lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    fmt::Debug,
8    iter,
9    ops::Range,
10    sync::Arc,
11    vec,
12};
13
14use arrow::array::AsArray;
15use arrow_array::{make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
16use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, ScalarBuffer};
17use arrow_schema::{DataType, Field as ArrowField};
18use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
19use itertools::Itertools;
20use lance_arrow::deepcopy::deep_copy_array;
21use lance_core::{
22    cache::{Context, DeepSizeOf},
23    datatypes::{
24        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
25    },
26    error::Error,
27    utils::bit::pad_bytes,
28    utils::hash::U8SliceKey,
29};
30use log::{debug, trace};
31use snafu::location;
32
33use crate::repdef::{
34    build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser,
35    DefinitionInterpretation, RepDefSlicer,
36};
37use crate::statistics::{ComputeStat, GetStat, Stat};
38use crate::utils::bytepack::ByteUnpacker;
39use crate::{
40    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
41    utils::bytepack::BytepackedIntegerEncoder,
42};
43use crate::{
44    decoder::{FixedPerValueDecompressor, VariablePerValueDecompressor},
45    encoder::PerValueDataBlock,
46};
47use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
48
49use crate::{
50    buffer::LanceBuffer,
51    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
52    decoder::{
53        BlockDecompressor, ColumnInfo, DecodeArrayTask, DecodePageTask, DecodedArray, DecodedPage,
54        DecompressorStrategy, FieldScheduler, FilterExpression, LoadedPage, LogicalPageDecoder,
55        MessageType, MiniBlockDecompressor, NextDecodeTask, PageEncoding, PageInfo, PageScheduler,
56        PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
57        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
58        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPage,
59    },
60    encoder::{
61        ArrayEncodingStrategy, CompressionStrategy, EncodeTask, EncodedColumn, EncodedPage,
62        EncodingOptions, FieldEncoder, MiniBlockChunk, MiniBlockCompressed, OutOfLineBuffers,
63    },
64    encodings::physical::{decoder_from_array_encoding, ColumnBuffers, PageBuffers},
65    format::{pb, ProtobufUtils},
66    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
67    EncodingsIo,
68};
69
70const FILL_BYTE: u8 = 0xFE;
71
72#[derive(Debug)]
73struct PrimitivePage {
74    scheduler: Box<dyn PageScheduler>,
75    num_rows: u64,
76    page_index: u32,
77}
78
79/// A field scheduler for primitive fields
80///
81/// This maps to exactly one column and it assumes that the top-level
82/// encoding of each page is "basic".  The basic encoding decodes into an
83/// optional buffer of validity and a fixed-width buffer of values
84/// which is exactly what we need to create a primitive array.
85///
86/// Note: we consider booleans and fixed-size-lists of primitive types to be
87/// primitive types.  This is slightly different than arrow-rs's definition
88#[derive(Debug)]
89pub struct PrimitiveFieldScheduler {
90    data_type: DataType,
91    page_schedulers: Vec<PrimitivePage>,
92    num_rows: u64,
93    should_validate: bool,
94    column_index: u32,
95}
96
97impl PrimitiveFieldScheduler {
98    pub fn new(
99        column_index: u32,
100        data_type: DataType,
101        pages: Arc<[PageInfo]>,
102        buffers: ColumnBuffers,
103        should_validate: bool,
104    ) -> Self {
105        let page_schedulers = pages
106            .iter()
107            .enumerate()
108            // Buggy versions of Lance could sometimes create empty pages
109            .filter(|(page_index, page)| {
110                log::trace!("Skipping empty page with index {}", page_index);
111                page.num_rows > 0
112            })
113            .map(|(page_index, page)| {
114                let page_buffers = PageBuffers {
115                    column_buffers: buffers,
116                    positions_and_sizes: &page.buffer_offsets_and_sizes,
117                };
118                let scheduler = decoder_from_array_encoding(
119                    page.encoding.as_legacy(),
120                    &page_buffers,
121                    &data_type,
122                );
123                PrimitivePage {
124                    scheduler,
125                    num_rows: page.num_rows,
126                    page_index: page_index as u32,
127                }
128            })
129            .collect::<Vec<_>>();
130        let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
131        Self {
132            data_type,
133            page_schedulers,
134            num_rows,
135            should_validate,
136            column_index,
137        }
138    }
139}
140
141#[derive(Debug)]
142struct PrimitiveFieldSchedulingJob<'a> {
143    scheduler: &'a PrimitiveFieldScheduler,
144    ranges: Vec<Range<u64>>,
145    page_idx: usize,
146    range_idx: usize,
147    range_offset: u64,
148    global_row_offset: u64,
149}
150
151impl<'a> PrimitiveFieldSchedulingJob<'a> {
152    pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
153        Self {
154            scheduler,
155            ranges,
156            page_idx: 0,
157            range_idx: 0,
158            range_offset: 0,
159            global_row_offset: 0,
160        }
161    }
162}
163
164impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
165    fn schedule_next(
166        &mut self,
167        context: &mut SchedulerContext,
168        priority: &dyn PriorityRange,
169    ) -> Result<ScheduledScanLine> {
170        debug_assert!(self.range_idx < self.ranges.len());
171        // Get our current range
172        let mut range = self.ranges[self.range_idx].clone();
173        range.start += self.range_offset;
174
175        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
176        trace!(
177            "Current range is {:?} and current page has {} rows",
178            range,
179            cur_page.num_rows
180        );
181        // Skip entire pages until we have some overlap with our next range
182        while cur_page.num_rows + self.global_row_offset <= range.start {
183            self.global_row_offset += cur_page.num_rows;
184            self.page_idx += 1;
185            trace!("Skipping entire page of {} rows", cur_page.num_rows);
186            cur_page = &self.scheduler.page_schedulers[self.page_idx];
187        }
188
189        // Now the cur_page has overlap with range.  Continue looping through ranges
190        // until we find a range that exceeds the current page
191
192        let mut ranges_in_page = Vec::new();
193        while cur_page.num_rows + self.global_row_offset > range.start {
194            range.start = range.start.max(self.global_row_offset);
195            let start_in_page = range.start - self.global_row_offset;
196            let end_in_page = start_in_page + (range.end - range.start);
197            let end_in_page = end_in_page.min(cur_page.num_rows);
198            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
199
200            ranges_in_page.push(start_in_page..end_in_page);
201            if last_in_range {
202                self.range_idx += 1;
203                if self.range_idx == self.ranges.len() {
204                    break;
205                }
206                range = self.ranges[self.range_idx].clone();
207            } else {
208                break;
209            }
210        }
211
212        let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
213        trace!(
214            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
215            num_rows_in_next,
216            ranges_in_page.len(),
217            cur_page.num_rows,
218            priority.current_priority(),
219            self.scheduler.column_index,
220            cur_page.page_index,
221        );
222
223        self.global_row_offset += cur_page.num_rows;
224        self.page_idx += 1;
225
226        let physical_decoder = cur_page.scheduler.schedule_ranges(
227            &ranges_in_page,
228            context.io(),
229            priority.current_priority(),
230        );
231
232        let logical_decoder = PrimitiveFieldDecoder {
233            data_type: self.scheduler.data_type.clone(),
234            column_index: self.scheduler.column_index,
235            unloaded_physical_decoder: Some(physical_decoder),
236            physical_decoder: None,
237            rows_drained: 0,
238            num_rows: num_rows_in_next,
239            should_validate: self.scheduler.should_validate,
240            page_index: cur_page.page_index,
241        };
242
243        let decoder = Box::new(logical_decoder);
244        let decoder_ready = context.locate_decoder(decoder);
245        Ok(ScheduledScanLine {
246            decoders: vec![MessageType::DecoderReady(decoder_ready)],
247            rows_scheduled: num_rows_in_next,
248        })
249    }
250
251    fn num_rows(&self) -> u64 {
252        self.ranges.iter().map(|r| r.end - r.start).sum()
253    }
254}
255
256impl FieldScheduler for PrimitiveFieldScheduler {
257    fn num_rows(&self) -> u64 {
258        self.num_rows
259    }
260
261    fn schedule_ranges<'a>(
262        &'a self,
263        ranges: &[std::ops::Range<u64>],
264        // TODO: Could potentially use filter to simplify decode, something of a micro-optimization probably
265        _filter: &FilterExpression,
266    ) -> Result<Box<dyn SchedulingJob + 'a>> {
267        Ok(Box::new(PrimitiveFieldSchedulingJob::new(
268            self,
269            ranges.to_vec(),
270        )))
271    }
272
273    fn initialize<'a>(
274        &'a self,
275        _filter: &'a FilterExpression,
276        _context: &'a SchedulerContext,
277    ) -> BoxFuture<'a, Result<()>> {
278        // 2.0 schedulers do not need to initialize
279        std::future::ready(Ok(())).boxed()
280    }
281}
282
283/// A trait for figuring out how to schedule the data within
284/// a single page.
285trait StructuralPageScheduler: std::fmt::Debug + Send {
286    /// Fetches any metadata required for the page
287    fn initialize<'a>(
288        &'a mut self,
289        io: &Arc<dyn EncodingsIo>,
290    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
291    /// Loads metadata from a previous initialize call
292    fn load(&mut self, data: &Arc<dyn CachedPageData>);
293    /// Schedules the read of the given ranges in the page
294    fn schedule_ranges(
295        &self,
296        ranges: &[Range<u64>],
297        io: &Arc<dyn EncodingsIo>,
298    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>>;
299}
300
301/// Metadata describing the decoded size of a mini-block
302#[derive(Debug)]
303struct ChunkMeta {
304    num_values: u64,
305    chunk_size_bytes: u64,
306    offset_bytes: u64,
307}
308
309/// A mini-block chunk that has been decoded and decompressed
310#[derive(Debug)]
311struct DecodedMiniBlockChunk {
312    rep: Option<ScalarBuffer<u16>>,
313    def: Option<ScalarBuffer<u16>>,
314    values: DataBlock,
315}
316
317/// A task to decode a one or more mini-blocks of data into an output batch
318///
319/// Note: Two batches might share the same mini-block of data.  When this happens
320/// then each batch gets a copy of the block and each batch decodes the block independently.
321///
322/// This means we have duplicated work but it is necessary to avoid having to synchronize
323/// the decoding of the block. (TODO: test this theory)
324#[derive(Debug)]
325struct DecodeMiniBlockTask {
326    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
327    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
328    value_decompressor: Arc<dyn MiniBlockDecompressor>,
329    dictionary_data: Option<Arc<DataBlock>>,
330    def_meaning: Arc<[DefinitionInterpretation]>,
331    num_buffers: u64,
332    max_visible_level: u16,
333    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
334}
335
336impl DecodeMiniBlockTask {
337    fn decode_levels(
338        rep_decompressor: &dyn BlockDecompressor,
339        levels: LanceBuffer,
340        num_levels: u16,
341    ) -> Result<ScalarBuffer<u16>> {
342        let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
343        let mut rep = rep.as_fixed_width().unwrap();
344        debug_assert_eq!(rep.num_values, num_levels as u64);
345        debug_assert_eq!(rep.bits_per_value, 16);
346        Ok(rep.data.borrow_to_typed_slice::<u16>())
347    }
348
349    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
350    // values from `level_buf` starting at `offset`.
351    //
352    // We need to handle both the case where `levels` is None (no nulls encountered
353    // yet) and the case where `level_buf` is None (the input we are copying from has
354    // no nulls)
355    fn extend_levels(
356        range: Range<u64>,
357        levels: &mut Option<LevelBuffer>,
358        level_buf: &Option<impl AsRef<[u16]>>,
359        dest_offset: usize,
360    ) {
361        if let Some(level_buf) = level_buf {
362            if levels.is_none() {
363                // This is the first non-empty def buf we've hit, fill in the past
364                // with 0 (valid)
365                let mut new_levels_vec =
366                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
367                new_levels_vec.extend(iter::repeat_n(0, dest_offset));
368                *levels = Some(new_levels_vec);
369            }
370            levels.as_mut().unwrap().extend(
371                level_buf.as_ref()[range.start as usize..range.end as usize]
372                    .iter()
373                    .copied(),
374            );
375        } else if let Some(levels) = levels {
376            let num_values = (range.end - range.start) as usize;
377            // This is an all-valid level_buf but we had nulls earlier and so we
378            // need to materialize it
379            levels.extend(iter::repeat_n(0, num_values));
380        }
381    }
382
383    /// Maps a range of rows to a range of items and a range of levels
384    ///
385    /// If there is no repetition information this just returns the range as-is.
386    ///
387    /// If there is repetition information then we need to do some work to figure out what
388    /// range of items corresponds to the requested range of rows.
389    ///
390    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
391    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
392    ///
393    /// Idx: 0 1 2 3 4 5 6
394    /// Rep: 1 0 0 1 0 1 0
395    ///
396    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
397    ///
398    /// If there are invisible items then we don't count them when calcuating the range of items we
399    /// are interested in but we do count them when calculating the range of levels we are interested
400    /// in.  As a result we have to return both the item range (first return value) and the level range
401    /// (second return value).
402    ///
403    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
404    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
405    ///
406    /// Idx: 0 1 2 3 4 5 6 7 8
407    /// Rep: 1 0 0 1 0 1 1 0 0
408    /// Def: 0 0 0 0 0 1 0 0 0
409    /// Itm: 1 2 3 4 5 6 7 8
410    ///
411    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
412    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
413    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
414    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
415    ///
416    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
417    ///
418    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
419    fn map_range(
420        range: Range<u64>,
421        rep: Option<&impl AsRef<[u16]>>,
422        def: Option<&impl AsRef<[u16]>>,
423        max_rep: u16,
424        max_visible_def: u16,
425        // The total number of items (not rows) in the chunk.  This is not quite the same as
426        // rep.len() / def.len() because it doesn't count invisible items
427        total_items: u64,
428        preamble_action: PreambleAction,
429    ) -> (Range<u64>, Range<u64>) {
430        if let Some(rep) = rep {
431            let mut rep = rep.as_ref();
432            // If there is a preamble and we need to skip it then do that first.  The work is the same
433            // whether there is def information or not
434            let mut items_in_preamble = 0;
435            let first_row_start = match preamble_action {
436                PreambleAction::Skip | PreambleAction::Take => {
437                    let first_row_start = if let Some(def) = def.as_ref() {
438                        let mut first_row_start = None;
439                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
440                            if *rep == max_rep {
441                                first_row_start = Some(idx);
442                                break;
443                            }
444                            if *def <= max_visible_def {
445                                items_in_preamble += 1;
446                            }
447                        }
448                        first_row_start
449                    } else {
450                        let first_row_start = rep.iter().position(|&r| r == max_rep);
451                        items_in_preamble = first_row_start.unwrap_or(rep.len());
452                        first_row_start
453                    };
454                    // It is possible for a chunk to be entirely partial values but if it is then it
455                    // should never show up as a preamble to skip
456                    if first_row_start.is_none() {
457                        assert!(preamble_action == PreambleAction::Take);
458                        return (0..total_items, 0..rep.len() as u64);
459                    }
460                    let first_row_start = first_row_start.unwrap() as u64;
461                    rep = &rep[first_row_start as usize..];
462                    first_row_start
463                }
464                PreambleAction::Absent => {
465                    debug_assert!(rep[0] == max_rep);
466                    0
467                }
468            };
469
470            // We hit this case when all we needed was the preamble
471            if range.start == range.end {
472                debug_assert!(preamble_action == PreambleAction::Take);
473                return (0..items_in_preamble as u64, 0..first_row_start);
474            }
475            assert!(range.start < range.end);
476
477            let mut rows_seen = 0;
478            let mut new_start = 0;
479            let mut new_levels_start = 0;
480
481            if let Some(def) = def {
482                let def = &def.as_ref()[first_row_start as usize..];
483
484                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
485                let mut lead_invis_seen = 0;
486
487                if range.start > 0 {
488                    if def[0] > max_visible_def {
489                        lead_invis_seen += 1;
490                    }
491                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
492                        if *rep == max_rep {
493                            rows_seen += 1;
494                            if rows_seen == range.start {
495                                new_start = idx as u64 + 1 - lead_invis_seen;
496                                new_levels_start = idx as u64 + 1;
497                                break;
498                            }
499                            if *def > max_visible_def {
500                                lead_invis_seen += 1;
501                            }
502                        }
503                    }
504                }
505
506                rows_seen += 1;
507
508                let mut new_end = u64::MAX;
509                let mut new_levels_end = rep.len() as u64;
510                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
511                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
512                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
513                    .iter()
514                    .zip(&def[(new_levels_start + 1) as usize..])
515                    .enumerate()
516                {
517                    if *rep == max_rep {
518                        rows_seen += 1;
519                        if rows_seen == range.end + 1 {
520                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
521                            new_levels_end = idx as u64 + new_levels_start + 1;
522                            break;
523                        }
524                        if *def > max_visible_def {
525                            tail_invis_seen += 1;
526                        }
527                    }
528                }
529
530                if new_end == u64::MAX {
531                    new_levels_end = rep.len() as u64;
532                    // This is the total number of visible items (minus any items in the preamble)
533                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
534                    new_end = rep.len() as u64 - total_invis_seen;
535                }
536
537                assert_ne!(new_end, u64::MAX);
538
539                // Adjust for any skipped preamble
540                if preamble_action == PreambleAction::Skip {
541                    // TODO: Should this be items_in_preamble?  If so, add a
542                    // unit test for this case
543                    new_start += first_row_start;
544                    new_end += first_row_start;
545                    new_levels_start += first_row_start;
546                    new_levels_end += first_row_start;
547                } else if preamble_action == PreambleAction::Take {
548                    debug_assert_eq!(new_start, 0);
549                    debug_assert_eq!(new_levels_start, 0);
550                    new_end += first_row_start;
551                    new_levels_end += first_row_start;
552                }
553
554                (new_start..new_end, new_levels_start..new_levels_end)
555            } else {
556                // Easy case, there are no invisible items, so we don't need to check for them
557                // The items range and levels range will be the same.  We do still need to walk
558                // the rep levels to find the row boundaries
559
560                // range.start == 0 always maps to 0, otherwise we need to walk
561                if range.start > 0 {
562                    for (idx, rep) in rep.iter().skip(1).enumerate() {
563                        if *rep == max_rep {
564                            rows_seen += 1;
565                            if rows_seen == range.start {
566                                new_start = idx as u64 + 1;
567                                break;
568                            }
569                        }
570                    }
571                }
572                let mut new_end = rep.len() as u64;
573                // range.end == max_items always maps to rep.len(), otherwise we need to walk
574                if range.end < total_items {
575                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
576                        if *rep == max_rep {
577                            rows_seen += 1;
578                            if rows_seen == range.end {
579                                new_end = idx as u64 + new_start + 1;
580                                break;
581                            }
582                        }
583                    }
584                }
585
586                // Adjust for any skipped preamble
587                if preamble_action == PreambleAction::Skip {
588                    new_start += first_row_start;
589                    new_end += first_row_start;
590                } else if preamble_action == PreambleAction::Take {
591                    debug_assert_eq!(new_start, 0);
592                    new_end += first_row_start;
593                }
594
595                (new_start..new_end, new_start..new_end)
596            }
597        } else {
598            // No repetition info, easy case, just use the range as-is and the item
599            // and level ranges are the same
600            (range.clone(), range)
601        }
602    }
603
604    // Unserialize a miniblock into a collection of vectors
605    fn decode_miniblock_chunk(
606        &self,
607        buf: &LanceBuffer,
608        items_in_chunk: u64,
609    ) -> Result<DecodedMiniBlockChunk> {
610        let mut offset = 0;
611        let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
612        offset += 2;
613
614        let rep_size = if self.rep_decompressor.is_some() {
615            let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
616            offset += 2;
617            Some(rep_size)
618        } else {
619            None
620        };
621        let def_size = if self.def_decompressor.is_some() {
622            let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
623            offset += 2;
624            Some(def_size)
625        } else {
626            None
627        };
628        let buffer_sizes = (0..self.num_buffers)
629            .map(|_| {
630                let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
631                offset += 2;
632                size
633            })
634            .collect::<Vec<_>>();
635
636        offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
637
638        let rep = rep_size.map(|rep_size| {
639            let rep = buf.slice_with_length(offset, rep_size as usize);
640            offset += rep_size as usize;
641            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
642            rep
643        });
644
645        let def = def_size.map(|def_size| {
646            let def = buf.slice_with_length(offset, def_size as usize);
647            offset += def_size as usize;
648            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
649            def
650        });
651
652        let buffers = buffer_sizes
653            .into_iter()
654            .map(|buf_size| {
655                let buf = buf.slice_with_length(offset, buf_size as usize);
656                offset += buf_size as usize;
657                offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
658                buf
659            })
660            .collect::<Vec<_>>();
661
662        let values = self
663            .value_decompressor
664            .decompress(buffers, items_in_chunk)?;
665
666        let rep = rep
667            .map(|rep| {
668                Self::decode_levels(
669                    self.rep_decompressor.as_ref().unwrap().as_ref(),
670                    rep,
671                    num_levels,
672                )
673            })
674            .transpose()?;
675        let def = def
676            .map(|def| {
677                Self::decode_levels(
678                    self.def_decompressor.as_ref().unwrap().as_ref(),
679                    def,
680                    num_levels,
681                )
682            })
683            .transpose()?;
684
685        Ok(DecodedMiniBlockChunk { rep, def, values })
686    }
687}
688
689impl DecodePageTask for DecodeMiniBlockTask {
690    fn decode(self: Box<Self>) -> Result<DecodedPage> {
691        // First, we create output buffers for the rep and def and data
692        let mut repbuf: Option<LevelBuffer> = None;
693        let mut defbuf: Option<LevelBuffer> = None;
694
695        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
696
697        // This is probably an over-estimate but it's quick and easy to calculate
698        let estimated_size_bytes = self
699            .instructions
700            .iter()
701            .map(|(_, chunk)| chunk.data.len())
702            .sum::<usize>()
703            * 2;
704        let mut data_builder =
705            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
706
707        // We need to keep track of the offset into repbuf/defbuf that we are building up
708        let mut level_offset = 0;
709        // Now we iterate through each instruction and process it
710        for (instructions, chunk) in self.instructions.iter() {
711            // TODO: It's very possible that we have duplicate `buf` in self.instructions and we
712            // don't want to decode the buf again and again on the same thread.
713
714            let DecodedMiniBlockChunk { rep, def, values } =
715                self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
716
717            // Our instructions tell us which rows we want to take from this chunk
718            let row_range_start =
719                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
720            let row_range_end = row_range_start + instructions.rows_to_take;
721
722            // We use the rep info to map the row range to an item range / levels range
723            let (item_range, level_range) = Self::map_range(
724                row_range_start..row_range_end,
725                rep.as_ref(),
726                def.as_ref(),
727                max_rep,
728                self.max_visible_level,
729                chunk.items_in_chunk,
730                instructions.preamble_action,
731            );
732
733            // Now we append the data to the output buffers
734            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
735            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
736            level_offset += (level_range.end - level_range.start) as usize;
737            data_builder.append(&values, item_range);
738        }
739
740        let data = data_builder.finish();
741
742        let unraveler = RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone());
743
744        // if dictionary encoding is applied, do dictionary decode here.
745        if let Some(dictionary) = &self.dictionary_data {
746            // assume the indices are uniformly distributed.
747            let estimated_size_bytes = dictionary.data_size()
748                * (data.num_values() + dictionary.num_values() - 1)
749                / dictionary.num_values();
750            let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
751
752            // if dictionary encoding is applied, indices are of type `UInt8`
753            if let DataBlock::FixedWidth(mut fixed_width_data_block) = data {
754                let indices = fixed_width_data_block.data.borrow_to_typed_slice::<u8>();
755                let indices = indices.as_ref();
756
757                indices.iter().for_each(|&idx| {
758                    data_builder.append(dictionary, idx as u64..idx as u64 + 1);
759                });
760
761                let data = data_builder.finish();
762                return Ok(DecodedPage {
763                    data,
764                    repdef: unraveler,
765                });
766            }
767        }
768
769        Ok(DecodedPage {
770            data,
771            repdef: unraveler,
772        })
773    }
774}
775
776/// A chunk that has been loaded by the miniblock scheduler (but not
777/// yet decoded)
778#[derive(Debug)]
779struct LoadedChunk {
780    data: LanceBuffer,
781    items_in_chunk: u64,
782    byte_range: Range<u64>,
783    chunk_idx: usize,
784}
785
786impl Clone for LoadedChunk {
787    fn clone(&self) -> Self {
788        Self {
789            // Safe as we always create borrowed buffers here
790            data: self.data.try_clone().unwrap(),
791            items_in_chunk: self.items_in_chunk,
792            byte_range: self.byte_range.clone(),
793            chunk_idx: self.chunk_idx,
794        }
795    }
796}
797
798/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
799/// details on the different layouts.
800#[derive(Debug)]
801struct MiniBlockDecoder {
802    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
803    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
804    value_decompressor: Arc<dyn MiniBlockDecompressor>,
805    def_meaning: Arc<[DefinitionInterpretation]>,
806    loaded_chunks: VecDeque<LoadedChunk>,
807    instructions: VecDeque<ChunkInstructions>,
808    offset_in_current_chunk: u64,
809    num_rows: u64,
810    num_buffers: u64,
811    dictionary: Option<Arc<DataBlock>>,
812}
813
814/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
815/// process for miniblock encoded data.
816impl StructuralPageDecoder for MiniBlockDecoder {
817    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
818        let mut items_desired = num_rows;
819        let mut need_preamble = false;
820        let mut skip_in_chunk = self.offset_in_current_chunk;
821        let mut drain_instructions = Vec::new();
822        while items_desired > 0 || need_preamble {
823            let (instructions, consumed) = self
824                .instructions
825                .front()
826                .unwrap()
827                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
828
829            while self.loaded_chunks.front().unwrap().chunk_idx
830                != instructions.chunk_instructions.chunk_idx
831            {
832                self.loaded_chunks.pop_front();
833            }
834            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
835            if consumed {
836                self.instructions.pop_front();
837            }
838        }
839        // We can throw away need_preamble here because it must be false.  If it were true it would mean
840        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
841        self.offset_in_current_chunk = skip_in_chunk;
842
843        let max_visible_level = self
844            .def_meaning
845            .iter()
846            .take_while(|l| !l.is_list())
847            .map(|l| l.num_def_levels())
848            .sum::<u16>();
849
850        Ok(Box::new(DecodeMiniBlockTask {
851            instructions: drain_instructions,
852            def_decompressor: self.def_decompressor.clone(),
853            rep_decompressor: self.rep_decompressor.clone(),
854            value_decompressor: self.value_decompressor.clone(),
855            dictionary_data: self.dictionary.clone(),
856            def_meaning: self.def_meaning.clone(),
857            num_buffers: self.num_buffers,
858            max_visible_level,
859        }))
860    }
861
862    fn num_rows(&self) -> u64 {
863        self.num_rows
864    }
865}
866
867#[derive(Debug)]
868struct CachedComplexAllNullState {
869    rep: Option<ScalarBuffer<u16>>,
870    def: Option<ScalarBuffer<u16>>,
871}
872
873impl DeepSizeOf for CachedComplexAllNullState {
874    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
875        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
876            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
877    }
878}
879
880impl CachedPageData for CachedComplexAllNullState {
881    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
882        self
883    }
884}
885
886/// A scheduler for all-null data that has repetition and definition levels
887///
888/// We still need to do some I/O in this case because we need to figure out what kind of null we
889/// are dealing with (null list, null struct, what level null struct, etc.)
890///
891/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
892/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
893/// and fast so...maybe not :)
894#[derive(Debug)]
895pub struct ComplexAllNullScheduler {
896    // Set from protobuf
897    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
898    def_meaning: Arc<[DefinitionInterpretation]>,
899    repdef: Option<Arc<CachedComplexAllNullState>>,
900}
901
902impl ComplexAllNullScheduler {
903    pub fn new(
904        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
905        def_meaning: Arc<[DefinitionInterpretation]>,
906    ) -> Self {
907        Self {
908            buffer_offsets_and_sizes,
909            def_meaning,
910            repdef: None,
911        }
912    }
913}
914
915impl StructuralPageScheduler for ComplexAllNullScheduler {
916    fn initialize<'a>(
917        &'a mut self,
918        io: &Arc<dyn EncodingsIo>,
919    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
920        // Fully load the rep & def buffers, as needed
921        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
922        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
923        let has_rep = rep_size > 0;
924        let has_def = def_size > 0;
925
926        let mut reads = Vec::with_capacity(2);
927        if has_rep {
928            reads.push(rep_pos..rep_pos + rep_size);
929        }
930        if has_def {
931            reads.push(def_pos..def_pos + def_size);
932        }
933
934        let data = io.submit_request(reads, 0);
935
936        async move {
937            let data = data.await?;
938            let mut data_iter = data.into_iter();
939
940            let rep = if has_rep {
941                let rep = data_iter.next().unwrap();
942                let mut rep = LanceBuffer::from_bytes(rep, 2);
943                let rep = rep.borrow_to_typed_slice::<u16>();
944                Some(rep)
945            } else {
946                None
947            };
948
949            let def = if has_def {
950                let def = data_iter.next().unwrap();
951                let mut def = LanceBuffer::from_bytes(def, 2);
952                let def = def.borrow_to_typed_slice::<u16>();
953                Some(def)
954            } else {
955                None
956            };
957
958            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
959
960            self.repdef = Some(repdef.clone());
961
962            Ok(repdef as Arc<dyn CachedPageData>)
963        }
964        .boxed()
965    }
966
967    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
968        self.repdef = Some(
969            data.clone()
970                .as_arc_any()
971                .downcast::<CachedComplexAllNullState>()
972                .unwrap(),
973        );
974    }
975
976    fn schedule_ranges(
977        &self,
978        ranges: &[Range<u64>],
979        _io: &Arc<dyn EncodingsIo>,
980    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
981        let ranges = VecDeque::from_iter(ranges.iter().cloned());
982        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
983        Ok(std::future::ready(Ok(Box::new(ComplexAllNullPageDecoder {
984            ranges,
985            rep: self.repdef.as_ref().unwrap().rep.clone(),
986            def: self.repdef.as_ref().unwrap().def.clone(),
987            num_rows,
988            def_meaning: self.def_meaning.clone(),
989        }) as Box<dyn StructuralPageDecoder>))
990        .boxed())
991    }
992}
993
994#[derive(Debug)]
995pub struct ComplexAllNullPageDecoder {
996    ranges: VecDeque<Range<u64>>,
997    rep: Option<ScalarBuffer<u16>>,
998    def: Option<ScalarBuffer<u16>>,
999    num_rows: u64,
1000    def_meaning: Arc<[DefinitionInterpretation]>,
1001}
1002
1003impl ComplexAllNullPageDecoder {
1004    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
1005        let mut rows_desired = num_rows;
1006        let mut ranges = Vec::with_capacity(self.ranges.len());
1007        while rows_desired > 0 {
1008            let front = self.ranges.front_mut().unwrap();
1009            let avail = front.end - front.start;
1010            if avail > rows_desired {
1011                ranges.push(front.start..front.start + rows_desired);
1012                front.start += rows_desired;
1013                rows_desired = 0;
1014            } else {
1015                ranges.push(self.ranges.pop_front().unwrap());
1016                rows_desired -= avail;
1017            }
1018        }
1019        ranges
1020    }
1021}
1022
1023impl StructuralPageDecoder for ComplexAllNullPageDecoder {
1024    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1025        let drained_ranges = self.drain_ranges(num_rows);
1026        Ok(Box::new(DecodeComplexAllNullTask {
1027            ranges: drained_ranges,
1028            rep: self.rep.clone(),
1029            def: self.def.clone(),
1030            def_meaning: self.def_meaning.clone(),
1031        }))
1032    }
1033
1034    fn num_rows(&self) -> u64 {
1035        self.num_rows
1036    }
1037}
1038
1039/// We use `ranges` to slice into `rep` and `def` and create rep/def buffers
1040/// for the null data.
1041#[derive(Debug)]
1042pub struct DecodeComplexAllNullTask {
1043    ranges: Vec<Range<u64>>,
1044    rep: Option<ScalarBuffer<u16>>,
1045    def: Option<ScalarBuffer<u16>>,
1046    def_meaning: Arc<[DefinitionInterpretation]>,
1047}
1048
1049impl DecodeComplexAllNullTask {
1050    fn decode_level(
1051        &self,
1052        levels: &Option<ScalarBuffer<u16>>,
1053        num_values: u64,
1054    ) -> Option<Vec<u16>> {
1055        levels.as_ref().map(|levels| {
1056            let mut referenced_levels = Vec::with_capacity(num_values as usize);
1057            for range in &self.ranges {
1058                referenced_levels.extend(
1059                    levels[range.start as usize..range.end as usize]
1060                        .iter()
1061                        .copied(),
1062                );
1063            }
1064            referenced_levels
1065        })
1066    }
1067}
1068
1069impl DecodePageTask for DecodeComplexAllNullTask {
1070    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1071        let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1072        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1073        let rep = self.decode_level(&self.rep, num_values);
1074        let def = self.decode_level(&self.def, num_values);
1075        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning);
1076        Ok(DecodedPage {
1077            data,
1078            repdef: unraveler,
1079        })
1080    }
1081}
1082
1083/// A scheduler for simple all-null data
1084///
1085/// "simple" all-null data is data that is all null and only has a single level of definition and
1086/// no repetition.  We don't need to read any data at all in this case.
1087#[derive(Debug, Default)]
1088pub struct SimpleAllNullScheduler {}
1089
1090impl StructuralPageScheduler for SimpleAllNullScheduler {
1091    fn initialize<'a>(
1092        &'a mut self,
1093        _io: &Arc<dyn EncodingsIo>,
1094    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1095        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1096    }
1097
1098    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1099
1100    fn schedule_ranges(
1101        &self,
1102        ranges: &[Range<u64>],
1103        _io: &Arc<dyn EncodingsIo>,
1104    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1105        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1106        Ok(std::future::ready(Ok(
1107            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>
1108        ))
1109        .boxed())
1110    }
1111}
1112
1113/// A page decode task for all-null data without any
1114/// repetition and only a single level of definition
1115#[derive(Debug)]
1116struct SimpleAllNullDecodePageTask {
1117    num_values: u64,
1118}
1119impl DecodePageTask for SimpleAllNullDecodePageTask {
1120    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1121        let unraveler = RepDefUnraveler::new(
1122            None,
1123            Some(vec![1; self.num_values as usize]),
1124            Arc::new([DefinitionInterpretation::NullableItem]),
1125        );
1126        Ok(DecodedPage {
1127            data: DataBlock::AllNull(AllNullDataBlock {
1128                num_values: self.num_values,
1129            }),
1130            repdef: unraveler,
1131        })
1132    }
1133}
1134
1135#[derive(Debug)]
1136pub struct SimpleAllNullPageDecoder {
1137    num_rows: u64,
1138}
1139
1140impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1141    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1142        Ok(Box::new(SimpleAllNullDecodePageTask {
1143            num_values: num_rows,
1144        }))
1145    }
1146
1147    fn num_rows(&self) -> u64 {
1148        self.num_rows
1149    }
1150}
1151
1152#[derive(Debug, Clone)]
1153struct MiniBlockSchedulerDictionary {
1154    // These come from the protobuf
1155    dictionary_decompressor: Arc<dyn BlockDecompressor>,
1156    dictionary_buf_position_and_size: (u64, u64),
1157    dictionary_data_alignment: u64,
1158    num_dictionary_items: u64,
1159}
1160
1161#[derive(Debug)]
1162struct RepIndexBlock {
1163    // The index of the first row that starts after the beginning of this block.  If the block
1164    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
1165    // then this will be a row that starts in some future block.
1166    first_row: u64,
1167    // The number of rows in the block, including the trailer but not the preamble.
1168    // Can be 0 if the block is entirely preamble
1169    starts_including_trailer: u64,
1170    // Whether the block has a preamble
1171    has_preamble: bool,
1172    // Whether the block has a trailer
1173    has_trailer: bool,
1174}
1175
1176impl DeepSizeOf for RepIndexBlock {
1177    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1178        0
1179    }
1180}
1181
1182#[derive(Debug)]
1183struct RepetitionIndex {
1184    blocks: Vec<RepIndexBlock>,
1185}
1186
1187impl DeepSizeOf for RepetitionIndex {
1188    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1189        self.blocks.deep_size_of_children(context)
1190    }
1191}
1192
1193impl RepetitionIndex {
1194    fn decode(rep_index: &[Vec<u64>]) -> Self {
1195        let mut chunk_has_preamble = false;
1196        let mut offset = 0;
1197        let mut blocks = Vec::with_capacity(rep_index.len());
1198        for chunk_rep in rep_index {
1199            let ends_count = chunk_rep[0];
1200            let partial_count = chunk_rep[1];
1201
1202            let chunk_has_trailer = partial_count > 0;
1203            let mut starts_including_trailer = ends_count;
1204            if chunk_has_trailer {
1205                starts_including_trailer += 1;
1206            }
1207            if chunk_has_preamble {
1208                starts_including_trailer -= 1;
1209            }
1210
1211            blocks.push(RepIndexBlock {
1212                first_row: offset,
1213                starts_including_trailer,
1214                has_preamble: chunk_has_preamble,
1215                has_trailer: chunk_has_trailer,
1216            });
1217
1218            chunk_has_preamble = chunk_has_trailer;
1219            offset += starts_including_trailer;
1220        }
1221
1222        Self { blocks }
1223    }
1224}
1225
1226/// State that is loaded once and cached for future lookups
1227#[derive(Debug)]
1228struct MiniBlockCacheableState {
1229    /// Metadata that describes each chunk in the page
1230    chunk_meta: Vec<ChunkMeta>,
1231    /// The decoded repetition index
1232    rep_index: RepetitionIndex,
1233    /// The dictionary for the page, if any
1234    dictionary: Option<Arc<DataBlock>>,
1235}
1236
1237impl DeepSizeOf for MiniBlockCacheableState {
1238    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1239        self.rep_index.deep_size_of_children(context)
1240            + self
1241                .dictionary
1242                .as_ref()
1243                .map(|dict| dict.data_size() as usize)
1244                .unwrap_or(0)
1245    }
1246}
1247
1248impl CachedPageData for MiniBlockCacheableState {
1249    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1250        self
1251    }
1252}
1253
1254/// A scheduler for a page that has been encoded with the mini-block layout
1255///
1256/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1257/// in practice.
1258///
1259/// First, during initialization, we load the chunk metadata, the repetition index,
1260/// and the dictionary (these last two may not be present)
1261///
1262/// Then, during scheduling, we use the user's requested row ranges and the repetition
1263/// index to determine which chunks we need and which rows we need from those chunks.
1264///
1265/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1266/// from the user is 40..60 then we need to:
1267///
1268///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1269///    then read 3 items for the 11th row of our range.
1270///  - Read the second chunk and read the remaining items in our 11th row and then read
1271///    the remaining 9 full rows.
1272///
1273/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1274/// The first two decode tasks will just need the first chunk.  The third decode task will
1275/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1276/// chunk.  The final decode task will just need the second chunk.
1277///
1278/// The above prose descriptions are what are represented by [`ChunkInstructions`] and
1279/// [`ChunkDrainInstructions`].
1280#[derive(Debug)]
1281pub struct MiniBlockScheduler {
1282    // These come from the protobuf
1283    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1284    priority: u64,
1285    items_in_page: u64,
1286    repetition_index_depth: u16,
1287    num_buffers: u64,
1288    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1289    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1290    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1291    def_meaning: Arc<[DefinitionInterpretation]>,
1292    dictionary: Option<MiniBlockSchedulerDictionary>,
1293    // This is set after initialization
1294    page_meta: Option<Arc<MiniBlockCacheableState>>,
1295}
1296
1297impl MiniBlockScheduler {
1298    fn try_new(
1299        buffer_offsets_and_sizes: &[(u64, u64)],
1300        priority: u64,
1301        items_in_page: u64,
1302        layout: &pb::MiniBlockLayout,
1303        decompressors: &dyn DecompressorStrategy,
1304    ) -> Result<Self> {
1305        let rep_decompressor = layout
1306            .rep_compression
1307            .as_ref()
1308            .map(|rep_compression| {
1309                decompressors
1310                    .create_block_decompressor(rep_compression)
1311                    .map(Arc::from)
1312            })
1313            .transpose()?;
1314        let def_decompressor = layout
1315            .def_compression
1316            .as_ref()
1317            .map(|def_compression| {
1318                decompressors
1319                    .create_block_decompressor(def_compression)
1320                    .map(Arc::from)
1321            })
1322            .transpose()?;
1323        let def_meaning = layout
1324            .layers
1325            .iter()
1326            .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1327            .collect::<Vec<_>>();
1328        let value_decompressor = decompressors
1329            .create_miniblock_decompressor(layout.value_compression.as_ref().unwrap())?;
1330        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1331            let num_dictionary_items = layout.num_dictionary_items;
1332            match dictionary_encoding.array_encoding.as_ref().unwrap() {
1333                pb::array_encoding::ArrayEncoding::Variable(_) => {
1334                    Some(MiniBlockSchedulerDictionary {
1335                        dictionary_decompressor: decompressors
1336                            .create_block_decompressor(dictionary_encoding)?
1337                            .into(),
1338                        dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1339                        dictionary_data_alignment: 4,
1340                        num_dictionary_items,
1341                    })
1342                }
1343                pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary {
1344                    dictionary_decompressor: decompressors
1345                        .create_block_decompressor(dictionary_encoding)?
1346                        .into(),
1347                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1348                    dictionary_data_alignment: 16,
1349                    num_dictionary_items,
1350                }),
1351                _ => {
1352                    unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
1353                }
1354            }
1355        } else {
1356            None
1357        };
1358
1359        Ok(Self {
1360            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1361            rep_decompressor,
1362            def_decompressor,
1363            value_decompressor: value_decompressor.into(),
1364            repetition_index_depth: layout.repetition_index_depth as u16,
1365            num_buffers: layout.num_buffers,
1366            priority,
1367            items_in_page,
1368            dictionary,
1369            def_meaning: def_meaning.into(),
1370            page_meta: None,
1371        })
1372    }
1373
1374    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1375        let page_meta = self.page_meta.as_ref().unwrap();
1376        chunk_indices
1377            .iter()
1378            .map(|&chunk_idx| {
1379                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1380                let bytes_start = chunk_meta.offset_bytes;
1381                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1382                LoadedChunk {
1383                    byte_range: bytes_start..bytes_end,
1384                    items_in_chunk: chunk_meta.num_values,
1385                    chunk_idx,
1386                    data: LanceBuffer::empty(),
1387                }
1388            })
1389            .collect()
1390    }
1391}
1392
1393#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1394enum PreambleAction {
1395    Take,
1396    Skip,
1397    Absent,
1398}
1399
1400// TODO: Add test cases for the all-preamble and all-trailer cases
1401
1402// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1403// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1404// us how exactly to read from the chunk.
1405#[derive(Clone, Debug, PartialEq, Eq)]
1406struct ChunkInstructions {
1407    // The index of the chunk to read
1408    chunk_idx: usize,
1409    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1410    // is no repetition index there is never a preamble.
1411    //
1412    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1413    // that spans several chunks.
1414    preamble: PreambleAction,
1415    // How many complete rows (not including the preamble or trailer) to skip
1416    //
1417    // If this is non-zero then premable must not be Take
1418    rows_to_skip: u64,
1419    // How many complete (non-preamble / non-trailer) rows to take
1420    rows_to_take: u64,
1421    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1422    // never a trailer.
1423    //
1424    // It's possible for a chunk to be entirely trailer.  This would mean the chunk starts with the beginning
1425    // of a list and that list is continued in the next chunk.
1426    //
1427    // If this is true then we want to include the trailer
1428    take_trailer: bool,
1429}
1430
1431// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1432// start decoding them, based on a batch size, which might not align with what we scheduled.
1433//
1434// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1435//
1436// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1437// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1438// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1439// then take 10 rows.
1440//
1441// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1442//  -no preamble, skip 5, take 10, take trailer
1443//  -take preamble, skip 0, take 50, no trailer
1444//
1445// and we are draining 20 rows then the drain instructions for the first batch will be:
1446//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1447//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1448#[derive(Debug, PartialEq, Eq)]
1449struct ChunkDrainInstructions {
1450    chunk_instructions: ChunkInstructions,
1451    rows_to_skip: u64,
1452    rows_to_take: u64,
1453    preamble_action: PreambleAction,
1454}
1455
1456impl ChunkInstructions {
1457    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1458    //
1459    // We assume that `user_ranges` are in sorted order and non-overlapping
1460    //
1461    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1462    fn schedule_instructions(rep_index: &RepetitionIndex, user_ranges: &[Range<u64>]) -> Vec<Self> {
1463        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1464        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1465        // per row which can happen with lists.
1466        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1467
1468        for user_range in user_ranges {
1469            let mut rows_needed = user_range.end - user_range.start;
1470            let mut need_preamble = false;
1471
1472            // Need to find the first chunk with a first row >= user_range.start.  If there are
1473            // multiple chunks with the same first row we need to take the first one.
1474            let mut block_index = match rep_index
1475                .blocks
1476                .binary_search_by_key(&user_range.start, |block| block.first_row)
1477            {
1478                Ok(idx) => {
1479                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1480                    // are grabbing first eligible chunk
1481                    let mut idx = idx;
1482                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1483                        idx -= 1;
1484                    }
1485                    idx
1486                }
1487                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1488                Err(idx) => idx - 1,
1489            };
1490
1491            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1492
1493            while rows_needed > 0 || need_preamble {
1494                let chunk = &rep_index.blocks[block_index];
1495                let rows_avail = chunk.starts_including_trailer - to_skip;
1496                debug_assert!(rows_avail > 0);
1497
1498                let rows_to_take = rows_avail.min(rows_needed);
1499                rows_needed -= rows_to_take;
1500
1501                let mut take_trailer = false;
1502                let preamble = if chunk.has_preamble {
1503                    if need_preamble {
1504                        PreambleAction::Take
1505                    } else {
1506                        PreambleAction::Skip
1507                    }
1508                } else {
1509                    PreambleAction::Absent
1510                };
1511                let mut rows_to_take_no_trailer = rows_to_take;
1512
1513                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1514                if rows_to_take == rows_avail && chunk.has_trailer {
1515                    take_trailer = true;
1516                    need_preamble = true;
1517                    rows_to_take_no_trailer -= 1;
1518                } else {
1519                    need_preamble = false;
1520                };
1521
1522                chunk_instructions.push(Self {
1523                    preamble,
1524                    chunk_idx: block_index,
1525                    rows_to_skip: to_skip,
1526                    rows_to_take: rows_to_take_no_trailer,
1527                    take_trailer,
1528                });
1529
1530                to_skip = 0;
1531                block_index += 1;
1532            }
1533        }
1534
1535        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1536        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1537        // rows of chunk 0 starting at 0")
1538        if user_ranges.len() > 1 {
1539            // TODO: Could probably optimize this allocation away
1540            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1541            let mut instructions_iter = chunk_instructions.into_iter();
1542            merged_instructions.push(instructions_iter.next().unwrap());
1543            for instruction in instructions_iter {
1544                let last = merged_instructions.last_mut().unwrap();
1545                if last.chunk_idx == instruction.chunk_idx
1546                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1547                {
1548                    last.rows_to_take += instruction.rows_to_take;
1549                    last.take_trailer |= instruction.take_trailer;
1550                } else {
1551                    merged_instructions.push(instruction);
1552                }
1553            }
1554            merged_instructions
1555        } else {
1556            chunk_instructions
1557        }
1558    }
1559
1560    fn drain_from_instruction(
1561        &self,
1562        rows_desired: &mut u64,
1563        need_preamble: &mut bool,
1564        skip_in_chunk: &mut u64,
1565    ) -> (ChunkDrainInstructions, bool) {
1566        // If we need the premable then we shouldn't be skipping anything
1567        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1568        let mut rows_avail = self.rows_to_take - *skip_in_chunk;
1569        let has_preamble = self.preamble != PreambleAction::Absent;
1570        let preamble_action = match (*need_preamble, has_preamble) {
1571            (true, true) => PreambleAction::Take,
1572            (true, false) => panic!("Need preamble but there isn't one"),
1573            (false, true) => PreambleAction::Skip,
1574            (false, false) => PreambleAction::Absent,
1575        };
1576
1577        // Did the scheduled chunk have a trailer?  If so, we have one extra row available
1578        if self.take_trailer {
1579            rows_avail += 1;
1580        }
1581
1582        // How many rows are we actually taking in this take step (including the preamble
1583        // and trailer both as individual rows)
1584        let rows_taking = if *rows_desired >= rows_avail {
1585            // We want all the rows.  If there is a trailer we are grabbing it and will need
1586            // the preamble of the next chunk
1587            *need_preamble = self.take_trailer;
1588            rows_avail
1589        } else {
1590            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1591            // it so we will not need the preamble
1592            *need_preamble = false;
1593            *rows_desired
1594        };
1595        let rows_skipped = *skip_in_chunk;
1596
1597        // Update the state for the next iteration
1598        let consumed_chunk = if *rows_desired >= rows_avail {
1599            *rows_desired -= rows_avail;
1600            *skip_in_chunk = 0;
1601            true
1602        } else {
1603            *skip_in_chunk += *rows_desired;
1604            *rows_desired = 0;
1605            false
1606        };
1607
1608        (
1609            ChunkDrainInstructions {
1610                chunk_instructions: self.clone(),
1611                rows_to_skip: rows_skipped,
1612                rows_to_take: rows_taking,
1613                preamble_action,
1614            },
1615            consumed_chunk,
1616        )
1617    }
1618}
1619
1620impl StructuralPageScheduler for MiniBlockScheduler {
1621    fn initialize<'a>(
1622        &'a mut self,
1623        io: &Arc<dyn EncodingsIo>,
1624    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1625        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1626        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1627        // need.
1628        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1629        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1630        let mut bufs_needed = 1;
1631        if self.dictionary.is_some() {
1632            bufs_needed += 1;
1633        }
1634        if self.repetition_index_depth > 0 {
1635            bufs_needed += 1;
1636        }
1637        let mut required_ranges = Vec::with_capacity(bufs_needed);
1638        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1639        if let Some(ref dictionary) = self.dictionary {
1640            required_ranges.push(
1641                dictionary.dictionary_buf_position_and_size.0
1642                    ..dictionary.dictionary_buf_position_and_size.0
1643                        + dictionary.dictionary_buf_position_and_size.1,
1644            );
1645        }
1646        if self.repetition_index_depth > 0 {
1647            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1648            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1649        }
1650        let io_req = io.submit_request(required_ranges, 0);
1651
1652        async move {
1653            let mut buffers = io_req.await?.into_iter().fuse();
1654            let meta_bytes = buffers.next().unwrap();
1655            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1656            let rep_index_bytes = buffers.next();
1657
1658            // Parse the metadata and build the chunk meta
1659            assert!(meta_bytes.len() % 2 == 0);
1660            let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1661            let words = bytes.borrow_to_typed_slice::<u16>();
1662            let words = words.as_ref();
1663
1664            let mut chunk_meta = Vec::with_capacity(words.len());
1665
1666            let mut rows_counter = 0;
1667            let mut offset_bytes = value_buf_position;
1668            for (word_idx, word) in words.iter().enumerate() {
1669                let log_num_values = word & 0x0F;
1670                let divided_bytes = word >> 4;
1671                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1672                debug_assert!(num_bytes > 0);
1673                let num_values = if word_idx < words.len() - 1 {
1674                    debug_assert!(log_num_values > 0);
1675                    1 << log_num_values
1676                } else {
1677                    debug_assert!(
1678                        log_num_values == 0
1679                            || (1 << log_num_values) == (self.items_in_page - rows_counter)
1680                    );
1681                    self.items_in_page - rows_counter
1682                };
1683                rows_counter += num_values;
1684
1685                chunk_meta.push(ChunkMeta {
1686                    num_values,
1687                    chunk_size_bytes: num_bytes as u64,
1688                    offset_bytes,
1689                });
1690                offset_bytes += num_bytes as u64;
1691            }
1692
1693            // Build the repetition index
1694            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1695                // If we have a repetition index then we use that
1696                // TODO: Compress the repetition index :)
1697                assert!(rep_index_data.len() % 8 == 0);
1698                let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8);
1699                let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::<u64>();
1700                // Unflatten
1701                repetition_index_vals
1702                    .as_ref()
1703                    .chunks_exact(self.repetition_index_depth as usize + 1)
1704                    .map(|c| c.to_vec())
1705                    .collect::<Vec<_>>()
1706            } else {
1707                // Default rep index is just the number of items in each chunk
1708                // with 0 partials/leftovers
1709                chunk_meta
1710                    .iter()
1711                    .map(|c| vec![c.num_values, 0])
1712                    .collect::<Vec<_>>()
1713            };
1714
1715            let mut page_meta = MiniBlockCacheableState {
1716                chunk_meta,
1717                rep_index: RepetitionIndex::decode(&rep_index),
1718                dictionary: None,
1719            };
1720
1721            // decode dictionary
1722            if let Some(ref mut dictionary) = self.dictionary {
1723                let dictionary_data = dictionary_bytes.unwrap();
1724                page_meta.dictionary =
1725                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1726                        LanceBuffer::from_bytes(
1727                            dictionary_data,
1728                            dictionary.dictionary_data_alignment,
1729                        ),
1730                        dictionary.num_dictionary_items,
1731                    )?));
1732            };
1733            let page_meta = Arc::new(page_meta);
1734            self.page_meta = Some(page_meta.clone());
1735            Ok(page_meta as Arc<dyn CachedPageData>)
1736        }
1737        .boxed()
1738    }
1739
1740    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1741        self.page_meta = Some(
1742            data.clone()
1743                .as_arc_any()
1744                .downcast::<MiniBlockCacheableState>()
1745                .unwrap(),
1746        );
1747    }
1748
1749    fn schedule_ranges(
1750        &self,
1751        ranges: &[Range<u64>],
1752        io: &Arc<dyn EncodingsIo>,
1753    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1754        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1755
1756        let page_meta = self.page_meta.as_ref().unwrap();
1757
1758        let chunk_instructions =
1759            ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1760
1761        debug_assert_eq!(
1762            num_rows,
1763            chunk_instructions
1764                .iter()
1765                .map(|ci| {
1766                    let taken = ci.rows_to_take;
1767                    if ci.take_trailer {
1768                        taken + 1
1769                    } else {
1770                        taken
1771                    }
1772                })
1773                .sum::<u64>()
1774        );
1775
1776        let chunks_needed = chunk_instructions
1777            .iter()
1778            .map(|ci| ci.chunk_idx)
1779            .unique()
1780            .collect::<Vec<_>>();
1781        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1782        let chunk_ranges = loaded_chunks
1783            .iter()
1784            .map(|c| c.byte_range.clone())
1785            .collect::<Vec<_>>();
1786        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1787
1788        let rep_decompressor = self.rep_decompressor.clone();
1789        let def_decompressor = self.def_decompressor.clone();
1790        let value_decompressor = self.value_decompressor.clone();
1791        let num_buffers = self.num_buffers;
1792        let dictionary = page_meta
1793            .dictionary
1794            .as_ref()
1795            .map(|dictionary| dictionary.clone());
1796        let def_meaning = self.def_meaning.clone();
1797
1798        let res = async move {
1799            let loaded_chunk_data = loaded_chunk_data.await?;
1800            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1801                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1802            }
1803
1804            Ok(Box::new(MiniBlockDecoder {
1805                rep_decompressor,
1806                def_decompressor,
1807                value_decompressor,
1808                def_meaning,
1809                loaded_chunks: VecDeque::from_iter(loaded_chunks),
1810                instructions: VecDeque::from(chunk_instructions),
1811                offset_in_current_chunk: 0,
1812                dictionary,
1813                num_rows,
1814                num_buffers,
1815            }) as Box<dyn StructuralPageDecoder>)
1816        }
1817        .boxed();
1818        Ok(res)
1819    }
1820}
1821
1822#[derive(Debug)]
1823struct FullZipRepIndexDetails {
1824    buf_position: u64,
1825    bytes_per_value: u64, // Will be 1, 2, 4, or 8
1826}
1827
1828#[derive(Debug)]
1829enum PerValueDecompressor {
1830    Fixed(Arc<dyn FixedPerValueDecompressor>),
1831    Variable(Arc<dyn VariablePerValueDecompressor>),
1832}
1833
1834#[derive(Debug)]
1835struct FullZipDecodeDetails {
1836    value_decompressor: PerValueDecompressor,
1837    def_meaning: Arc<[DefinitionInterpretation]>,
1838    ctrl_word_parser: ControlWordParser,
1839    max_rep: u16,
1840    max_visible_def: u16,
1841}
1842
1843/// A scheduler for full-zip encoded data
1844///
1845/// When the data type has a fixed-width then we simply need to map from
1846/// row ranges to byte ranges using the fixed-width of the data type.
1847///
1848/// When the data type is variable-width or has any repetition then a
1849/// repetition index is required.
1850#[derive(Debug)]
1851pub struct FullZipScheduler {
1852    data_buf_position: u64,
1853    rep_index: Option<FullZipRepIndexDetails>,
1854    priority: u64,
1855    rows_in_page: u64,
1856    bits_per_offset: u8,
1857    details: Arc<FullZipDecodeDetails>,
1858}
1859
1860impl FullZipScheduler {
1861    fn try_new(
1862        buffer_offsets_and_sizes: &[(u64, u64)],
1863        priority: u64,
1864        rows_in_page: u64,
1865        layout: &pb::FullZipLayout,
1866        decompressors: &dyn DecompressorStrategy,
1867        bits_per_offset: u8,
1868    ) -> Result<Self> {
1869        // We don't need the data_buf_size because either the data type is
1870        // fixed-width (and we can tell size from rows_in_page) or it is not
1871        // and we have a repetition index.
1872        let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1873        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1874            let num_reps = rows_in_page + 1;
1875            let bytes_per_rep = len / num_reps;
1876            debug_assert_eq!(len % num_reps, 0);
1877            debug_assert!(
1878                bytes_per_rep == 1
1879                    || bytes_per_rep == 2
1880                    || bytes_per_rep == 4
1881                    || bytes_per_rep == 8
1882            );
1883            FullZipRepIndexDetails {
1884                buf_position: *pos,
1885                bytes_per_value: bytes_per_rep,
1886            }
1887        });
1888
1889        let value_decompressor = match layout.details {
1890            Some(pb::full_zip_layout::Details::BitsPerValue(_)) => {
1891                let decompressor = decompressors.create_fixed_per_value_decompressor(
1892                    layout.value_compression.as_ref().unwrap(),
1893                )?;
1894                PerValueDecompressor::Fixed(decompressor.into())
1895            }
1896            Some(pb::full_zip_layout::Details::BitsPerOffset(_)) => {
1897                let decompressor = decompressors.create_variable_per_value_decompressor(
1898                    layout.value_compression.as_ref().unwrap(),
1899                )?;
1900                PerValueDecompressor::Variable(decompressor.into())
1901            }
1902            None => {
1903                panic!("Full-zip layout must have a `details` field");
1904            }
1905        };
1906        let ctrl_word_parser = ControlWordParser::new(
1907            layout.bits_rep.try_into().unwrap(),
1908            layout.bits_def.try_into().unwrap(),
1909        );
1910        let def_meaning = layout
1911            .layers
1912            .iter()
1913            .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1914            .collect::<Vec<_>>();
1915
1916        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1917        let max_visible_def = def_meaning
1918            .iter()
1919            .filter(|d| !d.is_list())
1920            .map(|d| d.num_def_levels())
1921            .sum();
1922
1923        let details = Arc::new(FullZipDecodeDetails {
1924            value_decompressor,
1925            def_meaning: def_meaning.into(),
1926            ctrl_word_parser,
1927            max_rep,
1928            max_visible_def,
1929        });
1930        Ok(Self {
1931            data_buf_position,
1932            rep_index,
1933            details,
1934            priority,
1935            rows_in_page,
1936            bits_per_offset,
1937        })
1938    }
1939
1940    /// Schedules indirectly by first fetching the data ranges from the
1941    /// repetition index and then fetching the data
1942    ///
1943    /// This approach is needed whenever we have a repetition index and
1944    /// the data has a variable length.
1945    #[allow(clippy::too_many_arguments)]
1946    async fn indirect_schedule_ranges(
1947        data_buffer_pos: u64,
1948        row_ranges: Vec<Range<u64>>,
1949        rep_index_ranges: Vec<Range<u64>>,
1950        bytes_per_rep: u64,
1951        io: Arc<dyn EncodingsIo>,
1952        priority: u64,
1953        bits_per_offset: u8,
1954        details: Arc<FullZipDecodeDetails>,
1955    ) -> Result<Box<dyn StructuralPageDecoder>> {
1956        let byte_ranges = io
1957            .submit_request(rep_index_ranges, priority)
1958            .await?
1959            .into_iter()
1960            .map(|d| LanceBuffer::from_bytes(d, 1))
1961            .collect::<Vec<_>>();
1962        let byte_ranges = LanceBuffer::concat(&byte_ranges);
1963        let byte_ranges = ByteUnpacker::new(byte_ranges, bytes_per_rep as usize)
1964            .chunks(2)
1965            .into_iter()
1966            .map(|mut c| {
1967                let start = c.next().unwrap() + data_buffer_pos;
1968                let end = c.next().unwrap() + data_buffer_pos;
1969                start..end
1970            })
1971            .collect::<Vec<_>>();
1972
1973        let data = io.submit_request(byte_ranges, priority);
1974
1975        let data = data.await?;
1976        let data = data
1977            .into_iter()
1978            .map(|d| LanceBuffer::from_bytes(d, 1))
1979            .collect();
1980        let num_rows = row_ranges.into_iter().map(|r| r.end - r.start).sum();
1981
1982        match &details.value_decompressor {
1983            PerValueDecompressor::Fixed(decompressor) => {
1984                let bits_per_value = decompressor.bits_per_value();
1985                assert!(bits_per_value > 0);
1986                if bits_per_value % 8 != 0 {
1987                    // Unlikely we will ever want this since full-zip values are so large the few bits we shave off don't
1988                    // make much difference.
1989                    unimplemented!("Bit-packed full-zip");
1990                }
1991                let bytes_per_value = bits_per_value / 8;
1992                let total_bytes_per_value =
1993                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1994                Ok(Box::new(FixedFullZipDecoder {
1995                    details,
1996                    data,
1997                    num_rows,
1998                    offset_in_current: 0,
1999                    bytes_per_value: bytes_per_value as usize,
2000                    total_bytes_per_value,
2001                }) as Box<dyn StructuralPageDecoder>)
2002            }
2003            PerValueDecompressor::Variable(_decompressor) => {
2004                // Variable full-zip
2005
2006                Ok(Box::new(VariableFullZipDecoder::new(
2007                    details,
2008                    data,
2009                    num_rows,
2010                    bits_per_offset,
2011                    bits_per_offset,
2012                )))
2013            }
2014        }
2015    }
2016
2017    /// Schedules ranges in the presence of a repetition index
2018    fn schedule_ranges_rep(
2019        &self,
2020        ranges: &[Range<u64>],
2021        io: &Arc<dyn EncodingsIo>,
2022        rep_index: &FullZipRepIndexDetails,
2023    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2024        let rep_index_ranges = ranges
2025            .iter()
2026            .flat_map(|r| {
2027                let first_val_start =
2028                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2029                let first_val_end = first_val_start + rep_index.bytes_per_value;
2030                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2031                let last_val_end = last_val_start + rep_index.bytes_per_value;
2032                [first_val_start..first_val_end, last_val_start..last_val_end]
2033            })
2034            .collect::<Vec<_>>();
2035
2036        // Create the decoder
2037
2038        Ok(Self::indirect_schedule_ranges(
2039            self.data_buf_position,
2040            ranges.to_vec(),
2041            rep_index_ranges,
2042            rep_index.bytes_per_value,
2043            io.clone(),
2044            self.priority,
2045            self.bits_per_offset,
2046            self.details.clone(),
2047        )
2048        .boxed())
2049    }
2050
2051    // In the simple case there is no repetition and we just have large fixed-width
2052    // rows of data.  We can just map row ranges to byte ranges directly using the
2053    // fixed-width of the data type.
2054    fn schedule_ranges_simple(
2055        &self,
2056        ranges: &[Range<u64>],
2057        io: &dyn EncodingsIo,
2058    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2059        // Convert row ranges to item ranges (i.e. multiply by items per row)
2060        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2061
2062        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2063            unreachable!()
2064        };
2065
2066        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
2067        let bits_per_value = decompressor.bits_per_value();
2068        assert_eq!(bits_per_value % 8, 0);
2069        let bytes_per_value = bits_per_value / 8;
2070        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2071        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2072        let byte_ranges = ranges.iter().map(|r| {
2073            debug_assert!(r.end <= self.rows_in_page);
2074            let start = self.data_buf_position + r.start * total_bytes_per_value;
2075            let end = self.data_buf_position + r.end * total_bytes_per_value;
2076            start..end
2077        });
2078
2079        // Request byte ranges
2080        let data = io.submit_request(byte_ranges.collect(), self.priority);
2081
2082        let details = self.details.clone();
2083
2084        Ok(async move {
2085            let data = data.await?;
2086            let data = data
2087                .into_iter()
2088                .map(|d| LanceBuffer::from_bytes(d, 1))
2089                .collect();
2090            Ok(Box::new(FixedFullZipDecoder {
2091                details,
2092                data,
2093                num_rows,
2094                offset_in_current: 0,
2095                bytes_per_value: bytes_per_value as usize,
2096                total_bytes_per_value: total_bytes_per_value as usize,
2097            }) as Box<dyn StructuralPageDecoder>)
2098        }
2099        .boxed())
2100    }
2101}
2102
2103impl StructuralPageScheduler for FullZipScheduler {
2104    // TODO: Add opt-in caching of repetition index
2105    fn initialize<'a>(
2106        &'a mut self,
2107        _io: &Arc<dyn EncodingsIo>,
2108    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2109        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2110    }
2111
2112    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
2113
2114    fn schedule_ranges(
2115        &self,
2116        ranges: &[Range<u64>],
2117        io: &Arc<dyn EncodingsIo>,
2118    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2119        if let Some(rep_index) = self.rep_index.as_ref() {
2120            self.schedule_ranges_rep(ranges, io, rep_index)
2121        } else {
2122            self.schedule_ranges_simple(ranges, io.as_ref())
2123        }
2124    }
2125}
2126
2127/// A decoder for full-zip encoded data when the data has a fixed-width
2128///
2129/// Here we need to unzip the control words from the values themselves and
2130/// then decompress the requested values.
2131///
2132/// We use a PerValueDecompressor because we will only be decompressing the
2133/// requested data.  This decoder / scheduler does not do any read amplification.
2134#[derive(Debug)]
2135struct FixedFullZipDecoder {
2136    details: Arc<FullZipDecodeDetails>,
2137    data: VecDeque<LanceBuffer>,
2138    offset_in_current: usize,
2139    bytes_per_value: usize,
2140    total_bytes_per_value: usize,
2141    num_rows: u64,
2142}
2143
2144impl FixedFullZipDecoder {
2145    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2146        debug_assert!(num_rows > 0);
2147        let cur_buf = self.data.front_mut().unwrap();
2148        let start = self.offset_in_current;
2149        if self.details.ctrl_word_parser.has_rep() {
2150            // This is a slightly slower path.  In order to figure out where to split we need to
2151            // examine the rep index so we can convert num_lists to num_rows
2152            let mut rows_started = 0;
2153            // We always need at least one value.  Now loop through until we have passed num_rows
2154            // values
2155            let mut num_items = 0;
2156            while self.offset_in_current < cur_buf.len() {
2157                let control = self.details.ctrl_word_parser.parse_desc(
2158                    &cur_buf[self.offset_in_current..],
2159                    self.details.max_rep,
2160                    self.details.max_visible_def,
2161                );
2162                if control.is_new_row {
2163                    if rows_started == num_rows {
2164                        break;
2165                    }
2166                    rows_started += 1;
2167                }
2168                num_items += 1;
2169                if control.is_visible {
2170                    self.offset_in_current += self.total_bytes_per_value;
2171                } else {
2172                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2173                }
2174            }
2175
2176            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2177            if self.offset_in_current == cur_buf.len() {
2178                self.data.pop_front();
2179                self.offset_in_current = 0;
2180            }
2181
2182            FullZipDecodeTaskItem {
2183                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2184                    data: task_slice,
2185                    bits_per_value: self.bytes_per_value as u64 * 8,
2186                    num_values: num_items,
2187                    block_info: BlockInfo::new(),
2188                }),
2189                rows_in_buf: rows_started,
2190            }
2191        } else {
2192            // If there's no repetition we can calculate the slicing point by just multiplying
2193            // the number of rows by the total bytes per value
2194            let cur_buf = self.data.front_mut().unwrap();
2195            let bytes_avail = cur_buf.len() - self.offset_in_current;
2196            let offset_in_cur = self.offset_in_current;
2197
2198            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2199            let mut rows_taken = num_rows;
2200            let task_slice = if bytes_needed >= bytes_avail {
2201                self.offset_in_current = 0;
2202                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2203                self.data
2204                    .pop_front()
2205                    .unwrap()
2206                    .slice_with_length(offset_in_cur, bytes_avail)
2207            } else {
2208                self.offset_in_current += bytes_needed;
2209                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2210            };
2211            FullZipDecodeTaskItem {
2212                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2213                    data: task_slice,
2214                    bits_per_value: self.bytes_per_value as u64 * 8,
2215                    num_values: rows_taken,
2216                    block_info: BlockInfo::new(),
2217                }),
2218                rows_in_buf: rows_taken,
2219            }
2220        }
2221    }
2222}
2223
2224impl StructuralPageDecoder for FixedFullZipDecoder {
2225    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2226        let mut task_data = Vec::with_capacity(self.data.len());
2227        let mut remaining = num_rows;
2228        while remaining > 0 {
2229            let task_item = self.slice_next_task(remaining);
2230            remaining -= task_item.rows_in_buf;
2231            task_data.push(task_item);
2232        }
2233        Ok(Box::new(FixedFullZipDecodeTask {
2234            details: self.details.clone(),
2235            data: task_data,
2236            bytes_per_value: self.bytes_per_value,
2237            num_rows: num_rows as usize,
2238        }))
2239    }
2240
2241    fn num_rows(&self) -> u64 {
2242        self.num_rows
2243    }
2244}
2245
2246/// A decoder for full-zip encoded data when the data has a variable-width
2247///
2248/// Here we need to unzip the control words AND lengths from the values and
2249/// then decompress the requested values.
2250#[derive(Debug)]
2251struct VariableFullZipDecoder {
2252    details: Arc<FullZipDecodeDetails>,
2253    decompressor: Arc<dyn VariablePerValueDecompressor>,
2254    data: LanceBuffer,
2255    offsets: LanceBuffer,
2256    rep: ScalarBuffer<u16>,
2257    def: ScalarBuffer<u16>,
2258    repdef_starts: Vec<usize>,
2259    data_starts: Vec<usize>,
2260    offset_starts: Vec<usize>,
2261    visible_item_counts: Vec<u64>,
2262    bits_per_offset: u8,
2263    current_idx: usize,
2264    num_rows: u64,
2265}
2266
2267impl VariableFullZipDecoder {
2268    fn new(
2269        details: Arc<FullZipDecodeDetails>,
2270        data: VecDeque<LanceBuffer>,
2271        num_rows: u64,
2272        in_bits_per_length: u8,
2273        out_bits_per_offset: u8,
2274    ) -> Self {
2275        let decompressor = match details.value_decompressor {
2276            PerValueDecompressor::Variable(ref d) => d.clone(),
2277            _ => unreachable!(),
2278        };
2279
2280        assert_eq!(in_bits_per_length % 8, 0);
2281        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2282
2283        let mut decoder = Self {
2284            details,
2285            decompressor,
2286            data: LanceBuffer::empty(),
2287            offsets: LanceBuffer::empty(),
2288            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2289            def: LanceBuffer::empty().borrow_to_typed_slice(),
2290            bits_per_offset: out_bits_per_offset,
2291            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2292            data_starts: Vec::with_capacity(num_rows as usize + 1),
2293            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2294            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2295            current_idx: 0,
2296            num_rows,
2297        };
2298
2299        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2300        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2301        // unpacked to know where the values start and end.
2302        //
2303        // We don't want to unzip on the decode thread because that is a single-threaded path
2304        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2305        //
2306        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2307        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2308        // we load all the data into memory and then throw it away only to load it all into memory again during
2309        // the decode.
2310        //
2311        // There are some alternatives to investigate:
2312        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2313        //     range in between.  This will give us the break points that we need for slicing and won't increase
2314        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2315        //     even when doing a full scan.
2316        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2317        //     has to do more work but the work is all fused.
2318        //   - We could just try doing this work on the decode thread and see if it is a problem.
2319        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2320
2321        decoder
2322    }
2323
2324    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2325        match bits_per_offset {
2326            8 => *data.get_unchecked(0) as u64,
2327            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2328            32 => u32::from_le_bytes([
2329                *data.get_unchecked(0),
2330                *data.get_unchecked(1),
2331                *data.get_unchecked(2),
2332                *data.get_unchecked(3),
2333            ]) as u64,
2334            64 => u64::from_le_bytes([
2335                *data.get_unchecked(0),
2336                *data.get_unchecked(1),
2337                *data.get_unchecked(2),
2338                *data.get_unchecked(3),
2339                *data.get_unchecked(4),
2340                *data.get_unchecked(5),
2341                *data.get_unchecked(6),
2342                *data.get_unchecked(7),
2343            ]),
2344            _ => unreachable!(),
2345        }
2346    }
2347
2348    fn unzip(
2349        &mut self,
2350        data: VecDeque<LanceBuffer>,
2351        in_bits_per_length: u8,
2352        out_bits_per_offset: u8,
2353        num_rows: u64,
2354    ) {
2355        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2356        let mut rep = Vec::with_capacity(num_rows as usize);
2357        let mut def = Vec::with_capacity(num_rows as usize);
2358        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2359
2360        // This undercounts if there are lists
2361        // It can also overcount if there are invisible items
2362        let bytes_per_offset = out_bits_per_offset as usize / 8;
2363        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2364        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2365
2366        let bytes_per_length = in_bits_per_length as usize / 8;
2367        let bytes_lengths = bytes_per_length * num_rows as usize;
2368
2369        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2370        // This overcounts since bytes_lengths and bytes_cw are undercounts
2371        // It can also undercount if there are invisible items (hence the saturating_sub)
2372        let mut unzipped_data =
2373            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2374
2375        let mut current_offset = 0_u64;
2376        let mut visible_item_count = 0_u64;
2377        for databuf in data.into_iter() {
2378            let mut databuf = databuf.as_ref();
2379            while !databuf.is_empty() {
2380                let data_start = unzipped_data.len();
2381                let offset_start = offsets_data.len();
2382                // We might have only-rep or only-def, neither, or both.  They move at the same
2383                // speed though so we only need one index into it
2384                let repdef_start = rep.len().max(def.len());
2385                // TODO: Kind of inefficient we parse the control word twice here
2386                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2387                    databuf,
2388                    self.details.max_rep,
2389                    self.details.max_visible_def,
2390                );
2391                self.details
2392                    .ctrl_word_parser
2393                    .parse(databuf, &mut rep, &mut def);
2394                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2395
2396                if ctrl_desc.is_new_row {
2397                    self.repdef_starts.push(repdef_start);
2398                    self.data_starts.push(data_start);
2399                    self.offset_starts.push(offset_start);
2400                    self.visible_item_counts.push(visible_item_count);
2401                }
2402                if ctrl_desc.is_visible {
2403                    visible_item_count += 1;
2404                    if ctrl_desc.is_valid_item {
2405                        // Safety: Data should have at least bytes_per_length bytes remaining
2406                        debug_assert!(databuf.len() >= bytes_per_length);
2407                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2408                        match out_bits_per_offset {
2409                            32 => offsets_data
2410                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2411                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2412                            _ => unreachable!(),
2413                        };
2414                        databuf = &databuf[bytes_per_offset..];
2415                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2416                        databuf = &databuf[length as usize..];
2417                        current_offset += length;
2418                    } else {
2419                        // Null items still get an offset
2420                        match out_bits_per_offset {
2421                            32 => offsets_data
2422                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2423                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2424                            _ => unreachable!(),
2425                        }
2426                    }
2427                }
2428            }
2429        }
2430        self.repdef_starts.push(rep.len().max(def.len()));
2431        self.data_starts.push(unzipped_data.len());
2432        self.offset_starts.push(offsets_data.len());
2433        self.visible_item_counts.push(visible_item_count);
2434        match out_bits_per_offset {
2435            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2436            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2437            _ => unreachable!(),
2438        };
2439        self.rep = ScalarBuffer::from(rep);
2440        self.def = ScalarBuffer::from(def);
2441        self.data = LanceBuffer::Owned(unzipped_data);
2442        self.offsets = LanceBuffer::Owned(offsets_data);
2443    }
2444}
2445
2446impl StructuralPageDecoder for VariableFullZipDecoder {
2447    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2448        let start = self.current_idx;
2449        let end = start + num_rows as usize;
2450
2451        // This might seem a little peculiar.  We are returning the entire data for every single
2452        // batch.  This is because the offsets are relative to the start of the data.  In other words
2453        // imagine we have a data buffer that is 100 bytes long and the offsets are [0, 10, 20, 30, 40]
2454        // and we return in batches of two.  The second set of offsets will be [20, 30, 40].
2455        //
2456        // So either we pay for a copy to normalize the offsets or we just return the entire data buffer
2457        // which is slightly cheaper.
2458        let data = self.data.borrow_and_clone();
2459
2460        let offset_start = self.offset_starts[start];
2461        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2462        let offsets = self
2463            .offsets
2464            .slice_with_length(offset_start, offset_end - offset_start);
2465
2466        let repdef_start = self.repdef_starts[start];
2467        let repdef_end = self.repdef_starts[end];
2468        let rep = if self.rep.is_empty() {
2469            self.rep.clone()
2470        } else {
2471            self.rep.slice(repdef_start, repdef_end - repdef_start)
2472        };
2473        let def = if self.def.is_empty() {
2474            self.def.clone()
2475        } else {
2476            self.def.slice(repdef_start, repdef_end - repdef_start)
2477        };
2478
2479        let visible_item_counts_start = self.visible_item_counts[start];
2480        let visible_item_counts_end = self.visible_item_counts[end];
2481        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2482
2483        self.current_idx += num_rows as usize;
2484
2485        Ok(Box::new(VariableFullZipDecodeTask {
2486            details: self.details.clone(),
2487            decompressor: self.decompressor.clone(),
2488            data,
2489            offsets,
2490            bits_per_offset: self.bits_per_offset,
2491            num_visible_items,
2492            rep,
2493            def,
2494        }))
2495    }
2496
2497    fn num_rows(&self) -> u64 {
2498        self.num_rows
2499    }
2500}
2501
2502#[derive(Debug)]
2503struct VariableFullZipDecodeTask {
2504    details: Arc<FullZipDecodeDetails>,
2505    decompressor: Arc<dyn VariablePerValueDecompressor>,
2506    data: LanceBuffer,
2507    offsets: LanceBuffer,
2508    bits_per_offset: u8,
2509    num_visible_items: u64,
2510    rep: ScalarBuffer<u16>,
2511    def: ScalarBuffer<u16>,
2512}
2513
2514impl DecodePageTask for VariableFullZipDecodeTask {
2515    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2516        let block = VariableWidthBlock {
2517            data: self.data,
2518            offsets: self.offsets,
2519            bits_per_offset: self.bits_per_offset,
2520            num_values: self.num_visible_items,
2521            block_info: BlockInfo::new(),
2522        };
2523        let decomopressed = self.decompressor.decompress(block)?;
2524        let rep = self.rep.to_vec();
2525        let def = self.def.to_vec();
2526        let unraveler =
2527            RepDefUnraveler::new(Some(rep), Some(def), self.details.def_meaning.clone());
2528        Ok(DecodedPage {
2529            data: decomopressed,
2530            repdef: unraveler,
2531        })
2532    }
2533}
2534
2535#[derive(Debug)]
2536struct FullZipDecodeTaskItem {
2537    data: PerValueDataBlock,
2538    rows_in_buf: u64,
2539}
2540
2541/// A task to unzip and decompress full-zip encoded data when that data
2542/// has a fixed-width.
2543#[derive(Debug)]
2544struct FixedFullZipDecodeTask {
2545    details: Arc<FullZipDecodeDetails>,
2546    data: Vec<FullZipDecodeTaskItem>,
2547    num_rows: usize,
2548    bytes_per_value: usize,
2549}
2550
2551impl DecodePageTask for FixedFullZipDecodeTask {
2552    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2553        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
2554        let estimated_size_bytes = self
2555            .data
2556            .iter()
2557            .map(|task_item| task_item.data.data_size() as usize)
2558            .sum::<usize>()
2559            * 2;
2560        let mut data_builder =
2561            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2562
2563        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2564            // Fast path, no need to unzip because there is no rep/def
2565            //
2566            // We decompress each buffer and add it to our output buffer
2567            for task_item in self.data.into_iter() {
2568                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2569                    unreachable!()
2570                };
2571                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2572                else {
2573                    unreachable!()
2574                };
2575                debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2576                let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2577                data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2578            }
2579
2580            let unraveler = RepDefUnraveler::new(None, None, self.details.def_meaning.clone());
2581
2582            Ok(DecodedPage {
2583                data: data_builder.finish(),
2584                repdef: unraveler,
2585            })
2586        } else {
2587            // Slow path, unzipping needed
2588            let mut rep = Vec::with_capacity(self.num_rows);
2589            let mut def = Vec::with_capacity(self.num_rows);
2590
2591            for task_item in self.data.into_iter() {
2592                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2593                    unreachable!()
2594                };
2595                let mut buf_slice = fixed_data.data.as_ref();
2596                let num_values = fixed_data.num_values as usize;
2597                // We will be unzipping repdef in to `rep` and `def` and the
2598                // values into `values` (which contains the compressed values)
2599                let mut values = Vec::with_capacity(
2600                    fixed_data.data.len()
2601                        - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2602                );
2603                let mut visible_items = 0;
2604                for _ in 0..num_values {
2605                    // Extract rep/def
2606                    self.details
2607                        .ctrl_word_parser
2608                        .parse(buf_slice, &mut rep, &mut def);
2609                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2610
2611                    let is_visible = def
2612                        .last()
2613                        .map(|d| *d <= self.details.max_visible_def)
2614                        .unwrap_or(true);
2615                    if is_visible {
2616                        // Extract value
2617                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2618                        buf_slice = &buf_slice[self.bytes_per_value..];
2619                        visible_items += 1;
2620                    }
2621                }
2622
2623                // Finally, we decompress the values and add them to our output buffer
2624                let values_buf = LanceBuffer::Owned(values);
2625                let fixed_data = FixedWidthDataBlock {
2626                    bits_per_value: self.bytes_per_value as u64 * 8,
2627                    block_info: BlockInfo::new(),
2628                    data: values_buf,
2629                    num_values: visible_items,
2630                };
2631                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2632                else {
2633                    unreachable!()
2634                };
2635                let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2636                data_builder.append(&decompressed, 0..visible_items);
2637            }
2638
2639            let repetition = if rep.is_empty() { None } else { Some(rep) };
2640            let definition = if def.is_empty() { None } else { Some(def) };
2641
2642            let unraveler =
2643                RepDefUnraveler::new(repetition, definition, self.details.def_meaning.clone());
2644            let data = data_builder.finish();
2645
2646            Ok(DecodedPage {
2647                data,
2648                repdef: unraveler,
2649            })
2650        }
2651    }
2652}
2653
2654#[derive(Debug)]
2655struct StructuralPrimitiveFieldSchedulingJob<'a> {
2656    scheduler: &'a StructuralPrimitiveFieldScheduler,
2657    ranges: Vec<Range<u64>>,
2658    page_idx: usize,
2659    range_idx: usize,
2660    global_row_offset: u64,
2661}
2662
2663impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2664    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2665        Self {
2666            scheduler,
2667            ranges,
2668            page_idx: 0,
2669            range_idx: 0,
2670            global_row_offset: 0,
2671        }
2672    }
2673}
2674
2675impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2676    fn schedule_next(
2677        &mut self,
2678        context: &mut SchedulerContext,
2679    ) -> Result<Option<ScheduledScanLine>> {
2680        if self.range_idx >= self.ranges.len() {
2681            return Ok(None);
2682        }
2683        // Get our current range
2684        let mut range = self.ranges[self.range_idx].clone();
2685        let priority = range.start;
2686
2687        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2688        trace!(
2689            "Current range is {:?} and current page has {} rows",
2690            range,
2691            cur_page.num_rows
2692        );
2693        // Skip entire pages until we have some overlap with our next range
2694        while cur_page.num_rows + self.global_row_offset <= range.start {
2695            self.global_row_offset += cur_page.num_rows;
2696            self.page_idx += 1;
2697            trace!("Skipping entire page of {} rows", cur_page.num_rows);
2698            cur_page = &self.scheduler.page_schedulers[self.page_idx];
2699        }
2700
2701        // Now the cur_page has overlap with range.  Continue looping through ranges
2702        // until we find a range that exceeds the current page
2703
2704        let mut ranges_in_page = Vec::new();
2705        while cur_page.num_rows + self.global_row_offset > range.start {
2706            range.start = range.start.max(self.global_row_offset);
2707            let start_in_page = range.start - self.global_row_offset;
2708            let end_in_page = start_in_page + (range.end - range.start);
2709            let end_in_page = end_in_page.min(cur_page.num_rows);
2710            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2711
2712            ranges_in_page.push(start_in_page..end_in_page);
2713            if last_in_range {
2714                self.range_idx += 1;
2715                if self.range_idx == self.ranges.len() {
2716                    break;
2717                }
2718                range = self.ranges[self.range_idx].clone();
2719            } else {
2720                break;
2721            }
2722        }
2723
2724        let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
2725        trace!(
2726            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2727            num_rows_in_next,
2728            ranges_in_page.len(),
2729            cur_page.num_rows,
2730            priority,
2731            self.scheduler.column_index,
2732            cur_page.page_index,
2733        );
2734
2735        self.global_row_offset += cur_page.num_rows;
2736        self.page_idx += 1;
2737
2738        let page_decoder = cur_page
2739            .scheduler
2740            .schedule_ranges(&ranges_in_page, context.io())?;
2741
2742        let cur_path = context.current_path();
2743        let page_index = cur_page.page_index;
2744        let unloaded_page = async move {
2745            let page_decoder = page_decoder.await?;
2746            Ok(LoadedPage {
2747                decoder: page_decoder,
2748                path: cur_path,
2749                page_index,
2750            })
2751        }
2752        .boxed();
2753
2754        Ok(Some(ScheduledScanLine {
2755            decoders: vec![MessageType::UnloadedPage(UnloadedPage(unloaded_page))],
2756            rows_scheduled: num_rows_in_next,
2757        }))
2758    }
2759}
2760
2761#[derive(Debug)]
2762struct PageInfoAndScheduler {
2763    page_index: usize,
2764    num_rows: u64,
2765    scheduler: Box<dyn StructuralPageScheduler>,
2766}
2767
2768/// A scheduler for a leaf node
2769///
2770/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
2771/// appropriate for the layout of the page.
2772#[derive(Debug)]
2773pub struct StructuralPrimitiveFieldScheduler {
2774    page_schedulers: Vec<PageInfoAndScheduler>,
2775    column_index: u32,
2776}
2777
2778impl StructuralPrimitiveFieldScheduler {
2779    pub fn try_new(
2780        column_info: &ColumnInfo,
2781        decompressors: &dyn DecompressorStrategy,
2782    ) -> Result<Self> {
2783        let page_schedulers = column_info
2784            .page_infos
2785            .iter()
2786            .enumerate()
2787            .map(|(page_index, page_info)| {
2788                Self::page_info_to_scheduler(
2789                    page_info,
2790                    page_index,
2791                    column_info.index as usize,
2792                    decompressors,
2793                )
2794            })
2795            .collect::<Result<Vec<_>>>()?;
2796        Ok(Self {
2797            page_schedulers,
2798            column_index: column_info.index,
2799        })
2800    }
2801
2802    fn page_info_to_scheduler(
2803        page_info: &PageInfo,
2804        page_index: usize,
2805        _column_index: usize,
2806        decompressors: &dyn DecompressorStrategy,
2807    ) -> Result<PageInfoAndScheduler> {
2808        let scheduler: Box<dyn StructuralPageScheduler> =
2809            match page_info.encoding.as_structural().layout.as_ref() {
2810                Some(pb::page_layout::Layout::MiniBlockLayout(mini_block)) => {
2811                    Box::new(MiniBlockScheduler::try_new(
2812                        &page_info.buffer_offsets_and_sizes,
2813                        page_info.priority,
2814                        mini_block.num_items,
2815                        mini_block,
2816                        decompressors,
2817                    )?)
2818                }
2819                Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => {
2820                    Box::new(FullZipScheduler::try_new(
2821                        &page_info.buffer_offsets_and_sizes,
2822                        page_info.priority,
2823                        page_info.num_rows,
2824                        full_zip,
2825                        decompressors,
2826                        /*bits_per_offset=*/ 32,
2827                    )?)
2828                }
2829                Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {
2830                    let def_meaning = all_null
2831                        .layers
2832                        .iter()
2833                        .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
2834                        .collect::<Vec<_>>();
2835                    if def_meaning.len() == 1
2836                        && def_meaning[0] == DefinitionInterpretation::NullableItem
2837                    {
2838                        Box::new(SimpleAllNullScheduler::default())
2839                            as Box<dyn StructuralPageScheduler>
2840                    } else {
2841                        Box::new(ComplexAllNullScheduler::new(
2842                            page_info.buffer_offsets_and_sizes.clone(),
2843                            def_meaning.into(),
2844                        )) as Box<dyn StructuralPageScheduler>
2845                    }
2846                }
2847                _ => todo!(),
2848            };
2849        Ok(PageInfoAndScheduler {
2850            page_index,
2851            num_rows: page_info.num_rows,
2852            scheduler,
2853        })
2854    }
2855}
2856
2857pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
2858    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
2859}
2860
2861pub struct NoCachedPageData;
2862
2863impl DeepSizeOf for NoCachedPageData {
2864    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
2865        0
2866    }
2867}
2868impl CachedPageData for NoCachedPageData {
2869    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2870        self
2871    }
2872}
2873
2874pub struct CachedFieldData {
2875    pages: Vec<Arc<dyn CachedPageData>>,
2876}
2877
2878impl DeepSizeOf for CachedFieldData {
2879    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
2880        self.pages.deep_size_of_children(ctx)
2881    }
2882}
2883
2884impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
2885    fn initialize<'a>(
2886        &'a mut self,
2887        _filter: &'a FilterExpression,
2888        context: &'a SchedulerContext,
2889    ) -> BoxFuture<'a, Result<()>> {
2890        let cache_key = self.column_index.to_string();
2891        if let Some(cached_data) = context.cache().get_by_str::<CachedFieldData>(&cache_key) {
2892            self.page_schedulers
2893                .iter_mut()
2894                .zip(cached_data.pages.iter())
2895                .for_each(|(page_scheduler, cached_data)| {
2896                    page_scheduler.scheduler.load(cached_data);
2897                });
2898            return std::future::ready(Ok(())).boxed();
2899        };
2900
2901        let cache = context.cache().clone();
2902        let page_data = self
2903            .page_schedulers
2904            .iter_mut()
2905            .map(|s| s.scheduler.initialize(context.io()))
2906            .collect::<FuturesOrdered<_>>();
2907
2908        async move {
2909            let page_data = page_data.try_collect::<Vec<_>>().await?;
2910            let cached_data = Arc::new(CachedFieldData { pages: page_data });
2911            cache.insert_by_str::<CachedFieldData>(&cache_key, cached_data);
2912            Ok(())
2913        }
2914        .boxed()
2915    }
2916
2917    fn schedule_ranges<'a>(
2918        &'a self,
2919        ranges: &[Range<u64>],
2920        _filter: &FilterExpression,
2921    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
2922        let ranges = ranges.to_vec();
2923        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
2924            self, ranges,
2925        )))
2926    }
2927}
2928
2929pub struct PrimitiveFieldDecoder {
2930    data_type: DataType,
2931    unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
2932    physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
2933    should_validate: bool,
2934    num_rows: u64,
2935    rows_drained: u64,
2936    column_index: u32,
2937    page_index: u32,
2938}
2939
2940impl PrimitiveFieldDecoder {
2941    pub fn new_from_data(
2942        physical_decoder: Arc<dyn PrimitivePageDecoder>,
2943        data_type: DataType,
2944        num_rows: u64,
2945        should_validate: bool,
2946    ) -> Self {
2947        Self {
2948            data_type,
2949            unloaded_physical_decoder: None,
2950            physical_decoder: Some(physical_decoder),
2951            should_validate,
2952            num_rows,
2953            rows_drained: 0,
2954            column_index: u32::MAX,
2955            page_index: u32::MAX,
2956        }
2957    }
2958}
2959
2960impl Debug for PrimitiveFieldDecoder {
2961    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2962        f.debug_struct("PrimitiveFieldDecoder")
2963            .field("data_type", &self.data_type)
2964            .field("num_rows", &self.num_rows)
2965            .field("rows_drained", &self.rows_drained)
2966            .finish()
2967    }
2968}
2969
2970struct PrimitiveFieldDecodeTask {
2971    rows_to_skip: u64,
2972    rows_to_take: u64,
2973    should_validate: bool,
2974    physical_decoder: Arc<dyn PrimitivePageDecoder>,
2975    data_type: DataType,
2976}
2977
2978impl DecodeArrayTask for PrimitiveFieldDecodeTask {
2979    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2980        let block = self
2981            .physical_decoder
2982            .decode(self.rows_to_skip, self.rows_to_take)?;
2983
2984        let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
2985
2986        // This is a bit of a hack to work around https://github.com/apache/arrow-rs/issues/6302
2987        //
2988        // We change from nulls-in-dictionary (storage format) to nulls-in-indices (arrow-rs preferred
2989        // format)
2990        //
2991        // The calculation of logical_nulls is not free and would be good to avoid in the future
2992        if let DataType::Dictionary(_, _) = self.data_type {
2993            let dict = array.as_any_dictionary();
2994            if let Some(nulls) = array.logical_nulls() {
2995                let new_indices = dict.keys().to_data();
2996                let new_array = make_array(
2997                    new_indices
2998                        .into_builder()
2999                        .nulls(Some(nulls))
3000                        .add_child_data(dict.values().to_data())
3001                        .data_type(dict.data_type().clone())
3002                        .build()?,
3003                );
3004                return Ok(new_array);
3005            }
3006        }
3007        Ok(array)
3008    }
3009}
3010
3011impl LogicalPageDecoder for PrimitiveFieldDecoder {
3012    // TODO: In the future, at some point, we may consider partially waiting for primitive pages by
3013    // breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode"
3014    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
3015        log::trace!(
3016            "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
3017            loaded_need,
3018            self.column_index,
3019            self.page_index,
3020            self.num_rows
3021        );
3022        async move {
3023            let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
3024            self.physical_decoder = Some(Arc::from(physical_decoder));
3025            Ok(())
3026        }
3027        .boxed()
3028    }
3029
3030    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
3031        if self.physical_decoder.as_ref().is_none() {
3032            return Err(lance_core::Error::Internal {
3033                message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
3034                location: location!(),
3035            });
3036        }
3037
3038        let rows_to_skip = self.rows_drained;
3039        let rows_to_take = num_rows;
3040
3041        self.rows_drained += rows_to_take;
3042
3043        let task = Box::new(PrimitiveFieldDecodeTask {
3044            rows_to_skip,
3045            rows_to_take,
3046            should_validate: self.should_validate,
3047            physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
3048            data_type: self.data_type.clone(),
3049        });
3050
3051        Ok(NextDecodeTask {
3052            task,
3053            num_rows: rows_to_take,
3054        })
3055    }
3056
3057    fn rows_loaded(&self) -> u64 {
3058        if self.unloaded_physical_decoder.is_some() {
3059            0
3060        } else {
3061            self.num_rows
3062        }
3063    }
3064
3065    fn rows_drained(&self) -> u64 {
3066        if self.unloaded_physical_decoder.is_some() {
3067            0
3068        } else {
3069            self.rows_drained
3070        }
3071    }
3072
3073    fn num_rows(&self) -> u64 {
3074        self.num_rows
3075    }
3076
3077    fn data_type(&self) -> &DataType {
3078        &self.data_type
3079    }
3080}
3081
3082/// Takes the output from several pages decoders and
3083/// concatenates them.
3084#[derive(Debug)]
3085pub struct StructuralCompositeDecodeArrayTask {
3086    tasks: Vec<Box<dyn DecodePageTask>>,
3087    should_validate: bool,
3088    data_type: DataType,
3089}
3090
3091impl StructuralCompositeDecodeArrayTask {
3092    fn restore_validity(
3093        array: Arc<dyn Array>,
3094        unraveler: &mut CompositeRepDefUnraveler,
3095    ) -> Arc<dyn Array> {
3096        let validity = unraveler.unravel_validity(array.len());
3097        let Some(validity) = validity else {
3098            return array;
3099        };
3100        if array.data_type() == &DataType::Null {
3101            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
3102            return array;
3103        }
3104        assert_eq!(validity.len(), array.len());
3105        // SAFETY: We've should have already asserted the buffers are all valid, we are just
3106        // adding null buffers to the array here
3107        make_array(unsafe {
3108            array
3109                .to_data()
3110                .into_builder()
3111                .nulls(Some(validity))
3112                .build_unchecked()
3113        })
3114    }
3115}
3116
3117impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3118    fn decode(self: Box<Self>) -> Result<DecodedArray> {
3119        let mut arrays = Vec::with_capacity(self.tasks.len());
3120        let mut unravelers = Vec::with_capacity(self.tasks.len());
3121        for task in self.tasks {
3122            let decoded = task.decode()?;
3123            unravelers.push(decoded.repdef);
3124
3125            let array = make_array(
3126                decoded
3127                    .data
3128                    .into_arrow(self.data_type.clone(), self.should_validate)?,
3129            );
3130
3131            arrays.push(array);
3132        }
3133        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3134        let array = arrow_select::concat::concat(&array_refs)?;
3135        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3136
3137        let array = Self::restore_validity(array, &mut repdef);
3138
3139        Ok(DecodedArray { array, repdef })
3140    }
3141}
3142
3143#[derive(Debug)]
3144pub struct StructuralPrimitiveFieldDecoder {
3145    field: Arc<ArrowField>,
3146    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3147    should_validate: bool,
3148    rows_drained_in_current: u64,
3149}
3150
3151impl StructuralPrimitiveFieldDecoder {
3152    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3153        Self {
3154            field: field.clone(),
3155            page_decoders: VecDeque::new(),
3156            should_validate,
3157            rows_drained_in_current: 0,
3158        }
3159    }
3160}
3161
3162impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3163    fn accept_page(&mut self, child: LoadedPage) -> Result<()> {
3164        assert!(child.path.is_empty());
3165        self.page_decoders.push_back(child.decoder);
3166        Ok(())
3167    }
3168
3169    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3170        let mut remaining = num_rows;
3171        let mut tasks = Vec::new();
3172        while remaining > 0 {
3173            let cur_page = self.page_decoders.front_mut().unwrap();
3174            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3175            let to_take = num_in_page.min(remaining);
3176
3177            let task = cur_page.drain(to_take)?;
3178            tasks.push(task);
3179
3180            if to_take == num_in_page {
3181                self.page_decoders.pop_front();
3182                self.rows_drained_in_current = 0;
3183            } else {
3184                self.rows_drained_in_current += to_take;
3185            }
3186
3187            remaining -= to_take;
3188        }
3189        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3190            tasks,
3191            should_validate: self.should_validate,
3192            data_type: self.field.data_type().clone(),
3193        }))
3194    }
3195
3196    fn data_type(&self) -> &DataType {
3197        self.field.data_type()
3198    }
3199}
3200
3201#[derive(Debug)]
3202pub struct AccumulationQueue {
3203    cache_bytes: u64,
3204    keep_original_array: bool,
3205    buffered_arrays: Vec<ArrayRef>,
3206    current_bytes: u64,
3207    // Row number of the first item in buffered_arrays, reset on flush
3208    row_number: u64,
3209    // Number of top level rows represented in buffered_arrays, reset on flush
3210    num_rows: u64,
3211    // This is only for logging / debugging purposes
3212    column_index: u32,
3213}
3214
3215impl AccumulationQueue {
3216    pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
3217        Self {
3218            cache_bytes,
3219            buffered_arrays: Vec::new(),
3220            current_bytes: 0,
3221            column_index,
3222            keep_original_array,
3223            row_number: u64::MAX,
3224            num_rows: 0,
3225        }
3226    }
3227
3228    /// Adds an array to the queue, if there is enough data then the queue is flushed
3229    /// and returned
3230    pub fn insert(
3231        &mut self,
3232        array: ArrayRef,
3233        row_number: u64,
3234        num_rows: u64,
3235    ) -> Option<(Vec<ArrayRef>, u64, u64)> {
3236        if self.row_number == u64::MAX {
3237            self.row_number = row_number;
3238        }
3239        self.num_rows += num_rows;
3240        self.current_bytes += array.get_array_memory_size() as u64;
3241        if self.current_bytes > self.cache_bytes {
3242            debug!(
3243                "Flushing column {} page of size {} bytes (unencoded)",
3244                self.column_index, self.current_bytes
3245            );
3246            // Push into buffered_arrays without copy since we are about to flush anyways
3247            self.buffered_arrays.push(array);
3248            self.current_bytes = 0;
3249            let row_number = self.row_number;
3250            self.row_number = u64::MAX;
3251            let num_rows = self.num_rows;
3252            self.num_rows = 0;
3253            Some((
3254                std::mem::take(&mut self.buffered_arrays),
3255                row_number,
3256                num_rows,
3257            ))
3258        } else {
3259            trace!(
3260                "Accumulating data for column {}.  Now at {} bytes",
3261                self.column_index,
3262                self.current_bytes
3263            );
3264            if self.keep_original_array {
3265                self.buffered_arrays.push(array);
3266            } else {
3267                self.buffered_arrays.push(deep_copy_array(array.as_ref()))
3268            }
3269            None
3270        }
3271    }
3272
3273    pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
3274        if self.buffered_arrays.is_empty() {
3275            trace!(
3276                "No final flush since no data at column {}",
3277                self.column_index
3278            );
3279            None
3280        } else {
3281            trace!(
3282                "Final flush of column {} which has {} bytes",
3283                self.column_index,
3284                self.current_bytes
3285            );
3286            self.current_bytes = 0;
3287            let row_number = self.row_number;
3288            self.row_number = u64::MAX;
3289            let num_rows = self.num_rows;
3290            self.num_rows = 0;
3291            Some((
3292                std::mem::take(&mut self.buffered_arrays),
3293                row_number,
3294                num_rows,
3295            ))
3296        }
3297    }
3298}
3299
3300pub struct PrimitiveFieldEncoder {
3301    accumulation_queue: AccumulationQueue,
3302    array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
3303    column_index: u32,
3304    field: Field,
3305    max_page_bytes: u64,
3306}
3307
3308impl PrimitiveFieldEncoder {
3309    pub fn try_new(
3310        options: &EncodingOptions,
3311        array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
3312        column_index: u32,
3313        field: Field,
3314    ) -> Result<Self> {
3315        Ok(Self {
3316            accumulation_queue: AccumulationQueue::new(
3317                options.cache_bytes_per_column,
3318                column_index,
3319                options.keep_original_array,
3320            ),
3321            column_index,
3322            max_page_bytes: options.max_page_bytes,
3323            array_encoding_strategy,
3324            field,
3325        })
3326    }
3327
3328    fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
3329        let encoder = self
3330            .array_encoding_strategy
3331            .create_array_encoder(&arrays, &self.field)?;
3332        let column_idx = self.column_index;
3333        let data_type = self.field.data_type();
3334
3335        Ok(tokio::task::spawn(async move {
3336            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
3337            let data = DataBlock::from_arrays(&arrays, num_values);
3338            let mut buffer_index = 0;
3339            let array = encoder.encode(data, &data_type, &mut buffer_index)?;
3340            let (data, description) = array.into_buffers();
3341            Ok(EncodedPage {
3342                data,
3343                description: PageEncoding::Legacy(description),
3344                num_rows: num_values,
3345                column_idx,
3346                row_number: 0, // legacy encoders do not use
3347            })
3348        })
3349        .map(|res_res| res_res.unwrap())
3350        .boxed())
3351    }
3352
3353    // Creates an encode task, consuming all buffered data
3354    fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
3355        if arrays.len() == 1 {
3356            let array = arrays.into_iter().next().unwrap();
3357            let size_bytes = array.get_buffer_memory_size();
3358            let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
3359            // Can't slice it finer than 1 page per row
3360            let num_parts = num_parts.min(array.len());
3361            if num_parts <= 1 {
3362                // One part and it fits in a page
3363                Ok(vec![self.create_encode_task(vec![array])?])
3364            } else {
3365                // One part and it needs to be sliced into multiple pages
3366
3367                // This isn't perfect (items in the array might not all have the same size)
3368                // but it's a reasonable stab for now)
3369                let mut tasks = Vec::with_capacity(num_parts);
3370                let mut offset = 0;
3371                let part_size = bit_util::ceil(array.len(), num_parts);
3372                for _ in 0..num_parts {
3373                    let avail = array.len() - offset;
3374                    if avail == 0 {
3375                        break;
3376                    }
3377                    let chunk_size = avail.min(part_size);
3378                    let part = array.slice(offset, chunk_size);
3379                    let task = self.create_encode_task(vec![part])?;
3380                    tasks.push(task);
3381                    offset += chunk_size;
3382                }
3383                Ok(tasks)
3384            }
3385        } else {
3386            // Multiple parts that (presumably) all fit in a page
3387            //
3388            // TODO: Could check here if there are any jumbo parts in the mix that need splitting
3389            Ok(vec![self.create_encode_task(arrays)?])
3390        }
3391    }
3392}
3393
3394impl FieldEncoder for PrimitiveFieldEncoder {
3395    // Buffers data, if there is enough to write a page then we create an encode task
3396    fn maybe_encode(
3397        &mut self,
3398        array: ArrayRef,
3399        _external_buffers: &mut OutOfLineBuffers,
3400        _repdef: RepDefBuilder,
3401        row_number: u64,
3402        num_rows: u64,
3403    ) -> Result<Vec<EncodeTask>> {
3404        if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
3405            Ok(self.do_flush(arrays.0)?)
3406        } else {
3407            Ok(vec![])
3408        }
3409    }
3410
3411    // If there is any data left in the buffer then create an encode task from it
3412    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
3413        if let Some(arrays) = self.accumulation_queue.flush() {
3414            Ok(self.do_flush(arrays.0)?)
3415        } else {
3416            Ok(vec![])
3417        }
3418    }
3419
3420    fn num_columns(&self) -> u32 {
3421        1
3422    }
3423
3424    fn finish(
3425        &mut self,
3426        _external_buffers: &mut OutOfLineBuffers,
3427    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
3428        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
3429    }
3430}
3431
3432/// The serialized representation of full-zip data
3433struct SerializedFullZip {
3434    /// The zipped values buffer
3435    values: LanceBuffer,
3436    /// The repetition index (only present if there is repetition)
3437    repetition_index: Option<LanceBuffer>,
3438}
3439
3440// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3441// to allow us to store a chunk size in 12 bits.
3442//
3443// If we directly record the size in bytes with 12 bits we would be limited to
3444// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3445// words we can store the # of words instead which gives us 32KiB.  We want
3446// at least 24KiB so we can handle even the worst case of
3447// - 4Ki values compressed into an 8186 byte buffer
3448// - 4 bytes to describe rep & def lengths
3449// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3450//   plan for it)
3451//
3452// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3453// values like offsets to be stored in a mini-block and safely read back out.  It also
3454// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3455// needed).
3456//
3457// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3458// the page" and "aligned to 8 bytes from the start of the file."
3459const MINIBLOCK_ALIGNMENT: usize = 8;
3460
3461/// An encoder for primitive (leaf) arrays
3462///
3463/// This encoder is fairly complicated and follows a number of paths depending
3464/// on the data.
3465///
3466/// First, we convert the validity & offsets information into repetition and
3467/// definition levels.  Then we compress the data itself into a single buffer.
3468///
3469/// If the data is narrow then we encode the data in small chunks (each chunk
3470/// should be a few disk sectors and contains a buffer of repetition, a buffer
3471/// of definition, and a buffer of value data).  This approach is called
3472/// "mini-block".  These mini-blocks are stored into a single data buffer.
3473///
3474/// If the data is wide then we zip together the repetition and definition value
3475/// with the value data into a single buffer.  This approach is called "zipped".
3476///
3477/// If there is any repetition information then we create a repetition index (TODO)
3478///
3479/// In addition, the compression process may create zero or more metadata buffers.
3480/// For example, a dictionary compression will create dictionary metadata.  Any
3481/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3482/// stored in a separate buffer on disk and read at initialization time.
3483///
3484/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3485/// at (roughly) the end of the file so there is, at most, one read per column of
3486/// metadata per file.
3487pub struct PrimitiveStructuralEncoder {
3488    // Accumulates arrays until we have enough data to justify a disk page
3489    accumulation_queue: AccumulationQueue,
3490    accumulated_repdefs: Vec<RepDefBuilder>,
3491    // The compression strategy we will use to compress the data
3492    compression_strategy: Arc<dyn CompressionStrategy>,
3493    column_index: u32,
3494    field: Field,
3495    encoding_metadata: Arc<HashMap<String, String>>,
3496}
3497
3498struct CompressedLevelsChunk {
3499    data: LanceBuffer,
3500    num_levels: u16,
3501}
3502
3503struct CompressedLevels {
3504    data: Vec<CompressedLevelsChunk>,
3505    compression: pb::ArrayEncoding,
3506    rep_index: Option<LanceBuffer>,
3507}
3508
3509struct SerializedMiniBlockPage {
3510    num_buffers: u64,
3511    data: LanceBuffer,
3512    metadata: LanceBuffer,
3513}
3514
3515impl PrimitiveStructuralEncoder {
3516    pub fn try_new(
3517        options: &EncodingOptions,
3518        compression_strategy: Arc<dyn CompressionStrategy>,
3519        column_index: u32,
3520        field: Field,
3521        encoding_metadata: Arc<HashMap<String, String>>,
3522    ) -> Result<Self> {
3523        Ok(Self {
3524            accumulation_queue: AccumulationQueue::new(
3525                options.cache_bytes_per_column,
3526                column_index,
3527                options.keep_original_array,
3528            ),
3529            accumulated_repdefs: Vec::new(),
3530            column_index,
3531            compression_strategy,
3532            field,
3533            encoding_metadata,
3534        })
3535    }
3536
3537    // TODO: This is a heuristic we may need to tune at some point
3538    //
3539    // As data gets narrow then the "zipping" process gets too expensive
3540    //   and we prefer mini-block
3541    // As data gets wide then the # of values per block shrinks (very wide)
3542    //   data doesn't even fit in a mini-block and the block overhead gets
3543    //   too large and we prefer zipped.
3544    fn is_narrow(data_block: &DataBlock) -> bool {
3545        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3546
3547        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3548            let max_len_array = max_len_array
3549                .as_any()
3550                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3551                .unwrap();
3552            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3553                return true;
3554            }
3555        }
3556        false
3557    }
3558
3559    fn prefers_miniblock(
3560        data_block: &DataBlock,
3561        encoding_metadata: &HashMap<String, String>,
3562    ) -> bool {
3563        // If the user specifically requested miniblock then use it
3564        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3565            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3566        }
3567        // Otherwise only use miniblock if it is narrow
3568        Self::is_narrow(data_block)
3569    }
3570
3571    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3572        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3573        // user specifically requested not to use it (in which case we're probably going
3574        // to emit an error)
3575        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3576            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3577        }
3578        true
3579    }
3580
3581    // Converts value data, repetition levels, and definition levels into a single
3582    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3583    // which tells us the size of each block.  Finally, if repetition is present then
3584    // we also create a buffer for the repetition index.
3585    //
3586    // Each chunk is serialized as:
3587    // | num_bufs (1 byte) | buf_lens (2 bytes per buffer) | P | buf0 | P | buf1 | ... | bufN | P |
3588    //
3589    // P - Padding inserted to ensure each buffer is 8-byte aligned and the buffer size is a multiple
3590    //     of 8 bytes (so that the next chunk is 8-byte aligned).
3591    //
3592    // Each block has a u16 word of metadata.  The upper 12 bits contain 1/6 the
3593    // # of bytes in the block (if the block does not have an even number of bytes
3594    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3595    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3596    // 0xA)  All blocks except the last must have power-of-two number of values.
3597    // This not only makes metadata smaller but it makes decoding easier since
3598    // batch sizes are typically a power of 2.  4 bits would allow us to express
3599    // up to 16Ki values but we restrict this further to 4Ki values.
3600    //
3601    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3602    //
3603    // All metadata words are serialized (as little endian) into a single buffer
3604    // of metadata values.
3605    //
3606    // If there is repetition then we also create a repetition index.  This is a
3607    // single buffer of integer vectors (stored in row major order).  There is one
3608    // entry for each chunk.  The size of the vector is based on the depth of random
3609    // access we want to support.
3610    //
3611    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3612    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3613    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3614    // levels of nested access and so on.
3615    //
3616    // The first number in the vector is the number of top-level rows that complete in
3617    // the chunk.  The second number is the number of second-level rows that complete
3618    // after the final top-level row completed (or beginning of the chunk if no top-level
3619    // row completes in the chunk).  And so on.  The final number in the vector is always
3620    // the number of leftover items not covered by earlier entries in the vector.
3621    //
3622    // Currently we are limited to 0 levels of nested access but that will change in the
3623    // future.
3624    //
3625    // The repetition index and the chunk metadata are read at initialization time and
3626    // cached in memory.
3627    fn serialize_miniblocks(
3628        miniblocks: MiniBlockCompressed,
3629        rep: Option<Vec<CompressedLevelsChunk>>,
3630        def: Option<Vec<CompressedLevelsChunk>>,
3631    ) -> SerializedMiniBlockPage {
3632        let bytes_rep = rep
3633            .as_ref()
3634            .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3635            .unwrap_or(0);
3636        let bytes_def = def
3637            .as_ref()
3638            .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3639            .unwrap_or(0);
3640        let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3641        let mut num_buffers = miniblocks.data.len();
3642        if rep.is_some() {
3643            num_buffers += 1;
3644        }
3645        if def.is_some() {
3646            num_buffers += 1;
3647        }
3648        // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer
3649        let max_extra = 9 * num_buffers;
3650        let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3651        let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3652
3653        let mut rep_iter = rep.map(|r| r.into_iter());
3654        let mut def_iter = def.map(|d| d.into_iter());
3655
3656        let mut buffer_offsets = vec![0; miniblocks.data.len()];
3657        for chunk in miniblocks.chunks {
3658            let start_pos = data_buffer.len();
3659            // Start of chunk should be aligned
3660            debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3661
3662            let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3663            let def = def_iter.as_mut().map(|d| d.next().unwrap());
3664
3665            // Write the number of levels, or 0 if there is no rep/def
3666            let num_levels = rep
3667                .as_ref()
3668                .map(|r| r.num_levels)
3669                .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3670            data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3671
3672            // Write the buffer lengths
3673            if let Some(rep) = rep.as_ref() {
3674                let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3675                data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3676            }
3677            if let Some(def) = def.as_ref() {
3678                let bytes_def = u16::try_from(def.data.len()).unwrap();
3679                data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3680            }
3681
3682            for buffer_size in &chunk.buffer_sizes {
3683                let bytes = *buffer_size;
3684                data_buffer.extend_from_slice(&bytes.to_le_bytes());
3685            }
3686
3687            // Pad
3688            let add_padding = |data_buffer: &mut Vec<u8>| {
3689                let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3690                data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3691            };
3692            add_padding(&mut data_buffer);
3693
3694            // Write the buffers themselves
3695            if let Some(rep) = rep.as_ref() {
3696                data_buffer.extend_from_slice(&rep.data);
3697                add_padding(&mut data_buffer);
3698            }
3699            if let Some(def) = def.as_ref() {
3700                data_buffer.extend_from_slice(&def.data);
3701                add_padding(&mut data_buffer);
3702            }
3703            for (buffer_size, (buffer, buffer_offset)) in chunk
3704                .buffer_sizes
3705                .iter()
3706                .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3707            {
3708                let start = *buffer_offset;
3709                let end = start + *buffer_size as usize;
3710                *buffer_offset += *buffer_size as usize;
3711                data_buffer.extend_from_slice(&buffer[start..end]);
3712                add_padding(&mut data_buffer);
3713            }
3714
3715            let chunk_bytes = data_buffer.len() - start_pos;
3716            assert!(chunk_bytes <= 16 * 1024);
3717            assert!(chunk_bytes > 0);
3718            assert_eq!(chunk_bytes % 8, 0);
3719            // We subtract 1 here from chunk_bytes because we want to be able to express
3720            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
3721            // 0xFFF
3722            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3723            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3724
3725            let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3726            meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3727        }
3728
3729        let data_buffer = LanceBuffer::Owned(data_buffer);
3730        let metadata_buffer = LanceBuffer::Owned(meta_buffer);
3731
3732        SerializedMiniBlockPage {
3733            num_buffers: miniblocks.data.len() as u64,
3734            data: data_buffer,
3735            metadata: metadata_buffer,
3736        }
3737    }
3738
3739    /// Compresses a buffer of levels into chunks
3740    ///
3741    /// If these are repetition levels then we also calculate the repetition index here (that
3742    /// is the third return value)
3743    fn compress_levels(
3744        mut levels: RepDefSlicer<'_>,
3745        num_elements: u64,
3746        compression_strategy: &dyn CompressionStrategy,
3747        chunks: &[MiniBlockChunk],
3748        // This will be 0 if we are compressing def levels
3749        max_rep: u16,
3750    ) -> Result<CompressedLevels> {
3751        let mut rep_index = if max_rep > 0 {
3752            Vec::with_capacity(chunks.len())
3753        } else {
3754            vec![]
3755        };
3756        // Make the levels into a FixedWidth data block
3757        let num_levels = levels.num_levels() as u64;
3758        let mut levels_buf = levels.all_levels().try_clone().unwrap();
3759        let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3760            data: levels_buf.borrow_and_clone(),
3761            bits_per_value: 16,
3762            num_values: num_levels,
3763            block_info: BlockInfo::new(),
3764        });
3765        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3766        // Pick a block compressor
3767        let (compressor, compressor_desc) =
3768            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3769        // Compress blocks of levels (sized according to the chunks)
3770        let mut level_chunks = Vec::with_capacity(chunks.len());
3771        let mut values_counter = 0;
3772        for (chunk_idx, chunk) in chunks.iter().enumerate() {
3773            let chunk_num_values = chunk.num_values(values_counter, num_elements);
3774            values_counter += chunk_num_values;
3775            let mut chunk_levels = if chunk_idx < chunks.len() - 1 {
3776                levels.slice_next(chunk_num_values as usize)
3777            } else {
3778                levels.slice_rest()
3779            };
3780            let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3781            if max_rep > 0 {
3782                // If max_rep > 0 then we are working with rep levels and we need
3783                // to calculate the repetition index.  The repetition index for a
3784                // chunk is currently 2 values (in the future it may be more).
3785                //
3786                // The first value is the number of rows that _finish_ in the
3787                // chunk.
3788                //
3789                // The second value is the number of "leftovers" after the last
3790                // finished row in the chunk.
3791                let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3792                let rep_values = rep_values.as_ref();
3793
3794                // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
3795                // will count it in the previous chunk)
3796                let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3797                let num_leftovers = if chunk_idx < chunks.len() - 1 {
3798                    rep_values
3799                        .iter()
3800                        .rev()
3801                        .position(|v| *v == max_rep)
3802                        // # of leftovers includes the max_rep spot
3803                        .map(|pos| pos + 1)
3804                        .unwrap_or(rep_values.len())
3805                } else {
3806                    // Last chunk can't have leftovers
3807                    0
3808                };
3809
3810                if chunk_idx != 0 && rep_values[0] == max_rep {
3811                    // This chunk starts with a new row and so, if we thought we had leftovers
3812                    // in the previous chunk, we were mistaken
3813                    // TODO: Can use unchecked here
3814                    let rep_len = rep_index.len();
3815                    if rep_index[rep_len - 1] != 0 {
3816                        // We thought we had leftovers but that was actually a full row
3817                        rep_index[rep_len - 2] += 1;
3818                        rep_index[rep_len - 1] = 0;
3819                    }
3820                }
3821
3822                if chunk_idx == chunks.len() - 1 {
3823                    // The final list
3824                    num_rows += 1;
3825                }
3826                rep_index.push(num_rows as u64);
3827                rep_index.push(num_leftovers as u64);
3828            }
3829            let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3830                data: chunk_levels,
3831                bits_per_value: 16,
3832                num_values: num_chunk_levels,
3833                block_info: BlockInfo::new(),
3834            });
3835            let compressed_levels = compressor.compress(chunk_levels_block)?;
3836            level_chunks.push(CompressedLevelsChunk {
3837                data: compressed_levels,
3838                num_levels: num_chunk_levels as u16,
3839            });
3840        }
3841        debug_assert_eq!(levels.num_levels_remaining(), 0);
3842        let rep_index = if rep_index.is_empty() {
3843            None
3844        } else {
3845            Some(LanceBuffer::reinterpret_vec(rep_index))
3846        };
3847        Ok(CompressedLevels {
3848            data: level_chunks,
3849            compression: compressor_desc,
3850            rep_index,
3851        })
3852    }
3853
3854    fn encode_simple_all_null(
3855        column_idx: u32,
3856        num_rows: u64,
3857        row_number: u64,
3858    ) -> Result<EncodedPage> {
3859        let description = ProtobufUtils::simple_all_null_layout();
3860        Ok(EncodedPage {
3861            column_idx,
3862            data: vec![],
3863            description: PageEncoding::Structural(description),
3864            num_rows,
3865            row_number,
3866        })
3867    }
3868
3869    // Encodes a page where all values are null but we have rep/def
3870    // information that we need to store (e.g. to distinguish between
3871    // different kinds of null)
3872    fn encode_complex_all_null(
3873        column_idx: u32,
3874        repdefs: Vec<RepDefBuilder>,
3875        row_number: u64,
3876        num_rows: u64,
3877    ) -> Result<EncodedPage> {
3878        let repdef = RepDefBuilder::serialize(repdefs);
3879
3880        // TODO: Actually compress repdef
3881        let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3882            LanceBuffer::reinterpret_slice(rep.clone())
3883        } else {
3884            LanceBuffer::empty()
3885        };
3886
3887        let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3888            LanceBuffer::reinterpret_slice(def.clone())
3889        } else {
3890            LanceBuffer::empty()
3891        };
3892
3893        let description = ProtobufUtils::all_null_layout(&repdef.def_meaning);
3894        Ok(EncodedPage {
3895            column_idx,
3896            data: vec![rep_bytes, def_bytes],
3897            description: PageEncoding::Structural(description),
3898            num_rows,
3899            row_number,
3900        })
3901    }
3902
3903    #[allow(clippy::too_many_arguments)]
3904    fn encode_miniblock(
3905        column_idx: u32,
3906        field: &Field,
3907        compression_strategy: &dyn CompressionStrategy,
3908        data: DataBlock,
3909        repdefs: Vec<RepDefBuilder>,
3910        row_number: u64,
3911        dictionary_data: Option<DataBlock>,
3912        num_rows: u64,
3913    ) -> Result<EncodedPage> {
3914        let repdef = RepDefBuilder::serialize(repdefs);
3915
3916        if let DataBlock::AllNull(_null_block) = data {
3917            // If we got here then all the data is null but we have rep/def information that
3918            // we need to store.
3919            todo!()
3920        }
3921
3922        // The top-level validity is encoded in repdef so we can remove it.  There may be inner
3923        // validities if we have FSL fields but those are not included in the repdef and need to
3924        // be encoded.
3925        let data = data.remove_outer_validity();
3926
3927        let num_items = data.num_values();
3928
3929        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3930        let (compressed_data, value_encoding) = compressor.compress(data)?;
3931
3932        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3933
3934        let mut compressed_rep = repdef
3935            .rep_slicer()
3936            .map(|rep_slicer| {
3937                Self::compress_levels(
3938                    rep_slicer,
3939                    num_items,
3940                    compression_strategy,
3941                    &compressed_data.chunks,
3942                    max_rep,
3943                )
3944            })
3945            .transpose()?;
3946
3947        let (rep_index, rep_index_depth) =
3948            match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3949                Some(rep_index) => (Some(rep_index.borrow_and_clone()), 1),
3950                None => (None, 0),
3951            };
3952
3953        let mut compressed_def = repdef
3954            .def_slicer()
3955            .map(|def_slicer| {
3956                Self::compress_levels(
3957                    def_slicer,
3958                    num_items,
3959                    compression_strategy,
3960                    &compressed_data.chunks,
3961                    /*max_rep=*/ 0,
3962                )
3963            })
3964            .transpose()?;
3965
3966        // TODO: Parquet sparsely encodes values here.  We could do the same but
3967        // then we won't have log2 values per chunk.  This means more metadata
3968        // and potentially more decoder asymmetry.  However, it may be worth
3969        // investigating at some point
3970
3971        let rep_data = compressed_rep
3972            .as_mut()
3973            .map(|cr| std::mem::take(&mut cr.data));
3974        let def_data = compressed_def
3975            .as_mut()
3976            .map(|cd| std::mem::take(&mut cd.data));
3977
3978        let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3979
3980        // Metadata, Data, Dictionary, (maybe) Repetition Index
3981        let mut data = Vec::with_capacity(4);
3982        data.push(serialized.metadata);
3983        data.push(serialized.data);
3984
3985        if let Some(dictionary_data) = dictionary_data {
3986            let num_dictionary_items = dictionary_data.num_values();
3987            // field in `create_block_compressor` is not used currently.
3988            let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3989
3990            let (compressor, dictionary_encoding) = compression_strategy
3991                .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3992            let dictionary_buffer = compressor.compress(dictionary_data)?;
3993
3994            data.push(dictionary_buffer);
3995            if let Some(rep_index) = rep_index {
3996                data.push(rep_index);
3997            }
3998
3999            let description = ProtobufUtils::miniblock_layout(
4000                compressed_rep.map(|cr| cr.compression),
4001                compressed_def.map(|cd| cd.compression),
4002                value_encoding,
4003                rep_index_depth,
4004                serialized.num_buffers,
4005                Some((dictionary_encoding, num_dictionary_items)),
4006                &repdef.def_meaning,
4007                num_items,
4008            );
4009            Ok(EncodedPage {
4010                num_rows,
4011                column_idx,
4012                data,
4013                description: PageEncoding::Structural(description),
4014                row_number,
4015            })
4016        } else {
4017            let description = ProtobufUtils::miniblock_layout(
4018                compressed_rep.map(|cr| cr.compression),
4019                compressed_def.map(|cd| cd.compression),
4020                value_encoding,
4021                rep_index_depth,
4022                serialized.num_buffers,
4023                None,
4024                &repdef.def_meaning,
4025                num_items,
4026            );
4027
4028            if let Some(mut rep_index) = rep_index {
4029                let view = rep_index.borrow_to_typed_slice::<u64>();
4030                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4031                debug_assert_eq!(total, num_rows);
4032
4033                data.push(rep_index);
4034            }
4035
4036            Ok(EncodedPage {
4037                num_rows,
4038                column_idx,
4039                data,
4040                description: PageEncoding::Structural(description),
4041                row_number,
4042            })
4043        }
4044    }
4045
4046    // For fixed-size data we encode < control word | data > for each value
4047    fn serialize_full_zip_fixed(
4048        fixed: FixedWidthDataBlock,
4049        mut repdef: ControlWordIterator,
4050        num_values: u64,
4051    ) -> SerializedFullZip {
4052        let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4053        let mut zipped_data = Vec::with_capacity(len);
4054
4055        let max_rep_index_val = if repdef.has_repetition() {
4056            len as u64
4057        } else {
4058            // Setting this to 0 means we won't write a repetition index
4059            0
4060        };
4061        let mut rep_index_builder =
4062            BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4063
4064        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
4065        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
4066        assert_eq!(
4067            fixed.bits_per_value % 8,
4068            0,
4069            "Non-byte aligned full-zip compression not yet supported"
4070        );
4071
4072        let bytes_per_value = fixed.bits_per_value as usize / 8;
4073
4074        let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4075        let mut offset = 0;
4076        while let Some(control) = repdef.append_next(&mut zipped_data) {
4077            if control.is_new_row {
4078                // We have finished a row
4079                debug_assert!(offset <= len);
4080                // SAFETY: We know that `start <= len`
4081                unsafe { rep_index_builder.append(offset as u64) };
4082            }
4083            if control.is_visible {
4084                let value = data_iter.next().unwrap();
4085                zipped_data.extend_from_slice(value);
4086            }
4087            offset = zipped_data.len();
4088        }
4089
4090        debug_assert_eq!(zipped_data.len(), len);
4091        // Put the final value in the rep index
4092        // SAFETY: `zipped_data.len() == len`
4093        unsafe {
4094            rep_index_builder.append(zipped_data.len() as u64);
4095        }
4096
4097        let zipped_data = LanceBuffer::Owned(zipped_data);
4098        let rep_index = rep_index_builder.into_data();
4099        let rep_index = if rep_index.is_empty() {
4100            None
4101        } else {
4102            Some(LanceBuffer::Owned(rep_index))
4103        };
4104        SerializedFullZip {
4105            values: zipped_data,
4106            repetition_index: rep_index,
4107        }
4108    }
4109
4110    // For variable-size data we encode < control word | length | data > for each value
4111    //
4112    // In addition, we create a second buffer, the repetition index
4113    fn serialize_full_zip_variable(
4114        mut variable: VariableWidthBlock,
4115        mut repdef: ControlWordIterator,
4116        num_items: u64,
4117    ) -> SerializedFullZip {
4118        let bytes_per_offset = variable.bits_per_offset as usize / 8;
4119        assert_eq!(
4120            variable.bits_per_offset % 8,
4121            0,
4122            "Only byte-aligned offsets supported"
4123        );
4124        let len = variable.data.len()
4125            + repdef.bytes_per_word() * num_items as usize
4126            + bytes_per_offset * variable.num_values as usize;
4127        let mut buf = Vec::with_capacity(len);
4128
4129        let max_rep_index_val = len as u64;
4130        let mut rep_index_builder =
4131            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4132
4133        // TODO: byte pack the item lengths with varint encoding
4134        match bytes_per_offset {
4135            4 => {
4136                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4137                let mut rep_offset = 0;
4138                let mut windows_iter = offs.as_ref().windows(2);
4139                while let Some(control) = repdef.append_next(&mut buf) {
4140                    if control.is_new_row {
4141                        // We have finished a row
4142                        debug_assert!(rep_offset <= len);
4143                        // SAFETY: We know that `buf.len() <= len`
4144                        unsafe { rep_index_builder.append(rep_offset as u64) };
4145                    }
4146                    if control.is_visible {
4147                        let window = windows_iter.next().unwrap();
4148                        if control.is_valid_item {
4149                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4150                            buf.extend_from_slice(
4151                                &variable.data[window[0] as usize..window[1] as usize],
4152                            );
4153                        }
4154                    }
4155                    rep_offset = buf.len();
4156                }
4157            }
4158            8 => {
4159                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4160                let mut rep_offset = 0;
4161                let mut windows_iter = offs.as_ref().windows(2);
4162                while let Some(control) = repdef.append_next(&mut buf) {
4163                    if control.is_new_row {
4164                        // We have finished a row
4165                        debug_assert!(rep_offset <= len);
4166                        // SAFETY: We know that `buf.len() <= len`
4167                        unsafe { rep_index_builder.append(rep_offset as u64) };
4168                    }
4169                    if control.is_visible {
4170                        let window = windows_iter.next().unwrap();
4171                        if control.is_valid_item {
4172                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4173                            buf.extend_from_slice(
4174                                &variable.data[window[0] as usize..window[1] as usize],
4175                            );
4176                        }
4177                    }
4178                    rep_offset = buf.len();
4179                }
4180            }
4181            _ => panic!("Unsupported offset size"),
4182        }
4183
4184        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
4185        // if we are over `len` then we have a bug.
4186        debug_assert!(buf.len() <= len);
4187        // Put the final value in the rep index
4188        // SAFETY: `zipped_data.len() == len`
4189        unsafe {
4190            rep_index_builder.append(buf.len() as u64);
4191        }
4192
4193        let zipped_data = LanceBuffer::Owned(buf);
4194        let rep_index = rep_index_builder.into_data();
4195        debug_assert!(!rep_index.is_empty());
4196        let rep_index = Some(LanceBuffer::Owned(rep_index));
4197        SerializedFullZip {
4198            values: zipped_data,
4199            repetition_index: rep_index,
4200        }
4201    }
4202
4203    /// Serializes data into a single buffer according to the full-zip format which zips
4204    /// together the repetition, definition, and value data into a single buffer.
4205    fn serialize_full_zip(
4206        compressed_data: PerValueDataBlock,
4207        repdef: ControlWordIterator,
4208        num_items: u64,
4209    ) -> SerializedFullZip {
4210        match compressed_data {
4211            PerValueDataBlock::Fixed(fixed) => {
4212                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4213            }
4214            PerValueDataBlock::Variable(var) => {
4215                Self::serialize_full_zip_variable(var, repdef, num_items)
4216            }
4217        }
4218    }
4219
4220    fn encode_full_zip(
4221        column_idx: u32,
4222        field: &Field,
4223        compression_strategy: &dyn CompressionStrategy,
4224        data: DataBlock,
4225        repdefs: Vec<RepDefBuilder>,
4226        row_number: u64,
4227        num_lists: u64,
4228    ) -> Result<EncodedPage> {
4229        let repdef = RepDefBuilder::serialize(repdefs);
4230        let max_rep = repdef
4231            .repetition_levels
4232            .as_ref()
4233            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4234        let max_def = repdef
4235            .definition_levels
4236            .as_ref()
4237            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4238
4239        // The top-level validity is encoded in repdef so we can remove it
4240        let data = data.remove_outer_validity();
4241
4242        // To handle FSL we just flatten
4243        // let data = data.flatten();
4244
4245        let (num_items, num_visible_items) =
4246            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4247                // If there are rep levels there may be "invisible" items and we need to encode
4248                // rep_levels.len() things which might be larger than data.num_values()
4249                (rep_levels.len() as u64, data.num_values())
4250            } else {
4251                // If there are no rep levels then we encode data.num_values() things
4252                (data.num_values(), data.num_values())
4253            };
4254
4255        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4256
4257        let repdef_iter = build_control_word_iterator(
4258            repdef.repetition_levels.as_deref(),
4259            max_rep,
4260            repdef.definition_levels.as_deref(),
4261            max_def,
4262            max_visible_def,
4263            num_items as usize,
4264        );
4265        let bits_rep = repdef_iter.bits_rep();
4266        let bits_def = repdef_iter.bits_def();
4267
4268        let compressor = compression_strategy.create_per_value(field, &data)?;
4269        let (compressed_data, value_encoding) = compressor.compress(data)?;
4270
4271        let description = match &compressed_data {
4272            PerValueDataBlock::Fixed(fixed) => ProtobufUtils::fixed_full_zip_layout(
4273                bits_rep,
4274                bits_def,
4275                fixed.bits_per_value as u32,
4276                value_encoding,
4277                &repdef.def_meaning,
4278                num_items as u32,
4279                num_visible_items as u32,
4280            ),
4281            PerValueDataBlock::Variable(variable) => ProtobufUtils::variable_full_zip_layout(
4282                bits_rep,
4283                bits_def,
4284                variable.bits_per_offset as u32,
4285                value_encoding,
4286                &repdef.def_meaning,
4287                num_items as u32,
4288                num_visible_items as u32,
4289            ),
4290        };
4291
4292        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4293
4294        let data = if let Some(repindex) = zipped.repetition_index {
4295            vec![zipped.values, repindex]
4296        } else {
4297            vec![zipped.values]
4298        };
4299
4300        Ok(EncodedPage {
4301            num_rows: num_lists,
4302            column_idx,
4303            data,
4304            description: PageEncoding::Structural(description),
4305            row_number,
4306        })
4307    }
4308
4309    fn dictionary_encode(mut data_block: DataBlock, cardinality: u64) -> (DataBlock, DataBlock) {
4310        match data_block {
4311            DataBlock::FixedWidth(ref mut fixed_width_data_block) => {
4312                // Currently FixedWidth DataBlock with only bits_per_value 128 has cardinality
4313                // TODO: a follow up PR to support `FixedWidth DataBlock with bits_per_value == 256`.
4314                let mut map = HashMap::new();
4315                let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::<u128>();
4316                let u128_slice = u128_slice.as_ref();
4317                let mut dictionary_buffer = Vec::with_capacity(cardinality as usize);
4318                let mut indices_buffer =
4319                    Vec::with_capacity(fixed_width_data_block.num_values as usize);
4320                let mut curr_idx: u8 = 0;
4321                u128_slice.iter().for_each(|&value| {
4322                    let idx = *map.entry(value).or_insert_with(|| {
4323                        dictionary_buffer.push(value);
4324                        curr_idx += 1;
4325                        curr_idx - 1
4326                    });
4327                    indices_buffer.push(idx);
4328                });
4329                let dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4330                    data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4331                    bits_per_value: 128,
4332                    num_values: curr_idx as u64,
4333                    block_info: BlockInfo::default(),
4334                });
4335                let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4336                    data: LanceBuffer::reinterpret_vec(indices_buffer),
4337                    bits_per_value: 8,
4338                    num_values: fixed_width_data_block.num_values,
4339                    block_info: BlockInfo::default(),
4340                });
4341                // Todo: if we decide to do eager statistics computing, wrap statistics computing
4342                // in DataBlock constructor.
4343                indices_data_block.compute_stat();
4344
4345                (indices_data_block, dictionary_data_block)
4346            }
4347            DataBlock::VariableWidth(ref mut variable_width_data_block) => {
4348                match variable_width_data_block.bits_per_offset {
4349                    32 => {
4350                        let mut map: HashMap<U8SliceKey, u8> = HashMap::new();
4351                        let offsets = variable_width_data_block
4352                            .offsets
4353                            .borrow_to_typed_slice::<u32>();
4354                        let offsets = offsets.as_ref();
4355
4356                        let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
4357                            "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
4358                        );
4359                        let max_len = max_len.as_primitive::<UInt64Type>().value(0);
4360
4361                        let mut dictionary_buffer: Vec<u8> =
4362                            Vec::with_capacity((max_len * cardinality) as usize);
4363                        let mut dictionary_offsets_buffer = vec![0];
4364                        let mut curr_idx = 0;
4365                        let mut indices_buffer =
4366                            Vec::with_capacity(variable_width_data_block.num_values as usize);
4367
4368                        offsets
4369                            .iter()
4370                            .zip(offsets.iter().skip(1))
4371                            .for_each(|(&start, &end)| {
4372                                let key =
4373                                    &variable_width_data_block.data[start as usize..end as usize];
4374                                let idx = *map.entry(U8SliceKey(key)).or_insert_with(|| {
4375                                    dictionary_buffer.extend_from_slice(key);
4376                                    dictionary_offsets_buffer.push(dictionary_buffer.len() as u32);
4377                                    curr_idx += 1;
4378                                    curr_idx - 1
4379                                });
4380                                indices_buffer.push(idx);
4381                            });
4382
4383                        let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
4384                            data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4385                            offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
4386                            bits_per_offset: 32,
4387                            num_values: curr_idx as u64,
4388                            block_info: BlockInfo::default(),
4389                        });
4390
4391                        let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4392                            data: LanceBuffer::Owned(indices_buffer),
4393                            bits_per_value: 8,
4394                            num_values: variable_width_data_block.num_values,
4395                            block_info: BlockInfo::default(),
4396                        });
4397                        // Todo: if we decide to do eager statistics computing, wrap statistics computing
4398                        // in DataBlock constructor.
4399                        indices_data_block.compute_stat();
4400
4401                        (indices_data_block, dictionary_data_block)
4402                    }
4403                    64 => {
4404                        todo!("A follow up PR to support dictionary encoding with dictionary type `VariableWidth DataBlock` with bits_per_offset 64");
4405                    }
4406                    _ => {
4407                        unreachable!()
4408                    }
4409                }
4410            }
4411            _ => {
4412                unreachable!("dictionary encode called with data block {:?}", data_block)
4413            }
4414        }
4415    }
4416
4417    // Creates an encode task, consuming all buffered data
4418    fn do_flush(
4419        &mut self,
4420        arrays: Vec<ArrayRef>,
4421        repdefs: Vec<RepDefBuilder>,
4422        row_number: u64,
4423        num_rows: u64,
4424    ) -> Result<Vec<EncodeTask>> {
4425        let column_idx = self.column_index;
4426        let compression_strategy = self.compression_strategy.clone();
4427        let field = self.field.clone();
4428        let encoding_metadata = self.encoding_metadata.clone();
4429        let task = spawn_cpu(move || {
4430            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4431            if num_values == 0 {
4432                // We should not encode empty arrays.  So if we get here that should mean that we
4433                // either have all empty lists or all null lists (or a mix).  We still need to encode
4434                // the rep/def information but we can skip the data encoding.
4435                return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4436            }
4437            let num_nulls = arrays
4438                .iter()
4439                .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4440                .sum::<u64>();
4441
4442            if num_values == num_nulls {
4443                if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4444                    log::debug!(
4445                        "Encoding column {} with {} items using simple-null layout",
4446                        column_idx,
4447                        num_values
4448                    );
4449                    // Simple case, no rep/def and all nulls, we don't need to encode any data
4450                    Self::encode_simple_all_null(column_idx, num_values, row_number)
4451                } else {
4452                    // If we get here then we have definition levels (presumably due to FSL) and
4453                    // we need to store those
4454                    Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4455                }
4456            } else {
4457                let data_block = DataBlock::from_arrays(&arrays, num_values);
4458
4459                // if the `data_block` is a `StructDataBlock`, then this is a struct with packed struct encoding.
4460                if let DataBlock::Struct(ref struct_data_block) = data_block {
4461                    if struct_data_block
4462                        .children
4463                        .iter()
4464                        .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
4465                    {
4466                        panic!("packed struct encoding currently only supports fixed-width fields.")
4467                    }
4468                }
4469
4470                let dictionary_encoding_threshold: u64 = 100.max(data_block.num_values() / 4);
4471                let cardinality =
4472                    if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4473                        cardinality_array.as_primitive::<UInt64Type>().value(0)
4474                    } else {
4475                        u64::MAX
4476                    };
4477
4478                // The triggering threshold for dictionary encoding can be further tuned.
4479                if cardinality <= dictionary_encoding_threshold
4480                    && data_block.num_values() >= 10 * cardinality
4481                {
4482                    let (indices_data_block, dictionary_data_block) =
4483                        Self::dictionary_encode(data_block, cardinality);
4484                    Self::encode_miniblock(
4485                        column_idx,
4486                        &field,
4487                        compression_strategy.as_ref(),
4488                        indices_data_block,
4489                        repdefs,
4490                        row_number,
4491                        Some(dictionary_data_block),
4492                        num_rows,
4493                    )
4494                } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4495                    log::debug!(
4496                        "Encoding column {} with {} items using mini-block layout",
4497                        column_idx,
4498                        num_values
4499                    );
4500                    Self::encode_miniblock(
4501                        column_idx,
4502                        &field,
4503                        compression_strategy.as_ref(),
4504                        data_block,
4505                        repdefs,
4506                        row_number,
4507                        None,
4508                        num_rows,
4509                    )
4510                } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4511                    log::debug!(
4512                        "Encoding column {} with {} items using full-zip layout",
4513                        column_idx,
4514                        num_values
4515                    );
4516                    Self::encode_full_zip(
4517                        column_idx,
4518                        &field,
4519                        compression_strategy.as_ref(),
4520                        data_block,
4521                        repdefs,
4522                        row_number,
4523                        num_rows,
4524                    )
4525                } else {
4526                    Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4527                }
4528            }
4529        })
4530        .boxed();
4531        Ok(vec![task])
4532    }
4533
4534    fn extract_validity_buf(array: &dyn Array, repdef: &mut RepDefBuilder) {
4535        if let Some(validity) = array.nulls() {
4536            repdef.add_validity_bitmap(validity.clone());
4537        } else {
4538            repdef.add_no_null(array.len());
4539        }
4540    }
4541
4542    fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder) {
4543        match array.data_type() {
4544            DataType::Null => {
4545                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4546            }
4547            DataType::Dictionary(_, _) => {
4548                unreachable!()
4549            }
4550            // Extract our validity buf but NOT any child validity bufs. (they will be encoded in
4551            // as part of the values).  Note: for FSL we do not use repdef.add_fsl because we do
4552            // NOT want to increase the repdef depth.
4553            //
4554            // This would be quite catasrophic for something like vector embeddings.  Imagine we
4555            // had thousands of vectors and some were null but no vector contained null items.  If
4556            // we treated the vectors (primitive FSL) like we treat structural FSL we would end up
4557            // with a rep/def value for every single item in the vector.
4558            _ => Self::extract_validity_buf(array, repdef),
4559        }
4560    }
4561}
4562
4563impl FieldEncoder for PrimitiveStructuralEncoder {
4564    // Buffers data, if there is enough to write a page then we create an encode task
4565    fn maybe_encode(
4566        &mut self,
4567        array: ArrayRef,
4568        _external_buffers: &mut OutOfLineBuffers,
4569        mut repdef: RepDefBuilder,
4570        row_number: u64,
4571        num_rows: u64,
4572    ) -> Result<Vec<EncodeTask>> {
4573        Self::extract_validity(array.as_ref(), &mut repdef);
4574        self.accumulated_repdefs.push(repdef);
4575
4576        if let Some((arrays, row_number, num_rows)) =
4577            self.accumulation_queue.insert(array, row_number, num_rows)
4578        {
4579            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4580            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4581        } else {
4582            Ok(vec![])
4583        }
4584    }
4585
4586    // If there is any data left in the buffer then create an encode task from it
4587    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4588        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4589            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4590            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4591        } else {
4592            Ok(vec![])
4593        }
4594    }
4595
4596    fn num_columns(&self) -> u32 {
4597        1
4598    }
4599
4600    fn finish(
4601        &mut self,
4602        _external_buffers: &mut OutOfLineBuffers,
4603    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4604        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4605    }
4606}
4607
4608#[cfg(test)]
4609#[allow(clippy::single_range_in_vec_init)]
4610mod tests {
4611    use std::{collections::VecDeque, sync::Arc};
4612
4613    use arrow_array::{ArrayRef, Int8Array, StringArray};
4614
4615    use crate::encodings::logical::primitive::{
4616        ChunkDrainInstructions, PrimitiveStructuralEncoder,
4617    };
4618
4619    use super::{
4620        ChunkInstructions, DataBlock, DecodeMiniBlockTask, PreambleAction, RepetitionIndex,
4621    };
4622
4623    #[test]
4624    fn test_is_narrow() {
4625        let int8_array = Int8Array::from(vec![1, 2, 3]);
4626        let array_ref: ArrayRef = Arc::new(int8_array);
4627        let block = DataBlock::from_array(array_ref);
4628
4629        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4630
4631        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4632        let block = DataBlock::from_array(string_array);
4633        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4634
4635        let string_array = StringArray::from(vec![
4636            Some("hello world".repeat(100)),
4637            Some("world".to_string()),
4638        ]);
4639        let block = DataBlock::from_array(string_array);
4640        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4641    }
4642
4643    #[test]
4644    fn test_map_range() {
4645        // Null in the middle
4646        // [[A, B, C], [D, E], NULL, [F, G, H]]
4647        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4648        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4649        let max_visible_def = 0;
4650        let total_items = 8;
4651        let max_rep = 1;
4652
4653        let check = |range, expected_item_range, expected_level_range| {
4654            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4655                range,
4656                rep.as_ref(),
4657                def.as_ref(),
4658                max_rep,
4659                max_visible_def,
4660                total_items,
4661                PreambleAction::Absent,
4662            );
4663            assert_eq!(item_range, expected_item_range);
4664            assert_eq!(level_range, expected_level_range);
4665        };
4666
4667        check(0..1, 0..3, 0..3);
4668        check(1..2, 3..5, 3..5);
4669        check(2..3, 5..5, 5..6);
4670        check(3..4, 5..8, 6..9);
4671        check(0..2, 0..5, 0..5);
4672        check(1..3, 3..5, 3..6);
4673        check(2..4, 5..8, 5..9);
4674        check(0..3, 0..5, 0..6);
4675        check(1..4, 3..8, 3..9);
4676        check(0..4, 0..8, 0..9);
4677
4678        // Null at start
4679        // [NULL, [A, B], [C]]
4680        let rep = Some(vec![1, 1, 0, 1]);
4681        let def = Some(vec![1, 0, 0, 0]);
4682        let max_visible_def = 0;
4683        let total_items = 3;
4684
4685        let check = |range, expected_item_range, expected_level_range| {
4686            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4687                range,
4688                rep.as_ref(),
4689                def.as_ref(),
4690                max_rep,
4691                max_visible_def,
4692                total_items,
4693                PreambleAction::Absent,
4694            );
4695            assert_eq!(item_range, expected_item_range);
4696            assert_eq!(level_range, expected_level_range);
4697        };
4698
4699        check(0..1, 0..0, 0..1);
4700        check(1..2, 0..2, 1..3);
4701        check(2..3, 2..3, 3..4);
4702        check(0..2, 0..2, 0..3);
4703        check(1..3, 0..3, 1..4);
4704        check(0..3, 0..3, 0..4);
4705
4706        // Null at end
4707        // [[A], [B, C], NULL]
4708        let rep = Some(vec![1, 1, 0, 1]);
4709        let def = Some(vec![0, 0, 0, 1]);
4710        let max_visible_def = 0;
4711        let total_items = 3;
4712
4713        let check = |range, expected_item_range, expected_level_range| {
4714            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4715                range,
4716                rep.as_ref(),
4717                def.as_ref(),
4718                max_rep,
4719                max_visible_def,
4720                total_items,
4721                PreambleAction::Absent,
4722            );
4723            assert_eq!(item_range, expected_item_range);
4724            assert_eq!(level_range, expected_level_range);
4725        };
4726
4727        check(0..1, 0..1, 0..1);
4728        check(1..2, 1..3, 1..3);
4729        check(2..3, 3..3, 3..4);
4730        check(0..2, 0..3, 0..3);
4731        check(1..3, 1..3, 1..4);
4732        check(0..3, 0..3, 0..4);
4733
4734        // No nulls, with repetition
4735        // [[A, B], [C, D], [E, F]]
4736        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4737        let def: Option<&[u16]> = None;
4738        let max_visible_def = 0;
4739        let total_items = 6;
4740
4741        let check = |range, expected_item_range, expected_level_range| {
4742            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4743                range,
4744                rep.as_ref(),
4745                def.as_ref(),
4746                max_rep,
4747                max_visible_def,
4748                total_items,
4749                PreambleAction::Absent,
4750            );
4751            assert_eq!(item_range, expected_item_range);
4752            assert_eq!(level_range, expected_level_range);
4753        };
4754
4755        check(0..1, 0..2, 0..2);
4756        check(1..2, 2..4, 2..4);
4757        check(2..3, 4..6, 4..6);
4758        check(0..2, 0..4, 0..4);
4759        check(1..3, 2..6, 2..6);
4760        check(0..3, 0..6, 0..6);
4761
4762        // No repetition, with nulls (this case is trivial)
4763        // [A, B, NULL, C]
4764        let rep: Option<&[u16]> = None;
4765        let def = Some(vec![0, 0, 1, 0]);
4766        let max_visible_def = 1;
4767        let total_items = 4;
4768
4769        let check = |range, expected_item_range, expected_level_range| {
4770            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4771                range,
4772                rep.as_ref(),
4773                def.as_ref(),
4774                max_rep,
4775                max_visible_def,
4776                total_items,
4777                PreambleAction::Absent,
4778            );
4779            assert_eq!(item_range, expected_item_range);
4780            assert_eq!(level_range, expected_level_range);
4781        };
4782
4783        check(0..1, 0..1, 0..1);
4784        check(1..2, 1..2, 1..2);
4785        check(2..3, 2..3, 2..3);
4786        check(0..2, 0..2, 0..2);
4787        check(1..3, 1..3, 1..3);
4788        check(0..3, 0..3, 0..3);
4789
4790        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
4791        // [[..., A] [B, C], NULL]
4792        //
4793        // What we do will depend on the preamble action
4794        let rep = Some(vec![0, 1, 0, 1]);
4795        let def = Some(vec![0, 0, 0, 1]);
4796        let max_visible_def = 0;
4797        let total_items = 3;
4798
4799        let check = |range, expected_item_range, expected_level_range| {
4800            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4801                range,
4802                rep.as_ref(),
4803                def.as_ref(),
4804                max_rep,
4805                max_visible_def,
4806                total_items,
4807                PreambleAction::Take,
4808            );
4809            assert_eq!(item_range, expected_item_range);
4810            assert_eq!(level_range, expected_level_range);
4811        };
4812
4813        // If we are taking the preamble then the range must start at 0
4814        check(0..1, 0..3, 0..3);
4815        check(0..2, 0..3, 0..4);
4816
4817        let check = |range, expected_item_range, expected_level_range| {
4818            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4819                range,
4820                rep.as_ref(),
4821                def.as_ref(),
4822                max_rep,
4823                max_visible_def,
4824                total_items,
4825                PreambleAction::Skip,
4826            );
4827            assert_eq!(item_range, expected_item_range);
4828            assert_eq!(level_range, expected_level_range);
4829        };
4830
4831        check(0..1, 1..3, 1..3);
4832        check(1..2, 3..3, 3..4);
4833        check(0..2, 1..3, 1..4);
4834
4835        // Another preamble case but now it doesn't end with a new list
4836        // [[..., A], NULL, [D, E]]
4837        //
4838        // What we do will depend on the preamble action
4839        let rep = Some(vec![0, 1, 1, 0]);
4840        let def = Some(vec![0, 1, 0, 0]);
4841        let max_visible_def = 0;
4842        let total_items = 4;
4843
4844        let check = |range, expected_item_range, expected_level_range| {
4845            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4846                range,
4847                rep.as_ref(),
4848                def.as_ref(),
4849                max_rep,
4850                max_visible_def,
4851                total_items,
4852                PreambleAction::Take,
4853            );
4854            assert_eq!(item_range, expected_item_range);
4855            assert_eq!(level_range, expected_level_range);
4856        };
4857
4858        // If we are taking the preamble then the range must start at 0
4859        check(0..1, 0..1, 0..2);
4860        check(0..2, 0..3, 0..4);
4861
4862        let check = |range, expected_item_range, expected_level_range| {
4863            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4864                range,
4865                rep.as_ref(),
4866                def.as_ref(),
4867                max_rep,
4868                max_visible_def,
4869                total_items,
4870                PreambleAction::Skip,
4871            );
4872            assert_eq!(item_range, expected_item_range);
4873            assert_eq!(level_range, expected_level_range);
4874        };
4875
4876        // If we are taking the preamble then the range must start at 0
4877        check(0..1, 1..1, 1..2);
4878        check(1..2, 1..3, 2..4);
4879        check(0..2, 1..3, 1..4);
4880
4881        // Now a preamble case without any definition levels
4882        // [[..., A] [B, C], [D]]
4883        let rep = Some(vec![0, 1, 0, 1]);
4884        let def: Option<Vec<u16>> = None;
4885        let max_visible_def = 0;
4886        let total_items = 4;
4887
4888        let check = |range, expected_item_range, expected_level_range| {
4889            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4890                range,
4891                rep.as_ref(),
4892                def.as_ref(),
4893                max_rep,
4894                max_visible_def,
4895                total_items,
4896                PreambleAction::Take,
4897            );
4898            assert_eq!(item_range, expected_item_range);
4899            assert_eq!(level_range, expected_level_range);
4900        };
4901
4902        // If we are taking the preamble then the range must start at 0
4903        check(0..1, 0..3, 0..3);
4904        check(0..2, 0..4, 0..4);
4905
4906        let check = |range, expected_item_range, expected_level_range| {
4907            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4908                range,
4909                rep.as_ref(),
4910                def.as_ref(),
4911                max_rep,
4912                max_visible_def,
4913                total_items,
4914                PreambleAction::Skip,
4915            );
4916            assert_eq!(item_range, expected_item_range);
4917            assert_eq!(level_range, expected_level_range);
4918        };
4919
4920        check(0..1, 1..3, 1..3);
4921        check(1..2, 3..4, 3..4);
4922        check(0..2, 1..4, 1..4);
4923    }
4924
4925    #[test]
4926    fn test_schedule_instructions() {
4927        let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4928        let repetition_index = RepetitionIndex::decode(&repetition_index);
4929
4930        let check = |user_ranges, expected_instructions| {
4931            let instructions =
4932                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4933            assert_eq!(instructions, expected_instructions);
4934        };
4935
4936        // The instructions we expect if we're grabbing the whole range
4937        let expected_take_all = vec![
4938            ChunkInstructions {
4939                chunk_idx: 0,
4940                preamble: PreambleAction::Absent,
4941                rows_to_skip: 0,
4942                rows_to_take: 5,
4943                take_trailer: true,
4944            },
4945            ChunkInstructions {
4946                chunk_idx: 1,
4947                preamble: PreambleAction::Take,
4948                rows_to_skip: 0,
4949                rows_to_take: 2,
4950                take_trailer: false,
4951            },
4952            ChunkInstructions {
4953                chunk_idx: 2,
4954                preamble: PreambleAction::Absent,
4955                rows_to_skip: 0,
4956                rows_to_take: 4,
4957                take_trailer: true,
4958            },
4959            ChunkInstructions {
4960                chunk_idx: 3,
4961                preamble: PreambleAction::Take,
4962                rows_to_skip: 0,
4963                rows_to_take: 1,
4964                take_trailer: false,
4965            },
4966        ];
4967
4968        // Take all as 1 range
4969        check(&[0..14], expected_take_all.clone());
4970
4971        // Take all a individual rows
4972        check(
4973            &[
4974                0..1,
4975                1..2,
4976                2..3,
4977                3..4,
4978                4..5,
4979                5..6,
4980                6..7,
4981                7..8,
4982                8..9,
4983                9..10,
4984                10..11,
4985                11..12,
4986                12..13,
4987                13..14,
4988            ],
4989            expected_take_all,
4990        );
4991
4992        // Test some partial takes
4993
4994        // 2 rows in the same chunk but not contiguous
4995        check(
4996            &[0..1, 3..4],
4997            vec![
4998                ChunkInstructions {
4999                    chunk_idx: 0,
5000                    preamble: PreambleAction::Absent,
5001                    rows_to_skip: 0,
5002                    rows_to_take: 1,
5003                    take_trailer: false,
5004                },
5005                ChunkInstructions {
5006                    chunk_idx: 0,
5007                    preamble: PreambleAction::Absent,
5008                    rows_to_skip: 3,
5009                    rows_to_take: 1,
5010                    take_trailer: false,
5011                },
5012            ],
5013        );
5014
5015        // Taking just a trailer/preamble
5016        check(
5017            &[5..6],
5018            vec![
5019                ChunkInstructions {
5020                    chunk_idx: 0,
5021                    preamble: PreambleAction::Absent,
5022                    rows_to_skip: 5,
5023                    rows_to_take: 0,
5024                    take_trailer: true,
5025                },
5026                ChunkInstructions {
5027                    chunk_idx: 1,
5028                    preamble: PreambleAction::Take,
5029                    rows_to_skip: 0,
5030                    rows_to_take: 0,
5031                    take_trailer: false,
5032                },
5033            ],
5034        );
5035
5036        // Skipping an entire chunk
5037        check(
5038            &[7..10],
5039            vec![
5040                ChunkInstructions {
5041                    chunk_idx: 1,
5042                    preamble: PreambleAction::Skip,
5043                    rows_to_skip: 1,
5044                    rows_to_take: 1,
5045                    take_trailer: false,
5046                },
5047                ChunkInstructions {
5048                    chunk_idx: 2,
5049                    preamble: PreambleAction::Absent,
5050                    rows_to_skip: 0,
5051                    rows_to_take: 2,
5052                    take_trailer: false,
5053                },
5054            ],
5055        );
5056    }
5057
5058    #[test]
5059    fn test_drain_instructions() {
5060        fn drain_from_instructions(
5061            instructions: &mut VecDeque<ChunkInstructions>,
5062            mut rows_desired: u64,
5063            need_preamble: &mut bool,
5064            skip_in_chunk: &mut u64,
5065        ) -> Vec<ChunkDrainInstructions> {
5066            // Note: instructions.len() is an upper bound, we typically take much fewer
5067            let mut drain_instructions = Vec::with_capacity(instructions.len());
5068            while rows_desired > 0 || *need_preamble {
5069                let (next_instructions, consumed_chunk) = instructions
5070                    .front()
5071                    .unwrap()
5072                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5073                if consumed_chunk {
5074                    instructions.pop_front();
5075                }
5076                drain_instructions.push(next_instructions);
5077            }
5078            drain_instructions
5079        }
5080
5081        let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
5082        let repetition_index = RepetitionIndex::decode(&repetition_index);
5083        let user_ranges = vec![1..7, 10..14];
5084
5085        // First, schedule the ranges
5086        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5087
5088        let mut to_drain = VecDeque::from(scheduled.clone());
5089
5090        // Now we drain in batches of 4
5091
5092        let mut need_preamble = false;
5093        let mut skip_in_chunk = 0;
5094
5095        let next_batch =
5096            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5097
5098        assert!(!need_preamble);
5099        assert_eq!(skip_in_chunk, 4);
5100        assert_eq!(
5101            next_batch,
5102            vec![ChunkDrainInstructions {
5103                chunk_instructions: scheduled[0].clone(),
5104                rows_to_take: 4,
5105                rows_to_skip: 0,
5106                preamble_action: PreambleAction::Absent,
5107            }]
5108        );
5109
5110        let next_batch =
5111            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5112
5113        assert!(!need_preamble);
5114        assert_eq!(skip_in_chunk, 2);
5115
5116        assert_eq!(
5117            next_batch,
5118            vec![
5119                ChunkDrainInstructions {
5120                    chunk_instructions: scheduled[0].clone(),
5121                    rows_to_take: 1,
5122                    rows_to_skip: 4,
5123                    preamble_action: PreambleAction::Absent,
5124                },
5125                ChunkDrainInstructions {
5126                    chunk_instructions: scheduled[1].clone(),
5127                    rows_to_take: 1,
5128                    rows_to_skip: 0,
5129                    preamble_action: PreambleAction::Take,
5130                },
5131                ChunkDrainInstructions {
5132                    chunk_instructions: scheduled[2].clone(),
5133                    rows_to_take: 2,
5134                    rows_to_skip: 0,
5135                    preamble_action: PreambleAction::Absent,
5136                }
5137            ]
5138        );
5139
5140        let next_batch =
5141            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5142
5143        assert!(!need_preamble);
5144        assert_eq!(skip_in_chunk, 0);
5145
5146        assert_eq!(
5147            next_batch,
5148            vec![
5149                ChunkDrainInstructions {
5150                    chunk_instructions: scheduled[2].clone(),
5151                    rows_to_take: 1,
5152                    rows_to_skip: 2,
5153                    preamble_action: PreambleAction::Absent,
5154                },
5155                ChunkDrainInstructions {
5156                    chunk_instructions: scheduled[3].clone(),
5157                    rows_to_take: 1,
5158                    rows_to_skip: 0,
5159                    preamble_action: PreambleAction::Take,
5160                },
5161            ]
5162        );
5163
5164        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
5165        let repetition_index = vec![vec![5, 2], vec![3, 3], vec![20, 0]];
5166        let repetition_index = RepetitionIndex::decode(&repetition_index);
5167        let user_ranges = vec![0..28];
5168
5169        // First, schedule the ranges
5170        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5171
5172        let mut to_drain = VecDeque::from(scheduled.clone());
5173
5174        // Drain first chunk and some of second chunk
5175
5176        let mut need_preamble = false;
5177        let mut skip_in_chunk = 0;
5178
5179        let next_batch =
5180            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5181
5182        assert_eq!(
5183            next_batch,
5184            vec![
5185                ChunkDrainInstructions {
5186                    chunk_instructions: scheduled[0].clone(),
5187                    rows_to_take: 6,
5188                    rows_to_skip: 0,
5189                    preamble_action: PreambleAction::Absent,
5190                },
5191                ChunkDrainInstructions {
5192                    chunk_instructions: scheduled[1].clone(),
5193                    rows_to_take: 1,
5194                    rows_to_skip: 0,
5195                    preamble_action: PreambleAction::Take,
5196                },
5197            ]
5198        );
5199
5200        assert!(!need_preamble);
5201        assert_eq!(skip_in_chunk, 1);
5202
5203        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
5204        // we get a drain task to take the preamble of the third chunk (and nothing else)
5205        let next_batch =
5206            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5207
5208        assert_eq!(
5209            next_batch,
5210            vec![
5211                ChunkDrainInstructions {
5212                    chunk_instructions: scheduled[1].clone(),
5213                    rows_to_take: 2,
5214                    rows_to_skip: 1,
5215                    preamble_action: PreambleAction::Skip,
5216                },
5217                ChunkDrainInstructions {
5218                    chunk_instructions: scheduled[2].clone(),
5219                    rows_to_take: 0,
5220                    rows_to_skip: 0,
5221                    preamble_action: PreambleAction::Take,
5222                },
5223            ]
5224        );
5225
5226        assert!(!need_preamble);
5227        assert_eq!(skip_in_chunk, 0);
5228    }
5229}