lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into [`self::FieldScheduler`] and [`self::PageScheduler`].
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into [`self::PhysicalPageDecoder`] and
27//! [`self::LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.git add --all
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type List<UInt32>
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::Once;
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::{CapacityMode, FileMetadataCache};
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::buffer::LanceBuffer;
238use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
239use crate::encoder::{values_column_encoding, EncodedBatch};
240use crate::encodings::logical::binary::BinaryFieldScheduler;
241use crate::encodings::logical::blob::BlobFieldScheduler;
242use crate::encodings::logical::list::{
243    ListFieldScheduler, OffsetPageInfo, StructuralListScheduler,
244};
245use crate::encodings::logical::primitive::{
246    PrimitiveFieldScheduler, StructuralPrimitiveFieldScheduler,
247};
248use crate::encodings::logical::r#struct::{
249    SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
250};
251use crate::encodings::physical::binary::{
252    BinaryBlockDecompressor, BinaryMiniBlockDecompressor, VariableDecoder,
253};
254use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
255use crate::encodings::physical::fsst::{FsstMiniBlockDecompressor, FsstPerValueDecompressor};
256use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
257use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
258use crate::encodings::physical::{ColumnBuffers, FileBuffers};
259use crate::format::pb::{self, column_encoding};
260use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
261use crate::version::LanceFileVersion;
262use crate::{BufferScheduler, EncodingsIo};
263
264// If users are getting batches over 10MiB large then it's time to reduce the batch size
265const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
266
267/// Top-level encoding message for a page.  Wraps both the
268/// legacy pb::ArrayEncoding and the newer pb::PageLayout
269///
270/// A file should only use one or the other and never both.
271/// 2.0 decoders can always assume this is pb::ArrayEncoding
272/// and 2.1+ decoders can always assume this is pb::PageLayout
273#[derive(Debug)]
274pub enum PageEncoding {
275    Legacy(pb::ArrayEncoding),
276    Structural(pb::PageLayout),
277}
278
279impl PageEncoding {
280    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
281        match self {
282            Self::Legacy(enc) => enc,
283            Self::Structural(_) => panic!("Expected a legacy encoding"),
284        }
285    }
286
287    pub fn as_structural(&self) -> &pb::PageLayout {
288        match self {
289            Self::Structural(enc) => enc,
290            Self::Legacy(_) => panic!("Expected a structural encoding"),
291        }
292    }
293
294    pub fn is_structural(&self) -> bool {
295        matches!(self, Self::Structural(_))
296    }
297}
298
299/// Metadata describing a page in a file
300///
301/// This is typically created by reading the metadata section of a Lance file
302#[derive(Debug)]
303pub struct PageInfo {
304    /// The number of rows in the page
305    pub num_rows: u64,
306    /// The priority (top level row number) of the page
307    ///
308    /// This is only set in 2.1 files and will be 0 for 2.0 files
309    pub priority: u64,
310    /// The encoding that explains the buffers in the page
311    pub encoding: PageEncoding,
312    /// The offsets and sizes of the buffers in the file
313    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
314}
315
316/// Metadata describing a column in a file
317///
318/// This is typically created by reading the metadata section of a Lance file
319#[derive(Debug, Clone)]
320pub struct ColumnInfo {
321    /// The index of the column in the file
322    pub index: u32,
323    /// The metadata for each page in the column
324    pub page_infos: Arc<[PageInfo]>,
325    /// File positions and their sizes of the column-level buffers
326    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
327    pub encoding: pb::ColumnEncoding,
328}
329
330impl ColumnInfo {
331    /// Create a new instance
332    pub fn new(
333        index: u32,
334        page_infos: Arc<[PageInfo]>,
335        buffer_offsets_and_sizes: Vec<(u64, u64)>,
336        encoding: pb::ColumnEncoding,
337    ) -> Self {
338        Self {
339            index,
340            page_infos,
341            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
342            encoding,
343        }
344    }
345
346    pub fn is_structural(&self) -> bool {
347        self.page_infos
348            // Can just look at the first since all should be the same
349            .first()
350            .map(|page| page.encoding.is_structural())
351            .unwrap_or(false)
352    }
353}
354
355enum RootScheduler {
356    Structural(Box<dyn StructuralFieldScheduler>),
357    Legacy(Arc<dyn FieldScheduler>),
358}
359
360impl RootScheduler {
361    fn as_legacy(&self) -> &Arc<dyn FieldScheduler> {
362        match self {
363            Self::Structural(_) => panic!("Expected a legacy scheduler"),
364            Self::Legacy(s) => s,
365        }
366    }
367
368    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
369        match self {
370            Self::Structural(s) => s.as_ref(),
371            Self::Legacy(_) => panic!("Expected a structural scheduler"),
372        }
373    }
374}
375
376/// The scheduler for decoding batches
377///
378/// Lance decoding is done in two steps, scheduling, and decoding.  The
379/// scheduling tends to be lightweight and should quickly figure what data
380/// is needed from the disk issue the appropriate I/O requests.  A decode task is
381/// created to eventually decode the data (once it is loaded) and scheduling
382/// moves on to scheduling the next page.
383///
384/// Meanwhile, it's expected that a decode stream will be setup to run at the
385/// same time.  Decode tasks take the data that is loaded and turn it into
386/// Arrow arrays.
387///
388/// This approach allows us to keep our I/O parallelism and CPU parallelism
389/// completely separate since those are often two very different values.
390///
391/// Backpressure should be achieved via the I/O service.  Requests that are
392/// issued will pile up if the decode stream is not polling quickly enough.
393/// The [`crate::EncodingsIo::submit_request`] function should return a pending
394/// future once there are too many I/O requests in flight.
395///
396/// TODO: Implement backpressure
397pub struct DecodeBatchScheduler {
398    root_scheduler: RootScheduler,
399    pub root_fields: Fields,
400    cache: Arc<FileMetadataCache>,
401}
402
403pub struct ColumnInfoIter<'a> {
404    column_infos: Vec<Arc<ColumnInfo>>,
405    column_indices: &'a [u32],
406    column_info_pos: usize,
407    column_indices_pos: usize,
408}
409
410impl<'a> ColumnInfoIter<'a> {
411    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
412        let initial_pos = column_indices[0] as usize;
413        Self {
414            column_infos,
415            column_indices,
416            column_info_pos: initial_pos,
417            column_indices_pos: 0,
418        }
419    }
420
421    pub fn peek(&self) -> &Arc<ColumnInfo> {
422        &self.column_infos[self.column_info_pos]
423    }
424
425    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
426        let column_info = self.column_infos[self.column_info_pos].clone();
427        let transformed = transform(column_info);
428        self.column_infos[self.column_info_pos] = transformed;
429    }
430
431    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
432        self.next().ok_or_else(|| {
433            Error::invalid_input(
434                "there were more fields in the schema than provided column indices / infos",
435                location!(),
436            )
437        })
438    }
439
440    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
441        if self.column_info_pos < self.column_infos.len() {
442            let info = &self.column_infos[self.column_info_pos];
443            self.column_info_pos += 1;
444            Some(info)
445        } else {
446            None
447        }
448    }
449
450    pub(crate) fn next_top_level(&mut self) {
451        self.column_indices_pos += 1;
452        if self.column_indices_pos < self.column_indices.len() {
453            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
454        } else {
455            self.column_info_pos = self.column_infos.len();
456        }
457    }
458}
459
460pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
461    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
462}
463
464pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
465    /// Decompress one or more values
466    fn decompress(&self, data: FixedWidthDataBlock) -> Result<DataBlock>;
467    /// The number of bits in each value
468    ///
469    /// Currently (and probably long term) this must be a multiple of 8
470    fn bits_per_value(&self) -> u64;
471}
472
473pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
474    /// Decompress one or more values
475    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
476}
477
478pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
479    fn decompress(&self, data: LanceBuffer) -> Result<DataBlock>;
480}
481
482pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
483    fn create_miniblock_decompressor(
484        &self,
485        description: &pb::ArrayEncoding,
486    ) -> Result<Box<dyn MiniBlockDecompressor>>;
487
488    fn create_fixed_per_value_decompressor(
489        &self,
490        description: &pb::ArrayEncoding,
491    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
492
493    fn create_variable_per_value_decompressor(
494        &self,
495        description: &pb::ArrayEncoding,
496    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
497
498    fn create_block_decompressor(
499        &self,
500        description: &pb::ArrayEncoding,
501    ) -> Result<Box<dyn BlockDecompressor>>;
502}
503
504#[derive(Debug, Default)]
505pub struct CoreDecompressorStrategy {}
506
507impl DecompressorStrategy for CoreDecompressorStrategy {
508    fn create_miniblock_decompressor(
509        &self,
510        description: &pb::ArrayEncoding,
511    ) -> Result<Box<dyn MiniBlockDecompressor>> {
512        match description.array_encoding.as_ref().unwrap() {
513            pb::array_encoding::ArrayEncoding::Flat(flat) => {
514                Ok(Box::new(ValueDecompressor::new(flat)))
515            }
516            pb::array_encoding::ArrayEncoding::Bitpack2(description) => {
517                Ok(Box::new(BitpackMiniBlockDecompressor::new(description)))
518            }
519            pb::array_encoding::ArrayEncoding::Variable(_) => {
520                Ok(Box::new(BinaryMiniBlockDecompressor::default()))
521            }
522            pb::array_encoding::ArrayEncoding::Fsst(description) => {
523                Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
524            }
525            pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
526                Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
527                    description,
528                )))
529            }
530            _ => todo!(),
531        }
532    }
533
534    fn create_fixed_per_value_decompressor(
535        &self,
536        description: &pb::ArrayEncoding,
537    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
538        match description.array_encoding.as_ref().unwrap() {
539            pb::array_encoding::ArrayEncoding::Flat(flat) => {
540                Ok(Box::new(ValueDecompressor::new(flat)))
541            }
542            _ => todo!("fixed-per-value decompressor for {:?}", description),
543        }
544    }
545
546    fn create_variable_per_value_decompressor(
547        &self,
548        description: &pb::ArrayEncoding,
549    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
550        match *description.array_encoding.as_ref().unwrap() {
551            pb::array_encoding::ArrayEncoding::Variable(variable) => {
552                assert!(variable.bits_per_offset < u8::MAX as u32);
553                Ok(Box::new(VariableDecoder::default()))
554            }
555            pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
556                Ok(Box::new(FsstPerValueDecompressor::new(
557                    LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
558                    Box::new(VariableDecoder::default()),
559                )))
560            }
561            _ => todo!("variable-per-value decompressor for {:?}", description),
562        }
563    }
564
565    fn create_block_decompressor(
566        &self,
567        description: &pb::ArrayEncoding,
568    ) -> Result<Box<dyn BlockDecompressor>> {
569        match description.array_encoding.as_ref().unwrap() {
570            pb::array_encoding::ArrayEncoding::Flat(flat) => {
571                Ok(Box::new(ValueDecompressor::new(flat)))
572            }
573            pb::array_encoding::ArrayEncoding::Constant(constant) => {
574                let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
575                Ok(Box::new(ConstantDecompressor::new(
576                    scalar,
577                    constant.num_values,
578                )))
579            }
580            pb::array_encoding::ArrayEncoding::Variable(_) => {
581                Ok(Box::new(BinaryBlockDecompressor::default()))
582            }
583            _ => todo!(),
584        }
585    }
586}
587
588/// The core decoder strategy handles all the various Arrow types
589#[derive(Debug)]
590pub struct CoreFieldDecoderStrategy {
591    pub validate_data: bool,
592    pub decompressor_strategy: Arc<dyn DecompressorStrategy>,
593}
594
595impl Default for CoreFieldDecoderStrategy {
596    fn default() -> Self {
597        Self {
598            validate_data: false,
599            decompressor_strategy: Arc::new(CoreDecompressorStrategy {}),
600        }
601    }
602}
603
604impl CoreFieldDecoderStrategy {
605    /// This is just a sanity check to ensure there is no "wrapped encodings"
606    /// that haven't been handled.
607    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
608        let column_encoding = column_info
609            .encoding
610            .column_encoding
611            .as_ref()
612            .ok_or_else(|| {
613                Error::invalid_input(
614                    format!(
615                        "the column at index {} was missing a ColumnEncoding",
616                        column_info.index
617                    ),
618                    location!(),
619                )
620            })?;
621        if matches!(
622            column_encoding,
623            pb::column_encoding::ColumnEncoding::Values(_)
624        ) {
625            Ok(())
626        } else {
627            Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
628        }
629    }
630
631    fn is_primitive(data_type: &DataType) -> bool {
632        if data_type.is_primitive() {
633            true
634        } else {
635            match data_type {
636                // DataType::is_primitive doesn't consider these primitive but we do
637                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
638                DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
639                _ => false,
640            }
641        }
642    }
643
644    fn create_primitive_scheduler(
645        &self,
646        field: &Field,
647        column: &ColumnInfo,
648        buffers: FileBuffers,
649    ) -> Result<Box<dyn FieldScheduler>> {
650        Self::ensure_values_encoded(column, &field.name)?;
651        // Primitive fields map to a single column
652        let column_buffers = ColumnBuffers {
653            file_buffers: buffers,
654            positions_and_sizes: &column.buffer_offsets_and_sizes,
655        };
656        Ok(Box::new(PrimitiveFieldScheduler::new(
657            column.index,
658            field.data_type(),
659            column.page_infos.clone(),
660            column_buffers,
661            self.validate_data,
662        )))
663    }
664
665    /// Helper method to verify the page encoding of a struct header column
666    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
667        Self::ensure_values_encoded(column_info, field_name)?;
668        if column_info.page_infos.len() != 1 {
669            return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
670        }
671        let encoding = &column_info.page_infos[0].encoding;
672        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
673            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
674            _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
675        }
676    }
677
678    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
679        let encoding = &column_info.page_infos[0].encoding;
680        matches!(
681            encoding.as_legacy().array_encoding.as_ref().unwrap(),
682            pb::array_encoding::ArrayEncoding::PackedStruct(_)
683        )
684    }
685
686    fn create_list_scheduler(
687        &self,
688        list_field: &Field,
689        column_infos: &mut ColumnInfoIter,
690        buffers: FileBuffers,
691        offsets_column: &ColumnInfo,
692    ) -> Result<Box<dyn FieldScheduler>> {
693        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
694        let offsets_column_buffers = ColumnBuffers {
695            file_buffers: buffers,
696            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
697        };
698        let items_scheduler =
699            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
700
701        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
702            .page_infos
703            .iter()
704            .filter(|offsets_page| offsets_page.num_rows > 0)
705            .map(|offsets_page| {
706                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
707                    &offsets_page.encoding.as_legacy().array_encoding
708                {
709                    let inner = PageInfo {
710                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
711                        encoding: PageEncoding::Legacy(
712                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
713                        ),
714                        num_rows: offsets_page.num_rows,
715                        priority: 0,
716                    };
717                    (
718                        inner,
719                        OffsetPageInfo {
720                            offsets_in_page: offsets_page.num_rows,
721                            null_offset_adjustment: list_encoding.null_offset_adjustment,
722                            num_items_referenced_by_page: list_encoding.num_items,
723                        },
724                    )
725                } else {
726                    // TODO: Should probably return Err here
727                    panic!("Expected a list column");
728                }
729            })
730            .unzip();
731        let inner = Arc::new(PrimitiveFieldScheduler::new(
732            offsets_column.index,
733            DataType::UInt64,
734            Arc::from(inner_infos.into_boxed_slice()),
735            offsets_column_buffers,
736            self.validate_data,
737        )) as Arc<dyn FieldScheduler>;
738        let items_field = match list_field.data_type() {
739            DataType::List(inner) => inner,
740            DataType::LargeList(inner) => inner,
741            _ => unreachable!(),
742        };
743        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
744            DataType::Int32
745        } else {
746            DataType::Int64
747        };
748        Ok(Box::new(ListFieldScheduler::new(
749            inner,
750            items_scheduler.into(),
751            items_field,
752            offset_type,
753            null_offset_adjustments,
754        )))
755    }
756
757    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
758        if let column_encoding::ColumnEncoding::Blob(blob) =
759            column_info.encoding.column_encoding.as_ref().unwrap()
760        {
761            let mut column_info = column_info.clone();
762            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
763            Some(column_info)
764        } else {
765            None
766        }
767    }
768
769    fn items_per_row(data_type: &DataType) -> u64 {
770        match data_type {
771            DataType::FixedSizeList(inner, dimension) => {
772                Self::items_per_row(inner.data_type()) * *dimension as u64
773            }
774            _ => 1,
775        }
776    }
777
778    fn create_structural_field_scheduler(
779        &self,
780        field: &Field,
781        column_infos: &mut ColumnInfoIter,
782    ) -> Result<Box<dyn StructuralFieldScheduler>> {
783        let data_type = field.data_type();
784        if Self::is_primitive(&data_type) {
785            let column_info = column_infos.expect_next()?;
786            let items_per_row = Self::items_per_row(&data_type);
787            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
788                column_info.as_ref(),
789                items_per_row,
790                self.decompressor_strategy.as_ref(),
791            )?);
792
793            // advance to the next top level column
794            column_infos.next_top_level();
795
796            return Ok(scheduler);
797        }
798        match &data_type {
799            DataType::Struct(fields) => {
800                if field.is_packed_struct() {
801                    let column_info = column_infos.expect_next()?;
802                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
803                        column_info.as_ref(),
804                        1, // items_per_row is always 1, any FSL will get transposed into 1 row
805                        self.decompressor_strategy.as_ref(),
806                    )?);
807
808                    // advance to the next top level column
809                    column_infos.next_top_level();
810
811                    return Ok(scheduler);
812                }
813                let mut child_schedulers = Vec::with_capacity(field.children.len());
814                for field in field.children.iter() {
815                    let field_scheduler =
816                        self.create_structural_field_scheduler(field, column_infos)?;
817                    child_schedulers.push(field_scheduler);
818                }
819
820                let fields = fields.clone();
821                Ok(
822                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
823                        as Box<dyn StructuralFieldScheduler>,
824                )
825            }
826            DataType::Binary | DataType::Utf8 => {
827                let column_info = column_infos.expect_next()?;
828                let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
829                    column_info.as_ref(),
830                    /*items_per_row=*/ 1,
831                    self.decompressor_strategy.as_ref(),
832                )?);
833                column_infos.next_top_level();
834                Ok(scheduler)
835            }
836            DataType::List(_) | DataType::LargeList(_) => {
837                let child = field
838                    .children
839                    .first()
840                    .expect("List field must have a child");
841                let child_scheduler =
842                    self.create_structural_field_scheduler(child, column_infos)?;
843                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
844                    as Box<dyn StructuralFieldScheduler>)
845            }
846            _ => todo!(),
847        }
848    }
849
850    fn create_legacy_field_scheduler(
851        &self,
852        field: &Field,
853        column_infos: &mut ColumnInfoIter,
854        buffers: FileBuffers,
855    ) -> Result<Box<dyn FieldScheduler>> {
856        let data_type = field.data_type();
857        if Self::is_primitive(&data_type) {
858            let column_info = column_infos.expect_next()?;
859            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
860            return Ok(scheduler);
861        } else if data_type.is_binary_like() {
862            let column_info = column_infos.next().unwrap().clone();
863            // Column is blob and user is asking for binary data
864            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
865                let desc_scheduler =
866                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
867                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
868                return Ok(blob_scheduler);
869            }
870            if let Some(page_info) = column_info.page_infos.first() {
871                if matches!(
872                    page_info.encoding.as_legacy(),
873                    pb::ArrayEncoding {
874                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
875                    }
876                ) {
877                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
878                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
879                    } else {
880                        DataType::LargeList(Arc::new(ArrowField::new(
881                            "item",
882                            DataType::UInt8,
883                            false,
884                        )))
885                    };
886                    let list_field = Field::try_from(ArrowField::new(
887                        field.name.clone(),
888                        list_type,
889                        field.nullable,
890                    ))
891                    .unwrap();
892                    let list_scheduler = self.create_list_scheduler(
893                        &list_field,
894                        column_infos,
895                        buffers,
896                        &column_info,
897                    )?;
898                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
899                        list_scheduler.into(),
900                        field.data_type(),
901                    ));
902                    return Ok(binary_scheduler);
903                } else {
904                    let scheduler =
905                        self.create_primitive_scheduler(field, &column_info, buffers)?;
906                    return Ok(scheduler);
907                }
908            } else {
909                return self.create_primitive_scheduler(field, &column_info, buffers);
910            }
911        }
912        match &data_type {
913            DataType::FixedSizeList(inner, _dimension) => {
914                // A fixed size list column could either be a physical or a logical decoder
915                // depending on the child data type.
916                if Self::is_primitive(inner.data_type()) {
917                    let primitive_col = column_infos.expect_next()?;
918                    let scheduler =
919                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
920                    Ok(scheduler)
921                } else {
922                    todo!()
923                }
924            }
925            DataType::Dictionary(_key_type, value_type) => {
926                if Self::is_primitive(value_type) || value_type.is_binary_like() {
927                    let primitive_col = column_infos.expect_next()?;
928                    let scheduler =
929                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
930                    Ok(scheduler)
931                } else {
932                    Err(Error::NotSupported {
933                        source: format!(
934                            "No way to decode into a dictionary field of type {}",
935                            value_type
936                        )
937                        .into(),
938                        location: location!(),
939                    })
940                }
941            }
942            DataType::List(_) | DataType::LargeList(_) => {
943                let offsets_column = column_infos.expect_next()?.clone();
944                column_infos.next_top_level();
945                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
946            }
947            DataType::Struct(fields) => {
948                let column_info = column_infos.expect_next()?;
949
950                // Column is blob and user is asking for descriptions
951                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
952                    // Can use primitive scheduler here since descriptions are always packed struct
953                    return self.create_primitive_scheduler(field, &blob_col, buffers);
954                }
955
956                if Self::check_packed_struct(column_info) {
957                    // use packed struct encoding
958                    self.create_primitive_scheduler(field, column_info, buffers)
959                } else {
960                    // use default struct encoding
961                    Self::check_simple_struct(column_info, &field.name).unwrap();
962                    let num_rows = column_info
963                        .page_infos
964                        .iter()
965                        .map(|page| page.num_rows)
966                        .sum();
967                    let mut child_schedulers = Vec::with_capacity(field.children.len());
968                    for field in &field.children {
969                        column_infos.next_top_level();
970                        let field_scheduler =
971                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
972                        child_schedulers.push(Arc::from(field_scheduler));
973                    }
974
975                    let fields = fields.clone();
976                    Ok(Box::new(SimpleStructScheduler::new(
977                        child_schedulers,
978                        fields,
979                        num_rows,
980                    )))
981                }
982            }
983            // TODO: Still need support for RLE
984            _ => todo!(),
985        }
986    }
987}
988
989/// Create's a dummy ColumnInfo for the root column
990fn root_column(num_rows: u64) -> ColumnInfo {
991    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
992    let final_page_num_rows = num_rows % (u32::MAX as u64);
993    let root_pages = (0..num_root_pages)
994        .map(|i| PageInfo {
995            num_rows: if i == num_root_pages - 1 {
996                final_page_num_rows
997            } else {
998                u64::MAX
999            },
1000            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
1001                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
1002                    pb::SimpleStruct {},
1003                )),
1004            }),
1005            priority: 0, // not used in legacy scheduler
1006            buffer_offsets_and_sizes: Arc::new([]),
1007        })
1008        .collect::<Vec<_>>();
1009    ColumnInfo {
1010        buffer_offsets_and_sizes: Arc::new([]),
1011        encoding: values_column_encoding(),
1012        index: u32::MAX,
1013        page_infos: Arc::from(root_pages),
1014    }
1015}
1016
1017pub enum RootDecoder {
1018    Structural(StructuralStructDecoder),
1019    Legacy(SimpleStructDecoder),
1020}
1021
1022impl RootDecoder {
1023    pub fn into_structural(self) -> StructuralStructDecoder {
1024        match self {
1025            Self::Structural(decoder) => decoder,
1026            Self::Legacy(_) => panic!("Expected a structural decoder"),
1027        }
1028    }
1029
1030    pub fn into_legacy(self) -> SimpleStructDecoder {
1031        match self {
1032            Self::Legacy(decoder) => decoder,
1033            Self::Structural(_) => panic!("Expected a legacy decoder"),
1034        }
1035    }
1036}
1037
1038impl DecodeBatchScheduler {
1039    /// Creates a new decode scheduler with the expected schema and the column
1040    /// metadata of the file.
1041    #[allow(clippy::too_many_arguments)]
1042    pub async fn try_new<'a>(
1043        schema: &'a Schema,
1044        column_indices: &[u32],
1045        column_infos: &[Arc<ColumnInfo>],
1046        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1047        num_rows: u64,
1048        _decoder_plugins: Arc<DecoderPlugins>,
1049        io: Arc<dyn EncodingsIo>,
1050        cache: Arc<FileMetadataCache>,
1051        filter: &FilterExpression,
1052    ) -> Result<Self> {
1053        assert!(num_rows > 0);
1054        let buffers = FileBuffers {
1055            positions_and_sizes: file_buffer_positions_and_sizes,
1056        };
1057        let arrow_schema = ArrowSchema::from(schema);
1058        let root_fields = arrow_schema.fields().clone();
1059        let root_type = DataType::Struct(root_fields.clone());
1060        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1061        // root_field.children and schema.fields should be identical at this point but the latter
1062        // has field ids and the former does not.  This line restores that.
1063        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
1064        root_field.children.clone_from(&schema.fields);
1065        root_field
1066            .metadata
1067            .insert("__lance_decoder_root".to_string(), "true".to_string());
1068
1069        if column_infos[0].is_structural() {
1070            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1071
1072            let mut root_scheduler = CoreFieldDecoderStrategy::default()
1073                .create_structural_field_scheduler(&root_field, &mut column_iter)?;
1074
1075            let context = SchedulerContext::new(io, cache.clone());
1076            root_scheduler.initialize(filter, &context).await?;
1077
1078            Ok(Self {
1079                root_scheduler: RootScheduler::Structural(root_scheduler),
1080                root_fields,
1081                cache,
1082            })
1083        } else {
1084            // The old encoding style expected a header column for structs and so we
1085            // need a header column for the top-level struct
1086            let mut columns = Vec::with_capacity(column_infos.len() + 1);
1087            columns.push(Arc::new(root_column(num_rows)));
1088            columns.extend(column_infos.iter().cloned());
1089
1090            let adjusted_column_indices = [0_u32]
1091                .into_iter()
1092                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1093                .collect::<Vec<_>>();
1094            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1095            let root_scheduler = CoreFieldDecoderStrategy::default()
1096                .create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1097
1098            let context = SchedulerContext::new(io, cache.clone());
1099            root_scheduler.initialize(filter, &context).await?;
1100
1101            Ok(Self {
1102                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1103                root_fields,
1104                cache,
1105            })
1106        }
1107    }
1108
1109    pub fn from_scheduler(
1110        root_scheduler: Arc<dyn FieldScheduler>,
1111        root_fields: Fields,
1112        cache: Arc<FileMetadataCache>,
1113    ) -> Self {
1114        Self {
1115            root_scheduler: RootScheduler::Legacy(root_scheduler),
1116            root_fields,
1117            cache,
1118        }
1119    }
1120
1121    fn do_schedule_ranges_structural(
1122        &mut self,
1123        ranges: &[Range<u64>],
1124        filter: &FilterExpression,
1125        io: Arc<dyn EncodingsIo>,
1126        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1127    ) {
1128        let root_scheduler = self.root_scheduler.as_structural();
1129        let mut context = SchedulerContext::new(io, self.cache.clone());
1130        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1131        if let Err(schedule_ranges_err) = maybe_root_job {
1132            schedule_action(Err(schedule_ranges_err));
1133            return;
1134        }
1135        let mut root_job = maybe_root_job.unwrap();
1136        let mut num_rows_scheduled = 0;
1137        loop {
1138            let maybe_next_scan_line = root_job.schedule_next(&mut context);
1139            if let Err(err) = maybe_next_scan_line {
1140                schedule_action(Err(err));
1141                return;
1142            }
1143            let next_scan_line = maybe_next_scan_line.unwrap();
1144            match next_scan_line {
1145                Some(next_scan_line) => {
1146                    trace!(
1147                        "Scheduled scan line of {} rows and {} decoders",
1148                        next_scan_line.rows_scheduled,
1149                        next_scan_line.decoders.len()
1150                    );
1151                    num_rows_scheduled += next_scan_line.rows_scheduled;
1152                    if !schedule_action(Ok(DecoderMessage {
1153                        scheduled_so_far: num_rows_scheduled,
1154                        decoders: next_scan_line.decoders,
1155                    })) {
1156                        // Decoder has disconnected
1157                        return;
1158                    }
1159                }
1160                None => return,
1161            }
1162        }
1163    }
1164
1165    fn do_schedule_ranges_legacy(
1166        &mut self,
1167        ranges: &[Range<u64>],
1168        filter: &FilterExpression,
1169        io: Arc<dyn EncodingsIo>,
1170        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1171        // If specified, this will be used as the top_level_row for all scheduling
1172        // tasks.  This is used by list scheduling to ensure all items scheduling
1173        // tasks are scheduled at the same top level row.
1174        priority: Option<Box<dyn PriorityRange>>,
1175    ) {
1176        let root_scheduler = self.root_scheduler.as_legacy();
1177        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1178        trace!(
1179            "Scheduling {} ranges across {}..{} ({} rows){}",
1180            ranges.len(),
1181            ranges.first().unwrap().start,
1182            ranges.last().unwrap().end,
1183            rows_requested,
1184            priority
1185                .as_ref()
1186                .map(|p| format!(" (priority={:?})", p))
1187                .unwrap_or_default()
1188        );
1189
1190        let mut context = SchedulerContext::new(io, self.cache.clone());
1191        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1192        if let Err(schedule_ranges_err) = maybe_root_job {
1193            schedule_action(Err(schedule_ranges_err));
1194            return;
1195        }
1196        let mut root_job = maybe_root_job.unwrap();
1197        let mut num_rows_scheduled = 0;
1198        let mut rows_to_schedule = root_job.num_rows();
1199        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1200        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1201        while rows_to_schedule > 0 {
1202            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1203            if let Err(schedule_next_err) = maybe_next_scan_line {
1204                schedule_action(Err(schedule_next_err));
1205                return;
1206            }
1207            let next_scan_line = maybe_next_scan_line.unwrap();
1208            priority.advance(next_scan_line.rows_scheduled);
1209            num_rows_scheduled += next_scan_line.rows_scheduled;
1210            rows_to_schedule -= next_scan_line.rows_scheduled;
1211            trace!(
1212                "Scheduled scan line of {} rows and {} decoders",
1213                next_scan_line.rows_scheduled,
1214                next_scan_line.decoders.len()
1215            );
1216            if !schedule_action(Ok(DecoderMessage {
1217                scheduled_so_far: num_rows_scheduled,
1218                decoders: next_scan_line.decoders,
1219            })) {
1220                // Decoder has disconnected
1221                return;
1222            }
1223
1224            trace!("Finished scheduling {} ranges", ranges.len());
1225        }
1226    }
1227
1228    fn do_schedule_ranges(
1229        &mut self,
1230        ranges: &[Range<u64>],
1231        filter: &FilterExpression,
1232        io: Arc<dyn EncodingsIo>,
1233        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1234        // If specified, this will be used as the top_level_row for all scheduling
1235        // tasks.  This is used by list scheduling to ensure all items scheduling
1236        // tasks are scheduled at the same top level row.
1237        priority: Option<Box<dyn PriorityRange>>,
1238    ) {
1239        match &self.root_scheduler {
1240            RootScheduler::Legacy(_) => {
1241                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1242            }
1243            RootScheduler::Structural(_) => {
1244                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1245            }
1246        }
1247    }
1248
1249    // This method is similar to schedule_ranges but instead of
1250    // sending the decoders to a channel it collects them all into a vector
1251    pub fn schedule_ranges_to_vec(
1252        &mut self,
1253        ranges: &[Range<u64>],
1254        filter: &FilterExpression,
1255        io: Arc<dyn EncodingsIo>,
1256        priority: Option<Box<dyn PriorityRange>>,
1257    ) -> Result<Vec<DecoderMessage>> {
1258        let mut decode_messages = Vec::new();
1259        self.do_schedule_ranges(
1260            ranges,
1261            filter,
1262            io,
1263            |msg| {
1264                decode_messages.push(msg);
1265                true
1266            },
1267            priority,
1268        );
1269        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1270    }
1271
1272    /// Schedules the load of multiple ranges of rows
1273    ///
1274    /// Ranges must be non-overlapping and in sorted order
1275    ///
1276    /// # Arguments
1277    ///
1278    /// * `ranges` - The ranges of rows to load
1279    /// * `sink` - A channel to send the decode tasks
1280    /// * `scheduler` An I/O scheduler to issue I/O requests
1281    #[instrument(skip_all)]
1282    pub fn schedule_ranges(
1283        &mut self,
1284        ranges: &[Range<u64>],
1285        filter: &FilterExpression,
1286        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1287        scheduler: Arc<dyn EncodingsIo>,
1288    ) {
1289        self.do_schedule_ranges(
1290            ranges,
1291            filter,
1292            scheduler,
1293            |msg| {
1294                match sink.send(msg) {
1295                    Ok(_) => true,
1296                    Err(SendError { .. }) => {
1297                        // The receiver has gone away.  We can't do anything about it
1298                        // so just ignore the error.
1299                        debug!(
1300                        "schedule_ranges aborting early since decoder appears to have been dropped"
1301                    );
1302                        false
1303                    }
1304                }
1305            },
1306            None,
1307        )
1308    }
1309
1310    /// Schedules the load of a range of rows
1311    ///
1312    /// # Arguments
1313    ///
1314    /// * `range` - The range of rows to load
1315    /// * `sink` - A channel to send the decode tasks
1316    /// * `scheduler` An I/O scheduler to issue I/O requests
1317    #[instrument(skip_all)]
1318    pub fn schedule_range(
1319        &mut self,
1320        range: Range<u64>,
1321        filter: &FilterExpression,
1322        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1323        scheduler: Arc<dyn EncodingsIo>,
1324    ) {
1325        self.schedule_ranges(&[range], filter, sink, scheduler)
1326    }
1327
1328    /// Schedules the load of selected rows
1329    ///
1330    /// # Arguments
1331    ///
1332    /// * `indices` - The row indices to load (these must be in ascending order!)
1333    /// * `sink` - A channel to send the decode tasks
1334    /// * `scheduler` An I/O scheduler to issue I/O requests
1335    pub fn schedule_take(
1336        &mut self,
1337        indices: &[u64],
1338        filter: &FilterExpression,
1339        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1340        scheduler: Arc<dyn EncodingsIo>,
1341    ) {
1342        debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1343        if indices.is_empty() {
1344            return;
1345        }
1346        trace!("Scheduling take of {} rows", indices.len());
1347        let ranges = Self::indices_to_ranges(indices);
1348        self.schedule_ranges(&ranges, filter, sink, scheduler)
1349    }
1350
1351    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1352    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1353        let mut ranges = Vec::new();
1354        let mut start = indices[0];
1355
1356        for window in indices.windows(2) {
1357            if window[1] != window[0] + 1 {
1358                ranges.push(start..window[0] + 1);
1359                start = window[1];
1360            }
1361        }
1362
1363        ranges.push(start..*indices.last().unwrap() + 1);
1364        ranges
1365    }
1366}
1367
1368pub struct ReadBatchTask {
1369    pub task: BoxFuture<'static, Result<RecordBatch>>,
1370    pub num_rows: u32,
1371}
1372
1373/// A stream that takes scheduled jobs and generates decode tasks from them.
1374pub struct BatchDecodeStream {
1375    context: DecoderContext,
1376    root_decoder: SimpleStructDecoder,
1377    rows_remaining: u64,
1378    rows_per_batch: u32,
1379    rows_scheduled: u64,
1380    rows_drained: u64,
1381    scheduler_exhausted: bool,
1382    emitted_batch_size_warning: Arc<Once>,
1383}
1384
1385impl BatchDecodeStream {
1386    /// Create a new instance of a batch decode stream
1387    ///
1388    /// # Arguments
1389    ///
1390    /// * `scheduled` - an incoming stream of decode tasks from a
1391    ///   [`crate::decode::DecodeBatchScheduler`]
1392    /// * `schema` - the schema of the data to create
1393    /// * `rows_per_batch` the number of rows to create before making a batch
1394    /// * `num_rows` the total number of rows scheduled
1395    /// * `num_columns` the total number of columns in the file
1396    pub fn new(
1397        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1398        rows_per_batch: u32,
1399        num_rows: u64,
1400        root_decoder: SimpleStructDecoder,
1401    ) -> Self {
1402        Self {
1403            context: DecoderContext::new(scheduled),
1404            root_decoder,
1405            rows_remaining: num_rows,
1406            rows_per_batch,
1407            rows_scheduled: 0,
1408            rows_drained: 0,
1409            scheduler_exhausted: false,
1410            emitted_batch_size_warning: Arc::new(Once::new()),
1411        }
1412    }
1413
1414    fn accept_decoder(&mut self, decoder: DecoderReady) -> Result<()> {
1415        if decoder.path.is_empty() {
1416            // The root decoder we can ignore
1417            Ok(())
1418        } else {
1419            self.root_decoder.accept_child(decoder)
1420        }
1421    }
1422
1423    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1424        if self.scheduler_exhausted {
1425            return Ok(self.rows_scheduled);
1426        }
1427        while self.rows_scheduled < scheduled_need {
1428            let next_message = self.context.source.recv().await;
1429            match next_message {
1430                Some(scan_line) => {
1431                    let scan_line = scan_line?;
1432                    self.rows_scheduled = scan_line.scheduled_so_far;
1433                    for message in scan_line.decoders {
1434                        self.accept_decoder(message.into_legacy())?;
1435                    }
1436                }
1437                None => {
1438                    // Schedule ended before we got all the data we expected.  This probably
1439                    // means some kind of pushdown filter was applied and we didn't load as
1440                    // much data as we thought we would.
1441                    self.scheduler_exhausted = true;
1442                    return Ok(self.rows_scheduled);
1443                }
1444            }
1445        }
1446        Ok(scheduled_need)
1447    }
1448
1449    #[instrument(level = "debug", skip_all)]
1450    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1451        trace!(
1452            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1453            self.rows_remaining,
1454            self.rows_drained,
1455            self.rows_scheduled,
1456        );
1457        if self.rows_remaining == 0 {
1458            return Ok(None);
1459        }
1460
1461        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1462        self.rows_remaining -= to_take;
1463
1464        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1465        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1466        if scheduled_need > 0 {
1467            let desired_scheduled = scheduled_need + self.rows_scheduled;
1468            trace!(
1469                "Draining from scheduler (desire at least {} scheduled rows)",
1470                desired_scheduled
1471            );
1472            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1473            if actually_scheduled < desired_scheduled {
1474                let under_scheduled = desired_scheduled - actually_scheduled;
1475                to_take -= under_scheduled;
1476            }
1477        }
1478
1479        if to_take == 0 {
1480            return Ok(None);
1481        }
1482
1483        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1484        let loaded_need = self.rows_drained + to_take - 1;
1485        trace!(
1486            "Waiting for I/O (desire at least {} fully loaded rows)",
1487            loaded_need
1488        );
1489        self.root_decoder.wait_for_loaded(loaded_need).await?;
1490
1491        let next_task = self.root_decoder.drain(to_take)?;
1492        self.rows_drained += to_take;
1493        Ok(Some(next_task))
1494    }
1495
1496    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1497        let stream = futures::stream::unfold(self, |mut slf| async move {
1498            let next_task = slf.next_batch_task().await;
1499            let next_task = next_task.transpose().map(|next_task| {
1500                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1501                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1502                let task = tokio::spawn(async move {
1503                    let next_task = next_task?;
1504                    next_task.into_batch(emitted_batch_size_warning)
1505                });
1506                (task, num_rows)
1507            });
1508            next_task.map(|(task, num_rows)| {
1509                let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1510                // This should be true since batch size is u32
1511                debug_assert!(num_rows <= u32::MAX as u64);
1512                let next_task = ReadBatchTask {
1513                    task,
1514                    num_rows: num_rows as u32,
1515                };
1516                (next_task, slf)
1517            })
1518        });
1519        stream.boxed()
1520    }
1521}
1522
1523// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1524// we can have a single implementation of the batch decode iterator
1525enum RootDecoderMessage {
1526    LoadedPage(LoadedPage),
1527    LegacyPage(DecoderReady),
1528}
1529trait RootDecoderType {
1530    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1531    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1532    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1533}
1534impl RootDecoderType for StructuralStructDecoder {
1535    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1536        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1537            unreachable!()
1538        };
1539        self.accept_page(loaded_page)
1540    }
1541    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1542        self.drain_batch_task(num_rows)
1543    }
1544    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1545        // Waiting happens elsewhere (not as part of the decoder)
1546        Ok(())
1547    }
1548}
1549impl RootDecoderType for SimpleStructDecoder {
1550    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1551        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1552            unreachable!()
1553        };
1554        self.accept_child(legacy_page)
1555    }
1556    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1557        self.drain(num_rows)
1558    }
1559    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1560        runtime.block_on(self.wait_for_loaded(loaded_need))
1561    }
1562}
1563
1564/// A blocking batch decoder that performs synchronous decoding
1565struct BatchDecodeIterator<T: RootDecoderType> {
1566    messages: VecDeque<Result<DecoderMessage>>,
1567    root_decoder: T,
1568    rows_remaining: u64,
1569    rows_per_batch: u32,
1570    rows_scheduled: u64,
1571    rows_drained: u64,
1572    emitted_batch_size_warning: Arc<Once>,
1573    // Note: this is not the runtime on which I/O happens.
1574    // That's always in the scheduler.  This is just a runtime we use to
1575    // sleep the current thread if I/O is unready
1576    wait_for_io_runtime: tokio::runtime::Runtime,
1577    schema: Arc<ArrowSchema>,
1578}
1579
1580impl<T: RootDecoderType> BatchDecodeIterator<T> {
1581    /// Create a new instance of a batch decode iterator
1582    pub fn new(
1583        messages: VecDeque<Result<DecoderMessage>>,
1584        rows_per_batch: u32,
1585        num_rows: u64,
1586        root_decoder: T,
1587        schema: Arc<ArrowSchema>,
1588    ) -> Self {
1589        Self {
1590            messages,
1591            root_decoder,
1592            rows_remaining: num_rows,
1593            rows_per_batch,
1594            rows_scheduled: 0,
1595            rows_drained: 0,
1596            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1597                .build()
1598                .unwrap(),
1599            emitted_batch_size_warning: Arc::new(Once::new()),
1600            schema,
1601        }
1602    }
1603
1604    /// Wait for a single page of data to finish loading
1605    ///
1606    /// If the data is not available this will perform a *blocking* wait (put
1607    /// the current thread to sleep)
1608    fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1609        match maybe_done(unloaded_page.0) {
1610            // Fast path, avoid all runtime shenanigans if the data is ready
1611            MaybeDone::Done(loaded_page) => loaded_page,
1612            // Slow path, we need to wait on I/O, enter the runtime
1613            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1614            MaybeDone::Gone => unreachable!(),
1615        }
1616    }
1617
1618    /// Waits for I/O until `scheduled_need` rows have been loaded
1619    ///
1620    /// Note that `scheduled_need` is cumulative.  E.g. this method
1621    /// should be called with 5, 10, 15 and not 5, 5, 5
1622    #[instrument(skip_all)]
1623    fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1624        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1625            let message = self.messages.pop_front().unwrap()?;
1626            self.rows_scheduled = message.scheduled_so_far;
1627            for decoder_message in message.decoders {
1628                match decoder_message {
1629                    MessageType::UnloadedPage(unloaded_page) => {
1630                        let loaded_page = self.wait_for_page(unloaded_page)?;
1631                        self.root_decoder
1632                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1633                    }
1634                    MessageType::DecoderReady(decoder_ready) => {
1635                        // The root decoder we can ignore
1636                        if !decoder_ready.path.is_empty() {
1637                            self.root_decoder
1638                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1639                        }
1640                    }
1641                }
1642            }
1643        }
1644
1645        let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1646
1647        self.root_decoder
1648            .wait(loaded_need, &self.wait_for_io_runtime)?;
1649        Ok(self.rows_scheduled)
1650    }
1651
1652    #[instrument(level = "debug", skip_all)]
1653    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1654        trace!(
1655            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1656            self.rows_remaining,
1657            self.rows_drained,
1658            self.rows_scheduled,
1659        );
1660        if self.rows_remaining == 0 {
1661            return Ok(None);
1662        }
1663
1664        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1665        self.rows_remaining -= to_take;
1666
1667        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1668        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1669        if scheduled_need > 0 {
1670            let desired_scheduled = scheduled_need + self.rows_scheduled;
1671            trace!(
1672                "Draining from scheduler (desire at least {} scheduled rows)",
1673                desired_scheduled
1674            );
1675            let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1676            if actually_scheduled < desired_scheduled {
1677                let under_scheduled = desired_scheduled - actually_scheduled;
1678                to_take -= under_scheduled;
1679            }
1680        }
1681
1682        if to_take == 0 {
1683            return Ok(None);
1684        }
1685
1686        let next_task = self.root_decoder.drain_batch(to_take)?;
1687
1688        self.rows_drained += to_take;
1689
1690        let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1691
1692        Ok(Some(batch))
1693    }
1694}
1695
1696impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1697    type Item = ArrowResult<RecordBatch>;
1698
1699    fn next(&mut self) -> Option<Self::Item> {
1700        self.next_batch_task()
1701            .transpose()
1702            .map(|r| r.map_err(ArrowError::from))
1703    }
1704}
1705
1706impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1707    fn schema(&self) -> Arc<ArrowSchema> {
1708        self.schema.clone()
1709    }
1710}
1711
1712/// A stream that takes scheduled jobs and generates decode tasks from them.
1713pub struct StructuralBatchDecodeStream {
1714    context: DecoderContext,
1715    root_decoder: StructuralStructDecoder,
1716    rows_remaining: u64,
1717    rows_per_batch: u32,
1718    rows_scheduled: u64,
1719    rows_drained: u64,
1720    scheduler_exhausted: bool,
1721    emitted_batch_size_warning: Arc<Once>,
1722}
1723
1724impl StructuralBatchDecodeStream {
1725    /// Create a new instance of a batch decode stream
1726    ///
1727    /// # Arguments
1728    ///
1729    /// * `scheduled` - an incoming stream of decode tasks from a
1730    ///   [`crate::decode::DecodeBatchScheduler`]
1731    /// * `schema` - the schema of the data to create
1732    /// * `rows_per_batch` the number of rows to create before making a batch
1733    /// * `num_rows` the total number of rows scheduled
1734    /// * `num_columns` the total number of columns in the file
1735    pub fn new(
1736        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1737        rows_per_batch: u32,
1738        num_rows: u64,
1739        root_decoder: StructuralStructDecoder,
1740    ) -> Self {
1741        Self {
1742            context: DecoderContext::new(scheduled),
1743            root_decoder,
1744            rows_remaining: num_rows,
1745            rows_per_batch,
1746            rows_scheduled: 0,
1747            rows_drained: 0,
1748            scheduler_exhausted: false,
1749            emitted_batch_size_warning: Arc::new(Once::new()),
1750        }
1751    }
1752
1753    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1754        if self.scheduler_exhausted {
1755            return Ok(self.rows_scheduled);
1756        }
1757        while self.rows_scheduled < scheduled_need {
1758            let next_message = self.context.source.recv().await;
1759            match next_message {
1760                Some(scan_line) => {
1761                    let scan_line = scan_line?;
1762                    self.rows_scheduled = scan_line.scheduled_so_far;
1763                    for message in scan_line.decoders {
1764                        let unloaded_page = message.into_structural();
1765                        let loaded_page = unloaded_page.0.await?;
1766                        self.root_decoder.accept_page(loaded_page)?;
1767                    }
1768                }
1769                None => {
1770                    // Schedule ended before we got all the data we expected.  This probably
1771                    // means some kind of pushdown filter was applied and we didn't load as
1772                    // much data as we thought we would.
1773                    self.scheduler_exhausted = true;
1774                    return Ok(self.rows_scheduled);
1775                }
1776            }
1777        }
1778        Ok(scheduled_need)
1779    }
1780
1781    #[instrument(level = "debug", skip_all)]
1782    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1783        trace!(
1784            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1785            self.rows_remaining,
1786            self.rows_drained,
1787            self.rows_scheduled,
1788        );
1789        if self.rows_remaining == 0 {
1790            return Ok(None);
1791        }
1792
1793        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1794        self.rows_remaining -= to_take;
1795
1796        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1797        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1798        if scheduled_need > 0 {
1799            let desired_scheduled = scheduled_need + self.rows_scheduled;
1800            trace!(
1801                "Draining from scheduler (desire at least {} scheduled rows)",
1802                desired_scheduled
1803            );
1804            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1805            if actually_scheduled < desired_scheduled {
1806                let under_scheduled = desired_scheduled - actually_scheduled;
1807                to_take -= under_scheduled;
1808            }
1809        }
1810
1811        if to_take == 0 {
1812            return Ok(None);
1813        }
1814
1815        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1816        self.rows_drained += to_take;
1817        Ok(Some(next_task))
1818    }
1819
1820    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1821        let stream = futures::stream::unfold(self, |mut slf| async move {
1822            let next_task = slf.next_batch_task().await;
1823            let next_task = next_task.transpose().map(|next_task| {
1824                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1825                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1826                let task = tokio::spawn(async move {
1827                    let next_task = next_task?;
1828                    next_task.into_batch(emitted_batch_size_warning)
1829                });
1830                (task, num_rows)
1831            });
1832            next_task.map(|(task, num_rows)| {
1833                let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1834                // This should be true since batch size is u32
1835                debug_assert!(num_rows <= u32::MAX as u64);
1836                let next_task = ReadBatchTask {
1837                    task,
1838                    num_rows: num_rows as u32,
1839                };
1840                (next_task, slf)
1841            })
1842        });
1843        stream.boxed()
1844    }
1845}
1846
1847#[derive(Debug)]
1848pub enum RequestedRows {
1849    Ranges(Vec<Range<u64>>),
1850    Indices(Vec<u64>),
1851}
1852
1853impl RequestedRows {
1854    pub fn num_rows(&self) -> u64 {
1855        match self {
1856            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1857            Self::Indices(indices) => indices.len() as u64,
1858        }
1859    }
1860}
1861
1862#[derive(Debug, Clone)]
1863pub struct SchedulerDecoderConfig {
1864    pub decoder_plugins: Arc<DecoderPlugins>,
1865    pub batch_size: u32,
1866    pub io: Arc<dyn EncodingsIo>,
1867    pub cache: Arc<FileMetadataCache>,
1868    pub should_validate: bool,
1869}
1870
1871fn check_scheduler_on_drop(
1872    stream: BoxStream<'static, ReadBatchTask>,
1873    scheduler_handle: tokio::task::JoinHandle<()>,
1874) -> BoxStream<'static, ReadBatchTask> {
1875    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
1876    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
1877    // when the stream finishes.
1878    let mut scheduler_handle = Some(scheduler_handle);
1879    let check_scheduler = stream::unfold((), move |_| {
1880        let handle = scheduler_handle.take();
1881        async move {
1882            if let Some(handle) = handle {
1883                handle.await.unwrap();
1884            }
1885            None
1886        }
1887    });
1888    stream.chain(check_scheduler).boxed()
1889}
1890
1891pub fn create_decode_stream(
1892    schema: &Schema,
1893    num_rows: u64,
1894    batch_size: u32,
1895    is_structural: bool,
1896    should_validate: bool,
1897    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1898) -> BoxStream<'static, ReadBatchTask> {
1899    if is_structural {
1900        let arrow_schema = ArrowSchema::from(schema);
1901        let structural_decoder = StructuralStructDecoder::new(
1902            arrow_schema.fields,
1903            should_validate,
1904            /*is_root=*/ true,
1905        );
1906        StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1907    } else {
1908        let arrow_schema = ArrowSchema::from(schema);
1909        let root_fields = arrow_schema.fields;
1910
1911        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1912        BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1913    }
1914}
1915
1916/// Creates a iterator that decodes a set of messages in a blocking fashion
1917///
1918/// See [`schedule_and_decode_blocking`] for more information.
1919pub fn create_decode_iterator(
1920    schema: &Schema,
1921    num_rows: u64,
1922    batch_size: u32,
1923    should_validate: bool,
1924    is_structural: bool,
1925    messages: VecDeque<Result<DecoderMessage>>,
1926) -> Box<dyn RecordBatchReader + Send + 'static> {
1927    let arrow_schema = Arc::new(ArrowSchema::from(schema));
1928    let root_fields = arrow_schema.fields.clone();
1929    if is_structural {
1930        let simple_struct_decoder =
1931            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true);
1932        Box::new(BatchDecodeIterator::new(
1933            messages,
1934            batch_size,
1935            num_rows,
1936            simple_struct_decoder,
1937            arrow_schema,
1938        ))
1939    } else {
1940        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1941        Box::new(BatchDecodeIterator::new(
1942            messages,
1943            batch_size,
1944            num_rows,
1945            root_decoder,
1946            arrow_schema,
1947        ))
1948    }
1949}
1950
1951fn create_scheduler_decoder(
1952    column_infos: Vec<Arc<ColumnInfo>>,
1953    requested_rows: RequestedRows,
1954    filter: FilterExpression,
1955    column_indices: Vec<u32>,
1956    target_schema: Arc<Schema>,
1957    config: SchedulerDecoderConfig,
1958) -> Result<BoxStream<'static, ReadBatchTask>> {
1959    let num_rows = requested_rows.num_rows();
1960
1961    let is_structural = column_infos[0].is_structural();
1962
1963    let (tx, rx) = mpsc::unbounded_channel();
1964
1965    let decode_stream = create_decode_stream(
1966        &target_schema,
1967        num_rows,
1968        config.batch_size,
1969        is_structural,
1970        config.should_validate,
1971        rx,
1972    );
1973
1974    let scheduler_handle = tokio::task::spawn(async move {
1975        let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1976            target_schema.as_ref(),
1977            &column_indices,
1978            &column_infos,
1979            &vec![],
1980            num_rows,
1981            config.decoder_plugins,
1982            config.io.clone(),
1983            config.cache,
1984            &filter,
1985        )
1986        .await
1987        {
1988            Ok(scheduler) => scheduler,
1989            Err(e) => {
1990                let _ = tx.send(Err(e));
1991                return;
1992            }
1993        };
1994
1995        match requested_rows {
1996            RequestedRows::Ranges(ranges) => {
1997                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1998            }
1999            RequestedRows::Indices(indices) => {
2000                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2001            }
2002        }
2003    });
2004
2005    Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2006}
2007
2008/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
2009/// decode the scheduled data and returns the decoder as a stream of record batches.
2010///
2011/// This is a convenience function that creates both the scheduler and the decoder
2012/// which can be a little tricky to get right.
2013pub fn schedule_and_decode(
2014    column_infos: Vec<Arc<ColumnInfo>>,
2015    requested_rows: RequestedRows,
2016    filter: FilterExpression,
2017    column_indices: Vec<u32>,
2018    target_schema: Arc<Schema>,
2019    config: SchedulerDecoderConfig,
2020) -> BoxStream<'static, ReadBatchTask> {
2021    if requested_rows.num_rows() == 0 {
2022        return stream::empty().boxed();
2023    }
2024    // For convenience we really want this method to be a snchronous method where all
2025    // errors happen on the stream.  There is some async initialization that must happen
2026    // when creating a scheduler.  We wrap that all up in the very first task.
2027    match create_scheduler_decoder(
2028        column_infos,
2029        requested_rows,
2030        filter,
2031        column_indices,
2032        target_schema,
2033        config,
2034    ) {
2035        // If the initialization failed make it look like a failed task
2036        Ok(stream) => stream,
2037        Err(e) => stream::once(std::future::ready(ReadBatchTask {
2038            num_rows: 0,
2039            task: std::future::ready(Err(e)).boxed(),
2040        }))
2041        .boxed(),
2042    }
2043}
2044
2045lazy_static::lazy_static! {
2046    pub static ref WAITER_RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread()
2047        .build()
2048        .unwrap();
2049}
2050
2051/// Schedules and decodes the requested data in a blocking fashion
2052///
2053/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
2054/// and decodes it in the current thread.
2055///
2056/// This can be useful when the disk is fast (or the data is in memory) and the amount
2057/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
2058///
2059/// This should NOT be used for full scans.  Even if the data is in memory this function will
2060/// not parallelize the decode and will be slower than the async version.  Full scans typically
2061/// make relatively few IOPs and so the asynchronous overhead is much smaller.
2062///
2063/// This method will first completely run the scheduling process.  Then it will run the
2064/// decode process.
2065pub fn schedule_and_decode_blocking(
2066    column_infos: Vec<Arc<ColumnInfo>>,
2067    requested_rows: RequestedRows,
2068    filter: FilterExpression,
2069    column_indices: Vec<u32>,
2070    target_schema: Arc<Schema>,
2071    config: SchedulerDecoderConfig,
2072) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2073    if requested_rows.num_rows() == 0 {
2074        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2075        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2076    }
2077
2078    let num_rows = requested_rows.num_rows();
2079    let is_structural = column_infos[0].is_structural();
2080
2081    let (tx, mut rx) = mpsc::unbounded_channel();
2082
2083    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
2084    // runtime.
2085    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2086        target_schema.as_ref(),
2087        &column_indices,
2088        &column_infos,
2089        &vec![],
2090        num_rows,
2091        config.decoder_plugins,
2092        config.io.clone(),
2093        config.cache,
2094        &filter,
2095    ))?;
2096
2097    // Schedule the requested rows
2098    match requested_rows {
2099        RequestedRows::Ranges(ranges) => {
2100            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2101        }
2102        RequestedRows::Indices(indices) => {
2103            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2104        }
2105    }
2106
2107    // Drain the scheduler queue into a vec of decode messages
2108    let mut messages = Vec::new();
2109    while rx
2110        .recv_many(&mut messages, usize::MAX)
2111        .now_or_never()
2112        .unwrap()
2113        != 0
2114    {}
2115
2116    // Create a decoder to decode the messages
2117    let decode_iterator = create_decode_iterator(
2118        &target_schema,
2119        num_rows,
2120        config.batch_size,
2121        config.should_validate,
2122        is_structural,
2123        messages.into(),
2124    );
2125
2126    Ok(decode_iterator)
2127}
2128
2129/// A decoder for single-column encodings of primitive data (this includes fixed size
2130/// lists of primitive data)
2131///
2132/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2133///
2134/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2135/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2136/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2137/// the decode task for batch 0 and the decode task for batch 1.
2138///
2139/// See [`crate::decoder`] for more information
2140pub trait PrimitivePageDecoder: Send + Sync {
2141    /// Decode data into buffers
2142    ///
2143    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2144    /// such as decompressing from some compressed representation.
2145    ///
2146    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2147    /// portion only needs to be updated if the encoding has some concept of an "optional"
2148    /// buffer.
2149    ///
2150    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2151    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2152    ///
2153    /// Binary decodings have two output buffers (one for values, one for offsets)
2154    ///
2155    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2156    /// fixed size strings into variable length strings going from one input buffer to multiple output
2157    /// buffers.
2158    ///
2159    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2160    /// generally end at one of these structures.  However, intermediate structures may exist which
2161    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2162    /// with buffers that have bits-per-value that is not a multiple of 8.
2163    ///
2164    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2165    /// type (order matters) and encodings that aim to decode into arrow types should respect
2166    /// this layout.
2167    /// # Arguments
2168    ///
2169    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2170    /// * `num_rows` - how many rows to decode
2171    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2172    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2173}
2174
2175/// A scheduler for single-column encodings of primitive data
2176///
2177/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2178///
2179/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2180/// be shared in follow-up I/O tasks.
2181///
2182/// See [`crate::decoder`] for more information
2183pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2184    /// Schedules a batch of I/O to load the data needed for the requested ranges
2185    ///
2186    /// Returns a future that will yield a decoder once the data has been loaded
2187    ///
2188    /// # Arguments
2189    ///
2190    /// * `range` - the range of row offsets (relative to start of page) requested
2191    ///             these must be ordered and must not overlap
2192    /// * `scheduler` - a scheduler to submit the I/O request to
2193    /// * `top_level_row` - the row offset of the top level field currently being
2194    ///   scheduled.  This can be used to assign priority to I/O requests
2195    fn schedule_ranges(
2196        &self,
2197        ranges: &[Range<u64>],
2198        scheduler: &Arc<dyn EncodingsIo>,
2199        top_level_row: u64,
2200    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2201}
2202
2203/// A trait to control the priority of I/O
2204pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2205    fn advance(&mut self, num_rows: u64);
2206    fn current_priority(&self) -> u64;
2207    fn box_clone(&self) -> Box<dyn PriorityRange>;
2208}
2209
2210/// A simple priority scheme for top-level fields with no parent
2211/// repetition
2212#[derive(Debug)]
2213pub struct SimplePriorityRange {
2214    priority: u64,
2215}
2216
2217impl SimplePriorityRange {
2218    fn new(priority: u64) -> Self {
2219        Self { priority }
2220    }
2221}
2222
2223impl PriorityRange for SimplePriorityRange {
2224    fn advance(&mut self, num_rows: u64) {
2225        self.priority += num_rows;
2226    }
2227
2228    fn current_priority(&self) -> u64 {
2229        self.priority
2230    }
2231
2232    fn box_clone(&self) -> Box<dyn PriorityRange> {
2233        Box::new(Self {
2234            priority: self.priority,
2235        })
2236    }
2237}
2238
2239/// Determining the priority of a list request is tricky.  We want
2240/// the priority to be the top-level row.  So if we have a
2241/// list<list<int>> and each outer list has 10 rows and each inner
2242/// list has 5 rows then the priority of the 100th item is 1 because
2243/// it is the 5th item in the 10th item of the *second* row.
2244///
2245/// This structure allows us to keep track of this complicated priority
2246/// relationship.
2247///
2248/// There's a fair amount of bookkeeping involved here.
2249///
2250/// A better approach (using repetition levels) is coming in the future.
2251pub struct ListPriorityRange {
2252    base: Box<dyn PriorityRange>,
2253    offsets: Arc<[u64]>,
2254    cur_index_into_offsets: usize,
2255    cur_position: u64,
2256}
2257
2258impl ListPriorityRange {
2259    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2260        Self {
2261            base,
2262            offsets,
2263            cur_index_into_offsets: 0,
2264            cur_position: 0,
2265        }
2266    }
2267}
2268
2269impl std::fmt::Debug for ListPriorityRange {
2270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2271        f.debug_struct("ListPriorityRange")
2272            .field("base", &self.base)
2273            .field("offsets.len()", &self.offsets.len())
2274            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2275            .field("cur_position", &self.cur_position)
2276            .finish()
2277    }
2278}
2279
2280impl PriorityRange for ListPriorityRange {
2281    fn advance(&mut self, num_rows: u64) {
2282        // We've scheduled X items.  Now walk through the offsets to
2283        // determine how many rows we've scheduled.
2284        self.cur_position += num_rows;
2285        let mut idx_into_offsets = self.cur_index_into_offsets;
2286        while idx_into_offsets + 1 < self.offsets.len()
2287            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2288        {
2289            idx_into_offsets += 1;
2290        }
2291        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2292        self.cur_index_into_offsets = idx_into_offsets;
2293        self.base.advance(base_rows_advanced as u64);
2294    }
2295
2296    fn current_priority(&self) -> u64 {
2297        self.base.current_priority()
2298    }
2299
2300    fn box_clone(&self) -> Box<dyn PriorityRange> {
2301        Box::new(Self {
2302            base: self.base.box_clone(),
2303            offsets: self.offsets.clone(),
2304            cur_index_into_offsets: self.cur_index_into_offsets,
2305            cur_position: self.cur_position,
2306        })
2307    }
2308}
2309
2310/// Contains the context for a scheduler
2311pub struct SchedulerContext {
2312    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2313    io: Arc<dyn EncodingsIo>,
2314    cache: Arc<FileMetadataCache>,
2315    name: String,
2316    path: Vec<u32>,
2317    path_names: Vec<String>,
2318}
2319
2320pub struct ScopedSchedulerContext<'a> {
2321    pub context: &'a mut SchedulerContext,
2322}
2323
2324impl<'a> ScopedSchedulerContext<'a> {
2325    pub fn pop(self) -> &'a mut SchedulerContext {
2326        self.context.pop();
2327        self.context
2328    }
2329}
2330
2331impl SchedulerContext {
2332    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<FileMetadataCache>) -> Self {
2333        Self {
2334            io,
2335            cache,
2336            recv: None,
2337            name: "".to_string(),
2338            path: Vec::new(),
2339            path_names: Vec::new(),
2340        }
2341    }
2342
2343    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2344        &self.io
2345    }
2346
2347    pub fn cache(&self) -> &Arc<FileMetadataCache> {
2348        &self.cache
2349    }
2350
2351    pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2352        self.path.push(index);
2353        self.path_names.push(name.to_string());
2354        ScopedSchedulerContext { context: self }
2355    }
2356
2357    pub fn pop(&mut self) {
2358        self.path.pop();
2359        self.path_names.pop();
2360    }
2361
2362    pub fn path_name(&self) -> String {
2363        let path = self.path_names.join("/");
2364        if self.recv.is_some() {
2365            format!("TEMP({}){}", self.name, path)
2366        } else {
2367            format!("ROOT{}", path)
2368        }
2369    }
2370
2371    pub fn current_path(&self) -> VecDeque<u32> {
2372        VecDeque::from_iter(self.path.iter().copied())
2373    }
2374
2375    pub fn locate_decoder(&mut self, decoder: Box<dyn LogicalPageDecoder>) -> DecoderReady {
2376        trace!(
2377            "Scheduling decoder of type {:?} for {:?}",
2378            decoder.data_type(),
2379            self.path,
2380        );
2381        DecoderReady {
2382            decoder,
2383            path: self.current_path(),
2384        }
2385    }
2386}
2387
2388pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2389
2390impl std::fmt::Debug for UnloadedPage {
2391    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2392        f.debug_struct("UnloadedPage").finish()
2393    }
2394}
2395
2396#[derive(Debug)]
2397pub struct ScheduledScanLine {
2398    pub rows_scheduled: u64,
2399    pub decoders: Vec<MessageType>,
2400}
2401
2402pub trait SchedulingJob: std::fmt::Debug {
2403    fn schedule_next(
2404        &mut self,
2405        context: &mut SchedulerContext,
2406        priority: &dyn PriorityRange,
2407    ) -> Result<ScheduledScanLine>;
2408
2409    fn num_rows(&self) -> u64;
2410}
2411
2412pub trait StructuralSchedulingJob: std::fmt::Debug {
2413    fn schedule_next(
2414        &mut self,
2415        context: &mut SchedulerContext,
2416    ) -> Result<Option<ScheduledScanLine>>;
2417}
2418
2419/// A filter expression to apply to the data
2420///
2421/// The core decoders do not currently take advantage of filtering in
2422/// any way.  In order to maintain the abstraction we represent filters
2423/// as an arbitrary byte sequence.
2424///
2425/// We recommend that encodings use Substrait for filters.
2426pub struct FilterExpression(pub Bytes);
2427
2428impl FilterExpression {
2429    /// Create a filter expression that does not filter any data
2430    ///
2431    /// This is currently represented by an empty byte array.  Encoders
2432    /// that are "filter aware" should make sure they handle this case.
2433    pub fn no_filter() -> Self {
2434        Self(Bytes::new())
2435    }
2436
2437    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2438    pub fn is_noop(&self) -> bool {
2439        self.0.is_empty()
2440    }
2441}
2442
2443/// A scheduler for a field's worth of data
2444///
2445/// Each field in a reader's output schema maps to one field scheduler.  This scheduler may
2446/// map to more than one column.  For example, one field of struct data may
2447/// cover many columns of child data.  In fact, the entire file is treated as one
2448/// top-level struct field.
2449///
2450/// The scheduler is responsible for calculating the necessary I/O.  One schedule_range
2451/// request could trigger multiple batches of I/O across multiple columns.  The scheduler
2452/// should emit decoders into the sink as quickly as possible.
2453///
2454/// As soon as the scheduler encounters a batch of data that can decoded then the scheduler
2455/// should emit a decoder in the "unloaded" state.  The decode stream will pull the decoder
2456/// and start decoding.
2457///
2458/// The order in which decoders are emitted is important.  Pages should be emitted in
2459/// row-major order allowing decode of complete rows as quickly as possible.
2460///
2461/// The `FieldScheduler` should be stateless and `Send` and `Sync`.  This is
2462/// because it might need to be shared.  For example, a list page has a reference to
2463/// the field schedulers for its items column.  This is shared with the follow-up I/O
2464/// task created when the offsets are loaded.
2465///
2466/// See [`crate::decoder`] for more information
2467pub trait FieldScheduler: Send + Sync + std::fmt::Debug {
2468    /// Called at the beginning of scheduling to initialize the scheduler
2469    fn initialize<'a>(
2470        &'a self,
2471        filter: &'a FilterExpression,
2472        context: &'a SchedulerContext,
2473    ) -> BoxFuture<'a, Result<()>>;
2474    /// Schedules I/O for the requested portions of the field.
2475    ///
2476    /// Note: `ranges` must be ordered and non-overlapping
2477    /// TODO: Support unordered or overlapping ranges in file scheduler
2478    fn schedule_ranges<'a>(
2479        &'a self,
2480        ranges: &[Range<u64>],
2481        filter: &FilterExpression,
2482    ) -> Result<Box<dyn SchedulingJob + 'a>>;
2483    /// The number of rows in this field
2484    fn num_rows(&self) -> u64;
2485}
2486
2487pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2488    fn initialize<'a>(
2489        &'a mut self,
2490        filter: &'a FilterExpression,
2491        context: &'a SchedulerContext,
2492    ) -> BoxFuture<'a, Result<()>>;
2493    fn schedule_ranges<'a>(
2494        &'a self,
2495        ranges: &[Range<u64>],
2496        filter: &FilterExpression,
2497    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2498}
2499
2500/// A trait for tasks that decode data into an Arrow array
2501pub trait DecodeArrayTask: Send {
2502    /// Decodes the data into an Arrow array
2503    fn decode(self: Box<Self>) -> Result<ArrayRef>;
2504}
2505
2506impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2507    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2508        StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2509    }
2510}
2511
2512/// A task to decode data into an Arrow record batch
2513///
2514/// It has a child `task` which decodes a struct array with no nulls.
2515/// This is then converted into a record batch.
2516pub struct NextDecodeTask {
2517    /// The decode task itself
2518    pub task: Box<dyn DecodeArrayTask>,
2519    /// The number of rows that will be created
2520    pub num_rows: u64,
2521}
2522
2523impl NextDecodeTask {
2524    // Run the task and produce a record batch
2525    //
2526    // If the batch is very large this function will log a warning message
2527    // suggesting the user try a smaller batch size.
2528    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2529    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2530        let struct_arr = self.task.decode();
2531        match struct_arr {
2532            Ok(struct_arr) => {
2533                let batch = RecordBatch::from(struct_arr.as_struct());
2534                let size_bytes = batch.get_array_memory_size() as u64;
2535                if size_bytes > BATCH_SIZE_BYTES_WARNING {
2536                    emitted_batch_size_warning.call_once(|| {
2537                        let size_mb = size_bytes / 1024 / 1024;
2538                        debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2539                    });
2540                }
2541                Ok(batch)
2542            }
2543            Err(e) => {
2544                let e = Error::Internal {
2545                    message: format!("Error decoding batch: {}", e),
2546                    location: location!(),
2547                };
2548                Err(e)
2549            }
2550        }
2551    }
2552}
2553
2554#[derive(Debug)]
2555pub struct DecoderReady {
2556    // The decoder that is ready to be decoded
2557    pub decoder: Box<dyn LogicalPageDecoder>,
2558    // The path to the decoder, the first value is the column index
2559    // following values, if present, are nested child indices
2560    //
2561    // For example, a path of [1, 1, 0] would mean to grab the second
2562    // column, then the second child, and then the first child.
2563    //
2564    // It could represent x in the following schema:
2565    //
2566    // score: float64
2567    // points: struct
2568    //   color: string
2569    //   location: struct
2570    //     x: float64
2571    //
2572    // Currently, only struct decoders have "children" although other
2573    // decoders may at some point as well.  List children are only
2574    // handled through indirect I/O at the moment and so they don't
2575    // need to be represented (yet)
2576    pub path: VecDeque<u32>,
2577}
2578
2579// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2580// share some code paths between the two.  Decoders can safely unwrap into whatever
2581// style they expect since a file will be either all-2.0 or all-2.1
2582#[derive(Debug)]
2583pub enum MessageType {
2584    // The older v2.0 scheduler/decoder used a scheme where the message was the
2585    // decoder itself.  The messages were not sent in priority order and the decoder
2586    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2587    // complexity.
2588    DecoderReady(DecoderReady),
2589    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2590    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2591    // the decoder does not have to worry about waiting for I/O.
2592    UnloadedPage(UnloadedPage),
2593}
2594
2595impl MessageType {
2596    pub fn into_legacy(self) -> DecoderReady {
2597        match self {
2598            Self::DecoderReady(decoder) => decoder,
2599            Self::UnloadedPage(_) => {
2600                panic!("Expected DecoderReady but got UnloadedPage")
2601            }
2602        }
2603    }
2604
2605    pub fn into_structural(self) -> UnloadedPage {
2606        match self {
2607            Self::UnloadedPage(unloaded) => unloaded,
2608            Self::DecoderReady(_) => {
2609                panic!("Expected UnloadedPage but got DecoderReady")
2610            }
2611        }
2612    }
2613}
2614
2615pub struct DecoderMessage {
2616    pub scheduled_so_far: u64,
2617    pub decoders: Vec<MessageType>,
2618}
2619
2620pub struct DecoderContext {
2621    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2622}
2623
2624impl DecoderContext {
2625    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2626        Self { source }
2627    }
2628}
2629
2630/// A decoder for a field's worth of data
2631///
2632/// The decoder is initially "unloaded" (doesn't have all its data).  The [`Self::wait`]
2633/// method should be called to wait for the needed I/O data before attempting to decode
2634/// any further.
2635///
2636/// Unlike the other decoder types it is assumed that `LogicalPageDecoder` is stateful
2637/// and only `Send`.  This is why we don't need a `rows_to_skip` argument in [`Self::drain`]
2638pub trait LogicalPageDecoder: std::fmt::Debug + Send {
2639    /// Add a newly scheduled child decoder
2640    ///
2641    /// The default implementation does not expect children and returns
2642    /// an error.
2643    fn accept_child(&mut self, _child: DecoderReady) -> Result<()> {
2644        Err(Error::Internal {
2645            message: format!(
2646                "The decoder {:?} does not expect children but received a child",
2647                self
2648            ),
2649            location: location!(),
2650        })
2651    }
2652    /// Waits until at least `num_rows` have been loaded
2653    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>>;
2654    /// The number of rows loaded so far
2655    fn rows_loaded(&self) -> u64;
2656    /// The number of rows that still need loading
2657    fn rows_unloaded(&self) -> u64 {
2658        self.num_rows() - self.rows_loaded()
2659    }
2660    /// The total number of rows in the field
2661    fn num_rows(&self) -> u64;
2662    /// The number of rows that have been drained so far
2663    fn rows_drained(&self) -> u64;
2664    /// The number of rows that are still available to drain
2665    fn rows_left(&self) -> u64 {
2666        self.num_rows() - self.rows_drained()
2667    }
2668    /// Creates a task to decode `num_rows` of data into an array
2669    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
2670    /// The data type of the decoded data
2671    fn data_type(&self) -> &DataType;
2672}
2673
2674pub struct DecodedPage {
2675    pub data: DataBlock,
2676    pub repdef: RepDefUnraveler,
2677}
2678
2679pub trait DecodePageTask: Send + std::fmt::Debug {
2680    /// Decodes the data into an Arrow array
2681    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2682}
2683
2684pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2685    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2686    fn num_rows(&self) -> u64;
2687}
2688
2689#[derive(Debug)]
2690pub struct LoadedPage {
2691    // The decoder that is ready to be decoded
2692    pub decoder: Box<dyn StructuralPageDecoder>,
2693    // The path to the decoder, the first value is the column index
2694    // following values, if present, are nested child indices
2695    //
2696    // For example, a path of [1, 1, 0] would mean to grab the second
2697    // column, then the second child, and then the first child.
2698    //
2699    // It could represent x in the following schema:
2700    //
2701    // score: float64
2702    // points: struct
2703    //   color: string
2704    //   location: struct
2705    //     x: float64
2706    //
2707    // Currently, only struct decoders have "children" although other
2708    // decoders may at some point as well.  List children are only
2709    // handled through indirect I/O at the moment and so they don't
2710    // need to be represented (yet)
2711    pub path: VecDeque<u32>,
2712    pub page_index: usize,
2713}
2714
2715pub struct DecodedArray {
2716    pub array: ArrayRef,
2717    pub repdef: CompositeRepDefUnraveler,
2718}
2719
2720pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2721    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2722}
2723
2724pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2725    /// Add a newly scheduled child decoder
2726    ///
2727    /// The default implementation does not expect children and returns
2728    /// an error.
2729    fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2730    /// Creates a task to decode `num_rows` of data into an array
2731    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2732    /// The data type of the decoded data
2733    fn data_type(&self) -> &DataType;
2734}
2735
2736#[derive(Debug, Default)]
2737pub struct DecoderPlugins {}
2738
2739/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2740pub async fn decode_batch(
2741    batch: &EncodedBatch,
2742    filter: &FilterExpression,
2743    decoder_plugins: Arc<DecoderPlugins>,
2744    should_validate: bool,
2745    version: LanceFileVersion,
2746    cache: Option<Arc<FileMetadataCache>>,
2747) -> Result<RecordBatch> {
2748    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2749    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2750    // polled twice.
2751
2752    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2753    let cache = cache.unwrap_or_else(|| {
2754        Arc::new(FileMetadataCache::with_capacity(
2755            128 * 1024 * 1024,
2756            CapacityMode::Bytes,
2757        ))
2758    });
2759    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2760        batch.schema.as_ref(),
2761        &batch.top_level_columns,
2762        &batch.page_table,
2763        &vec![],
2764        batch.num_rows,
2765        decoder_plugins,
2766        io_scheduler.clone(),
2767        cache,
2768        filter,
2769    )
2770    .await?;
2771    let (tx, rx) = unbounded_channel();
2772    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2773    let is_structural = version >= LanceFileVersion::V2_1;
2774    let mut decode_stream = create_decode_stream(
2775        &batch.schema,
2776        batch.num_rows,
2777        batch.num_rows as u32,
2778        is_structural,
2779        should_validate,
2780        rx,
2781    );
2782    decode_stream.next().await.unwrap().task.await
2783}
2784
2785#[cfg(test)]
2786// test coalesce indices to ranges
2787mod tests {
2788    use super::*;
2789
2790    #[test]
2791    fn test_coalesce_indices_to_ranges_with_single_index() {
2792        let indices = vec![1];
2793        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2794        assert_eq!(ranges, vec![1..2]);
2795    }
2796
2797    #[test]
2798    fn test_coalesce_indices_to_ranges() {
2799        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2800        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2801        assert_eq!(ranges, vec![1..10]);
2802    }
2803
2804    #[test]
2805    fn test_coalesce_indices_to_ranges_with_gaps() {
2806        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2807        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2808        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2809    }
2810}