Skip to main content

lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into `FieldScheduler` and `PageScheduler`.
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into `PhysicalPageDecoder` and
27//! [`LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type `List<UInt32>`
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use lance_core::utils::futures::FinallyStreamExt;
230use log::{debug, trace, warn};
231use snafu::location;
232use tokio::sync::mpsc::error::SendError;
233use tokio::sync::mpsc::{self, unbounded_channel};
234
235use lance_core::error::LanceOptionExt;
236use lance_core::{ArrowResult, Error, Result};
237use tracing::instrument;
238
239use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
240use crate::data::DataBlock;
241use crate::encoder::EncodedBatch;
242use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
243use crate::encodings::logical::list::StructuralListScheduler;
244use crate::encodings::logical::map::StructuralMapScheduler;
245use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
246use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
247use crate::format::pb::{self, column_encoding};
248use crate::format::pb21;
249use crate::previous::decoder::LogicalPageDecoder;
250use crate::previous::encodings::logical::list::OffsetPageInfo;
251use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
252use crate::previous::encodings::logical::{
253    binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
254    primitive::PrimitiveFieldScheduler,
255};
256use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
257use crate::version::LanceFileVersion;
258use crate::{BufferScheduler, EncodingsIo};
259
260// If users are getting batches over 10MiB large then it's time to reduce the batch size
261const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
262
263/// Top-level encoding message for a page.  Wraps both the
264/// legacy pb::ArrayEncoding and the newer pb::PageLayout
265///
266/// A file should only use one or the other and never both.
267/// 2.0 decoders can always assume this is pb::ArrayEncoding
268/// and 2.1+ decoders can always assume this is pb::PageLayout
269#[derive(Debug)]
270pub enum PageEncoding {
271    Legacy(pb::ArrayEncoding),
272    Structural(pb21::PageLayout),
273}
274
275impl PageEncoding {
276    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
277        match self {
278            Self::Legacy(enc) => enc,
279            Self::Structural(_) => panic!("Expected a legacy encoding"),
280        }
281    }
282
283    pub fn as_structural(&self) -> &pb21::PageLayout {
284        match self {
285            Self::Structural(enc) => enc,
286            Self::Legacy(_) => panic!("Expected a structural encoding"),
287        }
288    }
289
290    pub fn is_structural(&self) -> bool {
291        matches!(self, Self::Structural(_))
292    }
293}
294
295/// Metadata describing a page in a file
296///
297/// This is typically created by reading the metadata section of a Lance file
298#[derive(Debug)]
299pub struct PageInfo {
300    /// The number of rows in the page
301    pub num_rows: u64,
302    /// The priority (top level row number) of the page
303    ///
304    /// This is only set in 2.1 files and will be 0 for 2.0 files
305    pub priority: u64,
306    /// The encoding that explains the buffers in the page
307    pub encoding: PageEncoding,
308    /// The offsets and sizes of the buffers in the file
309    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
310}
311
312/// Metadata describing a column in a file
313///
314/// This is typically created by reading the metadata section of a Lance file
315#[derive(Debug, Clone)]
316pub struct ColumnInfo {
317    /// The index of the column in the file
318    pub index: u32,
319    /// The metadata for each page in the column
320    pub page_infos: Arc<[PageInfo]>,
321    /// File positions and their sizes of the column-level buffers
322    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
323    pub encoding: pb::ColumnEncoding,
324}
325
326impl ColumnInfo {
327    /// Create a new instance
328    pub fn new(
329        index: u32,
330        page_infos: Arc<[PageInfo]>,
331        buffer_offsets_and_sizes: Vec<(u64, u64)>,
332        encoding: pb::ColumnEncoding,
333    ) -> Self {
334        Self {
335            index,
336            page_infos,
337            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
338            encoding,
339        }
340    }
341
342    pub fn is_structural(&self) -> bool {
343        self.page_infos
344            // Can just look at the first since all should be the same
345            .first()
346            .map(|page| page.encoding.is_structural())
347            .unwrap_or(false)
348    }
349}
350
351enum RootScheduler {
352    Structural(Box<dyn StructuralFieldScheduler>),
353    Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
354}
355
356impl RootScheduler {
357    fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
358        match self {
359            Self::Structural(_) => panic!("Expected a legacy scheduler"),
360            Self::Legacy(s) => s,
361        }
362    }
363
364    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
365        match self {
366            Self::Structural(s) => s.as_ref(),
367            Self::Legacy(_) => panic!("Expected a structural scheduler"),
368        }
369    }
370}
371
372/// The scheduler for decoding batches
373///
374/// Lance decoding is done in two steps, scheduling, and decoding.  The
375/// scheduling tends to be lightweight and should quickly figure what data
376/// is needed from the disk issue the appropriate I/O requests.  A decode task is
377/// created to eventually decode the data (once it is loaded) and scheduling
378/// moves on to scheduling the next page.
379///
380/// Meanwhile, it's expected that a decode stream will be setup to run at the
381/// same time.  Decode tasks take the data that is loaded and turn it into
382/// Arrow arrays.
383///
384/// This approach allows us to keep our I/O parallelism and CPU parallelism
385/// completely separate since those are often two very different values.
386///
387/// Backpressure should be achieved via the I/O service.  Requests that are
388/// issued will pile up if the decode stream is not polling quickly enough.
389/// The [`crate::EncodingsIo::submit_request`] function should return a pending
390/// future once there are too many I/O requests in flight.
391///
392/// TODO: Implement backpressure
393pub struct DecodeBatchScheduler {
394    root_scheduler: RootScheduler,
395    pub root_fields: Fields,
396    cache: Arc<LanceCache>,
397}
398
399pub struct ColumnInfoIter<'a> {
400    column_infos: Vec<Arc<ColumnInfo>>,
401    column_indices: &'a [u32],
402    column_info_pos: usize,
403    column_indices_pos: usize,
404}
405
406impl<'a> ColumnInfoIter<'a> {
407    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
408        let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
409        Self {
410            column_infos,
411            column_indices,
412            column_info_pos: initial_pos,
413            column_indices_pos: 0,
414        }
415    }
416
417    pub fn peek(&self) -> &Arc<ColumnInfo> {
418        &self.column_infos[self.column_info_pos]
419    }
420
421    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
422        let column_info = self.column_infos[self.column_info_pos].clone();
423        let transformed = transform(column_info);
424        self.column_infos[self.column_info_pos] = transformed;
425    }
426
427    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
428        self.next().ok_or_else(|| {
429            Error::invalid_input(
430                "there were more fields in the schema than provided column indices / infos",
431                location!(),
432            )
433        })
434    }
435
436    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
437        if self.column_info_pos < self.column_infos.len() {
438            let info = &self.column_infos[self.column_info_pos];
439            self.column_info_pos += 1;
440            Some(info)
441        } else {
442            None
443        }
444    }
445
446    pub(crate) fn next_top_level(&mut self) {
447        self.column_indices_pos += 1;
448        if self.column_indices_pos < self.column_indices.len() {
449            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
450        } else {
451            self.column_info_pos = self.column_infos.len();
452        }
453    }
454}
455
456/// These contain the file buffers shared across the entire file
457#[derive(Clone, Copy, Debug)]
458pub struct FileBuffers<'a> {
459    pub positions_and_sizes: &'a [(u64, u64)],
460}
461
462/// These contain the file buffers and also buffers specific to a column
463#[derive(Clone, Copy, Debug)]
464pub struct ColumnBuffers<'a, 'b> {
465    pub file_buffers: FileBuffers<'a>,
466    pub positions_and_sizes: &'b [(u64, u64)],
467}
468
469/// These contain the file & column buffers and also buffers specific to a page
470#[derive(Clone, Copy, Debug)]
471pub struct PageBuffers<'a, 'b, 'c> {
472    pub column_buffers: ColumnBuffers<'a, 'b>,
473    pub positions_and_sizes: &'c [(u64, u64)],
474}
475
476/// The core decoder strategy handles all the various Arrow types
477#[derive(Debug)]
478pub struct CoreFieldDecoderStrategy {
479    pub validate_data: bool,
480    pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
481    pub cache_repetition_index: bool,
482}
483
484impl Default for CoreFieldDecoderStrategy {
485    fn default() -> Self {
486        Self {
487            validate_data: false,
488            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
489            cache_repetition_index: false,
490        }
491    }
492}
493
494impl CoreFieldDecoderStrategy {
495    /// Create a new strategy with cache_repetition_index enabled
496    pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
497        self.cache_repetition_index = cache_repetition_index;
498        self
499    }
500
501    /// Create a new strategy from decoder config
502    pub fn from_decoder_config(config: &DecoderConfig) -> Self {
503        Self {
504            validate_data: config.validate_on_decode,
505            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
506            cache_repetition_index: config.cache_repetition_index,
507        }
508    }
509
510    /// This is just a sanity check to ensure there is no "wrapped encodings"
511    /// that haven't been handled.
512    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
513        let column_encoding = column_info
514            .encoding
515            .column_encoding
516            .as_ref()
517            .ok_or_else(|| {
518                Error::invalid_input(
519                    format!(
520                        "the column at index {} was missing a ColumnEncoding",
521                        column_info.index
522                    ),
523                    location!(),
524                )
525            })?;
526        if matches!(
527            column_encoding,
528            pb::column_encoding::ColumnEncoding::Values(_)
529        ) {
530            Ok(())
531        } else {
532            Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
533        }
534    }
535
536    fn is_structural_primitive(data_type: &DataType) -> bool {
537        if data_type.is_primitive() {
538            true
539        } else {
540            match data_type {
541                // DataType::is_primitive doesn't consider these primitive but we do
542                DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
543                DataType::Boolean
544                | DataType::Null
545                | DataType::FixedSizeBinary(_)
546                | DataType::Binary
547                | DataType::LargeBinary
548                | DataType::Utf8
549                | DataType::LargeUtf8 => true,
550                DataType::FixedSizeList(inner, _) => {
551                    Self::is_structural_primitive(inner.data_type())
552                }
553                _ => false,
554            }
555        }
556    }
557
558    fn is_primitive_legacy(data_type: &DataType) -> bool {
559        if data_type.is_primitive() {
560            true
561        } else {
562            match data_type {
563                // DataType::is_primitive doesn't consider these primitive but we do
564                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
565                DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
566                _ => false,
567            }
568        }
569    }
570
571    fn create_primitive_scheduler(
572        &self,
573        field: &Field,
574        column: &ColumnInfo,
575        buffers: FileBuffers,
576    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
577        Self::ensure_values_encoded(column, &field.name)?;
578        // Primitive fields map to a single column
579        let column_buffers = ColumnBuffers {
580            file_buffers: buffers,
581            positions_and_sizes: &column.buffer_offsets_and_sizes,
582        };
583        Ok(Box::new(PrimitiveFieldScheduler::new(
584            column.index,
585            field.data_type(),
586            column.page_infos.clone(),
587            column_buffers,
588            self.validate_data,
589        )))
590    }
591
592    /// Helper method to verify the page encoding of a struct header column
593    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
594        Self::ensure_values_encoded(column_info, field_name)?;
595        if column_info.page_infos.len() != 1 {
596            return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
597        }
598        let encoding = &column_info.page_infos[0].encoding;
599        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
600            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
601            _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
602        }
603    }
604
605    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
606        let encoding = &column_info.page_infos[0].encoding;
607        matches!(
608            encoding.as_legacy().array_encoding.as_ref().unwrap(),
609            pb::array_encoding::ArrayEncoding::PackedStruct(_)
610        )
611    }
612
613    fn create_list_scheduler(
614        &self,
615        list_field: &Field,
616        column_infos: &mut ColumnInfoIter,
617        buffers: FileBuffers,
618        offsets_column: &ColumnInfo,
619    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
620        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
621        let offsets_column_buffers = ColumnBuffers {
622            file_buffers: buffers,
623            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
624        };
625        let items_scheduler =
626            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
627
628        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
629            .page_infos
630            .iter()
631            .filter(|offsets_page| offsets_page.num_rows > 0)
632            .map(|offsets_page| {
633                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
634                    &offsets_page.encoding.as_legacy().array_encoding
635                {
636                    let inner = PageInfo {
637                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
638                        encoding: PageEncoding::Legacy(
639                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
640                        ),
641                        num_rows: offsets_page.num_rows,
642                        priority: 0,
643                    };
644                    (
645                        inner,
646                        OffsetPageInfo {
647                            offsets_in_page: offsets_page.num_rows,
648                            null_offset_adjustment: list_encoding.null_offset_adjustment,
649                            num_items_referenced_by_page: list_encoding.num_items,
650                        },
651                    )
652                } else {
653                    // TODO: Should probably return Err here
654                    panic!("Expected a list column");
655                }
656            })
657            .unzip();
658        let inner = Arc::new(PrimitiveFieldScheduler::new(
659            offsets_column.index,
660            DataType::UInt64,
661            Arc::from(inner_infos.into_boxed_slice()),
662            offsets_column_buffers,
663            self.validate_data,
664        )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
665        let items_field = match list_field.data_type() {
666            DataType::List(inner) => inner,
667            DataType::LargeList(inner) => inner,
668            _ => unreachable!(),
669        };
670        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
671            DataType::Int32
672        } else {
673            DataType::Int64
674        };
675        Ok(Box::new(ListFieldScheduler::new(
676            inner,
677            items_scheduler.into(),
678            items_field,
679            offset_type,
680            null_offset_adjustments,
681        )))
682    }
683
684    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
685        if let column_encoding::ColumnEncoding::Blob(blob) =
686            column_info.encoding.column_encoding.as_ref().unwrap()
687        {
688            let mut column_info = column_info.clone();
689            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
690            Some(column_info)
691        } else {
692            None
693        }
694    }
695
696    fn create_structural_field_scheduler(
697        &self,
698        field: &Field,
699        column_infos: &mut ColumnInfoIter,
700    ) -> Result<Box<dyn StructuralFieldScheduler>> {
701        let data_type = field.data_type();
702        if Self::is_structural_primitive(&data_type) {
703            let column_info = column_infos.expect_next()?;
704            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
705                column_info.as_ref(),
706                self.decompressor_strategy.as_ref(),
707                self.cache_repetition_index,
708                field,
709            )?);
710
711            // advance to the next top level column
712            column_infos.next_top_level();
713
714            return Ok(scheduler);
715        }
716        match &data_type {
717            DataType::Struct(fields) => {
718                if field.is_packed_struct() {
719                    // Packed struct
720                    let column_info = column_infos.expect_next()?;
721                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
722                        column_info.as_ref(),
723                        self.decompressor_strategy.as_ref(),
724                        self.cache_repetition_index,
725                        field,
726                    )?);
727
728                    // advance to the next top level column
729                    column_infos.next_top_level();
730
731                    return Ok(scheduler);
732                }
733                // Maybe a blob descriptions struct?
734                if field.is_blob() {
735                    let column_info = column_infos.peek();
736                    if column_info.page_infos.iter().any(|page| {
737                        matches!(
738                            page.encoding,
739                            PageEncoding::Structural(pb21::PageLayout {
740                                layout: Some(pb21::page_layout::Layout::BlobLayout(_))
741                            })
742                        )
743                    }) {
744                        let column_info = column_infos.expect_next()?;
745                        let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
746                            column_info.as_ref(),
747                            self.decompressor_strategy.as_ref(),
748                            self.cache_repetition_index,
749                            field,
750                        )?);
751                        column_infos.next_top_level();
752                        return Ok(scheduler);
753                    }
754                }
755
756                let mut child_schedulers = Vec::with_capacity(field.children.len());
757                for field in field.children.iter() {
758                    let field_scheduler =
759                        self.create_structural_field_scheduler(field, column_infos)?;
760                    child_schedulers.push(field_scheduler);
761                }
762
763                let fields = fields.clone();
764                Ok(
765                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
766                        as Box<dyn StructuralFieldScheduler>,
767                )
768            }
769            DataType::List(_) | DataType::LargeList(_) => {
770                let child = field.children.first().expect_ok()?;
771                let child_scheduler =
772                    self.create_structural_field_scheduler(child, column_infos)?;
773                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
774                    as Box<dyn StructuralFieldScheduler>)
775            }
776            DataType::FixedSizeList(inner, dimension)
777                if matches!(inner.data_type(), DataType::Struct(_)) =>
778            {
779                let child = field.children.first().expect_ok()?;
780                let child_scheduler =
781                    self.create_structural_field_scheduler(child, column_infos)?;
782                Ok(Box::new(StructuralFixedSizeListScheduler::new(
783                    child_scheduler,
784                    *dimension,
785                )) as Box<dyn StructuralFieldScheduler>)
786            }
787            DataType::Map(_, keys_sorted) => {
788                // TODO: We only support keys_sorted=false for now,
789                //  because converting a rust arrow map field to the python arrow field will
790                //  lose the keys_sorted property.
791                if *keys_sorted {
792                    return Err(Error::NotSupported {
793                        source: format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into(),
794                        location: location!(),
795                    });
796                }
797                let entries_child = field.children.first().expect_ok()?;
798                let child_scheduler =
799                    self.create_structural_field_scheduler(entries_child, column_infos)?;
800                Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
801                    as Box<dyn StructuralFieldScheduler>)
802            }
803            _ => todo!("create_structural_field_scheduler for {}", data_type),
804        }
805    }
806
807    fn create_legacy_field_scheduler(
808        &self,
809        field: &Field,
810        column_infos: &mut ColumnInfoIter,
811        buffers: FileBuffers,
812    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
813        let data_type = field.data_type();
814        if Self::is_primitive_legacy(&data_type) {
815            let column_info = column_infos.expect_next()?;
816            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
817            return Ok(scheduler);
818        } else if data_type.is_binary_like() {
819            let column_info = column_infos.expect_next()?.clone();
820            // Column is blob and user is asking for binary data
821            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
822                let desc_scheduler =
823                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
824                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
825                return Ok(blob_scheduler);
826            }
827            if let Some(page_info) = column_info.page_infos.first() {
828                if matches!(
829                    page_info.encoding.as_legacy(),
830                    pb::ArrayEncoding {
831                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
832                    }
833                ) {
834                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
835                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
836                    } else {
837                        DataType::LargeList(Arc::new(ArrowField::new(
838                            "item",
839                            DataType::UInt8,
840                            false,
841                        )))
842                    };
843                    let list_field = Field::try_from(ArrowField::new(
844                        field.name.clone(),
845                        list_type,
846                        field.nullable,
847                    ))
848                    .unwrap();
849                    let list_scheduler = self.create_list_scheduler(
850                        &list_field,
851                        column_infos,
852                        buffers,
853                        &column_info,
854                    )?;
855                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
856                        list_scheduler.into(),
857                        field.data_type(),
858                    ));
859                    return Ok(binary_scheduler);
860                } else {
861                    let scheduler =
862                        self.create_primitive_scheduler(field, &column_info, buffers)?;
863                    return Ok(scheduler);
864                }
865            } else {
866                return self.create_primitive_scheduler(field, &column_info, buffers);
867            }
868        }
869        match &data_type {
870            DataType::FixedSizeList(inner, _dimension) => {
871                // A fixed size list column could either be a physical or a logical decoder
872                // depending on the child data type.
873                if Self::is_primitive_legacy(inner.data_type()) {
874                    let primitive_col = column_infos.expect_next()?;
875                    let scheduler =
876                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
877                    Ok(scheduler)
878                } else {
879                    todo!()
880                }
881            }
882            DataType::Dictionary(_key_type, value_type) => {
883                if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
884                    let primitive_col = column_infos.expect_next()?;
885                    let scheduler =
886                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
887                    Ok(scheduler)
888                } else {
889                    Err(Error::NotSupported {
890                        source: format!(
891                            "No way to decode into a dictionary field of type {}",
892                            value_type
893                        )
894                        .into(),
895                        location: location!(),
896                    })
897                }
898            }
899            DataType::List(_) | DataType::LargeList(_) => {
900                let offsets_column = column_infos.expect_next()?.clone();
901                column_infos.next_top_level();
902                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
903            }
904            DataType::Struct(fields) => {
905                let column_info = column_infos.expect_next()?;
906
907                // Column is blob and user is asking for descriptions
908                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
909                    // Can use primitive scheduler here since descriptions are always packed struct
910                    return self.create_primitive_scheduler(field, &blob_col, buffers);
911                }
912
913                if Self::check_packed_struct(column_info) {
914                    // use packed struct encoding
915                    self.create_primitive_scheduler(field, column_info, buffers)
916                } else {
917                    // use default struct encoding
918                    Self::check_simple_struct(column_info, &field.name).unwrap();
919                    let num_rows = column_info
920                        .page_infos
921                        .iter()
922                        .map(|page| page.num_rows)
923                        .sum();
924                    let mut child_schedulers = Vec::with_capacity(field.children.len());
925                    for field in &field.children {
926                        column_infos.next_top_level();
927                        let field_scheduler =
928                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
929                        child_schedulers.push(Arc::from(field_scheduler));
930                    }
931
932                    let fields = fields.clone();
933                    Ok(Box::new(SimpleStructScheduler::new(
934                        child_schedulers,
935                        fields,
936                        num_rows,
937                    )))
938                }
939            }
940            // TODO: Still need support for RLE
941            _ => todo!(),
942        }
943    }
944}
945
946/// Create's a dummy ColumnInfo for the root column
947fn root_column(num_rows: u64) -> ColumnInfo {
948    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
949    let final_page_num_rows = num_rows % (u32::MAX as u64);
950    let root_pages = (0..num_root_pages)
951        .map(|i| PageInfo {
952            num_rows: if i == num_root_pages - 1 {
953                final_page_num_rows
954            } else {
955                u64::MAX
956            },
957            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
958                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
959                    pb::SimpleStruct {},
960                )),
961            }),
962            priority: 0, // not used in legacy scheduler
963            buffer_offsets_and_sizes: Arc::new([]),
964        })
965        .collect::<Vec<_>>();
966    ColumnInfo {
967        buffer_offsets_and_sizes: Arc::new([]),
968        encoding: pb::ColumnEncoding {
969            column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
970        },
971        index: u32::MAX,
972        page_infos: Arc::from(root_pages),
973    }
974}
975
976pub enum RootDecoder {
977    Structural(StructuralStructDecoder),
978    Legacy(SimpleStructDecoder),
979}
980
981impl RootDecoder {
982    pub fn into_structural(self) -> StructuralStructDecoder {
983        match self {
984            Self::Structural(decoder) => decoder,
985            Self::Legacy(_) => panic!("Expected a structural decoder"),
986        }
987    }
988
989    pub fn into_legacy(self) -> SimpleStructDecoder {
990        match self {
991            Self::Legacy(decoder) => decoder,
992            Self::Structural(_) => panic!("Expected a legacy decoder"),
993        }
994    }
995}
996
997impl DecodeBatchScheduler {
998    /// Creates a new decode scheduler with the expected schema and the column
999    /// metadata of the file.
1000    #[allow(clippy::too_many_arguments)]
1001    pub async fn try_new<'a>(
1002        schema: &'a Schema,
1003        column_indices: &[u32],
1004        column_infos: &[Arc<ColumnInfo>],
1005        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1006        num_rows: u64,
1007        _decoder_plugins: Arc<DecoderPlugins>,
1008        io: Arc<dyn EncodingsIo>,
1009        cache: Arc<LanceCache>,
1010        filter: &FilterExpression,
1011        decoder_config: &DecoderConfig,
1012    ) -> Result<Self> {
1013        assert!(num_rows > 0);
1014        let buffers = FileBuffers {
1015            positions_and_sizes: file_buffer_positions_and_sizes,
1016        };
1017        let arrow_schema = ArrowSchema::from(schema);
1018        let root_fields = arrow_schema.fields().clone();
1019        let root_type = DataType::Struct(root_fields.clone());
1020        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1021        // root_field.children and schema.fields should be identical at this point but the latter
1022        // has field ids and the former does not.  This line restores that.
1023        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
1024        root_field.children.clone_from(&schema.fields);
1025        root_field
1026            .metadata
1027            .insert("__lance_decoder_root".to_string(), "true".to_string());
1028
1029        if column_infos.is_empty() || column_infos[0].is_structural() {
1030            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1031
1032            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1033            let mut root_scheduler =
1034                strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1035
1036            let context = SchedulerContext::new(io, cache.clone());
1037            root_scheduler.initialize(filter, &context).await?;
1038
1039            Ok(Self {
1040                root_scheduler: RootScheduler::Structural(root_scheduler),
1041                root_fields,
1042                cache,
1043            })
1044        } else {
1045            // The old encoding style expected a header column for structs and so we
1046            // need a header column for the top-level struct
1047            let mut columns = Vec::with_capacity(column_infos.len() + 1);
1048            columns.push(Arc::new(root_column(num_rows)));
1049            columns.extend(column_infos.iter().cloned());
1050
1051            let adjusted_column_indices = [0_u32]
1052                .into_iter()
1053                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1054                .collect::<Vec<_>>();
1055            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1056            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1057            let root_scheduler =
1058                strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1059
1060            let context = SchedulerContext::new(io, cache.clone());
1061            root_scheduler.initialize(filter, &context).await?;
1062
1063            Ok(Self {
1064                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1065                root_fields,
1066                cache,
1067            })
1068        }
1069    }
1070
1071    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1072    pub fn from_scheduler(
1073        root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1074        root_fields: Fields,
1075        cache: Arc<LanceCache>,
1076    ) -> Self {
1077        Self {
1078            root_scheduler: RootScheduler::Legacy(root_scheduler),
1079            root_fields,
1080            cache,
1081        }
1082    }
1083
1084    fn do_schedule_ranges_structural(
1085        &mut self,
1086        ranges: &[Range<u64>],
1087        filter: &FilterExpression,
1088        io: Arc<dyn EncodingsIo>,
1089        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1090    ) {
1091        let root_scheduler = self.root_scheduler.as_structural();
1092        let mut context = SchedulerContext::new(io, self.cache.clone());
1093        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1094        if let Err(schedule_ranges_err) = maybe_root_job {
1095            schedule_action(Err(schedule_ranges_err));
1096            return;
1097        }
1098        let mut root_job = maybe_root_job.unwrap();
1099        let mut num_rows_scheduled = 0;
1100        loop {
1101            let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1102            if let Err(err) = maybe_next_scan_lines {
1103                schedule_action(Err(err));
1104                return;
1105            }
1106            let next_scan_lines = maybe_next_scan_lines.unwrap();
1107            if next_scan_lines.is_empty() {
1108                return;
1109            }
1110            for next_scan_line in next_scan_lines {
1111                trace!(
1112                    "Scheduled scan line of {} rows and {} decoders",
1113                    next_scan_line.rows_scheduled,
1114                    next_scan_line.decoders.len()
1115                );
1116                num_rows_scheduled += next_scan_line.rows_scheduled;
1117                if !schedule_action(Ok(DecoderMessage {
1118                    scheduled_so_far: num_rows_scheduled,
1119                    decoders: next_scan_line.decoders,
1120                })) {
1121                    // Decoder has disconnected
1122                    return;
1123                }
1124            }
1125        }
1126    }
1127
1128    fn do_schedule_ranges_legacy(
1129        &mut self,
1130        ranges: &[Range<u64>],
1131        filter: &FilterExpression,
1132        io: Arc<dyn EncodingsIo>,
1133        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1134        // If specified, this will be used as the top_level_row for all scheduling
1135        // tasks.  This is used by list scheduling to ensure all items scheduling
1136        // tasks are scheduled at the same top level row.
1137        priority: Option<Box<dyn PriorityRange>>,
1138    ) {
1139        let root_scheduler = self.root_scheduler.as_legacy();
1140        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1141        trace!(
1142            "Scheduling {} ranges across {}..{} ({} rows){}",
1143            ranges.len(),
1144            ranges.first().unwrap().start,
1145            ranges.last().unwrap().end,
1146            rows_requested,
1147            priority
1148                .as_ref()
1149                .map(|p| format!(" (priority={:?})", p))
1150                .unwrap_or_default()
1151        );
1152
1153        let mut context = SchedulerContext::new(io, self.cache.clone());
1154        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1155        if let Err(schedule_ranges_err) = maybe_root_job {
1156            schedule_action(Err(schedule_ranges_err));
1157            return;
1158        }
1159        let mut root_job = maybe_root_job.unwrap();
1160        let mut num_rows_scheduled = 0;
1161        let mut rows_to_schedule = root_job.num_rows();
1162        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1163        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1164        while rows_to_schedule > 0 {
1165            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1166            if let Err(schedule_next_err) = maybe_next_scan_line {
1167                schedule_action(Err(schedule_next_err));
1168                return;
1169            }
1170            let next_scan_line = maybe_next_scan_line.unwrap();
1171            priority.advance(next_scan_line.rows_scheduled);
1172            num_rows_scheduled += next_scan_line.rows_scheduled;
1173            rows_to_schedule -= next_scan_line.rows_scheduled;
1174            trace!(
1175                "Scheduled scan line of {} rows and {} decoders",
1176                next_scan_line.rows_scheduled,
1177                next_scan_line.decoders.len()
1178            );
1179            if !schedule_action(Ok(DecoderMessage {
1180                scheduled_so_far: num_rows_scheduled,
1181                decoders: next_scan_line.decoders,
1182            })) {
1183                // Decoder has disconnected
1184                return;
1185            }
1186
1187            trace!("Finished scheduling {} ranges", ranges.len());
1188        }
1189    }
1190
1191    fn do_schedule_ranges(
1192        &mut self,
1193        ranges: &[Range<u64>],
1194        filter: &FilterExpression,
1195        io: Arc<dyn EncodingsIo>,
1196        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1197        // If specified, this will be used as the top_level_row for all scheduling
1198        // tasks.  This is used by list scheduling to ensure all items scheduling
1199        // tasks are scheduled at the same top level row.
1200        priority: Option<Box<dyn PriorityRange>>,
1201    ) {
1202        match &self.root_scheduler {
1203            RootScheduler::Legacy(_) => {
1204                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1205            }
1206            RootScheduler::Structural(_) => {
1207                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1208            }
1209        }
1210    }
1211
1212    // This method is similar to schedule_ranges but instead of
1213    // sending the decoders to a channel it collects them all into a vector
1214    pub fn schedule_ranges_to_vec(
1215        &mut self,
1216        ranges: &[Range<u64>],
1217        filter: &FilterExpression,
1218        io: Arc<dyn EncodingsIo>,
1219        priority: Option<Box<dyn PriorityRange>>,
1220    ) -> Result<Vec<DecoderMessage>> {
1221        let mut decode_messages = Vec::new();
1222        self.do_schedule_ranges(
1223            ranges,
1224            filter,
1225            io,
1226            |msg| {
1227                decode_messages.push(msg);
1228                true
1229            },
1230            priority,
1231        );
1232        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1233    }
1234
1235    /// Schedules the load of multiple ranges of rows
1236    ///
1237    /// Ranges must be non-overlapping and in sorted order
1238    ///
1239    /// # Arguments
1240    ///
1241    /// * `ranges` - The ranges of rows to load
1242    /// * `sink` - A channel to send the decode tasks
1243    /// * `scheduler` An I/O scheduler to issue I/O requests
1244    #[instrument(skip_all)]
1245    pub fn schedule_ranges(
1246        &mut self,
1247        ranges: &[Range<u64>],
1248        filter: &FilterExpression,
1249        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1250        scheduler: Arc<dyn EncodingsIo>,
1251    ) {
1252        self.do_schedule_ranges(
1253            ranges,
1254            filter,
1255            scheduler,
1256            |msg| {
1257                match sink.send(msg) {
1258                    Ok(_) => true,
1259                    Err(SendError { .. }) => {
1260                        // The receiver has gone away.  We can't do anything about it
1261                        // so just ignore the error.
1262                        debug!(
1263                        "schedule_ranges aborting early since decoder appears to have been dropped"
1264                    );
1265                        false
1266                    }
1267                }
1268            },
1269            None,
1270        )
1271    }
1272
1273    /// Schedules the load of a range of rows
1274    ///
1275    /// # Arguments
1276    ///
1277    /// * `range` - The range of rows to load
1278    /// * `sink` - A channel to send the decode tasks
1279    /// * `scheduler` An I/O scheduler to issue I/O requests
1280    #[instrument(skip_all)]
1281    pub fn schedule_range(
1282        &mut self,
1283        range: Range<u64>,
1284        filter: &FilterExpression,
1285        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1286        scheduler: Arc<dyn EncodingsIo>,
1287    ) {
1288        self.schedule_ranges(&[range], filter, sink, scheduler)
1289    }
1290
1291    /// Schedules the load of selected rows
1292    ///
1293    /// # Arguments
1294    ///
1295    /// * `indices` - The row indices to load (these must be in ascending order!)
1296    /// * `sink` - A channel to send the decode tasks
1297    /// * `scheduler` An I/O scheduler to issue I/O requests
1298    pub fn schedule_take(
1299        &mut self,
1300        indices: &[u64],
1301        filter: &FilterExpression,
1302        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1303        scheduler: Arc<dyn EncodingsIo>,
1304    ) {
1305        debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1306        if indices.is_empty() {
1307            return;
1308        }
1309        trace!("Scheduling take of {} rows", indices.len());
1310        let ranges = Self::indices_to_ranges(indices);
1311        self.schedule_ranges(&ranges, filter, sink, scheduler)
1312    }
1313
1314    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1315    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1316        let mut ranges = Vec::new();
1317        let mut start = indices[0];
1318
1319        for window in indices.windows(2) {
1320            if window[1] != window[0] + 1 {
1321                ranges.push(start..window[0] + 1);
1322                start = window[1];
1323            }
1324        }
1325
1326        ranges.push(start..*indices.last().unwrap() + 1);
1327        ranges
1328    }
1329}
1330
1331pub struct ReadBatchTask {
1332    pub task: BoxFuture<'static, Result<RecordBatch>>,
1333    pub num_rows: u32,
1334}
1335
1336/// A stream that takes scheduled jobs and generates decode tasks from them.
1337pub struct BatchDecodeStream {
1338    context: DecoderContext,
1339    root_decoder: SimpleStructDecoder,
1340    rows_remaining: u64,
1341    rows_per_batch: u32,
1342    rows_scheduled: u64,
1343    rows_drained: u64,
1344    scheduler_exhausted: bool,
1345    emitted_batch_size_warning: Arc<Once>,
1346}
1347
1348impl BatchDecodeStream {
1349    /// Create a new instance of a batch decode stream
1350    ///
1351    /// # Arguments
1352    ///
1353    /// * `scheduled` - an incoming stream of decode tasks from a `DecodeBatchScheduler`
1354    /// * `schema` - the schema of the data to create
1355    /// * `rows_per_batch` the number of rows to create before making a batch
1356    /// * `num_rows` the total number of rows scheduled
1357    /// * `num_columns` the total number of columns in the file
1358    pub fn new(
1359        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1360        rows_per_batch: u32,
1361        num_rows: u64,
1362        root_decoder: SimpleStructDecoder,
1363    ) -> Self {
1364        Self {
1365            context: DecoderContext::new(scheduled),
1366            root_decoder,
1367            rows_remaining: num_rows,
1368            rows_per_batch,
1369            rows_scheduled: 0,
1370            rows_drained: 0,
1371            scheduler_exhausted: false,
1372            emitted_batch_size_warning: Arc::new(Once::new()),
1373        }
1374    }
1375
1376    fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1377        if decoder.path.is_empty() {
1378            // The root decoder we can ignore
1379            Ok(())
1380        } else {
1381            self.root_decoder.accept_child(decoder)
1382        }
1383    }
1384
1385    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1386        if self.scheduler_exhausted {
1387            return Ok(self.rows_scheduled);
1388        }
1389        while self.rows_scheduled < scheduled_need {
1390            let next_message = self.context.source.recv().await;
1391            match next_message {
1392                Some(scan_line) => {
1393                    let scan_line = scan_line?;
1394                    self.rows_scheduled = scan_line.scheduled_so_far;
1395                    for message in scan_line.decoders {
1396                        self.accept_decoder(message.into_legacy())?;
1397                    }
1398                }
1399                None => {
1400                    // Schedule ended before we got all the data we expected.  This probably
1401                    // means some kind of pushdown filter was applied and we didn't load as
1402                    // much data as we thought we would.
1403                    self.scheduler_exhausted = true;
1404                    return Ok(self.rows_scheduled);
1405                }
1406            }
1407        }
1408        Ok(scheduled_need)
1409    }
1410
1411    #[instrument(level = "debug", skip_all)]
1412    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1413        trace!(
1414            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1415            self.rows_remaining,
1416            self.rows_drained,
1417            self.rows_scheduled,
1418        );
1419        if self.rows_remaining == 0 {
1420            return Ok(None);
1421        }
1422
1423        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1424        self.rows_remaining -= to_take;
1425
1426        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1427        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1428        if scheduled_need > 0 {
1429            let desired_scheduled = scheduled_need + self.rows_scheduled;
1430            trace!(
1431                "Draining from scheduler (desire at least {} scheduled rows)",
1432                desired_scheduled
1433            );
1434            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1435            if actually_scheduled < desired_scheduled {
1436                let under_scheduled = desired_scheduled - actually_scheduled;
1437                to_take -= under_scheduled;
1438            }
1439        }
1440
1441        if to_take == 0 {
1442            return Ok(None);
1443        }
1444
1445        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1446        let loaded_need = self.rows_drained + to_take - 1;
1447        trace!(
1448            "Waiting for I/O (desire at least {} fully loaded rows)",
1449            loaded_need
1450        );
1451        self.root_decoder.wait_for_loaded(loaded_need).await?;
1452
1453        let next_task = self.root_decoder.drain(to_take)?;
1454        self.rows_drained += to_take;
1455        Ok(Some(next_task))
1456    }
1457
1458    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1459        let stream = futures::stream::unfold(self, |mut slf| async move {
1460            let next_task = slf.next_batch_task().await;
1461            let next_task = next_task.transpose().map(|next_task| {
1462                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1463                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1464                let task = async move {
1465                    let next_task = next_task?;
1466                    // Real decode work happens inside into_batch, which can block the current
1467                    // thread for a long time. By spawning it as a new task, we allow Tokio's
1468                    // worker threads to keep making progress.
1469                    tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1470                        .await
1471                        .map_err(|err| Error::Wrapped {
1472                            error: err.into(),
1473                            location: location!(),
1474                        })?
1475                };
1476                (task, num_rows)
1477            });
1478            next_task.map(|(task, num_rows)| {
1479                // This should be true since batch size is u32
1480                debug_assert!(num_rows <= u32::MAX as u64);
1481                let next_task = ReadBatchTask {
1482                    task: task.boxed(),
1483                    num_rows: num_rows as u32,
1484                };
1485                (next_task, slf)
1486            })
1487        });
1488        stream.boxed()
1489    }
1490}
1491
1492// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1493// we can have a single implementation of the batch decode iterator
1494enum RootDecoderMessage {
1495    LoadedPage(LoadedPageShard),
1496    LegacyPage(crate::previous::decoder::DecoderReady),
1497}
1498trait RootDecoderType {
1499    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1500    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1501    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1502}
1503impl RootDecoderType for StructuralStructDecoder {
1504    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1505        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1506            unreachable!()
1507        };
1508        self.accept_page(loaded_page)
1509    }
1510    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1511        self.drain_batch_task(num_rows)
1512    }
1513    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1514        // Waiting happens elsewhere (not as part of the decoder)
1515        Ok(())
1516    }
1517}
1518impl RootDecoderType for SimpleStructDecoder {
1519    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1520        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1521            unreachable!()
1522        };
1523        self.accept_child(legacy_page)
1524    }
1525    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1526        self.drain(num_rows)
1527    }
1528    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1529        runtime.block_on(self.wait_for_loaded(loaded_need))
1530    }
1531}
1532
1533/// A blocking batch decoder that performs synchronous decoding
1534struct BatchDecodeIterator<T: RootDecoderType> {
1535    messages: VecDeque<Result<DecoderMessage>>,
1536    root_decoder: T,
1537    rows_remaining: u64,
1538    rows_per_batch: u32,
1539    rows_scheduled: u64,
1540    rows_drained: u64,
1541    emitted_batch_size_warning: Arc<Once>,
1542    // Note: this is not the runtime on which I/O happens.
1543    // That's always in the scheduler.  This is just a runtime we use to
1544    // sleep the current thread if I/O is unready
1545    wait_for_io_runtime: tokio::runtime::Runtime,
1546    schema: Arc<ArrowSchema>,
1547}
1548
1549impl<T: RootDecoderType> BatchDecodeIterator<T> {
1550    /// Create a new instance of a batch decode iterator
1551    pub fn new(
1552        messages: VecDeque<Result<DecoderMessage>>,
1553        rows_per_batch: u32,
1554        num_rows: u64,
1555        root_decoder: T,
1556        schema: Arc<ArrowSchema>,
1557    ) -> Self {
1558        Self {
1559            messages,
1560            root_decoder,
1561            rows_remaining: num_rows,
1562            rows_per_batch,
1563            rows_scheduled: 0,
1564            rows_drained: 0,
1565            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1566                .build()
1567                .unwrap(),
1568            emitted_batch_size_warning: Arc::new(Once::new()),
1569            schema,
1570        }
1571    }
1572
1573    /// Wait for a single page of data to finish loading
1574    ///
1575    /// If the data is not available this will perform a *blocking* wait (put
1576    /// the current thread to sleep)
1577    fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1578        match maybe_done(unloaded_page.0) {
1579            // Fast path, avoid all runtime shenanigans if the data is ready
1580            MaybeDone::Done(loaded_page) => loaded_page,
1581            // Slow path, we need to wait on I/O, enter the runtime
1582            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1583            MaybeDone::Gone => unreachable!(),
1584        }
1585    }
1586
1587    /// Waits for I/O until `scheduled_need` rows have been loaded
1588    ///
1589    /// Note that `scheduled_need` is cumulative.  E.g. this method
1590    /// should be called with 5, 10, 15 and not 5, 5, 5
1591    #[instrument(skip_all)]
1592    fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1593        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1594            let message = self.messages.pop_front().unwrap()?;
1595            self.rows_scheduled = message.scheduled_so_far;
1596            for decoder_message in message.decoders {
1597                match decoder_message {
1598                    MessageType::UnloadedPage(unloaded_page) => {
1599                        let loaded_page = self.wait_for_page(unloaded_page)?;
1600                        self.root_decoder
1601                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1602                    }
1603                    MessageType::DecoderReady(decoder_ready) => {
1604                        // The root decoder we can ignore
1605                        if !decoder_ready.path.is_empty() {
1606                            self.root_decoder
1607                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1608                        }
1609                    }
1610                }
1611            }
1612        }
1613
1614        let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1615
1616        self.root_decoder
1617            .wait(loaded_need, &self.wait_for_io_runtime)?;
1618        Ok(self.rows_scheduled)
1619    }
1620
1621    #[instrument(level = "debug", skip_all)]
1622    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1623        trace!(
1624            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1625            self.rows_remaining,
1626            self.rows_drained,
1627            self.rows_scheduled,
1628        );
1629        if self.rows_remaining == 0 {
1630            return Ok(None);
1631        }
1632
1633        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1634        self.rows_remaining -= to_take;
1635
1636        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1637        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1638        if scheduled_need > 0 {
1639            let desired_scheduled = scheduled_need + self.rows_scheduled;
1640            trace!(
1641                "Draining from scheduler (desire at least {} scheduled rows)",
1642                desired_scheduled
1643            );
1644            let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1645            if actually_scheduled < desired_scheduled {
1646                let under_scheduled = desired_scheduled - actually_scheduled;
1647                to_take -= under_scheduled;
1648            }
1649        }
1650
1651        if to_take == 0 {
1652            return Ok(None);
1653        }
1654
1655        let next_task = self.root_decoder.drain_batch(to_take)?;
1656
1657        self.rows_drained += to_take;
1658
1659        let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1660
1661        Ok(Some(batch))
1662    }
1663}
1664
1665impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1666    type Item = ArrowResult<RecordBatch>;
1667
1668    fn next(&mut self) -> Option<Self::Item> {
1669        self.next_batch_task()
1670            .transpose()
1671            .map(|r| r.map_err(ArrowError::from))
1672    }
1673}
1674
1675impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1676    fn schema(&self) -> Arc<ArrowSchema> {
1677        self.schema.clone()
1678    }
1679}
1680
1681/// A stream that takes scheduled jobs and generates decode tasks from them.
1682pub struct StructuralBatchDecodeStream {
1683    context: DecoderContext,
1684    root_decoder: StructuralStructDecoder,
1685    rows_remaining: u64,
1686    rows_per_batch: u32,
1687    rows_scheduled: u64,
1688    rows_drained: u64,
1689    scheduler_exhausted: bool,
1690    emitted_batch_size_warning: Arc<Once>,
1691}
1692
1693impl StructuralBatchDecodeStream {
1694    /// Create a new instance of a batch decode stream
1695    ///
1696    /// # Arguments
1697    ///
1698    /// * `scheduled` - an incoming stream of decode tasks from a `DecodeBatchScheduler`
1699    /// * `schema` - the schema of the data to create
1700    /// * `rows_per_batch` the number of rows to create before making a batch
1701    /// * `num_rows` the total number of rows scheduled
1702    /// * `num_columns` the total number of columns in the file
1703    pub fn new(
1704        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1705        rows_per_batch: u32,
1706        num_rows: u64,
1707        root_decoder: StructuralStructDecoder,
1708    ) -> Self {
1709        Self {
1710            context: DecoderContext::new(scheduled),
1711            root_decoder,
1712            rows_remaining: num_rows,
1713            rows_per_batch,
1714            rows_scheduled: 0,
1715            rows_drained: 0,
1716            scheduler_exhausted: false,
1717            emitted_batch_size_warning: Arc::new(Once::new()),
1718        }
1719    }
1720
1721    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1722        if self.scheduler_exhausted {
1723            return Ok(self.rows_scheduled);
1724        }
1725        while self.rows_scheduled < scheduled_need {
1726            let next_message = self.context.source.recv().await;
1727            match next_message {
1728                Some(scan_line) => {
1729                    let scan_line = scan_line?;
1730                    self.rows_scheduled = scan_line.scheduled_so_far;
1731                    for message in scan_line.decoders {
1732                        let unloaded_page = message.into_structural();
1733                        let loaded_page = unloaded_page.0.await?;
1734                        self.root_decoder.accept_page(loaded_page)?;
1735                    }
1736                }
1737                None => {
1738                    // Schedule ended before we got all the data we expected.  This probably
1739                    // means some kind of pushdown filter was applied and we didn't load as
1740                    // much data as we thought we would.
1741                    self.scheduler_exhausted = true;
1742                    return Ok(self.rows_scheduled);
1743                }
1744            }
1745        }
1746        Ok(scheduled_need)
1747    }
1748
1749    #[instrument(level = "debug", skip_all)]
1750    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1751        trace!(
1752            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1753            self.rows_remaining,
1754            self.rows_drained,
1755            self.rows_scheduled,
1756        );
1757        if self.rows_remaining == 0 {
1758            return Ok(None);
1759        }
1760
1761        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1762        self.rows_remaining -= to_take;
1763
1764        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1765        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1766        if scheduled_need > 0 {
1767            let desired_scheduled = scheduled_need + self.rows_scheduled;
1768            trace!(
1769                "Draining from scheduler (desire at least {} scheduled rows)",
1770                desired_scheduled
1771            );
1772            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1773            if actually_scheduled < desired_scheduled {
1774                let under_scheduled = desired_scheduled - actually_scheduled;
1775                to_take -= under_scheduled;
1776            }
1777        }
1778
1779        if to_take == 0 {
1780            return Ok(None);
1781        }
1782
1783        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1784        self.rows_drained += to_take;
1785        Ok(Some(next_task))
1786    }
1787
1788    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1789        let stream = futures::stream::unfold(self, |mut slf| async move {
1790            let next_task = slf.next_batch_task().await;
1791            let next_task = next_task.transpose().map(|next_task| {
1792                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1793                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1794                let task = async move {
1795                    let next_task = next_task?;
1796                    // Real decode work happens inside into_batch, which can block the current
1797                    // thread for a long time. By spawning it as a new task, we allow Tokio's
1798                    // worker threads to keep making progress.
1799                    tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1800                        .await
1801                        .map_err(|err| Error::Wrapped {
1802                            error: err.into(),
1803                            location: location!(),
1804                        })?
1805                };
1806                (task, num_rows)
1807            });
1808            next_task.map(|(task, num_rows)| {
1809                // This should be true since batch size is u32
1810                debug_assert!(num_rows <= u32::MAX as u64);
1811                let next_task = ReadBatchTask {
1812                    task: task.boxed(),
1813                    num_rows: num_rows as u32,
1814                };
1815                (next_task, slf)
1816            })
1817        });
1818        stream.boxed()
1819    }
1820}
1821
1822#[derive(Debug)]
1823pub enum RequestedRows {
1824    Ranges(Vec<Range<u64>>),
1825    Indices(Vec<u64>),
1826}
1827
1828impl RequestedRows {
1829    pub fn num_rows(&self) -> u64 {
1830        match self {
1831            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1832            Self::Indices(indices) => indices.len() as u64,
1833        }
1834    }
1835
1836    pub fn trim_empty_ranges(mut self) -> Self {
1837        if let Self::Ranges(ranges) = &mut self {
1838            ranges.retain(|r| !r.is_empty());
1839        }
1840        self
1841    }
1842}
1843
1844/// Configuration for decoder behavior
1845#[derive(Debug, Clone, Default)]
1846pub struct DecoderConfig {
1847    /// Whether to cache repetition indices for better performance
1848    pub cache_repetition_index: bool,
1849    /// Whether to validate decoded data
1850    pub validate_on_decode: bool,
1851}
1852
1853#[derive(Debug, Clone)]
1854pub struct SchedulerDecoderConfig {
1855    pub decoder_plugins: Arc<DecoderPlugins>,
1856    pub batch_size: u32,
1857    pub io: Arc<dyn EncodingsIo>,
1858    pub cache: Arc<LanceCache>,
1859    /// Decoder configuration
1860    pub decoder_config: DecoderConfig,
1861}
1862
1863fn check_scheduler_on_drop(
1864    stream: BoxStream<'static, ReadBatchTask>,
1865    scheduler_handle: tokio::task::JoinHandle<()>,
1866) -> BoxStream<'static, ReadBatchTask> {
1867    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
1868    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
1869    // when the stream finishes.
1870    let mut scheduler_handle = Some(scheduler_handle);
1871    let check_scheduler = stream::unfold((), move |_| {
1872        let handle = scheduler_handle.take();
1873        async move {
1874            if let Some(handle) = handle {
1875                handle.await.unwrap();
1876            }
1877            None
1878        }
1879    });
1880    stream.chain(check_scheduler).boxed()
1881}
1882
1883pub fn create_decode_stream(
1884    schema: &Schema,
1885    num_rows: u64,
1886    batch_size: u32,
1887    is_structural: bool,
1888    should_validate: bool,
1889    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1890) -> Result<BoxStream<'static, ReadBatchTask>> {
1891    if is_structural {
1892        let arrow_schema = ArrowSchema::from(schema);
1893        let structural_decoder = StructuralStructDecoder::new(
1894            arrow_schema.fields,
1895            should_validate,
1896            /*is_root=*/ true,
1897        )?;
1898        Ok(
1899            StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder)
1900                .into_stream(),
1901        )
1902    } else {
1903        let arrow_schema = ArrowSchema::from(schema);
1904        let root_fields = arrow_schema.fields;
1905
1906        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1907        Ok(BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream())
1908    }
1909}
1910
1911/// Creates a iterator that decodes a set of messages in a blocking fashion
1912///
1913/// See [`schedule_and_decode_blocking`] for more information.
1914pub fn create_decode_iterator(
1915    schema: &Schema,
1916    num_rows: u64,
1917    batch_size: u32,
1918    should_validate: bool,
1919    is_structural: bool,
1920    messages: VecDeque<Result<DecoderMessage>>,
1921) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1922    let arrow_schema = Arc::new(ArrowSchema::from(schema));
1923    let root_fields = arrow_schema.fields.clone();
1924    if is_structural {
1925        let simple_struct_decoder =
1926            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true)?;
1927        Ok(Box::new(BatchDecodeIterator::new(
1928            messages,
1929            batch_size,
1930            num_rows,
1931            simple_struct_decoder,
1932            arrow_schema,
1933        )))
1934    } else {
1935        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1936        Ok(Box::new(BatchDecodeIterator::new(
1937            messages,
1938            batch_size,
1939            num_rows,
1940            root_decoder,
1941            arrow_schema,
1942        )))
1943    }
1944}
1945
1946fn create_scheduler_decoder(
1947    column_infos: Vec<Arc<ColumnInfo>>,
1948    requested_rows: RequestedRows,
1949    filter: FilterExpression,
1950    column_indices: Vec<u32>,
1951    target_schema: Arc<Schema>,
1952    config: SchedulerDecoderConfig,
1953) -> Result<BoxStream<'static, ReadBatchTask>> {
1954    let num_rows = requested_rows.num_rows();
1955
1956    let is_structural = column_infos[0].is_structural();
1957
1958    let (tx, rx) = mpsc::unbounded_channel();
1959
1960    let decode_stream = create_decode_stream(
1961        &target_schema,
1962        num_rows,
1963        config.batch_size,
1964        is_structural,
1965        config.decoder_config.validate_on_decode,
1966        rx,
1967    )?;
1968
1969    let scheduler_handle = tokio::task::spawn(async move {
1970        let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1971            target_schema.as_ref(),
1972            &column_indices,
1973            &column_infos,
1974            &vec![],
1975            num_rows,
1976            config.decoder_plugins,
1977            config.io.clone(),
1978            config.cache,
1979            &filter,
1980            &config.decoder_config,
1981        )
1982        .await
1983        {
1984            Ok(scheduler) => scheduler,
1985            Err(e) => {
1986                let _ = tx.send(Err(e));
1987                return;
1988            }
1989        };
1990
1991        match requested_rows {
1992            RequestedRows::Ranges(ranges) => {
1993                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1994            }
1995            RequestedRows::Indices(indices) => {
1996                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1997            }
1998        }
1999    });
2000
2001    Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2002}
2003
2004/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
2005/// decode the scheduled data and returns the decoder as a stream of record batches.
2006///
2007/// This is a convenience function that creates both the scheduler and the decoder
2008/// which can be a little tricky to get right.
2009pub fn schedule_and_decode(
2010    column_infos: Vec<Arc<ColumnInfo>>,
2011    requested_rows: RequestedRows,
2012    filter: FilterExpression,
2013    column_indices: Vec<u32>,
2014    target_schema: Arc<Schema>,
2015    config: SchedulerDecoderConfig,
2016) -> BoxStream<'static, ReadBatchTask> {
2017    if requested_rows.num_rows() == 0 {
2018        return stream::empty().boxed();
2019    }
2020
2021    // If the user requested any ranges that are empty, ignore them.  They are pointless and
2022    // trying to read them has caused bugs in the past.
2023    let requested_rows = requested_rows.trim_empty_ranges();
2024
2025    let io = config.io.clone();
2026
2027    // For convenience we really want this method to be a snchronous method where all
2028    // errors happen on the stream.  There is some async initialization that must happen
2029    // when creating a scheduler.  We wrap that all up in the very first task.
2030    match create_scheduler_decoder(
2031        column_infos,
2032        requested_rows,
2033        filter,
2034        column_indices,
2035        target_schema,
2036        config,
2037    ) {
2038        // Keep the io alive until the stream is dropped or finishes.  Otherwise the
2039        // I/O drops as soon as the scheduling is finished and the I/O loop terminates.
2040        Ok(stream) => stream.finally(move || drop(io)).boxed(),
2041        // If the initialization failed make it look like a failed task
2042        Err(e) => stream::once(std::future::ready(ReadBatchTask {
2043            num_rows: 0,
2044            task: std::future::ready(Err(e)).boxed(),
2045        }))
2046        .boxed(),
2047    }
2048}
2049
2050pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2051    tokio::runtime::Builder::new_current_thread()
2052        .build()
2053        .unwrap()
2054});
2055
2056/// Schedules and decodes the requested data in a blocking fashion
2057///
2058/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
2059/// and decodes it in the current thread.
2060///
2061/// This can be useful when the disk is fast (or the data is in memory) and the amount
2062/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
2063///
2064/// This should NOT be used for full scans.  Even if the data is in memory this function will
2065/// not parallelize the decode and will be slower than the async version.  Full scans typically
2066/// make relatively few IOPs and so the asynchronous overhead is much smaller.
2067///
2068/// This method will first completely run the scheduling process.  Then it will run the
2069/// decode process.
2070pub fn schedule_and_decode_blocking(
2071    column_infos: Vec<Arc<ColumnInfo>>,
2072    requested_rows: RequestedRows,
2073    filter: FilterExpression,
2074    column_indices: Vec<u32>,
2075    target_schema: Arc<Schema>,
2076    config: SchedulerDecoderConfig,
2077) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2078    if requested_rows.num_rows() == 0 {
2079        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2080        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2081    }
2082
2083    let num_rows = requested_rows.num_rows();
2084    let is_structural = column_infos[0].is_structural();
2085
2086    let (tx, mut rx) = mpsc::unbounded_channel();
2087
2088    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
2089    // runtime.
2090    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2091        target_schema.as_ref(),
2092        &column_indices,
2093        &column_infos,
2094        &vec![],
2095        num_rows,
2096        config.decoder_plugins,
2097        config.io.clone(),
2098        config.cache,
2099        &filter,
2100        &config.decoder_config,
2101    ))?;
2102
2103    // Schedule the requested rows
2104    match requested_rows {
2105        RequestedRows::Ranges(ranges) => {
2106            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2107        }
2108        RequestedRows::Indices(indices) => {
2109            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2110        }
2111    }
2112
2113    // Drain the scheduler queue into a vec of decode messages
2114    let mut messages = Vec::new();
2115    while rx
2116        .recv_many(&mut messages, usize::MAX)
2117        .now_or_never()
2118        .unwrap()
2119        != 0
2120    {}
2121
2122    // Create a decoder to decode the messages
2123    let decode_iterator = create_decode_iterator(
2124        &target_schema,
2125        num_rows,
2126        config.batch_size,
2127        config.decoder_config.validate_on_decode,
2128        is_structural,
2129        messages.into(),
2130    )?;
2131
2132    Ok(decode_iterator)
2133}
2134
2135/// A decoder for single-column encodings of primitive data (this includes fixed size
2136/// lists of primitive data)
2137///
2138/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2139///
2140/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2141/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2142/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2143/// the decode task for batch 0 and the decode task for batch 1.
2144///
2145/// See [`crate::decoder`] for more information
2146pub trait PrimitivePageDecoder: Send + Sync {
2147    /// Decode data into buffers
2148    ///
2149    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2150    /// such as decompressing from some compressed representation.
2151    ///
2152    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2153    /// portion only needs to be updated if the encoding has some concept of an "optional"
2154    /// buffer.
2155    ///
2156    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2157    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2158    ///
2159    /// Binary decodings have two output buffers (one for values, one for offsets)
2160    ///
2161    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2162    /// fixed size strings into variable length strings going from one input buffer to multiple output
2163    /// buffers.
2164    ///
2165    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2166    /// generally end at one of these structures.  However, intermediate structures may exist which
2167    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2168    /// with buffers that have bits-per-value that is not a multiple of 8.
2169    ///
2170    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2171    /// type (order matters) and encodings that aim to decode into arrow types should respect
2172    /// this layout.
2173    /// # Arguments
2174    ///
2175    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2176    /// * `num_rows` - how many rows to decode
2177    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2178    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2179}
2180
2181/// A scheduler for single-column encodings of primitive data
2182///
2183/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2184///
2185/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2186/// be shared in follow-up I/O tasks.
2187///
2188/// See [`crate::decoder`] for more information
2189pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2190    /// Schedules a batch of I/O to load the data needed for the requested ranges
2191    ///
2192    /// Returns a future that will yield a decoder once the data has been loaded
2193    ///
2194    /// # Arguments
2195    ///
2196    /// * `range` - the range of row offsets (relative to start of page) requested
2197    ///   these must be ordered and must not overlap
2198    /// * `scheduler` - a scheduler to submit the I/O request to
2199    /// * `top_level_row` - the row offset of the top level field currently being
2200    ///   scheduled.  This can be used to assign priority to I/O requests
2201    fn schedule_ranges(
2202        &self,
2203        ranges: &[Range<u64>],
2204        scheduler: &Arc<dyn EncodingsIo>,
2205        top_level_row: u64,
2206    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2207}
2208
2209/// A trait to control the priority of I/O
2210pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2211    fn advance(&mut self, num_rows: u64);
2212    fn current_priority(&self) -> u64;
2213    fn box_clone(&self) -> Box<dyn PriorityRange>;
2214}
2215
2216/// A simple priority scheme for top-level fields with no parent
2217/// repetition
2218#[derive(Debug)]
2219pub struct SimplePriorityRange {
2220    priority: u64,
2221}
2222
2223impl SimplePriorityRange {
2224    fn new(priority: u64) -> Self {
2225        Self { priority }
2226    }
2227}
2228
2229impl PriorityRange for SimplePriorityRange {
2230    fn advance(&mut self, num_rows: u64) {
2231        self.priority += num_rows;
2232    }
2233
2234    fn current_priority(&self) -> u64 {
2235        self.priority
2236    }
2237
2238    fn box_clone(&self) -> Box<dyn PriorityRange> {
2239        Box::new(Self {
2240            priority: self.priority,
2241        })
2242    }
2243}
2244
2245/// Determining the priority of a list request is tricky.  We want
2246/// the priority to be the top-level row.  So if we have a
2247/// `list<list<int>>` and each outer list has 10 rows and each inner
2248/// list has 5 rows then the priority of the 100th item is 1 because
2249/// it is the 5th item in the 10th item of the *second* row.
2250///
2251/// This structure allows us to keep track of this complicated priority
2252/// relationship.
2253///
2254/// There's a fair amount of bookkeeping involved here.
2255///
2256/// A better approach (using repetition levels) is coming in the future.
2257pub struct ListPriorityRange {
2258    base: Box<dyn PriorityRange>,
2259    offsets: Arc<[u64]>,
2260    cur_index_into_offsets: usize,
2261    cur_position: u64,
2262}
2263
2264impl ListPriorityRange {
2265    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2266        Self {
2267            base,
2268            offsets,
2269            cur_index_into_offsets: 0,
2270            cur_position: 0,
2271        }
2272    }
2273}
2274
2275impl std::fmt::Debug for ListPriorityRange {
2276    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2277        f.debug_struct("ListPriorityRange")
2278            .field("base", &self.base)
2279            .field("offsets.len()", &self.offsets.len())
2280            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2281            .field("cur_position", &self.cur_position)
2282            .finish()
2283    }
2284}
2285
2286impl PriorityRange for ListPriorityRange {
2287    fn advance(&mut self, num_rows: u64) {
2288        // We've scheduled X items.  Now walk through the offsets to
2289        // determine how many rows we've scheduled.
2290        self.cur_position += num_rows;
2291        let mut idx_into_offsets = self.cur_index_into_offsets;
2292        while idx_into_offsets + 1 < self.offsets.len()
2293            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2294        {
2295            idx_into_offsets += 1;
2296        }
2297        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2298        self.cur_index_into_offsets = idx_into_offsets;
2299        self.base.advance(base_rows_advanced as u64);
2300    }
2301
2302    fn current_priority(&self) -> u64 {
2303        self.base.current_priority()
2304    }
2305
2306    fn box_clone(&self) -> Box<dyn PriorityRange> {
2307        Box::new(Self {
2308            base: self.base.box_clone(),
2309            offsets: self.offsets.clone(),
2310            cur_index_into_offsets: self.cur_index_into_offsets,
2311            cur_position: self.cur_position,
2312        })
2313    }
2314}
2315
2316/// Contains the context for a scheduler
2317pub struct SchedulerContext {
2318    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2319    io: Arc<dyn EncodingsIo>,
2320    cache: Arc<LanceCache>,
2321    name: String,
2322    path: Vec<u32>,
2323    path_names: Vec<String>,
2324}
2325
2326pub struct ScopedSchedulerContext<'a> {
2327    pub context: &'a mut SchedulerContext,
2328}
2329
2330impl<'a> ScopedSchedulerContext<'a> {
2331    pub fn pop(self) -> &'a mut SchedulerContext {
2332        self.context.pop();
2333        self.context
2334    }
2335}
2336
2337impl SchedulerContext {
2338    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2339        Self {
2340            io,
2341            cache,
2342            recv: None,
2343            name: "".to_string(),
2344            path: Vec::new(),
2345            path_names: Vec::new(),
2346        }
2347    }
2348
2349    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2350        &self.io
2351    }
2352
2353    pub fn cache(&self) -> &Arc<LanceCache> {
2354        &self.cache
2355    }
2356
2357    pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2358        self.path.push(index);
2359        self.path_names.push(name.to_string());
2360        ScopedSchedulerContext { context: self }
2361    }
2362
2363    pub fn pop(&mut self) {
2364        self.path.pop();
2365        self.path_names.pop();
2366    }
2367
2368    pub fn path_name(&self) -> String {
2369        let path = self.path_names.join("/");
2370        if self.recv.is_some() {
2371            format!("TEMP({}){}", self.name, path)
2372        } else {
2373            format!("ROOT{}", path)
2374        }
2375    }
2376
2377    pub fn current_path(&self) -> VecDeque<u32> {
2378        VecDeque::from_iter(self.path.iter().copied())
2379    }
2380
2381    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2382    pub fn locate_decoder(
2383        &mut self,
2384        decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2385    ) -> crate::previous::decoder::DecoderReady {
2386        trace!(
2387            "Scheduling decoder of type {:?} for {:?}",
2388            decoder.data_type(),
2389            self.path,
2390        );
2391        crate::previous::decoder::DecoderReady {
2392            decoder,
2393            path: self.current_path(),
2394        }
2395    }
2396}
2397
2398pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2399
2400impl std::fmt::Debug for UnloadedPageShard {
2401    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2402        f.debug_struct("UnloadedPage").finish()
2403    }
2404}
2405
2406#[derive(Debug)]
2407pub struct ScheduledScanLine {
2408    pub rows_scheduled: u64,
2409    pub decoders: Vec<MessageType>,
2410}
2411
2412pub trait StructuralSchedulingJob: std::fmt::Debug {
2413    /// Schedule the next batch of data
2414    ///
2415    /// Normally this equates to scheduling the next page of data into one task.  Very large pages
2416    /// might be split into multiple scan lines.  Each scan line has one or more rows.
2417    ///
2418    /// If a scheduler ends early it may return an empty vector.
2419    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2420}
2421
2422/// A filter expression to apply to the data
2423///
2424/// The core decoders do not currently take advantage of filtering in
2425/// any way.  In order to maintain the abstraction we represent filters
2426/// as an arbitrary byte sequence.
2427///
2428/// We recommend that encodings use Substrait for filters.
2429pub struct FilterExpression(pub Bytes);
2430
2431impl FilterExpression {
2432    /// Create a filter expression that does not filter any data
2433    ///
2434    /// This is currently represented by an empty byte array.  Encoders
2435    /// that are "filter aware" should make sure they handle this case.
2436    pub fn no_filter() -> Self {
2437        Self(Bytes::new())
2438    }
2439
2440    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2441    pub fn is_noop(&self) -> bool {
2442        self.0.is_empty()
2443    }
2444}
2445
2446pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2447    fn initialize<'a>(
2448        &'a mut self,
2449        filter: &'a FilterExpression,
2450        context: &'a SchedulerContext,
2451    ) -> BoxFuture<'a, Result<()>>;
2452    fn schedule_ranges<'a>(
2453        &'a self,
2454        ranges: &[Range<u64>],
2455        filter: &FilterExpression,
2456    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2457}
2458
2459/// A trait for tasks that decode data into an Arrow array
2460pub trait DecodeArrayTask: Send {
2461    /// Decodes the data into an Arrow array
2462    fn decode(self: Box<Self>) -> Result<ArrayRef>;
2463}
2464
2465impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2466    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2467        StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2468    }
2469}
2470
2471/// A task to decode data into an Arrow record batch
2472///
2473/// It has a child `task` which decodes a struct array with no nulls.
2474/// This is then converted into a record batch.
2475pub struct NextDecodeTask {
2476    /// The decode task itself
2477    pub task: Box<dyn DecodeArrayTask>,
2478    /// The number of rows that will be created
2479    pub num_rows: u64,
2480}
2481
2482impl NextDecodeTask {
2483    // Run the task and produce a record batch
2484    //
2485    // If the batch is very large this function will log a warning message
2486    // suggesting the user try a smaller batch size.
2487    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2488    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2489        let struct_arr = self.task.decode();
2490        match struct_arr {
2491            Ok(struct_arr) => {
2492                let batch = RecordBatch::from(struct_arr.as_struct());
2493                let size_bytes = batch.get_array_memory_size() as u64;
2494                if size_bytes > BATCH_SIZE_BYTES_WARNING {
2495                    emitted_batch_size_warning.call_once(|| {
2496                        let size_mb = size_bytes / 1024 / 1024;
2497                        debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2498                    });
2499                }
2500                Ok(batch)
2501            }
2502            Err(e) => {
2503                let e = Error::Internal {
2504                    message: format!("Error decoding batch: {}", e),
2505                    location: location!(),
2506                };
2507                Err(e)
2508            }
2509        }
2510    }
2511}
2512
2513// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2514// share some code paths between the two.  Decoders can safely unwrap into whatever
2515// style they expect since a file will be either all-2.0 or all-2.1
2516#[derive(Debug)]
2517pub enum MessageType {
2518    // The older v2.0 scheduler/decoder used a scheme where the message was the
2519    // decoder itself.  The messages were not sent in priority order and the decoder
2520    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2521    // complexity.
2522    DecoderReady(crate::previous::decoder::DecoderReady),
2523    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2524    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2525    // the decoder does not have to worry about waiting for I/O.
2526    UnloadedPage(UnloadedPageShard),
2527}
2528
2529impl MessageType {
2530    pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2531        match self {
2532            Self::DecoderReady(decoder) => decoder,
2533            Self::UnloadedPage(_) => {
2534                panic!("Expected DecoderReady but got UnloadedPage")
2535            }
2536        }
2537    }
2538
2539    pub fn into_structural(self) -> UnloadedPageShard {
2540        match self {
2541            Self::UnloadedPage(unloaded) => unloaded,
2542            Self::DecoderReady(_) => {
2543                panic!("Expected UnloadedPage but got DecoderReady")
2544            }
2545        }
2546    }
2547}
2548
2549pub struct DecoderMessage {
2550    pub scheduled_so_far: u64,
2551    pub decoders: Vec<MessageType>,
2552}
2553
2554pub struct DecoderContext {
2555    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2556}
2557
2558impl DecoderContext {
2559    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2560        Self { source }
2561    }
2562}
2563
2564pub struct DecodedPage {
2565    pub data: DataBlock,
2566    pub repdef: RepDefUnraveler,
2567}
2568
2569pub trait DecodePageTask: Send + std::fmt::Debug {
2570    /// Decodes the data into an Arrow array
2571    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2572}
2573
2574pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2575    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2576    fn num_rows(&self) -> u64;
2577}
2578
2579#[derive(Debug)]
2580pub struct LoadedPageShard {
2581    // The decoder that is ready to be decoded
2582    pub decoder: Box<dyn StructuralPageDecoder>,
2583    // The path to the decoder, the first value is the column index
2584    // following values, if present, are nested child indices
2585    //
2586    // For example, a path of [1, 1, 0] would mean to grab the second
2587    // column, then the second child, and then the first child.
2588    //
2589    // It could represent x in the following schema:
2590    //
2591    // score: float64
2592    // points: struct
2593    //   color: string
2594    //   location: struct
2595    //     x: float64
2596    //
2597    // Currently, only struct decoders have "children" although other
2598    // decoders may at some point as well.  List children are only
2599    // handled through indirect I/O at the moment and so they don't
2600    // need to be represented (yet)
2601    pub path: VecDeque<u32>,
2602}
2603
2604pub struct DecodedArray {
2605    pub array: ArrayRef,
2606    pub repdef: CompositeRepDefUnraveler,
2607}
2608
2609pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2610    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2611}
2612
2613pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2614    /// Add a newly scheduled child decoder
2615    ///
2616    /// The default implementation does not expect children and returns
2617    /// an error.
2618    fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2619    /// Creates a task to decode `num_rows` of data into an array
2620    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2621    /// The data type of the decoded data
2622    fn data_type(&self) -> &DataType;
2623}
2624
2625#[derive(Debug, Default)]
2626pub struct DecoderPlugins {}
2627
2628/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2629pub async fn decode_batch(
2630    batch: &EncodedBatch,
2631    filter: &FilterExpression,
2632    decoder_plugins: Arc<DecoderPlugins>,
2633    should_validate: bool,
2634    version: LanceFileVersion,
2635    cache: Option<Arc<LanceCache>>,
2636) -> Result<RecordBatch> {
2637    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2638    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2639    // polled twice.
2640
2641    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2642    let cache = if let Some(cache) = cache {
2643        cache
2644    } else {
2645        Arc::new(lance_core::cache::LanceCache::with_capacity(
2646            128 * 1024 * 1024,
2647        ))
2648    };
2649    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2650        batch.schema.as_ref(),
2651        &batch.top_level_columns,
2652        &batch.page_table,
2653        &vec![],
2654        batch.num_rows,
2655        decoder_plugins,
2656        io_scheduler.clone(),
2657        cache,
2658        filter,
2659        &DecoderConfig::default(),
2660    )
2661    .await?;
2662    let (tx, rx) = unbounded_channel();
2663    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2664    let is_structural = version >= LanceFileVersion::V2_1;
2665    let mut decode_stream = create_decode_stream(
2666        &batch.schema,
2667        batch.num_rows,
2668        batch.num_rows as u32,
2669        is_structural,
2670        should_validate,
2671        rx,
2672    )?;
2673    decode_stream.next().await.unwrap().task.await
2674}
2675
2676#[cfg(test)]
2677// test coalesce indices to ranges
2678mod tests {
2679    use super::*;
2680
2681    #[test]
2682    fn test_coalesce_indices_to_ranges_with_single_index() {
2683        let indices = vec![1];
2684        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2685        assert_eq!(ranges, vec![1..2]);
2686    }
2687
2688    #[test]
2689    fn test_coalesce_indices_to_ranges() {
2690        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2691        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2692        assert_eq!(ranges, vec![1..10]);
2693    }
2694
2695    #[test]
2696    fn test_coalesce_indices_to_ranges_with_gaps() {
2697        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2698        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2699        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2700    }
2701}