lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into [`self::FieldScheduler`] and [`self::PageScheduler`].
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into [`self::PhysicalPageDecoder`] and
27//! [`self::LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.git add --all
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type List<UInt32>
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::Once;
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::{CapacityMode, FileMetadataCache};
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::{instrument, Instrument};
236
237use crate::buffer::LanceBuffer;
238use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
239use crate::encoder::{values_column_encoding, EncodedBatch};
240use crate::encodings::logical::binary::BinaryFieldScheduler;
241use crate::encodings::logical::blob::BlobFieldScheduler;
242use crate::encodings::logical::list::{
243    ListFieldScheduler, OffsetPageInfo, StructuralListScheduler,
244};
245use crate::encodings::logical::primitive::{
246    PrimitiveFieldScheduler, StructuralPrimitiveFieldScheduler,
247};
248use crate::encodings::logical::r#struct::{
249    SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
250};
251use crate::encodings::physical::binary::{
252    BinaryBlockDecompressor, BinaryMiniBlockDecompressor, VariableDecoder,
253};
254use crate::encodings::physical::bitpack_fastlanes::InlineBitpacking;
255use crate::encodings::physical::block_compress::CompressedBufferEncoder;
256use crate::encodings::physical::fsst::{FsstMiniBlockDecompressor, FsstPerValueDecompressor};
257use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
258use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
259use crate::encodings::physical::{ColumnBuffers, FileBuffers};
260use crate::format::pb::{self, column_encoding};
261use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
262use crate::version::LanceFileVersion;
263use crate::{BufferScheduler, EncodingsIo};
264
265// If users are getting batches over 10MiB large then it's time to reduce the batch size
266const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
267
268/// Top-level encoding message for a page.  Wraps both the
269/// legacy pb::ArrayEncoding and the newer pb::PageLayout
270///
271/// A file should only use one or the other and never both.
272/// 2.0 decoders can always assume this is pb::ArrayEncoding
273/// and 2.1+ decoders can always assume this is pb::PageLayout
274#[derive(Debug)]
275pub enum PageEncoding {
276    Legacy(pb::ArrayEncoding),
277    Structural(pb::PageLayout),
278}
279
280impl PageEncoding {
281    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
282        match self {
283            Self::Legacy(enc) => enc,
284            Self::Structural(_) => panic!("Expected a legacy encoding"),
285        }
286    }
287
288    pub fn as_structural(&self) -> &pb::PageLayout {
289        match self {
290            Self::Structural(enc) => enc,
291            Self::Legacy(_) => panic!("Expected a structural encoding"),
292        }
293    }
294
295    pub fn is_structural(&self) -> bool {
296        matches!(self, Self::Structural(_))
297    }
298}
299
300/// Metadata describing a page in a file
301///
302/// This is typically created by reading the metadata section of a Lance file
303#[derive(Debug)]
304pub struct PageInfo {
305    /// The number of rows in the page
306    pub num_rows: u64,
307    /// The priority (top level row number) of the page
308    ///
309    /// This is only set in 2.1 files and will be 0 for 2.0 files
310    pub priority: u64,
311    /// The encoding that explains the buffers in the page
312    pub encoding: PageEncoding,
313    /// The offsets and sizes of the buffers in the file
314    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
315}
316
317/// Metadata describing a column in a file
318///
319/// This is typically created by reading the metadata section of a Lance file
320#[derive(Debug, Clone)]
321pub struct ColumnInfo {
322    /// The index of the column in the file
323    pub index: u32,
324    /// The metadata for each page in the column
325    pub page_infos: Arc<[PageInfo]>,
326    /// File positions and their sizes of the column-level buffers
327    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
328    pub encoding: pb::ColumnEncoding,
329}
330
331impl ColumnInfo {
332    /// Create a new instance
333    pub fn new(
334        index: u32,
335        page_infos: Arc<[PageInfo]>,
336        buffer_offsets_and_sizes: Vec<(u64, u64)>,
337        encoding: pb::ColumnEncoding,
338    ) -> Self {
339        Self {
340            index,
341            page_infos,
342            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
343            encoding,
344        }
345    }
346
347    pub fn is_structural(&self) -> bool {
348        self.page_infos
349            // Can just look at the first since all should be the same
350            .first()
351            .map(|page| page.encoding.is_structural())
352            .unwrap_or(false)
353    }
354}
355
356enum RootScheduler {
357    Structural(Box<dyn StructuralFieldScheduler>),
358    Legacy(Arc<dyn FieldScheduler>),
359}
360
361impl RootScheduler {
362    fn as_legacy(&self) -> &Arc<dyn FieldScheduler> {
363        match self {
364            Self::Structural(_) => panic!("Expected a legacy scheduler"),
365            Self::Legacy(s) => s,
366        }
367    }
368
369    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
370        match self {
371            Self::Structural(s) => s.as_ref(),
372            Self::Legacy(_) => panic!("Expected a structural scheduler"),
373        }
374    }
375}
376
377/// The scheduler for decoding batches
378///
379/// Lance decoding is done in two steps, scheduling, and decoding.  The
380/// scheduling tends to be lightweight and should quickly figure what data
381/// is needed from the disk issue the appropriate I/O requests.  A decode task is
382/// created to eventually decode the data (once it is loaded) and scheduling
383/// moves on to scheduling the next page.
384///
385/// Meanwhile, it's expected that a decode stream will be setup to run at the
386/// same time.  Decode tasks take the data that is loaded and turn it into
387/// Arrow arrays.
388///
389/// This approach allows us to keep our I/O parallelism and CPU parallelism
390/// completely separate since those are often two very different values.
391///
392/// Backpressure should be achieved via the I/O service.  Requests that are
393/// issued will pile up if the decode stream is not polling quickly enough.
394/// The [`crate::EncodingsIo::submit_request`] function should return a pending
395/// future once there are too many I/O requests in flight.
396///
397/// TODO: Implement backpressure
398pub struct DecodeBatchScheduler {
399    root_scheduler: RootScheduler,
400    pub root_fields: Fields,
401    cache: Arc<FileMetadataCache>,
402}
403
404pub struct ColumnInfoIter<'a> {
405    column_infos: Vec<Arc<ColumnInfo>>,
406    column_indices: &'a [u32],
407    column_info_pos: usize,
408    column_indices_pos: usize,
409}
410
411impl<'a> ColumnInfoIter<'a> {
412    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
413        let initial_pos = column_indices[0] as usize;
414        Self {
415            column_infos,
416            column_indices,
417            column_info_pos: initial_pos,
418            column_indices_pos: 0,
419        }
420    }
421
422    pub fn peek(&self) -> &Arc<ColumnInfo> {
423        &self.column_infos[self.column_info_pos]
424    }
425
426    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
427        let column_info = self.column_infos[self.column_info_pos].clone();
428        let transformed = transform(column_info);
429        self.column_infos[self.column_info_pos] = transformed;
430    }
431
432    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
433        self.next().ok_or_else(|| {
434            Error::invalid_input(
435                "there were more fields in the schema than provided column indices / infos",
436                location!(),
437            )
438        })
439    }
440
441    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
442        if self.column_info_pos < self.column_infos.len() {
443            let info = &self.column_infos[self.column_info_pos];
444            self.column_info_pos += 1;
445            Some(info)
446        } else {
447            None
448        }
449    }
450
451    pub(crate) fn next_top_level(&mut self) {
452        self.column_indices_pos += 1;
453        if self.column_indices_pos < self.column_indices.len() {
454            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
455        } else {
456            self.column_info_pos = self.column_infos.len();
457        }
458    }
459}
460
461pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
462    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
463}
464
465pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
466    /// Decompress one or more values
467    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
468    /// The number of bits in each value
469    ///
470    /// Currently (and probably long term) this must be a multiple of 8
471    fn bits_per_value(&self) -> u64;
472}
473
474pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
475    /// Decompress one or more values
476    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
477}
478
479pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
480    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
481}
482
483pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
484    fn create_miniblock_decompressor(
485        &self,
486        description: &pb::ArrayEncoding,
487    ) -> Result<Box<dyn MiniBlockDecompressor>>;
488
489    fn create_fixed_per_value_decompressor(
490        &self,
491        description: &pb::ArrayEncoding,
492    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
493
494    fn create_variable_per_value_decompressor(
495        &self,
496        description: &pb::ArrayEncoding,
497    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
498
499    fn create_block_decompressor(
500        &self,
501        description: &pb::ArrayEncoding,
502    ) -> Result<Box<dyn BlockDecompressor>>;
503}
504
505#[derive(Debug, Default)]
506pub struct CoreDecompressorStrategy {}
507
508impl DecompressorStrategy for CoreDecompressorStrategy {
509    fn create_miniblock_decompressor(
510        &self,
511        description: &pb::ArrayEncoding,
512    ) -> Result<Box<dyn MiniBlockDecompressor>> {
513        match description.array_encoding.as_ref().unwrap() {
514            pb::array_encoding::ArrayEncoding::Flat(flat) => {
515                Ok(Box::new(ValueDecompressor::from_flat(flat)))
516            }
517            pb::array_encoding::ArrayEncoding::InlineBitpacking(description) => {
518                Ok(Box::new(InlineBitpacking::from_description(description)))
519            }
520            pb::array_encoding::ArrayEncoding::Variable(_) => {
521                Ok(Box::new(BinaryMiniBlockDecompressor::default()))
522            }
523            pb::array_encoding::ArrayEncoding::Fsst(description) => {
524                Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
525            }
526            pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
527                Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
528                    description,
529                )))
530            }
531            pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
532                // In the future, we might need to do something more complex here if FSL supports
533                // compression.
534                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
535            }
536            _ => todo!(),
537        }
538    }
539
540    fn create_fixed_per_value_decompressor(
541        &self,
542        description: &pb::ArrayEncoding,
543    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
544        match description.array_encoding.as_ref().unwrap() {
545            pb::array_encoding::ArrayEncoding::Flat(flat) => {
546                Ok(Box::new(ValueDecompressor::from_flat(flat)))
547            }
548            pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
549                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
550            }
551            _ => todo!("fixed-per-value decompressor for {:?}", description),
552        }
553    }
554
555    fn create_variable_per_value_decompressor(
556        &self,
557        description: &pb::ArrayEncoding,
558    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
559        match *description.array_encoding.as_ref().unwrap() {
560            pb::array_encoding::ArrayEncoding::Variable(variable) => {
561                assert!(variable.bits_per_offset < u8::MAX as u32);
562                Ok(Box::new(VariableDecoder::default()))
563            }
564            pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
565                Ok(Box::new(FsstPerValueDecompressor::new(
566                    LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
567                    Box::new(VariableDecoder::default()),
568                )))
569            }
570            pb::array_encoding::ArrayEncoding::Block(ref block) => Ok(Box::new(
571                CompressedBufferEncoder::from_scheme(&block.scheme)?,
572            )),
573            _ => todo!("variable-per-value decompressor for {:?}", description),
574        }
575    }
576
577    fn create_block_decompressor(
578        &self,
579        description: &pb::ArrayEncoding,
580    ) -> Result<Box<dyn BlockDecompressor>> {
581        match description.array_encoding.as_ref().unwrap() {
582            pb::array_encoding::ArrayEncoding::Flat(flat) => {
583                Ok(Box::new(ValueDecompressor::from_flat(flat)))
584            }
585            pb::array_encoding::ArrayEncoding::Constant(constant) => {
586                let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
587                Ok(Box::new(ConstantDecompressor::new(scalar)))
588            }
589            pb::array_encoding::ArrayEncoding::Variable(_) => {
590                Ok(Box::new(BinaryBlockDecompressor::default()))
591            }
592            _ => todo!(),
593        }
594    }
595}
596
597/// The core decoder strategy handles all the various Arrow types
598#[derive(Debug)]
599pub struct CoreFieldDecoderStrategy {
600    pub validate_data: bool,
601    pub decompressor_strategy: Arc<dyn DecompressorStrategy>,
602}
603
604impl Default for CoreFieldDecoderStrategy {
605    fn default() -> Self {
606        Self {
607            validate_data: false,
608            decompressor_strategy: Arc::new(CoreDecompressorStrategy {}),
609        }
610    }
611}
612
613impl CoreFieldDecoderStrategy {
614    /// This is just a sanity check to ensure there is no "wrapped encodings"
615    /// that haven't been handled.
616    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
617        let column_encoding = column_info
618            .encoding
619            .column_encoding
620            .as_ref()
621            .ok_or_else(|| {
622                Error::invalid_input(
623                    format!(
624                        "the column at index {} was missing a ColumnEncoding",
625                        column_info.index
626                    ),
627                    location!(),
628                )
629            })?;
630        if matches!(
631            column_encoding,
632            pb::column_encoding::ColumnEncoding::Values(_)
633        ) {
634            Ok(())
635        } else {
636            Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
637        }
638    }
639
640    fn is_primitive(data_type: &DataType) -> bool {
641        if data_type.is_primitive() {
642            true
643        } else {
644            match data_type {
645                // DataType::is_primitive doesn't consider these primitive but we do
646                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
647                DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
648                _ => false,
649            }
650        }
651    }
652
653    fn create_primitive_scheduler(
654        &self,
655        field: &Field,
656        column: &ColumnInfo,
657        buffers: FileBuffers,
658    ) -> Result<Box<dyn FieldScheduler>> {
659        Self::ensure_values_encoded(column, &field.name)?;
660        // Primitive fields map to a single column
661        let column_buffers = ColumnBuffers {
662            file_buffers: buffers,
663            positions_and_sizes: &column.buffer_offsets_and_sizes,
664        };
665        Ok(Box::new(PrimitiveFieldScheduler::new(
666            column.index,
667            field.data_type(),
668            column.page_infos.clone(),
669            column_buffers,
670            self.validate_data,
671        )))
672    }
673
674    /// Helper method to verify the page encoding of a struct header column
675    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
676        Self::ensure_values_encoded(column_info, field_name)?;
677        if column_info.page_infos.len() != 1 {
678            return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
679        }
680        let encoding = &column_info.page_infos[0].encoding;
681        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
682            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
683            _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
684        }
685    }
686
687    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
688        let encoding = &column_info.page_infos[0].encoding;
689        matches!(
690            encoding.as_legacy().array_encoding.as_ref().unwrap(),
691            pb::array_encoding::ArrayEncoding::PackedStruct(_)
692        )
693    }
694
695    fn create_list_scheduler(
696        &self,
697        list_field: &Field,
698        column_infos: &mut ColumnInfoIter,
699        buffers: FileBuffers,
700        offsets_column: &ColumnInfo,
701    ) -> Result<Box<dyn FieldScheduler>> {
702        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
703        let offsets_column_buffers = ColumnBuffers {
704            file_buffers: buffers,
705            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
706        };
707        let items_scheduler =
708            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
709
710        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
711            .page_infos
712            .iter()
713            .filter(|offsets_page| offsets_page.num_rows > 0)
714            .map(|offsets_page| {
715                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
716                    &offsets_page.encoding.as_legacy().array_encoding
717                {
718                    let inner = PageInfo {
719                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
720                        encoding: PageEncoding::Legacy(
721                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
722                        ),
723                        num_rows: offsets_page.num_rows,
724                        priority: 0,
725                    };
726                    (
727                        inner,
728                        OffsetPageInfo {
729                            offsets_in_page: offsets_page.num_rows,
730                            null_offset_adjustment: list_encoding.null_offset_adjustment,
731                            num_items_referenced_by_page: list_encoding.num_items,
732                        },
733                    )
734                } else {
735                    // TODO: Should probably return Err here
736                    panic!("Expected a list column");
737                }
738            })
739            .unzip();
740        let inner = Arc::new(PrimitiveFieldScheduler::new(
741            offsets_column.index,
742            DataType::UInt64,
743            Arc::from(inner_infos.into_boxed_slice()),
744            offsets_column_buffers,
745            self.validate_data,
746        )) as Arc<dyn FieldScheduler>;
747        let items_field = match list_field.data_type() {
748            DataType::List(inner) => inner,
749            DataType::LargeList(inner) => inner,
750            _ => unreachable!(),
751        };
752        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
753            DataType::Int32
754        } else {
755            DataType::Int64
756        };
757        Ok(Box::new(ListFieldScheduler::new(
758            inner,
759            items_scheduler.into(),
760            items_field,
761            offset_type,
762            null_offset_adjustments,
763        )))
764    }
765
766    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
767        if let column_encoding::ColumnEncoding::Blob(blob) =
768            column_info.encoding.column_encoding.as_ref().unwrap()
769        {
770            let mut column_info = column_info.clone();
771            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
772            Some(column_info)
773        } else {
774            None
775        }
776    }
777
778    fn create_structural_field_scheduler(
779        &self,
780        field: &Field,
781        column_infos: &mut ColumnInfoIter,
782    ) -> Result<Box<dyn StructuralFieldScheduler>> {
783        let data_type = field.data_type();
784        if Self::is_primitive(&data_type) {
785            let column_info = column_infos.expect_next()?;
786            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
787                column_info.as_ref(),
788                self.decompressor_strategy.as_ref(),
789            )?);
790
791            // advance to the next top level column
792            column_infos.next_top_level();
793
794            return Ok(scheduler);
795        }
796        match &data_type {
797            DataType::Struct(fields) => {
798                if field.is_packed_struct() {
799                    let column_info = column_infos.expect_next()?;
800                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
801                        column_info.as_ref(),
802                        self.decompressor_strategy.as_ref(),
803                    )?);
804
805                    // advance to the next top level column
806                    column_infos.next_top_level();
807
808                    return Ok(scheduler);
809                }
810                let mut child_schedulers = Vec::with_capacity(field.children.len());
811                for field in field.children.iter() {
812                    let field_scheduler =
813                        self.create_structural_field_scheduler(field, column_infos)?;
814                    child_schedulers.push(field_scheduler);
815                }
816
817                let fields = fields.clone();
818                Ok(
819                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
820                        as Box<dyn StructuralFieldScheduler>,
821                )
822            }
823            DataType::Binary | DataType::Utf8 => {
824                let column_info = column_infos.expect_next()?;
825                let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
826                    column_info.as_ref(),
827                    self.decompressor_strategy.as_ref(),
828                )?);
829                column_infos.next_top_level();
830                Ok(scheduler)
831            }
832            DataType::List(_) | DataType::LargeList(_) => {
833                let child = field
834                    .children
835                    .first()
836                    .expect("List field must have a child");
837                let child_scheduler =
838                    self.create_structural_field_scheduler(child, column_infos)?;
839                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
840                    as Box<dyn StructuralFieldScheduler>)
841            }
842            _ => todo!(),
843        }
844    }
845
846    fn create_legacy_field_scheduler(
847        &self,
848        field: &Field,
849        column_infos: &mut ColumnInfoIter,
850        buffers: FileBuffers,
851    ) -> Result<Box<dyn FieldScheduler>> {
852        let data_type = field.data_type();
853        if Self::is_primitive(&data_type) {
854            let column_info = column_infos.expect_next()?;
855            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
856            return Ok(scheduler);
857        } else if data_type.is_binary_like() {
858            let column_info = column_infos.next().unwrap().clone();
859            // Column is blob and user is asking for binary data
860            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
861                let desc_scheduler =
862                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
863                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
864                return Ok(blob_scheduler);
865            }
866            if let Some(page_info) = column_info.page_infos.first() {
867                if matches!(
868                    page_info.encoding.as_legacy(),
869                    pb::ArrayEncoding {
870                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
871                    }
872                ) {
873                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
874                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
875                    } else {
876                        DataType::LargeList(Arc::new(ArrowField::new(
877                            "item",
878                            DataType::UInt8,
879                            false,
880                        )))
881                    };
882                    let list_field = Field::try_from(ArrowField::new(
883                        field.name.clone(),
884                        list_type,
885                        field.nullable,
886                    ))
887                    .unwrap();
888                    let list_scheduler = self.create_list_scheduler(
889                        &list_field,
890                        column_infos,
891                        buffers,
892                        &column_info,
893                    )?;
894                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
895                        list_scheduler.into(),
896                        field.data_type(),
897                    ));
898                    return Ok(binary_scheduler);
899                } else {
900                    let scheduler =
901                        self.create_primitive_scheduler(field, &column_info, buffers)?;
902                    return Ok(scheduler);
903                }
904            } else {
905                return self.create_primitive_scheduler(field, &column_info, buffers);
906            }
907        }
908        match &data_type {
909            DataType::FixedSizeList(inner, _dimension) => {
910                // A fixed size list column could either be a physical or a logical decoder
911                // depending on the child data type.
912                if Self::is_primitive(inner.data_type()) {
913                    let primitive_col = column_infos.expect_next()?;
914                    let scheduler =
915                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
916                    Ok(scheduler)
917                } else {
918                    todo!()
919                }
920            }
921            DataType::Dictionary(_key_type, value_type) => {
922                if Self::is_primitive(value_type) || value_type.is_binary_like() {
923                    let primitive_col = column_infos.expect_next()?;
924                    let scheduler =
925                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
926                    Ok(scheduler)
927                } else {
928                    Err(Error::NotSupported {
929                        source: format!(
930                            "No way to decode into a dictionary field of type {}",
931                            value_type
932                        )
933                        .into(),
934                        location: location!(),
935                    })
936                }
937            }
938            DataType::List(_) | DataType::LargeList(_) => {
939                let offsets_column = column_infos.expect_next()?.clone();
940                column_infos.next_top_level();
941                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
942            }
943            DataType::Struct(fields) => {
944                let column_info = column_infos.expect_next()?;
945
946                // Column is blob and user is asking for descriptions
947                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
948                    // Can use primitive scheduler here since descriptions are always packed struct
949                    return self.create_primitive_scheduler(field, &blob_col, buffers);
950                }
951
952                if Self::check_packed_struct(column_info) {
953                    // use packed struct encoding
954                    self.create_primitive_scheduler(field, column_info, buffers)
955                } else {
956                    // use default struct encoding
957                    Self::check_simple_struct(column_info, &field.name).unwrap();
958                    let num_rows = column_info
959                        .page_infos
960                        .iter()
961                        .map(|page| page.num_rows)
962                        .sum();
963                    let mut child_schedulers = Vec::with_capacity(field.children.len());
964                    for field in &field.children {
965                        column_infos.next_top_level();
966                        let field_scheduler =
967                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
968                        child_schedulers.push(Arc::from(field_scheduler));
969                    }
970
971                    let fields = fields.clone();
972                    Ok(Box::new(SimpleStructScheduler::new(
973                        child_schedulers,
974                        fields,
975                        num_rows,
976                    )))
977                }
978            }
979            // TODO: Still need support for RLE
980            _ => todo!(),
981        }
982    }
983}
984
985/// Create's a dummy ColumnInfo for the root column
986fn root_column(num_rows: u64) -> ColumnInfo {
987    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
988    let final_page_num_rows = num_rows % (u32::MAX as u64);
989    let root_pages = (0..num_root_pages)
990        .map(|i| PageInfo {
991            num_rows: if i == num_root_pages - 1 {
992                final_page_num_rows
993            } else {
994                u64::MAX
995            },
996            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
997                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
998                    pb::SimpleStruct {},
999                )),
1000            }),
1001            priority: 0, // not used in legacy scheduler
1002            buffer_offsets_and_sizes: Arc::new([]),
1003        })
1004        .collect::<Vec<_>>();
1005    ColumnInfo {
1006        buffer_offsets_and_sizes: Arc::new([]),
1007        encoding: values_column_encoding(),
1008        index: u32::MAX,
1009        page_infos: Arc::from(root_pages),
1010    }
1011}
1012
1013pub enum RootDecoder {
1014    Structural(StructuralStructDecoder),
1015    Legacy(SimpleStructDecoder),
1016}
1017
1018impl RootDecoder {
1019    pub fn into_structural(self) -> StructuralStructDecoder {
1020        match self {
1021            Self::Structural(decoder) => decoder,
1022            Self::Legacy(_) => panic!("Expected a structural decoder"),
1023        }
1024    }
1025
1026    pub fn into_legacy(self) -> SimpleStructDecoder {
1027        match self {
1028            Self::Legacy(decoder) => decoder,
1029            Self::Structural(_) => panic!("Expected a legacy decoder"),
1030        }
1031    }
1032}
1033
1034impl DecodeBatchScheduler {
1035    /// Creates a new decode scheduler with the expected schema and the column
1036    /// metadata of the file.
1037    #[allow(clippy::too_many_arguments)]
1038    pub async fn try_new<'a>(
1039        schema: &'a Schema,
1040        column_indices: &[u32],
1041        column_infos: &[Arc<ColumnInfo>],
1042        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1043        num_rows: u64,
1044        _decoder_plugins: Arc<DecoderPlugins>,
1045        io: Arc<dyn EncodingsIo>,
1046        cache: Arc<FileMetadataCache>,
1047        filter: &FilterExpression,
1048    ) -> Result<Self> {
1049        assert!(num_rows > 0);
1050        let buffers = FileBuffers {
1051            positions_and_sizes: file_buffer_positions_and_sizes,
1052        };
1053        let arrow_schema = ArrowSchema::from(schema);
1054        let root_fields = arrow_schema.fields().clone();
1055        let root_type = DataType::Struct(root_fields.clone());
1056        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1057        // root_field.children and schema.fields should be identical at this point but the latter
1058        // has field ids and the former does not.  This line restores that.
1059        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
1060        root_field.children.clone_from(&schema.fields);
1061        root_field
1062            .metadata
1063            .insert("__lance_decoder_root".to_string(), "true".to_string());
1064
1065        if column_infos[0].is_structural() {
1066            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1067
1068            let mut root_scheduler = CoreFieldDecoderStrategy::default()
1069                .create_structural_field_scheduler(&root_field, &mut column_iter)?;
1070
1071            let context = SchedulerContext::new(io, cache.clone());
1072            root_scheduler.initialize(filter, &context).await?;
1073
1074            Ok(Self {
1075                root_scheduler: RootScheduler::Structural(root_scheduler),
1076                root_fields,
1077                cache,
1078            })
1079        } else {
1080            // The old encoding style expected a header column for structs and so we
1081            // need a header column for the top-level struct
1082            let mut columns = Vec::with_capacity(column_infos.len() + 1);
1083            columns.push(Arc::new(root_column(num_rows)));
1084            columns.extend(column_infos.iter().cloned());
1085
1086            let adjusted_column_indices = [0_u32]
1087                .into_iter()
1088                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1089                .collect::<Vec<_>>();
1090            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1091            let root_scheduler = CoreFieldDecoderStrategy::default()
1092                .create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1093
1094            let context = SchedulerContext::new(io, cache.clone());
1095            root_scheduler.initialize(filter, &context).await?;
1096
1097            Ok(Self {
1098                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1099                root_fields,
1100                cache,
1101            })
1102        }
1103    }
1104
1105    pub fn from_scheduler(
1106        root_scheduler: Arc<dyn FieldScheduler>,
1107        root_fields: Fields,
1108        cache: Arc<FileMetadataCache>,
1109    ) -> Self {
1110        Self {
1111            root_scheduler: RootScheduler::Legacy(root_scheduler),
1112            root_fields,
1113            cache,
1114        }
1115    }
1116
1117    fn do_schedule_ranges_structural(
1118        &mut self,
1119        ranges: &[Range<u64>],
1120        filter: &FilterExpression,
1121        io: Arc<dyn EncodingsIo>,
1122        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1123    ) {
1124        let root_scheduler = self.root_scheduler.as_structural();
1125        let mut context = SchedulerContext::new(io, self.cache.clone());
1126        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1127        if let Err(schedule_ranges_err) = maybe_root_job {
1128            schedule_action(Err(schedule_ranges_err));
1129            return;
1130        }
1131        let mut root_job = maybe_root_job.unwrap();
1132        let mut num_rows_scheduled = 0;
1133        loop {
1134            let maybe_next_scan_line = root_job.schedule_next(&mut context);
1135            if let Err(err) = maybe_next_scan_line {
1136                schedule_action(Err(err));
1137                return;
1138            }
1139            let next_scan_line = maybe_next_scan_line.unwrap();
1140            match next_scan_line {
1141                Some(next_scan_line) => {
1142                    trace!(
1143                        "Scheduled scan line of {} rows and {} decoders",
1144                        next_scan_line.rows_scheduled,
1145                        next_scan_line.decoders.len()
1146                    );
1147                    num_rows_scheduled += next_scan_line.rows_scheduled;
1148                    if !schedule_action(Ok(DecoderMessage {
1149                        scheduled_so_far: num_rows_scheduled,
1150                        decoders: next_scan_line.decoders,
1151                    })) {
1152                        // Decoder has disconnected
1153                        return;
1154                    }
1155                }
1156                None => return,
1157            }
1158        }
1159    }
1160
1161    fn do_schedule_ranges_legacy(
1162        &mut self,
1163        ranges: &[Range<u64>],
1164        filter: &FilterExpression,
1165        io: Arc<dyn EncodingsIo>,
1166        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1167        // If specified, this will be used as the top_level_row for all scheduling
1168        // tasks.  This is used by list scheduling to ensure all items scheduling
1169        // tasks are scheduled at the same top level row.
1170        priority: Option<Box<dyn PriorityRange>>,
1171    ) {
1172        let root_scheduler = self.root_scheduler.as_legacy();
1173        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1174        trace!(
1175            "Scheduling {} ranges across {}..{} ({} rows){}",
1176            ranges.len(),
1177            ranges.first().unwrap().start,
1178            ranges.last().unwrap().end,
1179            rows_requested,
1180            priority
1181                .as_ref()
1182                .map(|p| format!(" (priority={:?})", p))
1183                .unwrap_or_default()
1184        );
1185
1186        let mut context = SchedulerContext::new(io, self.cache.clone());
1187        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1188        if let Err(schedule_ranges_err) = maybe_root_job {
1189            schedule_action(Err(schedule_ranges_err));
1190            return;
1191        }
1192        let mut root_job = maybe_root_job.unwrap();
1193        let mut num_rows_scheduled = 0;
1194        let mut rows_to_schedule = root_job.num_rows();
1195        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1196        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1197        while rows_to_schedule > 0 {
1198            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1199            if let Err(schedule_next_err) = maybe_next_scan_line {
1200                schedule_action(Err(schedule_next_err));
1201                return;
1202            }
1203            let next_scan_line = maybe_next_scan_line.unwrap();
1204            priority.advance(next_scan_line.rows_scheduled);
1205            num_rows_scheduled += next_scan_line.rows_scheduled;
1206            rows_to_schedule -= next_scan_line.rows_scheduled;
1207            trace!(
1208                "Scheduled scan line of {} rows and {} decoders",
1209                next_scan_line.rows_scheduled,
1210                next_scan_line.decoders.len()
1211            );
1212            if !schedule_action(Ok(DecoderMessage {
1213                scheduled_so_far: num_rows_scheduled,
1214                decoders: next_scan_line.decoders,
1215            })) {
1216                // Decoder has disconnected
1217                return;
1218            }
1219
1220            trace!("Finished scheduling {} ranges", ranges.len());
1221        }
1222    }
1223
1224    fn do_schedule_ranges(
1225        &mut self,
1226        ranges: &[Range<u64>],
1227        filter: &FilterExpression,
1228        io: Arc<dyn EncodingsIo>,
1229        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1230        // If specified, this will be used as the top_level_row for all scheduling
1231        // tasks.  This is used by list scheduling to ensure all items scheduling
1232        // tasks are scheduled at the same top level row.
1233        priority: Option<Box<dyn PriorityRange>>,
1234    ) {
1235        match &self.root_scheduler {
1236            RootScheduler::Legacy(_) => {
1237                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1238            }
1239            RootScheduler::Structural(_) => {
1240                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1241            }
1242        }
1243    }
1244
1245    // This method is similar to schedule_ranges but instead of
1246    // sending the decoders to a channel it collects them all into a vector
1247    pub fn schedule_ranges_to_vec(
1248        &mut self,
1249        ranges: &[Range<u64>],
1250        filter: &FilterExpression,
1251        io: Arc<dyn EncodingsIo>,
1252        priority: Option<Box<dyn PriorityRange>>,
1253    ) -> Result<Vec<DecoderMessage>> {
1254        let mut decode_messages = Vec::new();
1255        self.do_schedule_ranges(
1256            ranges,
1257            filter,
1258            io,
1259            |msg| {
1260                decode_messages.push(msg);
1261                true
1262            },
1263            priority,
1264        );
1265        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1266    }
1267
1268    /// Schedules the load of multiple ranges of rows
1269    ///
1270    /// Ranges must be non-overlapping and in sorted order
1271    ///
1272    /// # Arguments
1273    ///
1274    /// * `ranges` - The ranges of rows to load
1275    /// * `sink` - A channel to send the decode tasks
1276    /// * `scheduler` An I/O scheduler to issue I/O requests
1277    #[instrument(skip_all)]
1278    pub fn schedule_ranges(
1279        &mut self,
1280        ranges: &[Range<u64>],
1281        filter: &FilterExpression,
1282        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1283        scheduler: Arc<dyn EncodingsIo>,
1284    ) {
1285        self.do_schedule_ranges(
1286            ranges,
1287            filter,
1288            scheduler,
1289            |msg| {
1290                match sink.send(msg) {
1291                    Ok(_) => true,
1292                    Err(SendError { .. }) => {
1293                        // The receiver has gone away.  We can't do anything about it
1294                        // so just ignore the error.
1295                        debug!(
1296                        "schedule_ranges aborting early since decoder appears to have been dropped"
1297                    );
1298                        false
1299                    }
1300                }
1301            },
1302            None,
1303        )
1304    }
1305
1306    /// Schedules the load of a range of rows
1307    ///
1308    /// # Arguments
1309    ///
1310    /// * `range` - The range of rows to load
1311    /// * `sink` - A channel to send the decode tasks
1312    /// * `scheduler` An I/O scheduler to issue I/O requests
1313    #[instrument(skip_all)]
1314    pub fn schedule_range(
1315        &mut self,
1316        range: Range<u64>,
1317        filter: &FilterExpression,
1318        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1319        scheduler: Arc<dyn EncodingsIo>,
1320    ) {
1321        self.schedule_ranges(&[range], filter, sink, scheduler)
1322    }
1323
1324    /// Schedules the load of selected rows
1325    ///
1326    /// # Arguments
1327    ///
1328    /// * `indices` - The row indices to load (these must be in ascending order!)
1329    /// * `sink` - A channel to send the decode tasks
1330    /// * `scheduler` An I/O scheduler to issue I/O requests
1331    pub fn schedule_take(
1332        &mut self,
1333        indices: &[u64],
1334        filter: &FilterExpression,
1335        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1336        scheduler: Arc<dyn EncodingsIo>,
1337    ) {
1338        debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1339        if indices.is_empty() {
1340            return;
1341        }
1342        trace!("Scheduling take of {} rows", indices.len());
1343        let ranges = Self::indices_to_ranges(indices);
1344        self.schedule_ranges(&ranges, filter, sink, scheduler)
1345    }
1346
1347    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1348    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1349        let mut ranges = Vec::new();
1350        let mut start = indices[0];
1351
1352        for window in indices.windows(2) {
1353            if window[1] != window[0] + 1 {
1354                ranges.push(start..window[0] + 1);
1355                start = window[1];
1356            }
1357        }
1358
1359        ranges.push(start..*indices.last().unwrap() + 1);
1360        ranges
1361    }
1362}
1363
1364pub struct ReadBatchTask {
1365    pub task: BoxFuture<'static, Result<RecordBatch>>,
1366    pub num_rows: u32,
1367}
1368
1369/// A stream that takes scheduled jobs and generates decode tasks from them.
1370pub struct BatchDecodeStream {
1371    context: DecoderContext,
1372    root_decoder: SimpleStructDecoder,
1373    rows_remaining: u64,
1374    rows_per_batch: u32,
1375    rows_scheduled: u64,
1376    rows_drained: u64,
1377    scheduler_exhausted: bool,
1378    emitted_batch_size_warning: Arc<Once>,
1379}
1380
1381impl BatchDecodeStream {
1382    /// Create a new instance of a batch decode stream
1383    ///
1384    /// # Arguments
1385    ///
1386    /// * `scheduled` - an incoming stream of decode tasks from a
1387    ///   [`crate::decode::DecodeBatchScheduler`]
1388    /// * `schema` - the schema of the data to create
1389    /// * `rows_per_batch` the number of rows to create before making a batch
1390    /// * `num_rows` the total number of rows scheduled
1391    /// * `num_columns` the total number of columns in the file
1392    pub fn new(
1393        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1394        rows_per_batch: u32,
1395        num_rows: u64,
1396        root_decoder: SimpleStructDecoder,
1397    ) -> Self {
1398        Self {
1399            context: DecoderContext::new(scheduled),
1400            root_decoder,
1401            rows_remaining: num_rows,
1402            rows_per_batch,
1403            rows_scheduled: 0,
1404            rows_drained: 0,
1405            scheduler_exhausted: false,
1406            emitted_batch_size_warning: Arc::new(Once::new()),
1407        }
1408    }
1409
1410    fn accept_decoder(&mut self, decoder: DecoderReady) -> Result<()> {
1411        if decoder.path.is_empty() {
1412            // The root decoder we can ignore
1413            Ok(())
1414        } else {
1415            self.root_decoder.accept_child(decoder)
1416        }
1417    }
1418
1419    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1420        if self.scheduler_exhausted {
1421            return Ok(self.rows_scheduled);
1422        }
1423        while self.rows_scheduled < scheduled_need {
1424            let next_message = self.context.source.recv().await;
1425            match next_message {
1426                Some(scan_line) => {
1427                    let scan_line = scan_line?;
1428                    self.rows_scheduled = scan_line.scheduled_so_far;
1429                    for message in scan_line.decoders {
1430                        self.accept_decoder(message.into_legacy())?;
1431                    }
1432                }
1433                None => {
1434                    // Schedule ended before we got all the data we expected.  This probably
1435                    // means some kind of pushdown filter was applied and we didn't load as
1436                    // much data as we thought we would.
1437                    self.scheduler_exhausted = true;
1438                    return Ok(self.rows_scheduled);
1439                }
1440            }
1441        }
1442        Ok(scheduled_need)
1443    }
1444
1445    #[instrument(level = "debug", skip_all)]
1446    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1447        trace!(
1448            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1449            self.rows_remaining,
1450            self.rows_drained,
1451            self.rows_scheduled,
1452        );
1453        if self.rows_remaining == 0 {
1454            return Ok(None);
1455        }
1456
1457        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1458        self.rows_remaining -= to_take;
1459
1460        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1461        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1462        if scheduled_need > 0 {
1463            let desired_scheduled = scheduled_need + self.rows_scheduled;
1464            trace!(
1465                "Draining from scheduler (desire at least {} scheduled rows)",
1466                desired_scheduled
1467            );
1468            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1469            if actually_scheduled < desired_scheduled {
1470                let under_scheduled = desired_scheduled - actually_scheduled;
1471                to_take -= under_scheduled;
1472            }
1473        }
1474
1475        if to_take == 0 {
1476            return Ok(None);
1477        }
1478
1479        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1480        let loaded_need = self.rows_drained + to_take - 1;
1481        trace!(
1482            "Waiting for I/O (desire at least {} fully loaded rows)",
1483            loaded_need
1484        );
1485        self.root_decoder.wait_for_loaded(loaded_need).await?;
1486
1487        let next_task = self.root_decoder.drain(to_take)?;
1488        self.rows_drained += to_take;
1489        Ok(Some(next_task))
1490    }
1491
1492    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1493        let stream = futures::stream::unfold(self, |mut slf| async move {
1494            let next_task = slf.next_batch_task().await;
1495            let next_task = next_task.transpose().map(|next_task| {
1496                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1497                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1498                let task = tokio::spawn(
1499                    (async move {
1500                        let next_task = next_task?;
1501                        next_task.into_batch(emitted_batch_size_warning)
1502                    })
1503                    .in_current_span(),
1504                );
1505                (task, num_rows)
1506            });
1507            next_task.map(|(task, num_rows)| {
1508                let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1509                // This should be true since batch size is u32
1510                debug_assert!(num_rows <= u32::MAX as u64);
1511                let next_task = ReadBatchTask {
1512                    task,
1513                    num_rows: num_rows as u32,
1514                };
1515                (next_task, slf)
1516            })
1517        });
1518        stream.boxed()
1519    }
1520}
1521
1522// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1523// we can have a single implementation of the batch decode iterator
1524enum RootDecoderMessage {
1525    LoadedPage(LoadedPage),
1526    LegacyPage(DecoderReady),
1527}
1528trait RootDecoderType {
1529    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1530    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1531    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1532}
1533impl RootDecoderType for StructuralStructDecoder {
1534    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1535        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1536            unreachable!()
1537        };
1538        self.accept_page(loaded_page)
1539    }
1540    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1541        self.drain_batch_task(num_rows)
1542    }
1543    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1544        // Waiting happens elsewhere (not as part of the decoder)
1545        Ok(())
1546    }
1547}
1548impl RootDecoderType for SimpleStructDecoder {
1549    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1550        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1551            unreachable!()
1552        };
1553        self.accept_child(legacy_page)
1554    }
1555    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1556        self.drain(num_rows)
1557    }
1558    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1559        runtime.block_on(self.wait_for_loaded(loaded_need))
1560    }
1561}
1562
1563/// A blocking batch decoder that performs synchronous decoding
1564struct BatchDecodeIterator<T: RootDecoderType> {
1565    messages: VecDeque<Result<DecoderMessage>>,
1566    root_decoder: T,
1567    rows_remaining: u64,
1568    rows_per_batch: u32,
1569    rows_scheduled: u64,
1570    rows_drained: u64,
1571    emitted_batch_size_warning: Arc<Once>,
1572    // Note: this is not the runtime on which I/O happens.
1573    // That's always in the scheduler.  This is just a runtime we use to
1574    // sleep the current thread if I/O is unready
1575    wait_for_io_runtime: tokio::runtime::Runtime,
1576    schema: Arc<ArrowSchema>,
1577}
1578
1579impl<T: RootDecoderType> BatchDecodeIterator<T> {
1580    /// Create a new instance of a batch decode iterator
1581    pub fn new(
1582        messages: VecDeque<Result<DecoderMessage>>,
1583        rows_per_batch: u32,
1584        num_rows: u64,
1585        root_decoder: T,
1586        schema: Arc<ArrowSchema>,
1587    ) -> Self {
1588        Self {
1589            messages,
1590            root_decoder,
1591            rows_remaining: num_rows,
1592            rows_per_batch,
1593            rows_scheduled: 0,
1594            rows_drained: 0,
1595            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1596                .build()
1597                .unwrap(),
1598            emitted_batch_size_warning: Arc::new(Once::new()),
1599            schema,
1600        }
1601    }
1602
1603    /// Wait for a single page of data to finish loading
1604    ///
1605    /// If the data is not available this will perform a *blocking* wait (put
1606    /// the current thread to sleep)
1607    fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1608        match maybe_done(unloaded_page.0) {
1609            // Fast path, avoid all runtime shenanigans if the data is ready
1610            MaybeDone::Done(loaded_page) => loaded_page,
1611            // Slow path, we need to wait on I/O, enter the runtime
1612            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1613            MaybeDone::Gone => unreachable!(),
1614        }
1615    }
1616
1617    /// Waits for I/O until `scheduled_need` rows have been loaded
1618    ///
1619    /// Note that `scheduled_need` is cumulative.  E.g. this method
1620    /// should be called with 5, 10, 15 and not 5, 5, 5
1621    #[instrument(skip_all)]
1622    fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1623        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1624            let message = self.messages.pop_front().unwrap()?;
1625            self.rows_scheduled = message.scheduled_so_far;
1626            for decoder_message in message.decoders {
1627                match decoder_message {
1628                    MessageType::UnloadedPage(unloaded_page) => {
1629                        let loaded_page = self.wait_for_page(unloaded_page)?;
1630                        self.root_decoder
1631                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1632                    }
1633                    MessageType::DecoderReady(decoder_ready) => {
1634                        // The root decoder we can ignore
1635                        if !decoder_ready.path.is_empty() {
1636                            self.root_decoder
1637                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1638                        }
1639                    }
1640                }
1641            }
1642        }
1643
1644        let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1645
1646        self.root_decoder
1647            .wait(loaded_need, &self.wait_for_io_runtime)?;
1648        Ok(self.rows_scheduled)
1649    }
1650
1651    #[instrument(level = "debug", skip_all)]
1652    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1653        trace!(
1654            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1655            self.rows_remaining,
1656            self.rows_drained,
1657            self.rows_scheduled,
1658        );
1659        if self.rows_remaining == 0 {
1660            return Ok(None);
1661        }
1662
1663        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1664        self.rows_remaining -= to_take;
1665
1666        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1667        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1668        if scheduled_need > 0 {
1669            let desired_scheduled = scheduled_need + self.rows_scheduled;
1670            trace!(
1671                "Draining from scheduler (desire at least {} scheduled rows)",
1672                desired_scheduled
1673            );
1674            let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1675            if actually_scheduled < desired_scheduled {
1676                let under_scheduled = desired_scheduled - actually_scheduled;
1677                to_take -= under_scheduled;
1678            }
1679        }
1680
1681        if to_take == 0 {
1682            return Ok(None);
1683        }
1684
1685        let next_task = self.root_decoder.drain_batch(to_take)?;
1686
1687        self.rows_drained += to_take;
1688
1689        let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1690
1691        Ok(Some(batch))
1692    }
1693}
1694
1695impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1696    type Item = ArrowResult<RecordBatch>;
1697
1698    fn next(&mut self) -> Option<Self::Item> {
1699        self.next_batch_task()
1700            .transpose()
1701            .map(|r| r.map_err(ArrowError::from))
1702    }
1703}
1704
1705impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1706    fn schema(&self) -> Arc<ArrowSchema> {
1707        self.schema.clone()
1708    }
1709}
1710
1711/// A stream that takes scheduled jobs and generates decode tasks from them.
1712pub struct StructuralBatchDecodeStream {
1713    context: DecoderContext,
1714    root_decoder: StructuralStructDecoder,
1715    rows_remaining: u64,
1716    rows_per_batch: u32,
1717    rows_scheduled: u64,
1718    rows_drained: u64,
1719    scheduler_exhausted: bool,
1720    emitted_batch_size_warning: Arc<Once>,
1721}
1722
1723impl StructuralBatchDecodeStream {
1724    /// Create a new instance of a batch decode stream
1725    ///
1726    /// # Arguments
1727    ///
1728    /// * `scheduled` - an incoming stream of decode tasks from a
1729    ///   [`crate::decode::DecodeBatchScheduler`]
1730    /// * `schema` - the schema of the data to create
1731    /// * `rows_per_batch` the number of rows to create before making a batch
1732    /// * `num_rows` the total number of rows scheduled
1733    /// * `num_columns` the total number of columns in the file
1734    pub fn new(
1735        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1736        rows_per_batch: u32,
1737        num_rows: u64,
1738        root_decoder: StructuralStructDecoder,
1739    ) -> Self {
1740        Self {
1741            context: DecoderContext::new(scheduled),
1742            root_decoder,
1743            rows_remaining: num_rows,
1744            rows_per_batch,
1745            rows_scheduled: 0,
1746            rows_drained: 0,
1747            scheduler_exhausted: false,
1748            emitted_batch_size_warning: Arc::new(Once::new()),
1749        }
1750    }
1751
1752    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1753        if self.scheduler_exhausted {
1754            return Ok(self.rows_scheduled);
1755        }
1756        while self.rows_scheduled < scheduled_need {
1757            let next_message = self.context.source.recv().await;
1758            match next_message {
1759                Some(scan_line) => {
1760                    let scan_line = scan_line?;
1761                    self.rows_scheduled = scan_line.scheduled_so_far;
1762                    for message in scan_line.decoders {
1763                        let unloaded_page = message.into_structural();
1764                        let loaded_page = unloaded_page.0.await?;
1765                        self.root_decoder.accept_page(loaded_page)?;
1766                    }
1767                }
1768                None => {
1769                    // Schedule ended before we got all the data we expected.  This probably
1770                    // means some kind of pushdown filter was applied and we didn't load as
1771                    // much data as we thought we would.
1772                    self.scheduler_exhausted = true;
1773                    return Ok(self.rows_scheduled);
1774                }
1775            }
1776        }
1777        Ok(scheduled_need)
1778    }
1779
1780    #[instrument(level = "debug", skip_all)]
1781    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1782        trace!(
1783            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1784            self.rows_remaining,
1785            self.rows_drained,
1786            self.rows_scheduled,
1787        );
1788        if self.rows_remaining == 0 {
1789            return Ok(None);
1790        }
1791
1792        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1793        self.rows_remaining -= to_take;
1794
1795        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1796        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1797        if scheduled_need > 0 {
1798            let desired_scheduled = scheduled_need + self.rows_scheduled;
1799            trace!(
1800                "Draining from scheduler (desire at least {} scheduled rows)",
1801                desired_scheduled
1802            );
1803            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1804            if actually_scheduled < desired_scheduled {
1805                let under_scheduled = desired_scheduled - actually_scheduled;
1806                to_take -= under_scheduled;
1807            }
1808        }
1809
1810        if to_take == 0 {
1811            return Ok(None);
1812        }
1813
1814        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1815        self.rows_drained += to_take;
1816        Ok(Some(next_task))
1817    }
1818
1819    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1820        let stream = futures::stream::unfold(self, |mut slf| async move {
1821            let next_task = slf.next_batch_task().await;
1822            let next_task = next_task.transpose().map(|next_task| {
1823                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1824                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1825                let task = tokio::spawn(async move {
1826                    let next_task = next_task?;
1827                    next_task.into_batch(emitted_batch_size_warning)
1828                });
1829                (task, num_rows)
1830            });
1831            next_task.map(|(task, num_rows)| {
1832                let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1833                // This should be true since batch size is u32
1834                debug_assert!(num_rows <= u32::MAX as u64);
1835                let next_task = ReadBatchTask {
1836                    task,
1837                    num_rows: num_rows as u32,
1838                };
1839                (next_task, slf)
1840            })
1841        });
1842        stream.boxed()
1843    }
1844}
1845
1846#[derive(Debug)]
1847pub enum RequestedRows {
1848    Ranges(Vec<Range<u64>>),
1849    Indices(Vec<u64>),
1850}
1851
1852impl RequestedRows {
1853    pub fn num_rows(&self) -> u64 {
1854        match self {
1855            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1856            Self::Indices(indices) => indices.len() as u64,
1857        }
1858    }
1859}
1860
1861#[derive(Debug, Clone)]
1862pub struct SchedulerDecoderConfig {
1863    pub decoder_plugins: Arc<DecoderPlugins>,
1864    pub batch_size: u32,
1865    pub io: Arc<dyn EncodingsIo>,
1866    pub cache: Arc<FileMetadataCache>,
1867    pub should_validate: bool,
1868}
1869
1870fn check_scheduler_on_drop(
1871    stream: BoxStream<'static, ReadBatchTask>,
1872    scheduler_handle: tokio::task::JoinHandle<()>,
1873) -> BoxStream<'static, ReadBatchTask> {
1874    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
1875    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
1876    // when the stream finishes.
1877    let mut scheduler_handle = Some(scheduler_handle);
1878    let check_scheduler = stream::unfold((), move |_| {
1879        let handle = scheduler_handle.take();
1880        async move {
1881            if let Some(handle) = handle {
1882                handle.await.unwrap();
1883            }
1884            None
1885        }
1886    });
1887    stream.chain(check_scheduler).boxed()
1888}
1889
1890pub fn create_decode_stream(
1891    schema: &Schema,
1892    num_rows: u64,
1893    batch_size: u32,
1894    is_structural: bool,
1895    should_validate: bool,
1896    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1897) -> BoxStream<'static, ReadBatchTask> {
1898    if is_structural {
1899        let arrow_schema = ArrowSchema::from(schema);
1900        let structural_decoder = StructuralStructDecoder::new(
1901            arrow_schema.fields,
1902            should_validate,
1903            /*is_root=*/ true,
1904        );
1905        StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1906    } else {
1907        let arrow_schema = ArrowSchema::from(schema);
1908        let root_fields = arrow_schema.fields;
1909
1910        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1911        BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1912    }
1913}
1914
1915/// Creates a iterator that decodes a set of messages in a blocking fashion
1916///
1917/// See [`schedule_and_decode_blocking`] for more information.
1918pub fn create_decode_iterator(
1919    schema: &Schema,
1920    num_rows: u64,
1921    batch_size: u32,
1922    should_validate: bool,
1923    is_structural: bool,
1924    messages: VecDeque<Result<DecoderMessage>>,
1925) -> Box<dyn RecordBatchReader + Send + 'static> {
1926    let arrow_schema = Arc::new(ArrowSchema::from(schema));
1927    let root_fields = arrow_schema.fields.clone();
1928    if is_structural {
1929        let simple_struct_decoder =
1930            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true);
1931        Box::new(BatchDecodeIterator::new(
1932            messages,
1933            batch_size,
1934            num_rows,
1935            simple_struct_decoder,
1936            arrow_schema,
1937        ))
1938    } else {
1939        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1940        Box::new(BatchDecodeIterator::new(
1941            messages,
1942            batch_size,
1943            num_rows,
1944            root_decoder,
1945            arrow_schema,
1946        ))
1947    }
1948}
1949
1950fn create_scheduler_decoder(
1951    column_infos: Vec<Arc<ColumnInfo>>,
1952    requested_rows: RequestedRows,
1953    filter: FilterExpression,
1954    column_indices: Vec<u32>,
1955    target_schema: Arc<Schema>,
1956    config: SchedulerDecoderConfig,
1957) -> Result<BoxStream<'static, ReadBatchTask>> {
1958    let num_rows = requested_rows.num_rows();
1959
1960    let is_structural = column_infos[0].is_structural();
1961
1962    let (tx, rx) = mpsc::unbounded_channel();
1963
1964    let decode_stream = create_decode_stream(
1965        &target_schema,
1966        num_rows,
1967        config.batch_size,
1968        is_structural,
1969        config.should_validate,
1970        rx,
1971    );
1972
1973    let scheduler_handle = tokio::task::spawn(async move {
1974        let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1975            target_schema.as_ref(),
1976            &column_indices,
1977            &column_infos,
1978            &vec![],
1979            num_rows,
1980            config.decoder_plugins,
1981            config.io.clone(),
1982            config.cache,
1983            &filter,
1984        )
1985        .await
1986        {
1987            Ok(scheduler) => scheduler,
1988            Err(e) => {
1989                let _ = tx.send(Err(e));
1990                return;
1991            }
1992        };
1993
1994        match requested_rows {
1995            RequestedRows::Ranges(ranges) => {
1996                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1997            }
1998            RequestedRows::Indices(indices) => {
1999                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2000            }
2001        }
2002    });
2003
2004    Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2005}
2006
2007/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
2008/// decode the scheduled data and returns the decoder as a stream of record batches.
2009///
2010/// This is a convenience function that creates both the scheduler and the decoder
2011/// which can be a little tricky to get right.
2012pub fn schedule_and_decode(
2013    column_infos: Vec<Arc<ColumnInfo>>,
2014    requested_rows: RequestedRows,
2015    filter: FilterExpression,
2016    column_indices: Vec<u32>,
2017    target_schema: Arc<Schema>,
2018    config: SchedulerDecoderConfig,
2019) -> BoxStream<'static, ReadBatchTask> {
2020    if requested_rows.num_rows() == 0 {
2021        return stream::empty().boxed();
2022    }
2023    // For convenience we really want this method to be a snchronous method where all
2024    // errors happen on the stream.  There is some async initialization that must happen
2025    // when creating a scheduler.  We wrap that all up in the very first task.
2026    match create_scheduler_decoder(
2027        column_infos,
2028        requested_rows,
2029        filter,
2030        column_indices,
2031        target_schema,
2032        config,
2033    ) {
2034        // If the initialization failed make it look like a failed task
2035        Ok(stream) => stream,
2036        Err(e) => stream::once(std::future::ready(ReadBatchTask {
2037            num_rows: 0,
2038            task: std::future::ready(Err(e)).boxed(),
2039        }))
2040        .boxed(),
2041    }
2042}
2043
2044lazy_static::lazy_static! {
2045    pub static ref WAITER_RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread()
2046        .build()
2047        .unwrap();
2048}
2049
2050/// Schedules and decodes the requested data in a blocking fashion
2051///
2052/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
2053/// and decodes it in the current thread.
2054///
2055/// This can be useful when the disk is fast (or the data is in memory) and the amount
2056/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
2057///
2058/// This should NOT be used for full scans.  Even if the data is in memory this function will
2059/// not parallelize the decode and will be slower than the async version.  Full scans typically
2060/// make relatively few IOPs and so the asynchronous overhead is much smaller.
2061///
2062/// This method will first completely run the scheduling process.  Then it will run the
2063/// decode process.
2064pub fn schedule_and_decode_blocking(
2065    column_infos: Vec<Arc<ColumnInfo>>,
2066    requested_rows: RequestedRows,
2067    filter: FilterExpression,
2068    column_indices: Vec<u32>,
2069    target_schema: Arc<Schema>,
2070    config: SchedulerDecoderConfig,
2071) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2072    if requested_rows.num_rows() == 0 {
2073        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2074        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2075    }
2076
2077    let num_rows = requested_rows.num_rows();
2078    let is_structural = column_infos[0].is_structural();
2079
2080    let (tx, mut rx) = mpsc::unbounded_channel();
2081
2082    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
2083    // runtime.
2084    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2085        target_schema.as_ref(),
2086        &column_indices,
2087        &column_infos,
2088        &vec![],
2089        num_rows,
2090        config.decoder_plugins,
2091        config.io.clone(),
2092        config.cache,
2093        &filter,
2094    ))?;
2095
2096    // Schedule the requested rows
2097    match requested_rows {
2098        RequestedRows::Ranges(ranges) => {
2099            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2100        }
2101        RequestedRows::Indices(indices) => {
2102            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2103        }
2104    }
2105
2106    // Drain the scheduler queue into a vec of decode messages
2107    let mut messages = Vec::new();
2108    while rx
2109        .recv_many(&mut messages, usize::MAX)
2110        .now_or_never()
2111        .unwrap()
2112        != 0
2113    {}
2114
2115    // Create a decoder to decode the messages
2116    let decode_iterator = create_decode_iterator(
2117        &target_schema,
2118        num_rows,
2119        config.batch_size,
2120        config.should_validate,
2121        is_structural,
2122        messages.into(),
2123    );
2124
2125    Ok(decode_iterator)
2126}
2127
2128/// A decoder for single-column encodings of primitive data (this includes fixed size
2129/// lists of primitive data)
2130///
2131/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2132///
2133/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2134/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2135/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2136/// the decode task for batch 0 and the decode task for batch 1.
2137///
2138/// See [`crate::decoder`] for more information
2139pub trait PrimitivePageDecoder: Send + Sync {
2140    /// Decode data into buffers
2141    ///
2142    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2143    /// such as decompressing from some compressed representation.
2144    ///
2145    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2146    /// portion only needs to be updated if the encoding has some concept of an "optional"
2147    /// buffer.
2148    ///
2149    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2150    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2151    ///
2152    /// Binary decodings have two output buffers (one for values, one for offsets)
2153    ///
2154    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2155    /// fixed size strings into variable length strings going from one input buffer to multiple output
2156    /// buffers.
2157    ///
2158    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2159    /// generally end at one of these structures.  However, intermediate structures may exist which
2160    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2161    /// with buffers that have bits-per-value that is not a multiple of 8.
2162    ///
2163    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2164    /// type (order matters) and encodings that aim to decode into arrow types should respect
2165    /// this layout.
2166    /// # Arguments
2167    ///
2168    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2169    /// * `num_rows` - how many rows to decode
2170    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2171    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2172}
2173
2174/// A scheduler for single-column encodings of primitive data
2175///
2176/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2177///
2178/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2179/// be shared in follow-up I/O tasks.
2180///
2181/// See [`crate::decoder`] for more information
2182pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2183    /// Schedules a batch of I/O to load the data needed for the requested ranges
2184    ///
2185    /// Returns a future that will yield a decoder once the data has been loaded
2186    ///
2187    /// # Arguments
2188    ///
2189    /// * `range` - the range of row offsets (relative to start of page) requested
2190    ///   these must be ordered and must not overlap
2191    /// * `scheduler` - a scheduler to submit the I/O request to
2192    /// * `top_level_row` - the row offset of the top level field currently being
2193    ///   scheduled.  This can be used to assign priority to I/O requests
2194    fn schedule_ranges(
2195        &self,
2196        ranges: &[Range<u64>],
2197        scheduler: &Arc<dyn EncodingsIo>,
2198        top_level_row: u64,
2199    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2200}
2201
2202/// A trait to control the priority of I/O
2203pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2204    fn advance(&mut self, num_rows: u64);
2205    fn current_priority(&self) -> u64;
2206    fn box_clone(&self) -> Box<dyn PriorityRange>;
2207}
2208
2209/// A simple priority scheme for top-level fields with no parent
2210/// repetition
2211#[derive(Debug)]
2212pub struct SimplePriorityRange {
2213    priority: u64,
2214}
2215
2216impl SimplePriorityRange {
2217    fn new(priority: u64) -> Self {
2218        Self { priority }
2219    }
2220}
2221
2222impl PriorityRange for SimplePriorityRange {
2223    fn advance(&mut self, num_rows: u64) {
2224        self.priority += num_rows;
2225    }
2226
2227    fn current_priority(&self) -> u64 {
2228        self.priority
2229    }
2230
2231    fn box_clone(&self) -> Box<dyn PriorityRange> {
2232        Box::new(Self {
2233            priority: self.priority,
2234        })
2235    }
2236}
2237
2238/// Determining the priority of a list request is tricky.  We want
2239/// the priority to be the top-level row.  So if we have a
2240/// list<list<int>> and each outer list has 10 rows and each inner
2241/// list has 5 rows then the priority of the 100th item is 1 because
2242/// it is the 5th item in the 10th item of the *second* row.
2243///
2244/// This structure allows us to keep track of this complicated priority
2245/// relationship.
2246///
2247/// There's a fair amount of bookkeeping involved here.
2248///
2249/// A better approach (using repetition levels) is coming in the future.
2250pub struct ListPriorityRange {
2251    base: Box<dyn PriorityRange>,
2252    offsets: Arc<[u64]>,
2253    cur_index_into_offsets: usize,
2254    cur_position: u64,
2255}
2256
2257impl ListPriorityRange {
2258    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2259        Self {
2260            base,
2261            offsets,
2262            cur_index_into_offsets: 0,
2263            cur_position: 0,
2264        }
2265    }
2266}
2267
2268impl std::fmt::Debug for ListPriorityRange {
2269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2270        f.debug_struct("ListPriorityRange")
2271            .field("base", &self.base)
2272            .field("offsets.len()", &self.offsets.len())
2273            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2274            .field("cur_position", &self.cur_position)
2275            .finish()
2276    }
2277}
2278
2279impl PriorityRange for ListPriorityRange {
2280    fn advance(&mut self, num_rows: u64) {
2281        // We've scheduled X items.  Now walk through the offsets to
2282        // determine how many rows we've scheduled.
2283        self.cur_position += num_rows;
2284        let mut idx_into_offsets = self.cur_index_into_offsets;
2285        while idx_into_offsets + 1 < self.offsets.len()
2286            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2287        {
2288            idx_into_offsets += 1;
2289        }
2290        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2291        self.cur_index_into_offsets = idx_into_offsets;
2292        self.base.advance(base_rows_advanced as u64);
2293    }
2294
2295    fn current_priority(&self) -> u64 {
2296        self.base.current_priority()
2297    }
2298
2299    fn box_clone(&self) -> Box<dyn PriorityRange> {
2300        Box::new(Self {
2301            base: self.base.box_clone(),
2302            offsets: self.offsets.clone(),
2303            cur_index_into_offsets: self.cur_index_into_offsets,
2304            cur_position: self.cur_position,
2305        })
2306    }
2307}
2308
2309/// Contains the context for a scheduler
2310pub struct SchedulerContext {
2311    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2312    io: Arc<dyn EncodingsIo>,
2313    cache: Arc<FileMetadataCache>,
2314    name: String,
2315    path: Vec<u32>,
2316    path_names: Vec<String>,
2317}
2318
2319pub struct ScopedSchedulerContext<'a> {
2320    pub context: &'a mut SchedulerContext,
2321}
2322
2323impl<'a> ScopedSchedulerContext<'a> {
2324    pub fn pop(self) -> &'a mut SchedulerContext {
2325        self.context.pop();
2326        self.context
2327    }
2328}
2329
2330impl SchedulerContext {
2331    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<FileMetadataCache>) -> Self {
2332        Self {
2333            io,
2334            cache,
2335            recv: None,
2336            name: "".to_string(),
2337            path: Vec::new(),
2338            path_names: Vec::new(),
2339        }
2340    }
2341
2342    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2343        &self.io
2344    }
2345
2346    pub fn cache(&self) -> &Arc<FileMetadataCache> {
2347        &self.cache
2348    }
2349
2350    pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2351        self.path.push(index);
2352        self.path_names.push(name.to_string());
2353        ScopedSchedulerContext { context: self }
2354    }
2355
2356    pub fn pop(&mut self) {
2357        self.path.pop();
2358        self.path_names.pop();
2359    }
2360
2361    pub fn path_name(&self) -> String {
2362        let path = self.path_names.join("/");
2363        if self.recv.is_some() {
2364            format!("TEMP({}){}", self.name, path)
2365        } else {
2366            format!("ROOT{}", path)
2367        }
2368    }
2369
2370    pub fn current_path(&self) -> VecDeque<u32> {
2371        VecDeque::from_iter(self.path.iter().copied())
2372    }
2373
2374    pub fn locate_decoder(&mut self, decoder: Box<dyn LogicalPageDecoder>) -> DecoderReady {
2375        trace!(
2376            "Scheduling decoder of type {:?} for {:?}",
2377            decoder.data_type(),
2378            self.path,
2379        );
2380        DecoderReady {
2381            decoder,
2382            path: self.current_path(),
2383        }
2384    }
2385}
2386
2387pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2388
2389impl std::fmt::Debug for UnloadedPage {
2390    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2391        f.debug_struct("UnloadedPage").finish()
2392    }
2393}
2394
2395#[derive(Debug)]
2396pub struct ScheduledScanLine {
2397    pub rows_scheduled: u64,
2398    pub decoders: Vec<MessageType>,
2399}
2400
2401pub trait SchedulingJob: std::fmt::Debug {
2402    fn schedule_next(
2403        &mut self,
2404        context: &mut SchedulerContext,
2405        priority: &dyn PriorityRange,
2406    ) -> Result<ScheduledScanLine>;
2407
2408    fn num_rows(&self) -> u64;
2409}
2410
2411pub trait StructuralSchedulingJob: std::fmt::Debug {
2412    fn schedule_next(
2413        &mut self,
2414        context: &mut SchedulerContext,
2415    ) -> Result<Option<ScheduledScanLine>>;
2416}
2417
2418/// A filter expression to apply to the data
2419///
2420/// The core decoders do not currently take advantage of filtering in
2421/// any way.  In order to maintain the abstraction we represent filters
2422/// as an arbitrary byte sequence.
2423///
2424/// We recommend that encodings use Substrait for filters.
2425pub struct FilterExpression(pub Bytes);
2426
2427impl FilterExpression {
2428    /// Create a filter expression that does not filter any data
2429    ///
2430    /// This is currently represented by an empty byte array.  Encoders
2431    /// that are "filter aware" should make sure they handle this case.
2432    pub fn no_filter() -> Self {
2433        Self(Bytes::new())
2434    }
2435
2436    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2437    pub fn is_noop(&self) -> bool {
2438        self.0.is_empty()
2439    }
2440}
2441
2442/// A scheduler for a field's worth of data
2443///
2444/// Each field in a reader's output schema maps to one field scheduler.  This scheduler may
2445/// map to more than one column.  For example, one field of struct data may
2446/// cover many columns of child data.  In fact, the entire file is treated as one
2447/// top-level struct field.
2448///
2449/// The scheduler is responsible for calculating the necessary I/O.  One schedule_range
2450/// request could trigger multiple batches of I/O across multiple columns.  The scheduler
2451/// should emit decoders into the sink as quickly as possible.
2452///
2453/// As soon as the scheduler encounters a batch of data that can decoded then the scheduler
2454/// should emit a decoder in the "unloaded" state.  The decode stream will pull the decoder
2455/// and start decoding.
2456///
2457/// The order in which decoders are emitted is important.  Pages should be emitted in
2458/// row-major order allowing decode of complete rows as quickly as possible.
2459///
2460/// The `FieldScheduler` should be stateless and `Send` and `Sync`.  This is
2461/// because it might need to be shared.  For example, a list page has a reference to
2462/// the field schedulers for its items column.  This is shared with the follow-up I/O
2463/// task created when the offsets are loaded.
2464///
2465/// See [`crate::decoder`] for more information
2466pub trait FieldScheduler: Send + Sync + std::fmt::Debug {
2467    /// Called at the beginning of scheduling to initialize the scheduler
2468    fn initialize<'a>(
2469        &'a self,
2470        filter: &'a FilterExpression,
2471        context: &'a SchedulerContext,
2472    ) -> BoxFuture<'a, Result<()>>;
2473    /// Schedules I/O for the requested portions of the field.
2474    ///
2475    /// Note: `ranges` must be ordered and non-overlapping
2476    /// TODO: Support unordered or overlapping ranges in file scheduler
2477    fn schedule_ranges<'a>(
2478        &'a self,
2479        ranges: &[Range<u64>],
2480        filter: &FilterExpression,
2481    ) -> Result<Box<dyn SchedulingJob + 'a>>;
2482    /// The number of rows in this field
2483    fn num_rows(&self) -> u64;
2484}
2485
2486pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2487    fn initialize<'a>(
2488        &'a mut self,
2489        filter: &'a FilterExpression,
2490        context: &'a SchedulerContext,
2491    ) -> BoxFuture<'a, Result<()>>;
2492    fn schedule_ranges<'a>(
2493        &'a self,
2494        ranges: &[Range<u64>],
2495        filter: &FilterExpression,
2496    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2497}
2498
2499/// A trait for tasks that decode data into an Arrow array
2500pub trait DecodeArrayTask: Send {
2501    /// Decodes the data into an Arrow array
2502    fn decode(self: Box<Self>) -> Result<ArrayRef>;
2503}
2504
2505impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2506    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2507        StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2508    }
2509}
2510
2511/// A task to decode data into an Arrow record batch
2512///
2513/// It has a child `task` which decodes a struct array with no nulls.
2514/// This is then converted into a record batch.
2515pub struct NextDecodeTask {
2516    /// The decode task itself
2517    pub task: Box<dyn DecodeArrayTask>,
2518    /// The number of rows that will be created
2519    pub num_rows: u64,
2520}
2521
2522impl NextDecodeTask {
2523    // Run the task and produce a record batch
2524    //
2525    // If the batch is very large this function will log a warning message
2526    // suggesting the user try a smaller batch size.
2527    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2528    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2529        let struct_arr = self.task.decode();
2530        match struct_arr {
2531            Ok(struct_arr) => {
2532                let batch = RecordBatch::from(struct_arr.as_struct());
2533                let size_bytes = batch.get_array_memory_size() as u64;
2534                if size_bytes > BATCH_SIZE_BYTES_WARNING {
2535                    emitted_batch_size_warning.call_once(|| {
2536                        let size_mb = size_bytes / 1024 / 1024;
2537                        debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2538                    });
2539                }
2540                Ok(batch)
2541            }
2542            Err(e) => {
2543                let e = Error::Internal {
2544                    message: format!("Error decoding batch: {}", e),
2545                    location: location!(),
2546                };
2547                Err(e)
2548            }
2549        }
2550    }
2551}
2552
2553#[derive(Debug)]
2554pub struct DecoderReady {
2555    // The decoder that is ready to be decoded
2556    pub decoder: Box<dyn LogicalPageDecoder>,
2557    // The path to the decoder, the first value is the column index
2558    // following values, if present, are nested child indices
2559    //
2560    // For example, a path of [1, 1, 0] would mean to grab the second
2561    // column, then the second child, and then the first child.
2562    //
2563    // It could represent x in the following schema:
2564    //
2565    // score: float64
2566    // points: struct
2567    //   color: string
2568    //   location: struct
2569    //     x: float64
2570    //
2571    // Currently, only struct decoders have "children" although other
2572    // decoders may at some point as well.  List children are only
2573    // handled through indirect I/O at the moment and so they don't
2574    // need to be represented (yet)
2575    pub path: VecDeque<u32>,
2576}
2577
2578// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2579// share some code paths between the two.  Decoders can safely unwrap into whatever
2580// style they expect since a file will be either all-2.0 or all-2.1
2581#[derive(Debug)]
2582pub enum MessageType {
2583    // The older v2.0 scheduler/decoder used a scheme where the message was the
2584    // decoder itself.  The messages were not sent in priority order and the decoder
2585    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2586    // complexity.
2587    DecoderReady(DecoderReady),
2588    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2589    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2590    // the decoder does not have to worry about waiting for I/O.
2591    UnloadedPage(UnloadedPage),
2592}
2593
2594impl MessageType {
2595    pub fn into_legacy(self) -> DecoderReady {
2596        match self {
2597            Self::DecoderReady(decoder) => decoder,
2598            Self::UnloadedPage(_) => {
2599                panic!("Expected DecoderReady but got UnloadedPage")
2600            }
2601        }
2602    }
2603
2604    pub fn into_structural(self) -> UnloadedPage {
2605        match self {
2606            Self::UnloadedPage(unloaded) => unloaded,
2607            Self::DecoderReady(_) => {
2608                panic!("Expected UnloadedPage but got DecoderReady")
2609            }
2610        }
2611    }
2612}
2613
2614pub struct DecoderMessage {
2615    pub scheduled_so_far: u64,
2616    pub decoders: Vec<MessageType>,
2617}
2618
2619pub struct DecoderContext {
2620    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2621}
2622
2623impl DecoderContext {
2624    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2625        Self { source }
2626    }
2627}
2628
2629/// A decoder for a field's worth of data
2630///
2631/// The decoder is initially "unloaded" (doesn't have all its data).  The [`Self::wait`]
2632/// method should be called to wait for the needed I/O data before attempting to decode
2633/// any further.
2634///
2635/// Unlike the other decoder types it is assumed that `LogicalPageDecoder` is stateful
2636/// and only `Send`.  This is why we don't need a `rows_to_skip` argument in [`Self::drain`]
2637pub trait LogicalPageDecoder: std::fmt::Debug + Send {
2638    /// Add a newly scheduled child decoder
2639    ///
2640    /// The default implementation does not expect children and returns
2641    /// an error.
2642    fn accept_child(&mut self, _child: DecoderReady) -> Result<()> {
2643        Err(Error::Internal {
2644            message: format!(
2645                "The decoder {:?} does not expect children but received a child",
2646                self
2647            ),
2648            location: location!(),
2649        })
2650    }
2651    /// Waits until at least `num_rows` have been loaded
2652    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>>;
2653    /// The number of rows loaded so far
2654    fn rows_loaded(&self) -> u64;
2655    /// The number of rows that still need loading
2656    fn rows_unloaded(&self) -> u64 {
2657        self.num_rows() - self.rows_loaded()
2658    }
2659    /// The total number of rows in the field
2660    fn num_rows(&self) -> u64;
2661    /// The number of rows that have been drained so far
2662    fn rows_drained(&self) -> u64;
2663    /// The number of rows that are still available to drain
2664    fn rows_left(&self) -> u64 {
2665        self.num_rows() - self.rows_drained()
2666    }
2667    /// Creates a task to decode `num_rows` of data into an array
2668    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
2669    /// The data type of the decoded data
2670    fn data_type(&self) -> &DataType;
2671}
2672
2673pub struct DecodedPage {
2674    pub data: DataBlock,
2675    pub repdef: RepDefUnraveler,
2676}
2677
2678pub trait DecodePageTask: Send + std::fmt::Debug {
2679    /// Decodes the data into an Arrow array
2680    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2681}
2682
2683pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2684    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2685    fn num_rows(&self) -> u64;
2686}
2687
2688#[derive(Debug)]
2689pub struct LoadedPage {
2690    // The decoder that is ready to be decoded
2691    pub decoder: Box<dyn StructuralPageDecoder>,
2692    // The path to the decoder, the first value is the column index
2693    // following values, if present, are nested child indices
2694    //
2695    // For example, a path of [1, 1, 0] would mean to grab the second
2696    // column, then the second child, and then the first child.
2697    //
2698    // It could represent x in the following schema:
2699    //
2700    // score: float64
2701    // points: struct
2702    //   color: string
2703    //   location: struct
2704    //     x: float64
2705    //
2706    // Currently, only struct decoders have "children" although other
2707    // decoders may at some point as well.  List children are only
2708    // handled through indirect I/O at the moment and so they don't
2709    // need to be represented (yet)
2710    pub path: VecDeque<u32>,
2711    pub page_index: usize,
2712}
2713
2714pub struct DecodedArray {
2715    pub array: ArrayRef,
2716    pub repdef: CompositeRepDefUnraveler,
2717}
2718
2719pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2720    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2721}
2722
2723pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2724    /// Add a newly scheduled child decoder
2725    ///
2726    /// The default implementation does not expect children and returns
2727    /// an error.
2728    fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2729    /// Creates a task to decode `num_rows` of data into an array
2730    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2731    /// The data type of the decoded data
2732    fn data_type(&self) -> &DataType;
2733}
2734
2735#[derive(Debug, Default)]
2736pub struct DecoderPlugins {}
2737
2738/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2739pub async fn decode_batch(
2740    batch: &EncodedBatch,
2741    filter: &FilterExpression,
2742    decoder_plugins: Arc<DecoderPlugins>,
2743    should_validate: bool,
2744    version: LanceFileVersion,
2745    cache: Option<Arc<FileMetadataCache>>,
2746) -> Result<RecordBatch> {
2747    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2748    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2749    // polled twice.
2750
2751    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2752    let cache = cache.unwrap_or_else(|| {
2753        Arc::new(FileMetadataCache::with_capacity(
2754            128 * 1024 * 1024,
2755            CapacityMode::Bytes,
2756        ))
2757    });
2758    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2759        batch.schema.as_ref(),
2760        &batch.top_level_columns,
2761        &batch.page_table,
2762        &vec![],
2763        batch.num_rows,
2764        decoder_plugins,
2765        io_scheduler.clone(),
2766        cache,
2767        filter,
2768    )
2769    .await?;
2770    let (tx, rx) = unbounded_channel();
2771    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2772    let is_structural = version >= LanceFileVersion::V2_1;
2773    let mut decode_stream = create_decode_stream(
2774        &batch.schema,
2775        batch.num_rows,
2776        batch.num_rows as u32,
2777        is_structural,
2778        should_validate,
2779        rx,
2780    );
2781    decode_stream.next().await.unwrap().task.await
2782}
2783
2784#[cfg(test)]
2785// test coalesce indices to ranges
2786mod tests {
2787    use super::*;
2788
2789    #[test]
2790    fn test_coalesce_indices_to_ranges_with_single_index() {
2791        let indices = vec![1];
2792        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2793        assert_eq!(ranges, vec![1..2]);
2794    }
2795
2796    #[test]
2797    fn test_coalesce_indices_to_ranges() {
2798        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2799        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2800        assert_eq!(ranges, vec![1..10]);
2801    }
2802
2803    #[test]
2804    fn test_coalesce_indices_to_ranges_with_gaps() {
2805        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2806        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2807        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2808    }
2809}