lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into [`self::FieldScheduler`] and [`self::PageScheduler`].
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into [`self::PhysicalPageDecoder`] and
27//! [`self::LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type List<UInt32>
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
238use crate::data::DataBlock;
239use crate::encoder::EncodedBatch;
240use crate::encodings::logical::list::StructuralListScheduler;
241use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
242use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
243use crate::format::pb::{self, column_encoding};
244use crate::format::pb21;
245use crate::previous::decoder::LogicalPageDecoder;
246use crate::previous::encodings::logical::list::OffsetPageInfo;
247use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
248use crate::previous::encodings::logical::{
249    binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
250    primitive::PrimitiveFieldScheduler,
251};
252use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
253use crate::version::LanceFileVersion;
254use crate::{BufferScheduler, EncodingsIo};
255
256// If users are getting batches over 10MiB large then it's time to reduce the batch size
257const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
258
259/// Top-level encoding message for a page.  Wraps both the
260/// legacy pb::ArrayEncoding and the newer pb::PageLayout
261///
262/// A file should only use one or the other and never both.
263/// 2.0 decoders can always assume this is pb::ArrayEncoding
264/// and 2.1+ decoders can always assume this is pb::PageLayout
265#[derive(Debug)]
266pub enum PageEncoding {
267    Legacy(pb::ArrayEncoding),
268    Structural(pb21::PageLayout),
269}
270
271impl PageEncoding {
272    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
273        match self {
274            Self::Legacy(enc) => enc,
275            Self::Structural(_) => panic!("Expected a legacy encoding"),
276        }
277    }
278
279    pub fn as_structural(&self) -> &pb21::PageLayout {
280        match self {
281            Self::Structural(enc) => enc,
282            Self::Legacy(_) => panic!("Expected a structural encoding"),
283        }
284    }
285
286    pub fn is_structural(&self) -> bool {
287        matches!(self, Self::Structural(_))
288    }
289}
290
291/// Metadata describing a page in a file
292///
293/// This is typically created by reading the metadata section of a Lance file
294#[derive(Debug)]
295pub struct PageInfo {
296    /// The number of rows in the page
297    pub num_rows: u64,
298    /// The priority (top level row number) of the page
299    ///
300    /// This is only set in 2.1 files and will be 0 for 2.0 files
301    pub priority: u64,
302    /// The encoding that explains the buffers in the page
303    pub encoding: PageEncoding,
304    /// The offsets and sizes of the buffers in the file
305    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
306}
307
308/// Metadata describing a column in a file
309///
310/// This is typically created by reading the metadata section of a Lance file
311#[derive(Debug, Clone)]
312pub struct ColumnInfo {
313    /// The index of the column in the file
314    pub index: u32,
315    /// The metadata for each page in the column
316    pub page_infos: Arc<[PageInfo]>,
317    /// File positions and their sizes of the column-level buffers
318    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
319    pub encoding: pb::ColumnEncoding,
320}
321
322impl ColumnInfo {
323    /// Create a new instance
324    pub fn new(
325        index: u32,
326        page_infos: Arc<[PageInfo]>,
327        buffer_offsets_and_sizes: Vec<(u64, u64)>,
328        encoding: pb::ColumnEncoding,
329    ) -> Self {
330        Self {
331            index,
332            page_infos,
333            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
334            encoding,
335        }
336    }
337
338    pub fn is_structural(&self) -> bool {
339        self.page_infos
340            // Can just look at the first since all should be the same
341            .first()
342            .map(|page| page.encoding.is_structural())
343            .unwrap_or(false)
344    }
345}
346
347enum RootScheduler {
348    Structural(Box<dyn StructuralFieldScheduler>),
349    Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
350}
351
352impl RootScheduler {
353    fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
354        match self {
355            Self::Structural(_) => panic!("Expected a legacy scheduler"),
356            Self::Legacy(s) => s,
357        }
358    }
359
360    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
361        match self {
362            Self::Structural(s) => s.as_ref(),
363            Self::Legacy(_) => panic!("Expected a structural scheduler"),
364        }
365    }
366}
367
368/// The scheduler for decoding batches
369///
370/// Lance decoding is done in two steps, scheduling, and decoding.  The
371/// scheduling tends to be lightweight and should quickly figure what data
372/// is needed from the disk issue the appropriate I/O requests.  A decode task is
373/// created to eventually decode the data (once it is loaded) and scheduling
374/// moves on to scheduling the next page.
375///
376/// Meanwhile, it's expected that a decode stream will be setup to run at the
377/// same time.  Decode tasks take the data that is loaded and turn it into
378/// Arrow arrays.
379///
380/// This approach allows us to keep our I/O parallelism and CPU parallelism
381/// completely separate since those are often two very different values.
382///
383/// Backpressure should be achieved via the I/O service.  Requests that are
384/// issued will pile up if the decode stream is not polling quickly enough.
385/// The [`crate::EncodingsIo::submit_request`] function should return a pending
386/// future once there are too many I/O requests in flight.
387///
388/// TODO: Implement backpressure
389pub struct DecodeBatchScheduler {
390    root_scheduler: RootScheduler,
391    pub root_fields: Fields,
392    cache: Arc<LanceCache>,
393}
394
395pub struct ColumnInfoIter<'a> {
396    column_infos: Vec<Arc<ColumnInfo>>,
397    column_indices: &'a [u32],
398    column_info_pos: usize,
399    column_indices_pos: usize,
400}
401
402impl<'a> ColumnInfoIter<'a> {
403    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
404        let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
405        Self {
406            column_infos,
407            column_indices,
408            column_info_pos: initial_pos,
409            column_indices_pos: 0,
410        }
411    }
412
413    pub fn peek(&self) -> &Arc<ColumnInfo> {
414        &self.column_infos[self.column_info_pos]
415    }
416
417    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
418        let column_info = self.column_infos[self.column_info_pos].clone();
419        let transformed = transform(column_info);
420        self.column_infos[self.column_info_pos] = transformed;
421    }
422
423    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
424        self.next().ok_or_else(|| {
425            Error::invalid_input(
426                "there were more fields in the schema than provided column indices / infos",
427                location!(),
428            )
429        })
430    }
431
432    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
433        if self.column_info_pos < self.column_infos.len() {
434            let info = &self.column_infos[self.column_info_pos];
435            self.column_info_pos += 1;
436            Some(info)
437        } else {
438            None
439        }
440    }
441
442    pub(crate) fn next_top_level(&mut self) {
443        self.column_indices_pos += 1;
444        if self.column_indices_pos < self.column_indices.len() {
445            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
446        } else {
447            self.column_info_pos = self.column_infos.len();
448        }
449    }
450}
451
452/// These contain the file buffers shared across the entire file
453#[derive(Clone, Copy, Debug)]
454pub struct FileBuffers<'a> {
455    pub positions_and_sizes: &'a [(u64, u64)],
456}
457
458/// These contain the file buffers and also buffers specific to a column
459#[derive(Clone, Copy, Debug)]
460pub struct ColumnBuffers<'a, 'b> {
461    pub file_buffers: FileBuffers<'a>,
462    pub positions_and_sizes: &'b [(u64, u64)],
463}
464
465/// These contain the file & column buffers and also buffers specific to a page
466#[derive(Clone, Copy, Debug)]
467pub struct PageBuffers<'a, 'b, 'c> {
468    pub column_buffers: ColumnBuffers<'a, 'b>,
469    pub positions_and_sizes: &'c [(u64, u64)],
470}
471
472/// The core decoder strategy handles all the various Arrow types
473#[derive(Debug)]
474pub struct CoreFieldDecoderStrategy {
475    pub validate_data: bool,
476    pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
477    pub cache_repetition_index: bool,
478}
479
480impl Default for CoreFieldDecoderStrategy {
481    fn default() -> Self {
482        Self {
483            validate_data: false,
484            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
485            cache_repetition_index: false,
486        }
487    }
488}
489
490impl CoreFieldDecoderStrategy {
491    /// Create a new strategy with cache_repetition_index enabled
492    pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
493        self.cache_repetition_index = cache_repetition_index;
494        self
495    }
496
497    /// Create a new strategy from decoder config
498    pub fn from_decoder_config(config: &DecoderConfig) -> Self {
499        Self {
500            validate_data: config.validate_on_decode,
501            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
502            cache_repetition_index: config.cache_repetition_index,
503        }
504    }
505
506    /// This is just a sanity check to ensure there is no "wrapped encodings"
507    /// that haven't been handled.
508    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
509        let column_encoding = column_info
510            .encoding
511            .column_encoding
512            .as_ref()
513            .ok_or_else(|| {
514                Error::invalid_input(
515                    format!(
516                        "the column at index {} was missing a ColumnEncoding",
517                        column_info.index
518                    ),
519                    location!(),
520                )
521            })?;
522        if matches!(
523            column_encoding,
524            pb::column_encoding::ColumnEncoding::Values(_)
525        ) {
526            Ok(())
527        } else {
528            Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
529        }
530    }
531
532    fn is_structural_primitive(data_type: &DataType) -> bool {
533        if data_type.is_primitive() {
534            true
535        } else {
536            match data_type {
537                // DataType::is_primitive doesn't consider these primitive but we do
538                DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
539                DataType::Boolean
540                | DataType::Null
541                | DataType::FixedSizeBinary(_)
542                | DataType::Binary
543                | DataType::LargeBinary
544                | DataType::Utf8
545                | DataType::LargeUtf8 => true,
546                DataType::FixedSizeList(inner, _) => {
547                    Self::is_structural_primitive(inner.data_type())
548                }
549                _ => false,
550            }
551        }
552    }
553
554    fn is_primitive_legacy(data_type: &DataType) -> bool {
555        if data_type.is_primitive() {
556            true
557        } else {
558            match data_type {
559                // DataType::is_primitive doesn't consider these primitive but we do
560                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
561                DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
562                _ => false,
563            }
564        }
565    }
566
567    fn create_primitive_scheduler(
568        &self,
569        field: &Field,
570        column: &ColumnInfo,
571        buffers: FileBuffers,
572    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
573        Self::ensure_values_encoded(column, &field.name)?;
574        // Primitive fields map to a single column
575        let column_buffers = ColumnBuffers {
576            file_buffers: buffers,
577            positions_and_sizes: &column.buffer_offsets_and_sizes,
578        };
579        Ok(Box::new(PrimitiveFieldScheduler::new(
580            column.index,
581            field.data_type(),
582            column.page_infos.clone(),
583            column_buffers,
584            self.validate_data,
585        )))
586    }
587
588    /// Helper method to verify the page encoding of a struct header column
589    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
590        Self::ensure_values_encoded(column_info, field_name)?;
591        if column_info.page_infos.len() != 1 {
592            return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
593        }
594        let encoding = &column_info.page_infos[0].encoding;
595        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
596            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
597            _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
598        }
599    }
600
601    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
602        let encoding = &column_info.page_infos[0].encoding;
603        matches!(
604            encoding.as_legacy().array_encoding.as_ref().unwrap(),
605            pb::array_encoding::ArrayEncoding::PackedStruct(_)
606        )
607    }
608
609    fn create_list_scheduler(
610        &self,
611        list_field: &Field,
612        column_infos: &mut ColumnInfoIter,
613        buffers: FileBuffers,
614        offsets_column: &ColumnInfo,
615    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
616        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
617        let offsets_column_buffers = ColumnBuffers {
618            file_buffers: buffers,
619            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
620        };
621        let items_scheduler =
622            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
623
624        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
625            .page_infos
626            .iter()
627            .filter(|offsets_page| offsets_page.num_rows > 0)
628            .map(|offsets_page| {
629                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
630                    &offsets_page.encoding.as_legacy().array_encoding
631                {
632                    let inner = PageInfo {
633                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
634                        encoding: PageEncoding::Legacy(
635                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
636                        ),
637                        num_rows: offsets_page.num_rows,
638                        priority: 0,
639                    };
640                    (
641                        inner,
642                        OffsetPageInfo {
643                            offsets_in_page: offsets_page.num_rows,
644                            null_offset_adjustment: list_encoding.null_offset_adjustment,
645                            num_items_referenced_by_page: list_encoding.num_items,
646                        },
647                    )
648                } else {
649                    // TODO: Should probably return Err here
650                    panic!("Expected a list column");
651                }
652            })
653            .unzip();
654        let inner = Arc::new(PrimitiveFieldScheduler::new(
655            offsets_column.index,
656            DataType::UInt64,
657            Arc::from(inner_infos.into_boxed_slice()),
658            offsets_column_buffers,
659            self.validate_data,
660        )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
661        let items_field = match list_field.data_type() {
662            DataType::List(inner) => inner,
663            DataType::LargeList(inner) => inner,
664            _ => unreachable!(),
665        };
666        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
667            DataType::Int32
668        } else {
669            DataType::Int64
670        };
671        Ok(Box::new(ListFieldScheduler::new(
672            inner,
673            items_scheduler.into(),
674            items_field,
675            offset_type,
676            null_offset_adjustments,
677        )))
678    }
679
680    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
681        if let column_encoding::ColumnEncoding::Blob(blob) =
682            column_info.encoding.column_encoding.as_ref().unwrap()
683        {
684            let mut column_info = column_info.clone();
685            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
686            Some(column_info)
687        } else {
688            None
689        }
690    }
691
692    fn create_structural_field_scheduler(
693        &self,
694        field: &Field,
695        column_infos: &mut ColumnInfoIter,
696    ) -> Result<Box<dyn StructuralFieldScheduler>> {
697        let data_type = field.data_type();
698        if Self::is_structural_primitive(&data_type) {
699            let column_info = column_infos.expect_next()?;
700            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
701                column_info.as_ref(),
702                self.decompressor_strategy.as_ref(),
703                self.cache_repetition_index,
704                field,
705            )?);
706
707            // advance to the next top level column
708            column_infos.next_top_level();
709
710            return Ok(scheduler);
711        }
712        match &data_type {
713            DataType::Struct(fields) => {
714                if field.is_packed_struct() {
715                    // Packed struct
716                    let column_info = column_infos.expect_next()?;
717                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
718                        column_info.as_ref(),
719                        self.decompressor_strategy.as_ref(),
720                        self.cache_repetition_index,
721                        field,
722                    )?);
723
724                    // advance to the next top level column
725                    column_infos.next_top_level();
726
727                    return Ok(scheduler);
728                }
729                // Maybe a blob descriptions struct?
730                if field.is_blob() {
731                    let column_info = column_infos.peek();
732                    if column_info.page_infos.iter().any(|page| {
733                        matches!(
734                            page.encoding,
735                            PageEncoding::Structural(pb21::PageLayout {
736                                layout: Some(pb21::page_layout::Layout::BlobLayout(_))
737                            })
738                        )
739                    }) {
740                        let column_info = column_infos.expect_next()?;
741                        let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
742                            column_info.as_ref(),
743                            self.decompressor_strategy.as_ref(),
744                            self.cache_repetition_index,
745                            field,
746                        )?);
747                        column_infos.next_top_level();
748                        return Ok(scheduler);
749                    }
750                }
751
752                let mut child_schedulers = Vec::with_capacity(field.children.len());
753                for field in field.children.iter() {
754                    let field_scheduler =
755                        self.create_structural_field_scheduler(field, column_infos)?;
756                    child_schedulers.push(field_scheduler);
757                }
758
759                let fields = fields.clone();
760                Ok(
761                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
762                        as Box<dyn StructuralFieldScheduler>,
763                )
764            }
765            DataType::List(_) | DataType::LargeList(_) => {
766                let child = field
767                    .children
768                    .first()
769                    .expect("List field must have a child");
770                let child_scheduler =
771                    self.create_structural_field_scheduler(child, column_infos)?;
772                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
773                    as Box<dyn StructuralFieldScheduler>)
774            }
775            _ => todo!("create_structural_field_scheduler for {}", data_type),
776        }
777    }
778
779    fn create_legacy_field_scheduler(
780        &self,
781        field: &Field,
782        column_infos: &mut ColumnInfoIter,
783        buffers: FileBuffers,
784    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
785        let data_type = field.data_type();
786        if Self::is_primitive_legacy(&data_type) {
787            let column_info = column_infos.expect_next()?;
788            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
789            return Ok(scheduler);
790        } else if data_type.is_binary_like() {
791            let column_info = column_infos.next().unwrap().clone();
792            // Column is blob and user is asking for binary data
793            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
794                let desc_scheduler =
795                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
796                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
797                return Ok(blob_scheduler);
798            }
799            if let Some(page_info) = column_info.page_infos.first() {
800                if matches!(
801                    page_info.encoding.as_legacy(),
802                    pb::ArrayEncoding {
803                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
804                    }
805                ) {
806                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
807                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
808                    } else {
809                        DataType::LargeList(Arc::new(ArrowField::new(
810                            "item",
811                            DataType::UInt8,
812                            false,
813                        )))
814                    };
815                    let list_field = Field::try_from(ArrowField::new(
816                        field.name.clone(),
817                        list_type,
818                        field.nullable,
819                    ))
820                    .unwrap();
821                    let list_scheduler = self.create_list_scheduler(
822                        &list_field,
823                        column_infos,
824                        buffers,
825                        &column_info,
826                    )?;
827                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
828                        list_scheduler.into(),
829                        field.data_type(),
830                    ));
831                    return Ok(binary_scheduler);
832                } else {
833                    let scheduler =
834                        self.create_primitive_scheduler(field, &column_info, buffers)?;
835                    return Ok(scheduler);
836                }
837            } else {
838                return self.create_primitive_scheduler(field, &column_info, buffers);
839            }
840        }
841        match &data_type {
842            DataType::FixedSizeList(inner, _dimension) => {
843                // A fixed size list column could either be a physical or a logical decoder
844                // depending on the child data type.
845                if Self::is_primitive_legacy(inner.data_type()) {
846                    let primitive_col = column_infos.expect_next()?;
847                    let scheduler =
848                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
849                    Ok(scheduler)
850                } else {
851                    todo!()
852                }
853            }
854            DataType::Dictionary(_key_type, value_type) => {
855                if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
856                    let primitive_col = column_infos.expect_next()?;
857                    let scheduler =
858                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
859                    Ok(scheduler)
860                } else {
861                    Err(Error::NotSupported {
862                        source: format!(
863                            "No way to decode into a dictionary field of type {}",
864                            value_type
865                        )
866                        .into(),
867                        location: location!(),
868                    })
869                }
870            }
871            DataType::List(_) | DataType::LargeList(_) => {
872                let offsets_column = column_infos.expect_next()?.clone();
873                column_infos.next_top_level();
874                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
875            }
876            DataType::Struct(fields) => {
877                let column_info = column_infos.expect_next()?;
878
879                // Column is blob and user is asking for descriptions
880                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
881                    // Can use primitive scheduler here since descriptions are always packed struct
882                    return self.create_primitive_scheduler(field, &blob_col, buffers);
883                }
884
885                if Self::check_packed_struct(column_info) {
886                    // use packed struct encoding
887                    self.create_primitive_scheduler(field, column_info, buffers)
888                } else {
889                    // use default struct encoding
890                    Self::check_simple_struct(column_info, &field.name).unwrap();
891                    let num_rows = column_info
892                        .page_infos
893                        .iter()
894                        .map(|page| page.num_rows)
895                        .sum();
896                    let mut child_schedulers = Vec::with_capacity(field.children.len());
897                    for field in &field.children {
898                        column_infos.next_top_level();
899                        let field_scheduler =
900                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
901                        child_schedulers.push(Arc::from(field_scheduler));
902                    }
903
904                    let fields = fields.clone();
905                    Ok(Box::new(SimpleStructScheduler::new(
906                        child_schedulers,
907                        fields,
908                        num_rows,
909                    )))
910                }
911            }
912            // TODO: Still need support for RLE
913            _ => todo!(),
914        }
915    }
916}
917
918/// Create's a dummy ColumnInfo for the root column
919fn root_column(num_rows: u64) -> ColumnInfo {
920    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
921    let final_page_num_rows = num_rows % (u32::MAX as u64);
922    let root_pages = (0..num_root_pages)
923        .map(|i| PageInfo {
924            num_rows: if i == num_root_pages - 1 {
925                final_page_num_rows
926            } else {
927                u64::MAX
928            },
929            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
930                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
931                    pb::SimpleStruct {},
932                )),
933            }),
934            priority: 0, // not used in legacy scheduler
935            buffer_offsets_and_sizes: Arc::new([]),
936        })
937        .collect::<Vec<_>>();
938    ColumnInfo {
939        buffer_offsets_and_sizes: Arc::new([]),
940        encoding: pb::ColumnEncoding {
941            column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
942        },
943        index: u32::MAX,
944        page_infos: Arc::from(root_pages),
945    }
946}
947
948pub enum RootDecoder {
949    Structural(StructuralStructDecoder),
950    Legacy(SimpleStructDecoder),
951}
952
953impl RootDecoder {
954    pub fn into_structural(self) -> StructuralStructDecoder {
955        match self {
956            Self::Structural(decoder) => decoder,
957            Self::Legacy(_) => panic!("Expected a structural decoder"),
958        }
959    }
960
961    pub fn into_legacy(self) -> SimpleStructDecoder {
962        match self {
963            Self::Legacy(decoder) => decoder,
964            Self::Structural(_) => panic!("Expected a legacy decoder"),
965        }
966    }
967}
968
969impl DecodeBatchScheduler {
970    /// Creates a new decode scheduler with the expected schema and the column
971    /// metadata of the file.
972    #[allow(clippy::too_many_arguments)]
973    pub async fn try_new<'a>(
974        schema: &'a Schema,
975        column_indices: &[u32],
976        column_infos: &[Arc<ColumnInfo>],
977        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
978        num_rows: u64,
979        _decoder_plugins: Arc<DecoderPlugins>,
980        io: Arc<dyn EncodingsIo>,
981        cache: Arc<LanceCache>,
982        filter: &FilterExpression,
983        decoder_config: &DecoderConfig,
984    ) -> Result<Self> {
985        assert!(num_rows > 0);
986        let buffers = FileBuffers {
987            positions_and_sizes: file_buffer_positions_and_sizes,
988        };
989        let arrow_schema = ArrowSchema::from(schema);
990        let root_fields = arrow_schema.fields().clone();
991        let root_type = DataType::Struct(root_fields.clone());
992        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
993        // root_field.children and schema.fields should be identical at this point but the latter
994        // has field ids and the former does not.  This line restores that.
995        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
996        root_field.children.clone_from(&schema.fields);
997        root_field
998            .metadata
999            .insert("__lance_decoder_root".to_string(), "true".to_string());
1000
1001        if column_infos.is_empty() || column_infos[0].is_structural() {
1002            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1003
1004            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1005            let mut root_scheduler =
1006                strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1007
1008            let context = SchedulerContext::new(io, cache.clone());
1009            root_scheduler.initialize(filter, &context).await?;
1010
1011            Ok(Self {
1012                root_scheduler: RootScheduler::Structural(root_scheduler),
1013                root_fields,
1014                cache,
1015            })
1016        } else {
1017            // The old encoding style expected a header column for structs and so we
1018            // need a header column for the top-level struct
1019            let mut columns = Vec::with_capacity(column_infos.len() + 1);
1020            columns.push(Arc::new(root_column(num_rows)));
1021            columns.extend(column_infos.iter().cloned());
1022
1023            let adjusted_column_indices = [0_u32]
1024                .into_iter()
1025                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1026                .collect::<Vec<_>>();
1027            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1028            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1029            let root_scheduler =
1030                strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1031
1032            let context = SchedulerContext::new(io, cache.clone());
1033            root_scheduler.initialize(filter, &context).await?;
1034
1035            Ok(Self {
1036                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1037                root_fields,
1038                cache,
1039            })
1040        }
1041    }
1042
1043    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1044    pub fn from_scheduler(
1045        root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1046        root_fields: Fields,
1047        cache: Arc<LanceCache>,
1048    ) -> Self {
1049        Self {
1050            root_scheduler: RootScheduler::Legacy(root_scheduler),
1051            root_fields,
1052            cache,
1053        }
1054    }
1055
1056    fn do_schedule_ranges_structural(
1057        &mut self,
1058        ranges: &[Range<u64>],
1059        filter: &FilterExpression,
1060        io: Arc<dyn EncodingsIo>,
1061        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1062    ) {
1063        let root_scheduler = self.root_scheduler.as_structural();
1064        let mut context = SchedulerContext::new(io, self.cache.clone());
1065        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1066        if let Err(schedule_ranges_err) = maybe_root_job {
1067            schedule_action(Err(schedule_ranges_err));
1068            return;
1069        }
1070        let mut root_job = maybe_root_job.unwrap();
1071        let mut num_rows_scheduled = 0;
1072        loop {
1073            let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1074            if let Err(err) = maybe_next_scan_lines {
1075                schedule_action(Err(err));
1076                return;
1077            }
1078            let next_scan_lines = maybe_next_scan_lines.unwrap();
1079            if next_scan_lines.is_empty() {
1080                return;
1081            }
1082            for next_scan_line in next_scan_lines {
1083                trace!(
1084                    "Scheduled scan line of {} rows and {} decoders",
1085                    next_scan_line.rows_scheduled,
1086                    next_scan_line.decoders.len()
1087                );
1088                num_rows_scheduled += next_scan_line.rows_scheduled;
1089                if !schedule_action(Ok(DecoderMessage {
1090                    scheduled_so_far: num_rows_scheduled,
1091                    decoders: next_scan_line.decoders,
1092                })) {
1093                    // Decoder has disconnected
1094                    return;
1095                }
1096            }
1097        }
1098    }
1099
1100    fn do_schedule_ranges_legacy(
1101        &mut self,
1102        ranges: &[Range<u64>],
1103        filter: &FilterExpression,
1104        io: Arc<dyn EncodingsIo>,
1105        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1106        // If specified, this will be used as the top_level_row for all scheduling
1107        // tasks.  This is used by list scheduling to ensure all items scheduling
1108        // tasks are scheduled at the same top level row.
1109        priority: Option<Box<dyn PriorityRange>>,
1110    ) {
1111        let root_scheduler = self.root_scheduler.as_legacy();
1112        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1113        trace!(
1114            "Scheduling {} ranges across {}..{} ({} rows){}",
1115            ranges.len(),
1116            ranges.first().unwrap().start,
1117            ranges.last().unwrap().end,
1118            rows_requested,
1119            priority
1120                .as_ref()
1121                .map(|p| format!(" (priority={:?})", p))
1122                .unwrap_or_default()
1123        );
1124
1125        let mut context = SchedulerContext::new(io, self.cache.clone());
1126        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1127        if let Err(schedule_ranges_err) = maybe_root_job {
1128            schedule_action(Err(schedule_ranges_err));
1129            return;
1130        }
1131        let mut root_job = maybe_root_job.unwrap();
1132        let mut num_rows_scheduled = 0;
1133        let mut rows_to_schedule = root_job.num_rows();
1134        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1135        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1136        while rows_to_schedule > 0 {
1137            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1138            if let Err(schedule_next_err) = maybe_next_scan_line {
1139                schedule_action(Err(schedule_next_err));
1140                return;
1141            }
1142            let next_scan_line = maybe_next_scan_line.unwrap();
1143            priority.advance(next_scan_line.rows_scheduled);
1144            num_rows_scheduled += next_scan_line.rows_scheduled;
1145            rows_to_schedule -= next_scan_line.rows_scheduled;
1146            trace!(
1147                "Scheduled scan line of {} rows and {} decoders",
1148                next_scan_line.rows_scheduled,
1149                next_scan_line.decoders.len()
1150            );
1151            if !schedule_action(Ok(DecoderMessage {
1152                scheduled_so_far: num_rows_scheduled,
1153                decoders: next_scan_line.decoders,
1154            })) {
1155                // Decoder has disconnected
1156                return;
1157            }
1158
1159            trace!("Finished scheduling {} ranges", ranges.len());
1160        }
1161    }
1162
1163    fn do_schedule_ranges(
1164        &mut self,
1165        ranges: &[Range<u64>],
1166        filter: &FilterExpression,
1167        io: Arc<dyn EncodingsIo>,
1168        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1169        // If specified, this will be used as the top_level_row for all scheduling
1170        // tasks.  This is used by list scheduling to ensure all items scheduling
1171        // tasks are scheduled at the same top level row.
1172        priority: Option<Box<dyn PriorityRange>>,
1173    ) {
1174        match &self.root_scheduler {
1175            RootScheduler::Legacy(_) => {
1176                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1177            }
1178            RootScheduler::Structural(_) => {
1179                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1180            }
1181        }
1182    }
1183
1184    // This method is similar to schedule_ranges but instead of
1185    // sending the decoders to a channel it collects them all into a vector
1186    pub fn schedule_ranges_to_vec(
1187        &mut self,
1188        ranges: &[Range<u64>],
1189        filter: &FilterExpression,
1190        io: Arc<dyn EncodingsIo>,
1191        priority: Option<Box<dyn PriorityRange>>,
1192    ) -> Result<Vec<DecoderMessage>> {
1193        let mut decode_messages = Vec::new();
1194        self.do_schedule_ranges(
1195            ranges,
1196            filter,
1197            io,
1198            |msg| {
1199                decode_messages.push(msg);
1200                true
1201            },
1202            priority,
1203        );
1204        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1205    }
1206
1207    /// Schedules the load of multiple ranges of rows
1208    ///
1209    /// Ranges must be non-overlapping and in sorted order
1210    ///
1211    /// # Arguments
1212    ///
1213    /// * `ranges` - The ranges of rows to load
1214    /// * `sink` - A channel to send the decode tasks
1215    /// * `scheduler` An I/O scheduler to issue I/O requests
1216    #[instrument(skip_all)]
1217    pub fn schedule_ranges(
1218        &mut self,
1219        ranges: &[Range<u64>],
1220        filter: &FilterExpression,
1221        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1222        scheduler: Arc<dyn EncodingsIo>,
1223    ) {
1224        self.do_schedule_ranges(
1225            ranges,
1226            filter,
1227            scheduler,
1228            |msg| {
1229                match sink.send(msg) {
1230                    Ok(_) => true,
1231                    Err(SendError { .. }) => {
1232                        // The receiver has gone away.  We can't do anything about it
1233                        // so just ignore the error.
1234                        debug!(
1235                        "schedule_ranges aborting early since decoder appears to have been dropped"
1236                    );
1237                        false
1238                    }
1239                }
1240            },
1241            None,
1242        )
1243    }
1244
1245    /// Schedules the load of a range of rows
1246    ///
1247    /// # Arguments
1248    ///
1249    /// * `range` - The range of rows to load
1250    /// * `sink` - A channel to send the decode tasks
1251    /// * `scheduler` An I/O scheduler to issue I/O requests
1252    #[instrument(skip_all)]
1253    pub fn schedule_range(
1254        &mut self,
1255        range: Range<u64>,
1256        filter: &FilterExpression,
1257        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1258        scheduler: Arc<dyn EncodingsIo>,
1259    ) {
1260        self.schedule_ranges(&[range], filter, sink, scheduler)
1261    }
1262
1263    /// Schedules the load of selected rows
1264    ///
1265    /// # Arguments
1266    ///
1267    /// * `indices` - The row indices to load (these must be in ascending order!)
1268    /// * `sink` - A channel to send the decode tasks
1269    /// * `scheduler` An I/O scheduler to issue I/O requests
1270    pub fn schedule_take(
1271        &mut self,
1272        indices: &[u64],
1273        filter: &FilterExpression,
1274        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1275        scheduler: Arc<dyn EncodingsIo>,
1276    ) {
1277        debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1278        if indices.is_empty() {
1279            return;
1280        }
1281        trace!("Scheduling take of {} rows", indices.len());
1282        let ranges = Self::indices_to_ranges(indices);
1283        self.schedule_ranges(&ranges, filter, sink, scheduler)
1284    }
1285
1286    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1287    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1288        let mut ranges = Vec::new();
1289        let mut start = indices[0];
1290
1291        for window in indices.windows(2) {
1292            if window[1] != window[0] + 1 {
1293                ranges.push(start..window[0] + 1);
1294                start = window[1];
1295            }
1296        }
1297
1298        ranges.push(start..*indices.last().unwrap() + 1);
1299        ranges
1300    }
1301}
1302
1303pub struct ReadBatchTask {
1304    pub task: BoxFuture<'static, Result<RecordBatch>>,
1305    pub num_rows: u32,
1306}
1307
1308/// A stream that takes scheduled jobs and generates decode tasks from them.
1309pub struct BatchDecodeStream {
1310    context: DecoderContext,
1311    root_decoder: SimpleStructDecoder,
1312    rows_remaining: u64,
1313    rows_per_batch: u32,
1314    rows_scheduled: u64,
1315    rows_drained: u64,
1316    scheduler_exhausted: bool,
1317    emitted_batch_size_warning: Arc<Once>,
1318}
1319
1320impl BatchDecodeStream {
1321    /// Create a new instance of a batch decode stream
1322    ///
1323    /// # Arguments
1324    ///
1325    /// * `scheduled` - an incoming stream of decode tasks from a
1326    ///   [`crate::decode::DecodeBatchScheduler`]
1327    /// * `schema` - the schema of the data to create
1328    /// * `rows_per_batch` the number of rows to create before making a batch
1329    /// * `num_rows` the total number of rows scheduled
1330    /// * `num_columns` the total number of columns in the file
1331    pub fn new(
1332        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1333        rows_per_batch: u32,
1334        num_rows: u64,
1335        root_decoder: SimpleStructDecoder,
1336    ) -> Self {
1337        Self {
1338            context: DecoderContext::new(scheduled),
1339            root_decoder,
1340            rows_remaining: num_rows,
1341            rows_per_batch,
1342            rows_scheduled: 0,
1343            rows_drained: 0,
1344            scheduler_exhausted: false,
1345            emitted_batch_size_warning: Arc::new(Once::new()),
1346        }
1347    }
1348
1349    fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1350        if decoder.path.is_empty() {
1351            // The root decoder we can ignore
1352            Ok(())
1353        } else {
1354            self.root_decoder.accept_child(decoder)
1355        }
1356    }
1357
1358    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1359        if self.scheduler_exhausted {
1360            return Ok(self.rows_scheduled);
1361        }
1362        while self.rows_scheduled < scheduled_need {
1363            let next_message = self.context.source.recv().await;
1364            match next_message {
1365                Some(scan_line) => {
1366                    let scan_line = scan_line?;
1367                    self.rows_scheduled = scan_line.scheduled_so_far;
1368                    for message in scan_line.decoders {
1369                        self.accept_decoder(message.into_legacy())?;
1370                    }
1371                }
1372                None => {
1373                    // Schedule ended before we got all the data we expected.  This probably
1374                    // means some kind of pushdown filter was applied and we didn't load as
1375                    // much data as we thought we would.
1376                    self.scheduler_exhausted = true;
1377                    return Ok(self.rows_scheduled);
1378                }
1379            }
1380        }
1381        Ok(scheduled_need)
1382    }
1383
1384    #[instrument(level = "debug", skip_all)]
1385    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1386        trace!(
1387            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1388            self.rows_remaining,
1389            self.rows_drained,
1390            self.rows_scheduled,
1391        );
1392        if self.rows_remaining == 0 {
1393            return Ok(None);
1394        }
1395
1396        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1397        self.rows_remaining -= to_take;
1398
1399        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1400        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1401        if scheduled_need > 0 {
1402            let desired_scheduled = scheduled_need + self.rows_scheduled;
1403            trace!(
1404                "Draining from scheduler (desire at least {} scheduled rows)",
1405                desired_scheduled
1406            );
1407            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1408            if actually_scheduled < desired_scheduled {
1409                let under_scheduled = desired_scheduled - actually_scheduled;
1410                to_take -= under_scheduled;
1411            }
1412        }
1413
1414        if to_take == 0 {
1415            return Ok(None);
1416        }
1417
1418        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1419        let loaded_need = self.rows_drained + to_take - 1;
1420        trace!(
1421            "Waiting for I/O (desire at least {} fully loaded rows)",
1422            loaded_need
1423        );
1424        self.root_decoder.wait_for_loaded(loaded_need).await?;
1425
1426        let next_task = self.root_decoder.drain(to_take)?;
1427        self.rows_drained += to_take;
1428        Ok(Some(next_task))
1429    }
1430
1431    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1432        let stream = futures::stream::unfold(self, |mut slf| async move {
1433            let next_task = slf.next_batch_task().await;
1434            let next_task = next_task.transpose().map(|next_task| {
1435                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1436                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1437                let task = async move {
1438                    let next_task = next_task?;
1439                    next_task.into_batch(emitted_batch_size_warning)
1440                };
1441                (task, num_rows)
1442            });
1443            next_task.map(|(task, num_rows)| {
1444                // This should be true since batch size is u32
1445                debug_assert!(num_rows <= u32::MAX as u64);
1446                let next_task = ReadBatchTask {
1447                    task: task.boxed(),
1448                    num_rows: num_rows as u32,
1449                };
1450                (next_task, slf)
1451            })
1452        });
1453        stream.boxed()
1454    }
1455}
1456
1457// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1458// we can have a single implementation of the batch decode iterator
1459enum RootDecoderMessage {
1460    LoadedPage(LoadedPageShard),
1461    LegacyPage(crate::previous::decoder::DecoderReady),
1462}
1463trait RootDecoderType {
1464    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1465    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1466    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1467}
1468impl RootDecoderType for StructuralStructDecoder {
1469    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1470        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1471            unreachable!()
1472        };
1473        self.accept_page(loaded_page)
1474    }
1475    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1476        self.drain_batch_task(num_rows)
1477    }
1478    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1479        // Waiting happens elsewhere (not as part of the decoder)
1480        Ok(())
1481    }
1482}
1483impl RootDecoderType for SimpleStructDecoder {
1484    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1485        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1486            unreachable!()
1487        };
1488        self.accept_child(legacy_page)
1489    }
1490    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1491        self.drain(num_rows)
1492    }
1493    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1494        runtime.block_on(self.wait_for_loaded(loaded_need))
1495    }
1496}
1497
1498/// A blocking batch decoder that performs synchronous decoding
1499struct BatchDecodeIterator<T: RootDecoderType> {
1500    messages: VecDeque<Result<DecoderMessage>>,
1501    root_decoder: T,
1502    rows_remaining: u64,
1503    rows_per_batch: u32,
1504    rows_scheduled: u64,
1505    rows_drained: u64,
1506    emitted_batch_size_warning: Arc<Once>,
1507    // Note: this is not the runtime on which I/O happens.
1508    // That's always in the scheduler.  This is just a runtime we use to
1509    // sleep the current thread if I/O is unready
1510    wait_for_io_runtime: tokio::runtime::Runtime,
1511    schema: Arc<ArrowSchema>,
1512}
1513
1514impl<T: RootDecoderType> BatchDecodeIterator<T> {
1515    /// Create a new instance of a batch decode iterator
1516    pub fn new(
1517        messages: VecDeque<Result<DecoderMessage>>,
1518        rows_per_batch: u32,
1519        num_rows: u64,
1520        root_decoder: T,
1521        schema: Arc<ArrowSchema>,
1522    ) -> Self {
1523        Self {
1524            messages,
1525            root_decoder,
1526            rows_remaining: num_rows,
1527            rows_per_batch,
1528            rows_scheduled: 0,
1529            rows_drained: 0,
1530            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1531                .build()
1532                .unwrap(),
1533            emitted_batch_size_warning: Arc::new(Once::new()),
1534            schema,
1535        }
1536    }
1537
1538    /// Wait for a single page of data to finish loading
1539    ///
1540    /// If the data is not available this will perform a *blocking* wait (put
1541    /// the current thread to sleep)
1542    fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1543        match maybe_done(unloaded_page.0) {
1544            // Fast path, avoid all runtime shenanigans if the data is ready
1545            MaybeDone::Done(loaded_page) => loaded_page,
1546            // Slow path, we need to wait on I/O, enter the runtime
1547            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1548            MaybeDone::Gone => unreachable!(),
1549        }
1550    }
1551
1552    /// Waits for I/O until `scheduled_need` rows have been loaded
1553    ///
1554    /// Note that `scheduled_need` is cumulative.  E.g. this method
1555    /// should be called with 5, 10, 15 and not 5, 5, 5
1556    #[instrument(skip_all)]
1557    fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1558        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1559            let message = self.messages.pop_front().unwrap()?;
1560            self.rows_scheduled = message.scheduled_so_far;
1561            for decoder_message in message.decoders {
1562                match decoder_message {
1563                    MessageType::UnloadedPage(unloaded_page) => {
1564                        let loaded_page = self.wait_for_page(unloaded_page)?;
1565                        self.root_decoder
1566                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1567                    }
1568                    MessageType::DecoderReady(decoder_ready) => {
1569                        // The root decoder we can ignore
1570                        if !decoder_ready.path.is_empty() {
1571                            self.root_decoder
1572                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1573                        }
1574                    }
1575                }
1576            }
1577        }
1578
1579        let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1580
1581        self.root_decoder
1582            .wait(loaded_need, &self.wait_for_io_runtime)?;
1583        Ok(self.rows_scheduled)
1584    }
1585
1586    #[instrument(level = "debug", skip_all)]
1587    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1588        trace!(
1589            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1590            self.rows_remaining,
1591            self.rows_drained,
1592            self.rows_scheduled,
1593        );
1594        if self.rows_remaining == 0 {
1595            return Ok(None);
1596        }
1597
1598        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1599        self.rows_remaining -= to_take;
1600
1601        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1602        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1603        if scheduled_need > 0 {
1604            let desired_scheduled = scheduled_need + self.rows_scheduled;
1605            trace!(
1606                "Draining from scheduler (desire at least {} scheduled rows)",
1607                desired_scheduled
1608            );
1609            let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1610            if actually_scheduled < desired_scheduled {
1611                let under_scheduled = desired_scheduled - actually_scheduled;
1612                to_take -= under_scheduled;
1613            }
1614        }
1615
1616        if to_take == 0 {
1617            return Ok(None);
1618        }
1619
1620        let next_task = self.root_decoder.drain_batch(to_take)?;
1621
1622        self.rows_drained += to_take;
1623
1624        let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1625
1626        Ok(Some(batch))
1627    }
1628}
1629
1630impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1631    type Item = ArrowResult<RecordBatch>;
1632
1633    fn next(&mut self) -> Option<Self::Item> {
1634        self.next_batch_task()
1635            .transpose()
1636            .map(|r| r.map_err(ArrowError::from))
1637    }
1638}
1639
1640impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1641    fn schema(&self) -> Arc<ArrowSchema> {
1642        self.schema.clone()
1643    }
1644}
1645
1646/// A stream that takes scheduled jobs and generates decode tasks from them.
1647pub struct StructuralBatchDecodeStream {
1648    context: DecoderContext,
1649    root_decoder: StructuralStructDecoder,
1650    rows_remaining: u64,
1651    rows_per_batch: u32,
1652    rows_scheduled: u64,
1653    rows_drained: u64,
1654    scheduler_exhausted: bool,
1655    emitted_batch_size_warning: Arc<Once>,
1656}
1657
1658impl StructuralBatchDecodeStream {
1659    /// Create a new instance of a batch decode stream
1660    ///
1661    /// # Arguments
1662    ///
1663    /// * `scheduled` - an incoming stream of decode tasks from a
1664    ///   [`crate::decode::DecodeBatchScheduler`]
1665    /// * `schema` - the schema of the data to create
1666    /// * `rows_per_batch` the number of rows to create before making a batch
1667    /// * `num_rows` the total number of rows scheduled
1668    /// * `num_columns` the total number of columns in the file
1669    pub fn new(
1670        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1671        rows_per_batch: u32,
1672        num_rows: u64,
1673        root_decoder: StructuralStructDecoder,
1674    ) -> Self {
1675        Self {
1676            context: DecoderContext::new(scheduled),
1677            root_decoder,
1678            rows_remaining: num_rows,
1679            rows_per_batch,
1680            rows_scheduled: 0,
1681            rows_drained: 0,
1682            scheduler_exhausted: false,
1683            emitted_batch_size_warning: Arc::new(Once::new()),
1684        }
1685    }
1686
1687    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1688        if self.scheduler_exhausted {
1689            return Ok(self.rows_scheduled);
1690        }
1691        while self.rows_scheduled < scheduled_need {
1692            let next_message = self.context.source.recv().await;
1693            match next_message {
1694                Some(scan_line) => {
1695                    let scan_line = scan_line?;
1696                    self.rows_scheduled = scan_line.scheduled_so_far;
1697                    for message in scan_line.decoders {
1698                        let unloaded_page = message.into_structural();
1699                        let loaded_page = unloaded_page.0.await?;
1700                        self.root_decoder.accept_page(loaded_page)?;
1701                    }
1702                }
1703                None => {
1704                    // Schedule ended before we got all the data we expected.  This probably
1705                    // means some kind of pushdown filter was applied and we didn't load as
1706                    // much data as we thought we would.
1707                    self.scheduler_exhausted = true;
1708                    return Ok(self.rows_scheduled);
1709                }
1710            }
1711        }
1712        Ok(scheduled_need)
1713    }
1714
1715    #[instrument(level = "debug", skip_all)]
1716    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1717        trace!(
1718            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1719            self.rows_remaining,
1720            self.rows_drained,
1721            self.rows_scheduled,
1722        );
1723        if self.rows_remaining == 0 {
1724            return Ok(None);
1725        }
1726
1727        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1728        self.rows_remaining -= to_take;
1729
1730        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1731        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1732        if scheduled_need > 0 {
1733            let desired_scheduled = scheduled_need + self.rows_scheduled;
1734            trace!(
1735                "Draining from scheduler (desire at least {} scheduled rows)",
1736                desired_scheduled
1737            );
1738            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1739            if actually_scheduled < desired_scheduled {
1740                let under_scheduled = desired_scheduled - actually_scheduled;
1741                to_take -= under_scheduled;
1742            }
1743        }
1744
1745        if to_take == 0 {
1746            return Ok(None);
1747        }
1748
1749        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1750        self.rows_drained += to_take;
1751        Ok(Some(next_task))
1752    }
1753
1754    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1755        let stream = futures::stream::unfold(self, |mut slf| async move {
1756            let next_task = slf.next_batch_task().await;
1757            let next_task = next_task.transpose().map(|next_task| {
1758                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1759                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1760                let task = async move {
1761                    let next_task = next_task?;
1762                    next_task.into_batch(emitted_batch_size_warning)
1763                };
1764                (task, num_rows)
1765            });
1766            next_task.map(|(task, num_rows)| {
1767                // This should be true since batch size is u32
1768                debug_assert!(num_rows <= u32::MAX as u64);
1769                let next_task = ReadBatchTask {
1770                    task: task.boxed(),
1771                    num_rows: num_rows as u32,
1772                };
1773                (next_task, slf)
1774            })
1775        });
1776        stream.boxed()
1777    }
1778}
1779
1780#[derive(Debug)]
1781pub enum RequestedRows {
1782    Ranges(Vec<Range<u64>>),
1783    Indices(Vec<u64>),
1784}
1785
1786impl RequestedRows {
1787    pub fn num_rows(&self) -> u64 {
1788        match self {
1789            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1790            Self::Indices(indices) => indices.len() as u64,
1791        }
1792    }
1793
1794    pub fn trim_empty_ranges(mut self) -> Self {
1795        if let Self::Ranges(ranges) = &mut self {
1796            ranges.retain(|r| !r.is_empty());
1797        }
1798        self
1799    }
1800}
1801
1802/// Configuration for decoder behavior
1803#[derive(Debug, Clone, Default)]
1804pub struct DecoderConfig {
1805    /// Whether to cache repetition indices for better performance
1806    pub cache_repetition_index: bool,
1807    /// Whether to validate decoded data
1808    pub validate_on_decode: bool,
1809}
1810
1811#[derive(Debug, Clone)]
1812pub struct SchedulerDecoderConfig {
1813    pub decoder_plugins: Arc<DecoderPlugins>,
1814    pub batch_size: u32,
1815    pub io: Arc<dyn EncodingsIo>,
1816    pub cache: Arc<LanceCache>,
1817    /// Decoder configuration
1818    pub decoder_config: DecoderConfig,
1819}
1820
1821fn check_scheduler_on_drop(
1822    stream: BoxStream<'static, ReadBatchTask>,
1823    scheduler_handle: tokio::task::JoinHandle<()>,
1824) -> BoxStream<'static, ReadBatchTask> {
1825    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
1826    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
1827    // when the stream finishes.
1828    let mut scheduler_handle = Some(scheduler_handle);
1829    let check_scheduler = stream::unfold((), move |_| {
1830        let handle = scheduler_handle.take();
1831        async move {
1832            if let Some(handle) = handle {
1833                handle.await.unwrap();
1834            }
1835            None
1836        }
1837    });
1838    stream.chain(check_scheduler).boxed()
1839}
1840
1841pub fn create_decode_stream(
1842    schema: &Schema,
1843    num_rows: u64,
1844    batch_size: u32,
1845    is_structural: bool,
1846    should_validate: bool,
1847    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1848) -> BoxStream<'static, ReadBatchTask> {
1849    if is_structural {
1850        let arrow_schema = ArrowSchema::from(schema);
1851        let structural_decoder = StructuralStructDecoder::new(
1852            arrow_schema.fields,
1853            should_validate,
1854            /*is_root=*/ true,
1855        );
1856        StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1857    } else {
1858        let arrow_schema = ArrowSchema::from(schema);
1859        let root_fields = arrow_schema.fields;
1860
1861        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1862        BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1863    }
1864}
1865
1866/// Creates a iterator that decodes a set of messages in a blocking fashion
1867///
1868/// See [`schedule_and_decode_blocking`] for more information.
1869pub fn create_decode_iterator(
1870    schema: &Schema,
1871    num_rows: u64,
1872    batch_size: u32,
1873    should_validate: bool,
1874    is_structural: bool,
1875    messages: VecDeque<Result<DecoderMessage>>,
1876) -> Box<dyn RecordBatchReader + Send + 'static> {
1877    let arrow_schema = Arc::new(ArrowSchema::from(schema));
1878    let root_fields = arrow_schema.fields.clone();
1879    if is_structural {
1880        let simple_struct_decoder =
1881            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true);
1882        Box::new(BatchDecodeIterator::new(
1883            messages,
1884            batch_size,
1885            num_rows,
1886            simple_struct_decoder,
1887            arrow_schema,
1888        ))
1889    } else {
1890        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1891        Box::new(BatchDecodeIterator::new(
1892            messages,
1893            batch_size,
1894            num_rows,
1895            root_decoder,
1896            arrow_schema,
1897        ))
1898    }
1899}
1900
1901fn create_scheduler_decoder(
1902    column_infos: Vec<Arc<ColumnInfo>>,
1903    requested_rows: RequestedRows,
1904    filter: FilterExpression,
1905    column_indices: Vec<u32>,
1906    target_schema: Arc<Schema>,
1907    config: SchedulerDecoderConfig,
1908) -> Result<BoxStream<'static, ReadBatchTask>> {
1909    let num_rows = requested_rows.num_rows();
1910
1911    let is_structural = column_infos[0].is_structural();
1912
1913    let (tx, rx) = mpsc::unbounded_channel();
1914
1915    let decode_stream = create_decode_stream(
1916        &target_schema,
1917        num_rows,
1918        config.batch_size,
1919        is_structural,
1920        config.decoder_config.validate_on_decode,
1921        rx,
1922    );
1923
1924    let scheduler_handle = tokio::task::spawn(async move {
1925        let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1926            target_schema.as_ref(),
1927            &column_indices,
1928            &column_infos,
1929            &vec![],
1930            num_rows,
1931            config.decoder_plugins,
1932            config.io.clone(),
1933            config.cache,
1934            &filter,
1935            &config.decoder_config,
1936        )
1937        .await
1938        {
1939            Ok(scheduler) => scheduler,
1940            Err(e) => {
1941                let _ = tx.send(Err(e));
1942                return;
1943            }
1944        };
1945
1946        match requested_rows {
1947            RequestedRows::Ranges(ranges) => {
1948                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1949            }
1950            RequestedRows::Indices(indices) => {
1951                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1952            }
1953        }
1954    });
1955
1956    Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1957}
1958
1959/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
1960/// decode the scheduled data and returns the decoder as a stream of record batches.
1961///
1962/// This is a convenience function that creates both the scheduler and the decoder
1963/// which can be a little tricky to get right.
1964pub fn schedule_and_decode(
1965    column_infos: Vec<Arc<ColumnInfo>>,
1966    requested_rows: RequestedRows,
1967    filter: FilterExpression,
1968    column_indices: Vec<u32>,
1969    target_schema: Arc<Schema>,
1970    config: SchedulerDecoderConfig,
1971) -> BoxStream<'static, ReadBatchTask> {
1972    if requested_rows.num_rows() == 0 {
1973        return stream::empty().boxed();
1974    }
1975
1976    // If the user requested any ranges that are empty, ignore them.  They are pointless and
1977    // trying to read them has caused bugs in the past.
1978    let requested_rows = requested_rows.trim_empty_ranges();
1979
1980    // For convenience we really want this method to be a snchronous method where all
1981    // errors happen on the stream.  There is some async initialization that must happen
1982    // when creating a scheduler.  We wrap that all up in the very first task.
1983    match create_scheduler_decoder(
1984        column_infos,
1985        requested_rows,
1986        filter,
1987        column_indices,
1988        target_schema,
1989        config,
1990    ) {
1991        // If the initialization failed make it look like a failed task
1992        Ok(stream) => stream,
1993        Err(e) => stream::once(std::future::ready(ReadBatchTask {
1994            num_rows: 0,
1995            task: std::future::ready(Err(e)).boxed(),
1996        }))
1997        .boxed(),
1998    }
1999}
2000
2001pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2002    tokio::runtime::Builder::new_current_thread()
2003        .build()
2004        .unwrap()
2005});
2006
2007/// Schedules and decodes the requested data in a blocking fashion
2008///
2009/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
2010/// and decodes it in the current thread.
2011///
2012/// This can be useful when the disk is fast (or the data is in memory) and the amount
2013/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
2014///
2015/// This should NOT be used for full scans.  Even if the data is in memory this function will
2016/// not parallelize the decode and will be slower than the async version.  Full scans typically
2017/// make relatively few IOPs and so the asynchronous overhead is much smaller.
2018///
2019/// This method will first completely run the scheduling process.  Then it will run the
2020/// decode process.
2021pub fn schedule_and_decode_blocking(
2022    column_infos: Vec<Arc<ColumnInfo>>,
2023    requested_rows: RequestedRows,
2024    filter: FilterExpression,
2025    column_indices: Vec<u32>,
2026    target_schema: Arc<Schema>,
2027    config: SchedulerDecoderConfig,
2028) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2029    if requested_rows.num_rows() == 0 {
2030        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2031        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2032    }
2033
2034    let num_rows = requested_rows.num_rows();
2035    let is_structural = column_infos[0].is_structural();
2036
2037    let (tx, mut rx) = mpsc::unbounded_channel();
2038
2039    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
2040    // runtime.
2041    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2042        target_schema.as_ref(),
2043        &column_indices,
2044        &column_infos,
2045        &vec![],
2046        num_rows,
2047        config.decoder_plugins,
2048        config.io.clone(),
2049        config.cache,
2050        &filter,
2051        &config.decoder_config,
2052    ))?;
2053
2054    // Schedule the requested rows
2055    match requested_rows {
2056        RequestedRows::Ranges(ranges) => {
2057            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2058        }
2059        RequestedRows::Indices(indices) => {
2060            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2061        }
2062    }
2063
2064    // Drain the scheduler queue into a vec of decode messages
2065    let mut messages = Vec::new();
2066    while rx
2067        .recv_many(&mut messages, usize::MAX)
2068        .now_or_never()
2069        .unwrap()
2070        != 0
2071    {}
2072
2073    // Create a decoder to decode the messages
2074    let decode_iterator = create_decode_iterator(
2075        &target_schema,
2076        num_rows,
2077        config.batch_size,
2078        config.decoder_config.validate_on_decode,
2079        is_structural,
2080        messages.into(),
2081    );
2082
2083    Ok(decode_iterator)
2084}
2085
2086/// A decoder for single-column encodings of primitive data (this includes fixed size
2087/// lists of primitive data)
2088///
2089/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2090///
2091/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2092/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2093/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2094/// the decode task for batch 0 and the decode task for batch 1.
2095///
2096/// See [`crate::decoder`] for more information
2097pub trait PrimitivePageDecoder: Send + Sync {
2098    /// Decode data into buffers
2099    ///
2100    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2101    /// such as decompressing from some compressed representation.
2102    ///
2103    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2104    /// portion only needs to be updated if the encoding has some concept of an "optional"
2105    /// buffer.
2106    ///
2107    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2108    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2109    ///
2110    /// Binary decodings have two output buffers (one for values, one for offsets)
2111    ///
2112    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2113    /// fixed size strings into variable length strings going from one input buffer to multiple output
2114    /// buffers.
2115    ///
2116    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2117    /// generally end at one of these structures.  However, intermediate structures may exist which
2118    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2119    /// with buffers that have bits-per-value that is not a multiple of 8.
2120    ///
2121    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2122    /// type (order matters) and encodings that aim to decode into arrow types should respect
2123    /// this layout.
2124    /// # Arguments
2125    ///
2126    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2127    /// * `num_rows` - how many rows to decode
2128    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2129    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2130}
2131
2132/// A scheduler for single-column encodings of primitive data
2133///
2134/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2135///
2136/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2137/// be shared in follow-up I/O tasks.
2138///
2139/// See [`crate::decoder`] for more information
2140pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2141    /// Schedules a batch of I/O to load the data needed for the requested ranges
2142    ///
2143    /// Returns a future that will yield a decoder once the data has been loaded
2144    ///
2145    /// # Arguments
2146    ///
2147    /// * `range` - the range of row offsets (relative to start of page) requested
2148    ///   these must be ordered and must not overlap
2149    /// * `scheduler` - a scheduler to submit the I/O request to
2150    /// * `top_level_row` - the row offset of the top level field currently being
2151    ///   scheduled.  This can be used to assign priority to I/O requests
2152    fn schedule_ranges(
2153        &self,
2154        ranges: &[Range<u64>],
2155        scheduler: &Arc<dyn EncodingsIo>,
2156        top_level_row: u64,
2157    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2158}
2159
2160/// A trait to control the priority of I/O
2161pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2162    fn advance(&mut self, num_rows: u64);
2163    fn current_priority(&self) -> u64;
2164    fn box_clone(&self) -> Box<dyn PriorityRange>;
2165}
2166
2167/// A simple priority scheme for top-level fields with no parent
2168/// repetition
2169#[derive(Debug)]
2170pub struct SimplePriorityRange {
2171    priority: u64,
2172}
2173
2174impl SimplePriorityRange {
2175    fn new(priority: u64) -> Self {
2176        Self { priority }
2177    }
2178}
2179
2180impl PriorityRange for SimplePriorityRange {
2181    fn advance(&mut self, num_rows: u64) {
2182        self.priority += num_rows;
2183    }
2184
2185    fn current_priority(&self) -> u64 {
2186        self.priority
2187    }
2188
2189    fn box_clone(&self) -> Box<dyn PriorityRange> {
2190        Box::new(Self {
2191            priority: self.priority,
2192        })
2193    }
2194}
2195
2196/// Determining the priority of a list request is tricky.  We want
2197/// the priority to be the top-level row.  So if we have a
2198/// list<list<int>> and each outer list has 10 rows and each inner
2199/// list has 5 rows then the priority of the 100th item is 1 because
2200/// it is the 5th item in the 10th item of the *second* row.
2201///
2202/// This structure allows us to keep track of this complicated priority
2203/// relationship.
2204///
2205/// There's a fair amount of bookkeeping involved here.
2206///
2207/// A better approach (using repetition levels) is coming in the future.
2208pub struct ListPriorityRange {
2209    base: Box<dyn PriorityRange>,
2210    offsets: Arc<[u64]>,
2211    cur_index_into_offsets: usize,
2212    cur_position: u64,
2213}
2214
2215impl ListPriorityRange {
2216    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2217        Self {
2218            base,
2219            offsets,
2220            cur_index_into_offsets: 0,
2221            cur_position: 0,
2222        }
2223    }
2224}
2225
2226impl std::fmt::Debug for ListPriorityRange {
2227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2228        f.debug_struct("ListPriorityRange")
2229            .field("base", &self.base)
2230            .field("offsets.len()", &self.offsets.len())
2231            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2232            .field("cur_position", &self.cur_position)
2233            .finish()
2234    }
2235}
2236
2237impl PriorityRange for ListPriorityRange {
2238    fn advance(&mut self, num_rows: u64) {
2239        // We've scheduled X items.  Now walk through the offsets to
2240        // determine how many rows we've scheduled.
2241        self.cur_position += num_rows;
2242        let mut idx_into_offsets = self.cur_index_into_offsets;
2243        while idx_into_offsets + 1 < self.offsets.len()
2244            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2245        {
2246            idx_into_offsets += 1;
2247        }
2248        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2249        self.cur_index_into_offsets = idx_into_offsets;
2250        self.base.advance(base_rows_advanced as u64);
2251    }
2252
2253    fn current_priority(&self) -> u64 {
2254        self.base.current_priority()
2255    }
2256
2257    fn box_clone(&self) -> Box<dyn PriorityRange> {
2258        Box::new(Self {
2259            base: self.base.box_clone(),
2260            offsets: self.offsets.clone(),
2261            cur_index_into_offsets: self.cur_index_into_offsets,
2262            cur_position: self.cur_position,
2263        })
2264    }
2265}
2266
2267/// Contains the context for a scheduler
2268pub struct SchedulerContext {
2269    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2270    io: Arc<dyn EncodingsIo>,
2271    cache: Arc<LanceCache>,
2272    name: String,
2273    path: Vec<u32>,
2274    path_names: Vec<String>,
2275}
2276
2277pub struct ScopedSchedulerContext<'a> {
2278    pub context: &'a mut SchedulerContext,
2279}
2280
2281impl<'a> ScopedSchedulerContext<'a> {
2282    pub fn pop(self) -> &'a mut SchedulerContext {
2283        self.context.pop();
2284        self.context
2285    }
2286}
2287
2288impl SchedulerContext {
2289    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2290        Self {
2291            io,
2292            cache,
2293            recv: None,
2294            name: "".to_string(),
2295            path: Vec::new(),
2296            path_names: Vec::new(),
2297        }
2298    }
2299
2300    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2301        &self.io
2302    }
2303
2304    pub fn cache(&self) -> &Arc<LanceCache> {
2305        &self.cache
2306    }
2307
2308    pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2309        self.path.push(index);
2310        self.path_names.push(name.to_string());
2311        ScopedSchedulerContext { context: self }
2312    }
2313
2314    pub fn pop(&mut self) {
2315        self.path.pop();
2316        self.path_names.pop();
2317    }
2318
2319    pub fn path_name(&self) -> String {
2320        let path = self.path_names.join("/");
2321        if self.recv.is_some() {
2322            format!("TEMP({}){}", self.name, path)
2323        } else {
2324            format!("ROOT{}", path)
2325        }
2326    }
2327
2328    pub fn current_path(&self) -> VecDeque<u32> {
2329        VecDeque::from_iter(self.path.iter().copied())
2330    }
2331
2332    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2333    pub fn locate_decoder(
2334        &mut self,
2335        decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2336    ) -> crate::previous::decoder::DecoderReady {
2337        trace!(
2338            "Scheduling decoder of type {:?} for {:?}",
2339            decoder.data_type(),
2340            self.path,
2341        );
2342        crate::previous::decoder::DecoderReady {
2343            decoder,
2344            path: self.current_path(),
2345        }
2346    }
2347}
2348
2349pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2350
2351impl std::fmt::Debug for UnloadedPageShard {
2352    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2353        f.debug_struct("UnloadedPage").finish()
2354    }
2355}
2356
2357#[derive(Debug)]
2358pub struct ScheduledScanLine {
2359    pub rows_scheduled: u64,
2360    pub decoders: Vec<MessageType>,
2361}
2362
2363pub trait StructuralSchedulingJob: std::fmt::Debug {
2364    /// Schedule the next batch of data
2365    ///
2366    /// Normally this equates to scheduling the next page of data into one task.  Very large pages
2367    /// might be split into multiple scan lines.  Each scan line has one or more rows.
2368    ///
2369    /// If a scheduler ends early it may return an empty vector.
2370    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2371}
2372
2373/// A filter expression to apply to the data
2374///
2375/// The core decoders do not currently take advantage of filtering in
2376/// any way.  In order to maintain the abstraction we represent filters
2377/// as an arbitrary byte sequence.
2378///
2379/// We recommend that encodings use Substrait for filters.
2380pub struct FilterExpression(pub Bytes);
2381
2382impl FilterExpression {
2383    /// Create a filter expression that does not filter any data
2384    ///
2385    /// This is currently represented by an empty byte array.  Encoders
2386    /// that are "filter aware" should make sure they handle this case.
2387    pub fn no_filter() -> Self {
2388        Self(Bytes::new())
2389    }
2390
2391    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2392    pub fn is_noop(&self) -> bool {
2393        self.0.is_empty()
2394    }
2395}
2396
2397pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2398    fn initialize<'a>(
2399        &'a mut self,
2400        filter: &'a FilterExpression,
2401        context: &'a SchedulerContext,
2402    ) -> BoxFuture<'a, Result<()>>;
2403    fn schedule_ranges<'a>(
2404        &'a self,
2405        ranges: &[Range<u64>],
2406        filter: &FilterExpression,
2407    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2408}
2409
2410/// A trait for tasks that decode data into an Arrow array
2411pub trait DecodeArrayTask: Send {
2412    /// Decodes the data into an Arrow array
2413    fn decode(self: Box<Self>) -> Result<ArrayRef>;
2414}
2415
2416impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2417    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2418        StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2419    }
2420}
2421
2422/// A task to decode data into an Arrow record batch
2423///
2424/// It has a child `task` which decodes a struct array with no nulls.
2425/// This is then converted into a record batch.
2426pub struct NextDecodeTask {
2427    /// The decode task itself
2428    pub task: Box<dyn DecodeArrayTask>,
2429    /// The number of rows that will be created
2430    pub num_rows: u64,
2431}
2432
2433impl NextDecodeTask {
2434    // Run the task and produce a record batch
2435    //
2436    // If the batch is very large this function will log a warning message
2437    // suggesting the user try a smaller batch size.
2438    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2439    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2440        let struct_arr = self.task.decode();
2441        match struct_arr {
2442            Ok(struct_arr) => {
2443                let batch = RecordBatch::from(struct_arr.as_struct());
2444                let size_bytes = batch.get_array_memory_size() as u64;
2445                if size_bytes > BATCH_SIZE_BYTES_WARNING {
2446                    emitted_batch_size_warning.call_once(|| {
2447                        let size_mb = size_bytes / 1024 / 1024;
2448                        debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2449                    });
2450                }
2451                Ok(batch)
2452            }
2453            Err(e) => {
2454                let e = Error::Internal {
2455                    message: format!("Error decoding batch: {}", e),
2456                    location: location!(),
2457                };
2458                Err(e)
2459            }
2460        }
2461    }
2462}
2463
2464// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2465// share some code paths between the two.  Decoders can safely unwrap into whatever
2466// style they expect since a file will be either all-2.0 or all-2.1
2467#[derive(Debug)]
2468pub enum MessageType {
2469    // The older v2.0 scheduler/decoder used a scheme where the message was the
2470    // decoder itself.  The messages were not sent in priority order and the decoder
2471    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2472    // complexity.
2473    DecoderReady(crate::previous::decoder::DecoderReady),
2474    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2475    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2476    // the decoder does not have to worry about waiting for I/O.
2477    UnloadedPage(UnloadedPageShard),
2478}
2479
2480impl MessageType {
2481    pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2482        match self {
2483            Self::DecoderReady(decoder) => decoder,
2484            Self::UnloadedPage(_) => {
2485                panic!("Expected DecoderReady but got UnloadedPage")
2486            }
2487        }
2488    }
2489
2490    pub fn into_structural(self) -> UnloadedPageShard {
2491        match self {
2492            Self::UnloadedPage(unloaded) => unloaded,
2493            Self::DecoderReady(_) => {
2494                panic!("Expected UnloadedPage but got DecoderReady")
2495            }
2496        }
2497    }
2498}
2499
2500pub struct DecoderMessage {
2501    pub scheduled_so_far: u64,
2502    pub decoders: Vec<MessageType>,
2503}
2504
2505pub struct DecoderContext {
2506    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2507}
2508
2509impl DecoderContext {
2510    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2511        Self { source }
2512    }
2513}
2514
2515pub struct DecodedPage {
2516    pub data: DataBlock,
2517    pub repdef: RepDefUnraveler,
2518}
2519
2520pub trait DecodePageTask: Send + std::fmt::Debug {
2521    /// Decodes the data into an Arrow array
2522    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2523}
2524
2525pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2526    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2527    fn num_rows(&self) -> u64;
2528}
2529
2530#[derive(Debug)]
2531pub struct LoadedPageShard {
2532    // The decoder that is ready to be decoded
2533    pub decoder: Box<dyn StructuralPageDecoder>,
2534    // The path to the decoder, the first value is the column index
2535    // following values, if present, are nested child indices
2536    //
2537    // For example, a path of [1, 1, 0] would mean to grab the second
2538    // column, then the second child, and then the first child.
2539    //
2540    // It could represent x in the following schema:
2541    //
2542    // score: float64
2543    // points: struct
2544    //   color: string
2545    //   location: struct
2546    //     x: float64
2547    //
2548    // Currently, only struct decoders have "children" although other
2549    // decoders may at some point as well.  List children are only
2550    // handled through indirect I/O at the moment and so they don't
2551    // need to be represented (yet)
2552    pub path: VecDeque<u32>,
2553}
2554
2555pub struct DecodedArray {
2556    pub array: ArrayRef,
2557    pub repdef: CompositeRepDefUnraveler,
2558}
2559
2560pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2561    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2562}
2563
2564pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2565    /// Add a newly scheduled child decoder
2566    ///
2567    /// The default implementation does not expect children and returns
2568    /// an error.
2569    fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2570    /// Creates a task to decode `num_rows` of data into an array
2571    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2572    /// The data type of the decoded data
2573    fn data_type(&self) -> &DataType;
2574}
2575
2576#[derive(Debug, Default)]
2577pub struct DecoderPlugins {}
2578
2579/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2580pub async fn decode_batch(
2581    batch: &EncodedBatch,
2582    filter: &FilterExpression,
2583    decoder_plugins: Arc<DecoderPlugins>,
2584    should_validate: bool,
2585    version: LanceFileVersion,
2586    cache: Option<Arc<LanceCache>>,
2587) -> Result<RecordBatch> {
2588    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2589    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2590    // polled twice.
2591
2592    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2593    let cache = if let Some(cache) = cache {
2594        cache
2595    } else {
2596        Arc::new(lance_core::cache::LanceCache::with_capacity(
2597            128 * 1024 * 1024,
2598        ))
2599    };
2600    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2601        batch.schema.as_ref(),
2602        &batch.top_level_columns,
2603        &batch.page_table,
2604        &vec![],
2605        batch.num_rows,
2606        decoder_plugins,
2607        io_scheduler.clone(),
2608        cache,
2609        filter,
2610        &DecoderConfig::default(),
2611    )
2612    .await?;
2613    let (tx, rx) = unbounded_channel();
2614    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2615    let is_structural = version >= LanceFileVersion::V2_1;
2616    let mut decode_stream = create_decode_stream(
2617        &batch.schema,
2618        batch.num_rows,
2619        batch.num_rows as u32,
2620        is_structural,
2621        should_validate,
2622        rx,
2623    );
2624    decode_stream.next().await.unwrap().task.await
2625}
2626
2627#[cfg(test)]
2628// test coalesce indices to ranges
2629mod tests {
2630    use super::*;
2631
2632    #[test]
2633    fn test_coalesce_indices_to_ranges_with_single_index() {
2634        let indices = vec![1];
2635        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2636        assert_eq!(ranges, vec![1..2]);
2637    }
2638
2639    #[test]
2640    fn test_coalesce_indices_to_ranges() {
2641        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2642        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2643        assert_eq!(ranges, vec![1..10]);
2644    }
2645
2646    #[test]
2647    fn test_coalesce_indices_to_ranges_with_gaps() {
2648        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2649        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2650        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2651    }
2652}