lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into [`self::FieldScheduler`] and [`self::PageScheduler`].
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into [`self::PhysicalPageDecoder`] and
27//! [`self::LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type List<UInt32>
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use lance_core::utils::futures::FinallyStreamExt;
230use log::{debug, trace, warn};
231use snafu::location;
232use tokio::sync::mpsc::error::SendError;
233use tokio::sync::mpsc::{self, unbounded_channel};
234
235use lance_core::{ArrowResult, Error, Result};
236use tracing::instrument;
237
238use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
239use crate::data::DataBlock;
240use crate::encoder::EncodedBatch;
241use crate::encodings::logical::list::StructuralListScheduler;
242use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
243use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
244use crate::format::pb::{self, column_encoding};
245use crate::format::pb21;
246use crate::previous::decoder::LogicalPageDecoder;
247use crate::previous::encodings::logical::list::OffsetPageInfo;
248use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
249use crate::previous::encodings::logical::{
250    binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
251    primitive::PrimitiveFieldScheduler,
252};
253use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
254use crate::version::LanceFileVersion;
255use crate::{BufferScheduler, EncodingsIo};
256
257// If users are getting batches over 10MiB large then it's time to reduce the batch size
258const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
259
260/// Top-level encoding message for a page.  Wraps both the
261/// legacy pb::ArrayEncoding and the newer pb::PageLayout
262///
263/// A file should only use one or the other and never both.
264/// 2.0 decoders can always assume this is pb::ArrayEncoding
265/// and 2.1+ decoders can always assume this is pb::PageLayout
266#[derive(Debug)]
267pub enum PageEncoding {
268    Legacy(pb::ArrayEncoding),
269    Structural(pb21::PageLayout),
270}
271
272impl PageEncoding {
273    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
274        match self {
275            Self::Legacy(enc) => enc,
276            Self::Structural(_) => panic!("Expected a legacy encoding"),
277        }
278    }
279
280    pub fn as_structural(&self) -> &pb21::PageLayout {
281        match self {
282            Self::Structural(enc) => enc,
283            Self::Legacy(_) => panic!("Expected a structural encoding"),
284        }
285    }
286
287    pub fn is_structural(&self) -> bool {
288        matches!(self, Self::Structural(_))
289    }
290}
291
292/// Metadata describing a page in a file
293///
294/// This is typically created by reading the metadata section of a Lance file
295#[derive(Debug)]
296pub struct PageInfo {
297    /// The number of rows in the page
298    pub num_rows: u64,
299    /// The priority (top level row number) of the page
300    ///
301    /// This is only set in 2.1 files and will be 0 for 2.0 files
302    pub priority: u64,
303    /// The encoding that explains the buffers in the page
304    pub encoding: PageEncoding,
305    /// The offsets and sizes of the buffers in the file
306    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
307}
308
309/// Metadata describing a column in a file
310///
311/// This is typically created by reading the metadata section of a Lance file
312#[derive(Debug, Clone)]
313pub struct ColumnInfo {
314    /// The index of the column in the file
315    pub index: u32,
316    /// The metadata for each page in the column
317    pub page_infos: Arc<[PageInfo]>,
318    /// File positions and their sizes of the column-level buffers
319    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
320    pub encoding: pb::ColumnEncoding,
321}
322
323impl ColumnInfo {
324    /// Create a new instance
325    pub fn new(
326        index: u32,
327        page_infos: Arc<[PageInfo]>,
328        buffer_offsets_and_sizes: Vec<(u64, u64)>,
329        encoding: pb::ColumnEncoding,
330    ) -> Self {
331        Self {
332            index,
333            page_infos,
334            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
335            encoding,
336        }
337    }
338
339    pub fn is_structural(&self) -> bool {
340        self.page_infos
341            // Can just look at the first since all should be the same
342            .first()
343            .map(|page| page.encoding.is_structural())
344            .unwrap_or(false)
345    }
346}
347
348enum RootScheduler {
349    Structural(Box<dyn StructuralFieldScheduler>),
350    Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
351}
352
353impl RootScheduler {
354    fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
355        match self {
356            Self::Structural(_) => panic!("Expected a legacy scheduler"),
357            Self::Legacy(s) => s,
358        }
359    }
360
361    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
362        match self {
363            Self::Structural(s) => s.as_ref(),
364            Self::Legacy(_) => panic!("Expected a structural scheduler"),
365        }
366    }
367}
368
369/// The scheduler for decoding batches
370///
371/// Lance decoding is done in two steps, scheduling, and decoding.  The
372/// scheduling tends to be lightweight and should quickly figure what data
373/// is needed from the disk issue the appropriate I/O requests.  A decode task is
374/// created to eventually decode the data (once it is loaded) and scheduling
375/// moves on to scheduling the next page.
376///
377/// Meanwhile, it's expected that a decode stream will be setup to run at the
378/// same time.  Decode tasks take the data that is loaded and turn it into
379/// Arrow arrays.
380///
381/// This approach allows us to keep our I/O parallelism and CPU parallelism
382/// completely separate since those are often two very different values.
383///
384/// Backpressure should be achieved via the I/O service.  Requests that are
385/// issued will pile up if the decode stream is not polling quickly enough.
386/// The [`crate::EncodingsIo::submit_request`] function should return a pending
387/// future once there are too many I/O requests in flight.
388///
389/// TODO: Implement backpressure
390pub struct DecodeBatchScheduler {
391    root_scheduler: RootScheduler,
392    pub root_fields: Fields,
393    cache: Arc<LanceCache>,
394}
395
396pub struct ColumnInfoIter<'a> {
397    column_infos: Vec<Arc<ColumnInfo>>,
398    column_indices: &'a [u32],
399    column_info_pos: usize,
400    column_indices_pos: usize,
401}
402
403impl<'a> ColumnInfoIter<'a> {
404    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
405        let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
406        Self {
407            column_infos,
408            column_indices,
409            column_info_pos: initial_pos,
410            column_indices_pos: 0,
411        }
412    }
413
414    pub fn peek(&self) -> &Arc<ColumnInfo> {
415        &self.column_infos[self.column_info_pos]
416    }
417
418    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
419        let column_info = self.column_infos[self.column_info_pos].clone();
420        let transformed = transform(column_info);
421        self.column_infos[self.column_info_pos] = transformed;
422    }
423
424    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
425        self.next().ok_or_else(|| {
426            Error::invalid_input(
427                "there were more fields in the schema than provided column indices / infos",
428                location!(),
429            )
430        })
431    }
432
433    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
434        if self.column_info_pos < self.column_infos.len() {
435            let info = &self.column_infos[self.column_info_pos];
436            self.column_info_pos += 1;
437            Some(info)
438        } else {
439            None
440        }
441    }
442
443    pub(crate) fn next_top_level(&mut self) {
444        self.column_indices_pos += 1;
445        if self.column_indices_pos < self.column_indices.len() {
446            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
447        } else {
448            self.column_info_pos = self.column_infos.len();
449        }
450    }
451}
452
453/// These contain the file buffers shared across the entire file
454#[derive(Clone, Copy, Debug)]
455pub struct FileBuffers<'a> {
456    pub positions_and_sizes: &'a [(u64, u64)],
457}
458
459/// These contain the file buffers and also buffers specific to a column
460#[derive(Clone, Copy, Debug)]
461pub struct ColumnBuffers<'a, 'b> {
462    pub file_buffers: FileBuffers<'a>,
463    pub positions_and_sizes: &'b [(u64, u64)],
464}
465
466/// These contain the file & column buffers and also buffers specific to a page
467#[derive(Clone, Copy, Debug)]
468pub struct PageBuffers<'a, 'b, 'c> {
469    pub column_buffers: ColumnBuffers<'a, 'b>,
470    pub positions_and_sizes: &'c [(u64, u64)],
471}
472
473/// The core decoder strategy handles all the various Arrow types
474#[derive(Debug)]
475pub struct CoreFieldDecoderStrategy {
476    pub validate_data: bool,
477    pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
478    pub cache_repetition_index: bool,
479}
480
481impl Default for CoreFieldDecoderStrategy {
482    fn default() -> Self {
483        Self {
484            validate_data: false,
485            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
486            cache_repetition_index: false,
487        }
488    }
489}
490
491impl CoreFieldDecoderStrategy {
492    /// Create a new strategy with cache_repetition_index enabled
493    pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
494        self.cache_repetition_index = cache_repetition_index;
495        self
496    }
497
498    /// Create a new strategy from decoder config
499    pub fn from_decoder_config(config: &DecoderConfig) -> Self {
500        Self {
501            validate_data: config.validate_on_decode,
502            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
503            cache_repetition_index: config.cache_repetition_index,
504        }
505    }
506
507    /// This is just a sanity check to ensure there is no "wrapped encodings"
508    /// that haven't been handled.
509    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
510        let column_encoding = column_info
511            .encoding
512            .column_encoding
513            .as_ref()
514            .ok_or_else(|| {
515                Error::invalid_input(
516                    format!(
517                        "the column at index {} was missing a ColumnEncoding",
518                        column_info.index
519                    ),
520                    location!(),
521                )
522            })?;
523        if matches!(
524            column_encoding,
525            pb::column_encoding::ColumnEncoding::Values(_)
526        ) {
527            Ok(())
528        } else {
529            Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
530        }
531    }
532
533    fn is_structural_primitive(data_type: &DataType) -> bool {
534        if data_type.is_primitive() {
535            true
536        } else {
537            match data_type {
538                // DataType::is_primitive doesn't consider these primitive but we do
539                DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
540                DataType::Boolean
541                | DataType::Null
542                | DataType::FixedSizeBinary(_)
543                | DataType::Binary
544                | DataType::LargeBinary
545                | DataType::Utf8
546                | DataType::LargeUtf8 => true,
547                DataType::FixedSizeList(inner, _) => {
548                    Self::is_structural_primitive(inner.data_type())
549                }
550                _ => false,
551            }
552        }
553    }
554
555    fn is_primitive_legacy(data_type: &DataType) -> bool {
556        if data_type.is_primitive() {
557            true
558        } else {
559            match data_type {
560                // DataType::is_primitive doesn't consider these primitive but we do
561                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
562                DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
563                _ => false,
564            }
565        }
566    }
567
568    fn create_primitive_scheduler(
569        &self,
570        field: &Field,
571        column: &ColumnInfo,
572        buffers: FileBuffers,
573    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
574        Self::ensure_values_encoded(column, &field.name)?;
575        // Primitive fields map to a single column
576        let column_buffers = ColumnBuffers {
577            file_buffers: buffers,
578            positions_and_sizes: &column.buffer_offsets_and_sizes,
579        };
580        Ok(Box::new(PrimitiveFieldScheduler::new(
581            column.index,
582            field.data_type(),
583            column.page_infos.clone(),
584            column_buffers,
585            self.validate_data,
586        )))
587    }
588
589    /// Helper method to verify the page encoding of a struct header column
590    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
591        Self::ensure_values_encoded(column_info, field_name)?;
592        if column_info.page_infos.len() != 1 {
593            return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
594        }
595        let encoding = &column_info.page_infos[0].encoding;
596        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
597            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
598            _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
599        }
600    }
601
602    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
603        let encoding = &column_info.page_infos[0].encoding;
604        matches!(
605            encoding.as_legacy().array_encoding.as_ref().unwrap(),
606            pb::array_encoding::ArrayEncoding::PackedStruct(_)
607        )
608    }
609
610    fn create_list_scheduler(
611        &self,
612        list_field: &Field,
613        column_infos: &mut ColumnInfoIter,
614        buffers: FileBuffers,
615        offsets_column: &ColumnInfo,
616    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
617        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
618        let offsets_column_buffers = ColumnBuffers {
619            file_buffers: buffers,
620            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
621        };
622        let items_scheduler =
623            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
624
625        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
626            .page_infos
627            .iter()
628            .filter(|offsets_page| offsets_page.num_rows > 0)
629            .map(|offsets_page| {
630                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
631                    &offsets_page.encoding.as_legacy().array_encoding
632                {
633                    let inner = PageInfo {
634                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
635                        encoding: PageEncoding::Legacy(
636                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
637                        ),
638                        num_rows: offsets_page.num_rows,
639                        priority: 0,
640                    };
641                    (
642                        inner,
643                        OffsetPageInfo {
644                            offsets_in_page: offsets_page.num_rows,
645                            null_offset_adjustment: list_encoding.null_offset_adjustment,
646                            num_items_referenced_by_page: list_encoding.num_items,
647                        },
648                    )
649                } else {
650                    // TODO: Should probably return Err here
651                    panic!("Expected a list column");
652                }
653            })
654            .unzip();
655        let inner = Arc::new(PrimitiveFieldScheduler::new(
656            offsets_column.index,
657            DataType::UInt64,
658            Arc::from(inner_infos.into_boxed_slice()),
659            offsets_column_buffers,
660            self.validate_data,
661        )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
662        let items_field = match list_field.data_type() {
663            DataType::List(inner) => inner,
664            DataType::LargeList(inner) => inner,
665            _ => unreachable!(),
666        };
667        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
668            DataType::Int32
669        } else {
670            DataType::Int64
671        };
672        Ok(Box::new(ListFieldScheduler::new(
673            inner,
674            items_scheduler.into(),
675            items_field,
676            offset_type,
677            null_offset_adjustments,
678        )))
679    }
680
681    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
682        if let column_encoding::ColumnEncoding::Blob(blob) =
683            column_info.encoding.column_encoding.as_ref().unwrap()
684        {
685            let mut column_info = column_info.clone();
686            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
687            Some(column_info)
688        } else {
689            None
690        }
691    }
692
693    fn create_structural_field_scheduler(
694        &self,
695        field: &Field,
696        column_infos: &mut ColumnInfoIter,
697    ) -> Result<Box<dyn StructuralFieldScheduler>> {
698        let data_type = field.data_type();
699        if Self::is_structural_primitive(&data_type) {
700            let column_info = column_infos.expect_next()?;
701            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
702                column_info.as_ref(),
703                self.decompressor_strategy.as_ref(),
704                self.cache_repetition_index,
705                field,
706            )?);
707
708            // advance to the next top level column
709            column_infos.next_top_level();
710
711            return Ok(scheduler);
712        }
713        match &data_type {
714            DataType::Struct(fields) => {
715                if field.is_packed_struct() {
716                    // Packed struct
717                    let column_info = column_infos.expect_next()?;
718                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
719                        column_info.as_ref(),
720                        self.decompressor_strategy.as_ref(),
721                        self.cache_repetition_index,
722                        field,
723                    )?);
724
725                    // advance to the next top level column
726                    column_infos.next_top_level();
727
728                    return Ok(scheduler);
729                }
730                // Maybe a blob descriptions struct?
731                if field.is_blob() {
732                    let column_info = column_infos.peek();
733                    if column_info.page_infos.iter().any(|page| {
734                        matches!(
735                            page.encoding,
736                            PageEncoding::Structural(pb21::PageLayout {
737                                layout: Some(pb21::page_layout::Layout::BlobLayout(_))
738                            })
739                        )
740                    }) {
741                        let column_info = column_infos.expect_next()?;
742                        let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
743                            column_info.as_ref(),
744                            self.decompressor_strategy.as_ref(),
745                            self.cache_repetition_index,
746                            field,
747                        )?);
748                        column_infos.next_top_level();
749                        return Ok(scheduler);
750                    }
751                }
752
753                let mut child_schedulers = Vec::with_capacity(field.children.len());
754                for field in field.children.iter() {
755                    let field_scheduler =
756                        self.create_structural_field_scheduler(field, column_infos)?;
757                    child_schedulers.push(field_scheduler);
758                }
759
760                let fields = fields.clone();
761                Ok(
762                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
763                        as Box<dyn StructuralFieldScheduler>,
764                )
765            }
766            DataType::List(_) | DataType::LargeList(_) => {
767                let child = field
768                    .children
769                    .first()
770                    .expect("List field must have a child");
771                let child_scheduler =
772                    self.create_structural_field_scheduler(child, column_infos)?;
773                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
774                    as Box<dyn StructuralFieldScheduler>)
775            }
776            _ => todo!("create_structural_field_scheduler for {}", data_type),
777        }
778    }
779
780    fn create_legacy_field_scheduler(
781        &self,
782        field: &Field,
783        column_infos: &mut ColumnInfoIter,
784        buffers: FileBuffers,
785    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
786        let data_type = field.data_type();
787        if Self::is_primitive_legacy(&data_type) {
788            let column_info = column_infos.expect_next()?;
789            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
790            return Ok(scheduler);
791        } else if data_type.is_binary_like() {
792            let column_info = column_infos.next().unwrap().clone();
793            // Column is blob and user is asking for binary data
794            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
795                let desc_scheduler =
796                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
797                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
798                return Ok(blob_scheduler);
799            }
800            if let Some(page_info) = column_info.page_infos.first() {
801                if matches!(
802                    page_info.encoding.as_legacy(),
803                    pb::ArrayEncoding {
804                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
805                    }
806                ) {
807                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
808                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
809                    } else {
810                        DataType::LargeList(Arc::new(ArrowField::new(
811                            "item",
812                            DataType::UInt8,
813                            false,
814                        )))
815                    };
816                    let list_field = Field::try_from(ArrowField::new(
817                        field.name.clone(),
818                        list_type,
819                        field.nullable,
820                    ))
821                    .unwrap();
822                    let list_scheduler = self.create_list_scheduler(
823                        &list_field,
824                        column_infos,
825                        buffers,
826                        &column_info,
827                    )?;
828                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
829                        list_scheduler.into(),
830                        field.data_type(),
831                    ));
832                    return Ok(binary_scheduler);
833                } else {
834                    let scheduler =
835                        self.create_primitive_scheduler(field, &column_info, buffers)?;
836                    return Ok(scheduler);
837                }
838            } else {
839                return self.create_primitive_scheduler(field, &column_info, buffers);
840            }
841        }
842        match &data_type {
843            DataType::FixedSizeList(inner, _dimension) => {
844                // A fixed size list column could either be a physical or a logical decoder
845                // depending on the child data type.
846                if Self::is_primitive_legacy(inner.data_type()) {
847                    let primitive_col = column_infos.expect_next()?;
848                    let scheduler =
849                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
850                    Ok(scheduler)
851                } else {
852                    todo!()
853                }
854            }
855            DataType::Dictionary(_key_type, value_type) => {
856                if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
857                    let primitive_col = column_infos.expect_next()?;
858                    let scheduler =
859                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
860                    Ok(scheduler)
861                } else {
862                    Err(Error::NotSupported {
863                        source: format!(
864                            "No way to decode into a dictionary field of type {}",
865                            value_type
866                        )
867                        .into(),
868                        location: location!(),
869                    })
870                }
871            }
872            DataType::List(_) | DataType::LargeList(_) => {
873                let offsets_column = column_infos.expect_next()?.clone();
874                column_infos.next_top_level();
875                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
876            }
877            DataType::Struct(fields) => {
878                let column_info = column_infos.expect_next()?;
879
880                // Column is blob and user is asking for descriptions
881                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
882                    // Can use primitive scheduler here since descriptions are always packed struct
883                    return self.create_primitive_scheduler(field, &blob_col, buffers);
884                }
885
886                if Self::check_packed_struct(column_info) {
887                    // use packed struct encoding
888                    self.create_primitive_scheduler(field, column_info, buffers)
889                } else {
890                    // use default struct encoding
891                    Self::check_simple_struct(column_info, &field.name).unwrap();
892                    let num_rows = column_info
893                        .page_infos
894                        .iter()
895                        .map(|page| page.num_rows)
896                        .sum();
897                    let mut child_schedulers = Vec::with_capacity(field.children.len());
898                    for field in &field.children {
899                        column_infos.next_top_level();
900                        let field_scheduler =
901                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
902                        child_schedulers.push(Arc::from(field_scheduler));
903                    }
904
905                    let fields = fields.clone();
906                    Ok(Box::new(SimpleStructScheduler::new(
907                        child_schedulers,
908                        fields,
909                        num_rows,
910                    )))
911                }
912            }
913            // TODO: Still need support for RLE
914            _ => todo!(),
915        }
916    }
917}
918
919/// Create's a dummy ColumnInfo for the root column
920fn root_column(num_rows: u64) -> ColumnInfo {
921    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
922    let final_page_num_rows = num_rows % (u32::MAX as u64);
923    let root_pages = (0..num_root_pages)
924        .map(|i| PageInfo {
925            num_rows: if i == num_root_pages - 1 {
926                final_page_num_rows
927            } else {
928                u64::MAX
929            },
930            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
931                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
932                    pb::SimpleStruct {},
933                )),
934            }),
935            priority: 0, // not used in legacy scheduler
936            buffer_offsets_and_sizes: Arc::new([]),
937        })
938        .collect::<Vec<_>>();
939    ColumnInfo {
940        buffer_offsets_and_sizes: Arc::new([]),
941        encoding: pb::ColumnEncoding {
942            column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
943        },
944        index: u32::MAX,
945        page_infos: Arc::from(root_pages),
946    }
947}
948
949pub enum RootDecoder {
950    Structural(StructuralStructDecoder),
951    Legacy(SimpleStructDecoder),
952}
953
954impl RootDecoder {
955    pub fn into_structural(self) -> StructuralStructDecoder {
956        match self {
957            Self::Structural(decoder) => decoder,
958            Self::Legacy(_) => panic!("Expected a structural decoder"),
959        }
960    }
961
962    pub fn into_legacy(self) -> SimpleStructDecoder {
963        match self {
964            Self::Legacy(decoder) => decoder,
965            Self::Structural(_) => panic!("Expected a legacy decoder"),
966        }
967    }
968}
969
970impl DecodeBatchScheduler {
971    /// Creates a new decode scheduler with the expected schema and the column
972    /// metadata of the file.
973    #[allow(clippy::too_many_arguments)]
974    pub async fn try_new<'a>(
975        schema: &'a Schema,
976        column_indices: &[u32],
977        column_infos: &[Arc<ColumnInfo>],
978        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
979        num_rows: u64,
980        _decoder_plugins: Arc<DecoderPlugins>,
981        io: Arc<dyn EncodingsIo>,
982        cache: Arc<LanceCache>,
983        filter: &FilterExpression,
984        decoder_config: &DecoderConfig,
985    ) -> Result<Self> {
986        assert!(num_rows > 0);
987        let buffers = FileBuffers {
988            positions_and_sizes: file_buffer_positions_and_sizes,
989        };
990        let arrow_schema = ArrowSchema::from(schema);
991        let root_fields = arrow_schema.fields().clone();
992        let root_type = DataType::Struct(root_fields.clone());
993        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
994        // root_field.children and schema.fields should be identical at this point but the latter
995        // has field ids and the former does not.  This line restores that.
996        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
997        root_field.children.clone_from(&schema.fields);
998        root_field
999            .metadata
1000            .insert("__lance_decoder_root".to_string(), "true".to_string());
1001
1002        if column_infos.is_empty() || column_infos[0].is_structural() {
1003            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1004
1005            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1006            let mut root_scheduler =
1007                strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1008
1009            let context = SchedulerContext::new(io, cache.clone());
1010            root_scheduler.initialize(filter, &context).await?;
1011
1012            Ok(Self {
1013                root_scheduler: RootScheduler::Structural(root_scheduler),
1014                root_fields,
1015                cache,
1016            })
1017        } else {
1018            // The old encoding style expected a header column for structs and so we
1019            // need a header column for the top-level struct
1020            let mut columns = Vec::with_capacity(column_infos.len() + 1);
1021            columns.push(Arc::new(root_column(num_rows)));
1022            columns.extend(column_infos.iter().cloned());
1023
1024            let adjusted_column_indices = [0_u32]
1025                .into_iter()
1026                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1027                .collect::<Vec<_>>();
1028            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1029            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1030            let root_scheduler =
1031                strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1032
1033            let context = SchedulerContext::new(io, cache.clone());
1034            root_scheduler.initialize(filter, &context).await?;
1035
1036            Ok(Self {
1037                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1038                root_fields,
1039                cache,
1040            })
1041        }
1042    }
1043
1044    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1045    pub fn from_scheduler(
1046        root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1047        root_fields: Fields,
1048        cache: Arc<LanceCache>,
1049    ) -> Self {
1050        Self {
1051            root_scheduler: RootScheduler::Legacy(root_scheduler),
1052            root_fields,
1053            cache,
1054        }
1055    }
1056
1057    fn do_schedule_ranges_structural(
1058        &mut self,
1059        ranges: &[Range<u64>],
1060        filter: &FilterExpression,
1061        io: Arc<dyn EncodingsIo>,
1062        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1063    ) {
1064        let root_scheduler = self.root_scheduler.as_structural();
1065        let mut context = SchedulerContext::new(io, self.cache.clone());
1066        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1067        if let Err(schedule_ranges_err) = maybe_root_job {
1068            schedule_action(Err(schedule_ranges_err));
1069            return;
1070        }
1071        let mut root_job = maybe_root_job.unwrap();
1072        let mut num_rows_scheduled = 0;
1073        loop {
1074            let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1075            if let Err(err) = maybe_next_scan_lines {
1076                schedule_action(Err(err));
1077                return;
1078            }
1079            let next_scan_lines = maybe_next_scan_lines.unwrap();
1080            if next_scan_lines.is_empty() {
1081                return;
1082            }
1083            for next_scan_line in next_scan_lines {
1084                trace!(
1085                    "Scheduled scan line of {} rows and {} decoders",
1086                    next_scan_line.rows_scheduled,
1087                    next_scan_line.decoders.len()
1088                );
1089                num_rows_scheduled += next_scan_line.rows_scheduled;
1090                if !schedule_action(Ok(DecoderMessage {
1091                    scheduled_so_far: num_rows_scheduled,
1092                    decoders: next_scan_line.decoders,
1093                })) {
1094                    // Decoder has disconnected
1095                    return;
1096                }
1097            }
1098        }
1099    }
1100
1101    fn do_schedule_ranges_legacy(
1102        &mut self,
1103        ranges: &[Range<u64>],
1104        filter: &FilterExpression,
1105        io: Arc<dyn EncodingsIo>,
1106        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1107        // If specified, this will be used as the top_level_row for all scheduling
1108        // tasks.  This is used by list scheduling to ensure all items scheduling
1109        // tasks are scheduled at the same top level row.
1110        priority: Option<Box<dyn PriorityRange>>,
1111    ) {
1112        let root_scheduler = self.root_scheduler.as_legacy();
1113        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1114        trace!(
1115            "Scheduling {} ranges across {}..{} ({} rows){}",
1116            ranges.len(),
1117            ranges.first().unwrap().start,
1118            ranges.last().unwrap().end,
1119            rows_requested,
1120            priority
1121                .as_ref()
1122                .map(|p| format!(" (priority={:?})", p))
1123                .unwrap_or_default()
1124        );
1125
1126        let mut context = SchedulerContext::new(io, self.cache.clone());
1127        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1128        if let Err(schedule_ranges_err) = maybe_root_job {
1129            schedule_action(Err(schedule_ranges_err));
1130            return;
1131        }
1132        let mut root_job = maybe_root_job.unwrap();
1133        let mut num_rows_scheduled = 0;
1134        let mut rows_to_schedule = root_job.num_rows();
1135        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1136        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1137        while rows_to_schedule > 0 {
1138            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1139            if let Err(schedule_next_err) = maybe_next_scan_line {
1140                schedule_action(Err(schedule_next_err));
1141                return;
1142            }
1143            let next_scan_line = maybe_next_scan_line.unwrap();
1144            priority.advance(next_scan_line.rows_scheduled);
1145            num_rows_scheduled += next_scan_line.rows_scheduled;
1146            rows_to_schedule -= next_scan_line.rows_scheduled;
1147            trace!(
1148                "Scheduled scan line of {} rows and {} decoders",
1149                next_scan_line.rows_scheduled,
1150                next_scan_line.decoders.len()
1151            );
1152            if !schedule_action(Ok(DecoderMessage {
1153                scheduled_so_far: num_rows_scheduled,
1154                decoders: next_scan_line.decoders,
1155            })) {
1156                // Decoder has disconnected
1157                return;
1158            }
1159
1160            trace!("Finished scheduling {} ranges", ranges.len());
1161        }
1162    }
1163
1164    fn do_schedule_ranges(
1165        &mut self,
1166        ranges: &[Range<u64>],
1167        filter: &FilterExpression,
1168        io: Arc<dyn EncodingsIo>,
1169        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1170        // If specified, this will be used as the top_level_row for all scheduling
1171        // tasks.  This is used by list scheduling to ensure all items scheduling
1172        // tasks are scheduled at the same top level row.
1173        priority: Option<Box<dyn PriorityRange>>,
1174    ) {
1175        match &self.root_scheduler {
1176            RootScheduler::Legacy(_) => {
1177                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1178            }
1179            RootScheduler::Structural(_) => {
1180                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1181            }
1182        }
1183    }
1184
1185    // This method is similar to schedule_ranges but instead of
1186    // sending the decoders to a channel it collects them all into a vector
1187    pub fn schedule_ranges_to_vec(
1188        &mut self,
1189        ranges: &[Range<u64>],
1190        filter: &FilterExpression,
1191        io: Arc<dyn EncodingsIo>,
1192        priority: Option<Box<dyn PriorityRange>>,
1193    ) -> Result<Vec<DecoderMessage>> {
1194        let mut decode_messages = Vec::new();
1195        self.do_schedule_ranges(
1196            ranges,
1197            filter,
1198            io,
1199            |msg| {
1200                decode_messages.push(msg);
1201                true
1202            },
1203            priority,
1204        );
1205        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1206    }
1207
1208    /// Schedules the load of multiple ranges of rows
1209    ///
1210    /// Ranges must be non-overlapping and in sorted order
1211    ///
1212    /// # Arguments
1213    ///
1214    /// * `ranges` - The ranges of rows to load
1215    /// * `sink` - A channel to send the decode tasks
1216    /// * `scheduler` An I/O scheduler to issue I/O requests
1217    #[instrument(skip_all)]
1218    pub fn schedule_ranges(
1219        &mut self,
1220        ranges: &[Range<u64>],
1221        filter: &FilterExpression,
1222        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1223        scheduler: Arc<dyn EncodingsIo>,
1224    ) {
1225        self.do_schedule_ranges(
1226            ranges,
1227            filter,
1228            scheduler,
1229            |msg| {
1230                match sink.send(msg) {
1231                    Ok(_) => true,
1232                    Err(SendError { .. }) => {
1233                        // The receiver has gone away.  We can't do anything about it
1234                        // so just ignore the error.
1235                        debug!(
1236                        "schedule_ranges aborting early since decoder appears to have been dropped"
1237                    );
1238                        false
1239                    }
1240                }
1241            },
1242            None,
1243        )
1244    }
1245
1246    /// Schedules the load of a range of rows
1247    ///
1248    /// # Arguments
1249    ///
1250    /// * `range` - The range of rows to load
1251    /// * `sink` - A channel to send the decode tasks
1252    /// * `scheduler` An I/O scheduler to issue I/O requests
1253    #[instrument(skip_all)]
1254    pub fn schedule_range(
1255        &mut self,
1256        range: Range<u64>,
1257        filter: &FilterExpression,
1258        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1259        scheduler: Arc<dyn EncodingsIo>,
1260    ) {
1261        self.schedule_ranges(&[range], filter, sink, scheduler)
1262    }
1263
1264    /// Schedules the load of selected rows
1265    ///
1266    /// # Arguments
1267    ///
1268    /// * `indices` - The row indices to load (these must be in ascending order!)
1269    /// * `sink` - A channel to send the decode tasks
1270    /// * `scheduler` An I/O scheduler to issue I/O requests
1271    pub fn schedule_take(
1272        &mut self,
1273        indices: &[u64],
1274        filter: &FilterExpression,
1275        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1276        scheduler: Arc<dyn EncodingsIo>,
1277    ) {
1278        debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1279        if indices.is_empty() {
1280            return;
1281        }
1282        trace!("Scheduling take of {} rows", indices.len());
1283        let ranges = Self::indices_to_ranges(indices);
1284        self.schedule_ranges(&ranges, filter, sink, scheduler)
1285    }
1286
1287    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1288    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1289        let mut ranges = Vec::new();
1290        let mut start = indices[0];
1291
1292        for window in indices.windows(2) {
1293            if window[1] != window[0] + 1 {
1294                ranges.push(start..window[0] + 1);
1295                start = window[1];
1296            }
1297        }
1298
1299        ranges.push(start..*indices.last().unwrap() + 1);
1300        ranges
1301    }
1302}
1303
1304pub struct ReadBatchTask {
1305    pub task: BoxFuture<'static, Result<RecordBatch>>,
1306    pub num_rows: u32,
1307}
1308
1309/// A stream that takes scheduled jobs and generates decode tasks from them.
1310pub struct BatchDecodeStream {
1311    context: DecoderContext,
1312    root_decoder: SimpleStructDecoder,
1313    rows_remaining: u64,
1314    rows_per_batch: u32,
1315    rows_scheduled: u64,
1316    rows_drained: u64,
1317    scheduler_exhausted: bool,
1318    emitted_batch_size_warning: Arc<Once>,
1319}
1320
1321impl BatchDecodeStream {
1322    /// Create a new instance of a batch decode stream
1323    ///
1324    /// # Arguments
1325    ///
1326    /// * `scheduled` - an incoming stream of decode tasks from a
1327    ///   [`crate::decode::DecodeBatchScheduler`]
1328    /// * `schema` - the schema of the data to create
1329    /// * `rows_per_batch` the number of rows to create before making a batch
1330    /// * `num_rows` the total number of rows scheduled
1331    /// * `num_columns` the total number of columns in the file
1332    pub fn new(
1333        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1334        rows_per_batch: u32,
1335        num_rows: u64,
1336        root_decoder: SimpleStructDecoder,
1337    ) -> Self {
1338        Self {
1339            context: DecoderContext::new(scheduled),
1340            root_decoder,
1341            rows_remaining: num_rows,
1342            rows_per_batch,
1343            rows_scheduled: 0,
1344            rows_drained: 0,
1345            scheduler_exhausted: false,
1346            emitted_batch_size_warning: Arc::new(Once::new()),
1347        }
1348    }
1349
1350    fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1351        if decoder.path.is_empty() {
1352            // The root decoder we can ignore
1353            Ok(())
1354        } else {
1355            self.root_decoder.accept_child(decoder)
1356        }
1357    }
1358
1359    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1360        if self.scheduler_exhausted {
1361            return Ok(self.rows_scheduled);
1362        }
1363        while self.rows_scheduled < scheduled_need {
1364            let next_message = self.context.source.recv().await;
1365            match next_message {
1366                Some(scan_line) => {
1367                    let scan_line = scan_line?;
1368                    self.rows_scheduled = scan_line.scheduled_so_far;
1369                    for message in scan_line.decoders {
1370                        self.accept_decoder(message.into_legacy())?;
1371                    }
1372                }
1373                None => {
1374                    // Schedule ended before we got all the data we expected.  This probably
1375                    // means some kind of pushdown filter was applied and we didn't load as
1376                    // much data as we thought we would.
1377                    self.scheduler_exhausted = true;
1378                    return Ok(self.rows_scheduled);
1379                }
1380            }
1381        }
1382        Ok(scheduled_need)
1383    }
1384
1385    #[instrument(level = "debug", skip_all)]
1386    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1387        trace!(
1388            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1389            self.rows_remaining,
1390            self.rows_drained,
1391            self.rows_scheduled,
1392        );
1393        if self.rows_remaining == 0 {
1394            return Ok(None);
1395        }
1396
1397        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1398        self.rows_remaining -= to_take;
1399
1400        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1401        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1402        if scheduled_need > 0 {
1403            let desired_scheduled = scheduled_need + self.rows_scheduled;
1404            trace!(
1405                "Draining from scheduler (desire at least {} scheduled rows)",
1406                desired_scheduled
1407            );
1408            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1409            if actually_scheduled < desired_scheduled {
1410                let under_scheduled = desired_scheduled - actually_scheduled;
1411                to_take -= under_scheduled;
1412            }
1413        }
1414
1415        if to_take == 0 {
1416            return Ok(None);
1417        }
1418
1419        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1420        let loaded_need = self.rows_drained + to_take - 1;
1421        trace!(
1422            "Waiting for I/O (desire at least {} fully loaded rows)",
1423            loaded_need
1424        );
1425        self.root_decoder.wait_for_loaded(loaded_need).await?;
1426
1427        let next_task = self.root_decoder.drain(to_take)?;
1428        self.rows_drained += to_take;
1429        Ok(Some(next_task))
1430    }
1431
1432    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1433        let stream = futures::stream::unfold(self, |mut slf| async move {
1434            let next_task = slf.next_batch_task().await;
1435            let next_task = next_task.transpose().map(|next_task| {
1436                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1437                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1438                let task = async move {
1439                    let next_task = next_task?;
1440                    // Real decode work happens inside into_batch, which can block the current
1441                    // thread for a long time. By spawning it as a new task, we allow Tokio's
1442                    // worker threads to keep making progress.
1443                    tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1444                        .await
1445                        .map_err(|err| Error::Wrapped {
1446                            error: err.into(),
1447                            location: location!(),
1448                        })?
1449                };
1450                (task, num_rows)
1451            });
1452            next_task.map(|(task, num_rows)| {
1453                // This should be true since batch size is u32
1454                debug_assert!(num_rows <= u32::MAX as u64);
1455                let next_task = ReadBatchTask {
1456                    task: task.boxed(),
1457                    num_rows: num_rows as u32,
1458                };
1459                (next_task, slf)
1460            })
1461        });
1462        stream.boxed()
1463    }
1464}
1465
1466// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1467// we can have a single implementation of the batch decode iterator
1468enum RootDecoderMessage {
1469    LoadedPage(LoadedPageShard),
1470    LegacyPage(crate::previous::decoder::DecoderReady),
1471}
1472trait RootDecoderType {
1473    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1474    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1475    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1476}
1477impl RootDecoderType for StructuralStructDecoder {
1478    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1479        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1480            unreachable!()
1481        };
1482        self.accept_page(loaded_page)
1483    }
1484    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1485        self.drain_batch_task(num_rows)
1486    }
1487    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1488        // Waiting happens elsewhere (not as part of the decoder)
1489        Ok(())
1490    }
1491}
1492impl RootDecoderType for SimpleStructDecoder {
1493    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1494        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1495            unreachable!()
1496        };
1497        self.accept_child(legacy_page)
1498    }
1499    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1500        self.drain(num_rows)
1501    }
1502    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1503        runtime.block_on(self.wait_for_loaded(loaded_need))
1504    }
1505}
1506
1507/// A blocking batch decoder that performs synchronous decoding
1508struct BatchDecodeIterator<T: RootDecoderType> {
1509    messages: VecDeque<Result<DecoderMessage>>,
1510    root_decoder: T,
1511    rows_remaining: u64,
1512    rows_per_batch: u32,
1513    rows_scheduled: u64,
1514    rows_drained: u64,
1515    emitted_batch_size_warning: Arc<Once>,
1516    // Note: this is not the runtime on which I/O happens.
1517    // That's always in the scheduler.  This is just a runtime we use to
1518    // sleep the current thread if I/O is unready
1519    wait_for_io_runtime: tokio::runtime::Runtime,
1520    schema: Arc<ArrowSchema>,
1521}
1522
1523impl<T: RootDecoderType> BatchDecodeIterator<T> {
1524    /// Create a new instance of a batch decode iterator
1525    pub fn new(
1526        messages: VecDeque<Result<DecoderMessage>>,
1527        rows_per_batch: u32,
1528        num_rows: u64,
1529        root_decoder: T,
1530        schema: Arc<ArrowSchema>,
1531    ) -> Self {
1532        Self {
1533            messages,
1534            root_decoder,
1535            rows_remaining: num_rows,
1536            rows_per_batch,
1537            rows_scheduled: 0,
1538            rows_drained: 0,
1539            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1540                .build()
1541                .unwrap(),
1542            emitted_batch_size_warning: Arc::new(Once::new()),
1543            schema,
1544        }
1545    }
1546
1547    /// Wait for a single page of data to finish loading
1548    ///
1549    /// If the data is not available this will perform a *blocking* wait (put
1550    /// the current thread to sleep)
1551    fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1552        match maybe_done(unloaded_page.0) {
1553            // Fast path, avoid all runtime shenanigans if the data is ready
1554            MaybeDone::Done(loaded_page) => loaded_page,
1555            // Slow path, we need to wait on I/O, enter the runtime
1556            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1557            MaybeDone::Gone => unreachable!(),
1558        }
1559    }
1560
1561    /// Waits for I/O until `scheduled_need` rows have been loaded
1562    ///
1563    /// Note that `scheduled_need` is cumulative.  E.g. this method
1564    /// should be called with 5, 10, 15 and not 5, 5, 5
1565    #[instrument(skip_all)]
1566    fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1567        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1568            let message = self.messages.pop_front().unwrap()?;
1569            self.rows_scheduled = message.scheduled_so_far;
1570            for decoder_message in message.decoders {
1571                match decoder_message {
1572                    MessageType::UnloadedPage(unloaded_page) => {
1573                        let loaded_page = self.wait_for_page(unloaded_page)?;
1574                        self.root_decoder
1575                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1576                    }
1577                    MessageType::DecoderReady(decoder_ready) => {
1578                        // The root decoder we can ignore
1579                        if !decoder_ready.path.is_empty() {
1580                            self.root_decoder
1581                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1582                        }
1583                    }
1584                }
1585            }
1586        }
1587
1588        let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1589
1590        self.root_decoder
1591            .wait(loaded_need, &self.wait_for_io_runtime)?;
1592        Ok(self.rows_scheduled)
1593    }
1594
1595    #[instrument(level = "debug", skip_all)]
1596    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1597        trace!(
1598            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1599            self.rows_remaining,
1600            self.rows_drained,
1601            self.rows_scheduled,
1602        );
1603        if self.rows_remaining == 0 {
1604            return Ok(None);
1605        }
1606
1607        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1608        self.rows_remaining -= to_take;
1609
1610        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1611        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1612        if scheduled_need > 0 {
1613            let desired_scheduled = scheduled_need + self.rows_scheduled;
1614            trace!(
1615                "Draining from scheduler (desire at least {} scheduled rows)",
1616                desired_scheduled
1617            );
1618            let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1619            if actually_scheduled < desired_scheduled {
1620                let under_scheduled = desired_scheduled - actually_scheduled;
1621                to_take -= under_scheduled;
1622            }
1623        }
1624
1625        if to_take == 0 {
1626            return Ok(None);
1627        }
1628
1629        let next_task = self.root_decoder.drain_batch(to_take)?;
1630
1631        self.rows_drained += to_take;
1632
1633        let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1634
1635        Ok(Some(batch))
1636    }
1637}
1638
1639impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1640    type Item = ArrowResult<RecordBatch>;
1641
1642    fn next(&mut self) -> Option<Self::Item> {
1643        self.next_batch_task()
1644            .transpose()
1645            .map(|r| r.map_err(ArrowError::from))
1646    }
1647}
1648
1649impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1650    fn schema(&self) -> Arc<ArrowSchema> {
1651        self.schema.clone()
1652    }
1653}
1654
1655/// A stream that takes scheduled jobs and generates decode tasks from them.
1656pub struct StructuralBatchDecodeStream {
1657    context: DecoderContext,
1658    root_decoder: StructuralStructDecoder,
1659    rows_remaining: u64,
1660    rows_per_batch: u32,
1661    rows_scheduled: u64,
1662    rows_drained: u64,
1663    scheduler_exhausted: bool,
1664    emitted_batch_size_warning: Arc<Once>,
1665}
1666
1667impl StructuralBatchDecodeStream {
1668    /// Create a new instance of a batch decode stream
1669    ///
1670    /// # Arguments
1671    ///
1672    /// * `scheduled` - an incoming stream of decode tasks from a
1673    ///   [`crate::decode::DecodeBatchScheduler`]
1674    /// * `schema` - the schema of the data to create
1675    /// * `rows_per_batch` the number of rows to create before making a batch
1676    /// * `num_rows` the total number of rows scheduled
1677    /// * `num_columns` the total number of columns in the file
1678    pub fn new(
1679        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1680        rows_per_batch: u32,
1681        num_rows: u64,
1682        root_decoder: StructuralStructDecoder,
1683    ) -> Self {
1684        Self {
1685            context: DecoderContext::new(scheduled),
1686            root_decoder,
1687            rows_remaining: num_rows,
1688            rows_per_batch,
1689            rows_scheduled: 0,
1690            rows_drained: 0,
1691            scheduler_exhausted: false,
1692            emitted_batch_size_warning: Arc::new(Once::new()),
1693        }
1694    }
1695
1696    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1697        if self.scheduler_exhausted {
1698            return Ok(self.rows_scheduled);
1699        }
1700        while self.rows_scheduled < scheduled_need {
1701            let next_message = self.context.source.recv().await;
1702            match next_message {
1703                Some(scan_line) => {
1704                    let scan_line = scan_line?;
1705                    self.rows_scheduled = scan_line.scheduled_so_far;
1706                    for message in scan_line.decoders {
1707                        let unloaded_page = message.into_structural();
1708                        let loaded_page = unloaded_page.0.await?;
1709                        self.root_decoder.accept_page(loaded_page)?;
1710                    }
1711                }
1712                None => {
1713                    // Schedule ended before we got all the data we expected.  This probably
1714                    // means some kind of pushdown filter was applied and we didn't load as
1715                    // much data as we thought we would.
1716                    self.scheduler_exhausted = true;
1717                    return Ok(self.rows_scheduled);
1718                }
1719            }
1720        }
1721        Ok(scheduled_need)
1722    }
1723
1724    #[instrument(level = "debug", skip_all)]
1725    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1726        trace!(
1727            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1728            self.rows_remaining,
1729            self.rows_drained,
1730            self.rows_scheduled,
1731        );
1732        if self.rows_remaining == 0 {
1733            return Ok(None);
1734        }
1735
1736        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1737        self.rows_remaining -= to_take;
1738
1739        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1740        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1741        if scheduled_need > 0 {
1742            let desired_scheduled = scheduled_need + self.rows_scheduled;
1743            trace!(
1744                "Draining from scheduler (desire at least {} scheduled rows)",
1745                desired_scheduled
1746            );
1747            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1748            if actually_scheduled < desired_scheduled {
1749                let under_scheduled = desired_scheduled - actually_scheduled;
1750                to_take -= under_scheduled;
1751            }
1752        }
1753
1754        if to_take == 0 {
1755            return Ok(None);
1756        }
1757
1758        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1759        self.rows_drained += to_take;
1760        Ok(Some(next_task))
1761    }
1762
1763    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1764        let stream = futures::stream::unfold(self, |mut slf| async move {
1765            let next_task = slf.next_batch_task().await;
1766            let next_task = next_task.transpose().map(|next_task| {
1767                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1768                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1769                let task = async move {
1770                    let next_task = next_task?;
1771                    // Real decode work happens inside into_batch, which can block the current
1772                    // thread for a long time. By spawning it as a new task, we allow Tokio's
1773                    // worker threads to keep making progress.
1774                    tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1775                        .await
1776                        .map_err(|err| Error::Wrapped {
1777                            error: err.into(),
1778                            location: location!(),
1779                        })?
1780                };
1781                (task, num_rows)
1782            });
1783            next_task.map(|(task, num_rows)| {
1784                // This should be true since batch size is u32
1785                debug_assert!(num_rows <= u32::MAX as u64);
1786                let next_task = ReadBatchTask {
1787                    task: task.boxed(),
1788                    num_rows: num_rows as u32,
1789                };
1790                (next_task, slf)
1791            })
1792        });
1793        stream.boxed()
1794    }
1795}
1796
1797#[derive(Debug)]
1798pub enum RequestedRows {
1799    Ranges(Vec<Range<u64>>),
1800    Indices(Vec<u64>),
1801}
1802
1803impl RequestedRows {
1804    pub fn num_rows(&self) -> u64 {
1805        match self {
1806            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1807            Self::Indices(indices) => indices.len() as u64,
1808        }
1809    }
1810
1811    pub fn trim_empty_ranges(mut self) -> Self {
1812        if let Self::Ranges(ranges) = &mut self {
1813            ranges.retain(|r| !r.is_empty());
1814        }
1815        self
1816    }
1817}
1818
1819/// Configuration for decoder behavior
1820#[derive(Debug, Clone, Default)]
1821pub struct DecoderConfig {
1822    /// Whether to cache repetition indices for better performance
1823    pub cache_repetition_index: bool,
1824    /// Whether to validate decoded data
1825    pub validate_on_decode: bool,
1826}
1827
1828#[derive(Debug, Clone)]
1829pub struct SchedulerDecoderConfig {
1830    pub decoder_plugins: Arc<DecoderPlugins>,
1831    pub batch_size: u32,
1832    pub io: Arc<dyn EncodingsIo>,
1833    pub cache: Arc<LanceCache>,
1834    /// Decoder configuration
1835    pub decoder_config: DecoderConfig,
1836}
1837
1838fn check_scheduler_on_drop(
1839    stream: BoxStream<'static, ReadBatchTask>,
1840    scheduler_handle: tokio::task::JoinHandle<()>,
1841) -> BoxStream<'static, ReadBatchTask> {
1842    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
1843    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
1844    // when the stream finishes.
1845    let mut scheduler_handle = Some(scheduler_handle);
1846    let check_scheduler = stream::unfold((), move |_| {
1847        let handle = scheduler_handle.take();
1848        async move {
1849            if let Some(handle) = handle {
1850                handle.await.unwrap();
1851            }
1852            None
1853        }
1854    });
1855    stream.chain(check_scheduler).boxed()
1856}
1857
1858pub fn create_decode_stream(
1859    schema: &Schema,
1860    num_rows: u64,
1861    batch_size: u32,
1862    is_structural: bool,
1863    should_validate: bool,
1864    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1865) -> BoxStream<'static, ReadBatchTask> {
1866    if is_structural {
1867        let arrow_schema = ArrowSchema::from(schema);
1868        let structural_decoder = StructuralStructDecoder::new(
1869            arrow_schema.fields,
1870            should_validate,
1871            /*is_root=*/ true,
1872        );
1873        StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1874    } else {
1875        let arrow_schema = ArrowSchema::from(schema);
1876        let root_fields = arrow_schema.fields;
1877
1878        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1879        BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1880    }
1881}
1882
1883/// Creates a iterator that decodes a set of messages in a blocking fashion
1884///
1885/// See [`schedule_and_decode_blocking`] for more information.
1886pub fn create_decode_iterator(
1887    schema: &Schema,
1888    num_rows: u64,
1889    batch_size: u32,
1890    should_validate: bool,
1891    is_structural: bool,
1892    messages: VecDeque<Result<DecoderMessage>>,
1893) -> Box<dyn RecordBatchReader + Send + 'static> {
1894    let arrow_schema = Arc::new(ArrowSchema::from(schema));
1895    let root_fields = arrow_schema.fields.clone();
1896    if is_structural {
1897        let simple_struct_decoder =
1898            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true);
1899        Box::new(BatchDecodeIterator::new(
1900            messages,
1901            batch_size,
1902            num_rows,
1903            simple_struct_decoder,
1904            arrow_schema,
1905        ))
1906    } else {
1907        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1908        Box::new(BatchDecodeIterator::new(
1909            messages,
1910            batch_size,
1911            num_rows,
1912            root_decoder,
1913            arrow_schema,
1914        ))
1915    }
1916}
1917
1918fn create_scheduler_decoder(
1919    column_infos: Vec<Arc<ColumnInfo>>,
1920    requested_rows: RequestedRows,
1921    filter: FilterExpression,
1922    column_indices: Vec<u32>,
1923    target_schema: Arc<Schema>,
1924    config: SchedulerDecoderConfig,
1925) -> Result<BoxStream<'static, ReadBatchTask>> {
1926    let num_rows = requested_rows.num_rows();
1927
1928    let is_structural = column_infos[0].is_structural();
1929
1930    let (tx, rx) = mpsc::unbounded_channel();
1931
1932    let decode_stream = create_decode_stream(
1933        &target_schema,
1934        num_rows,
1935        config.batch_size,
1936        is_structural,
1937        config.decoder_config.validate_on_decode,
1938        rx,
1939    );
1940
1941    let scheduler_handle = tokio::task::spawn(async move {
1942        let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1943            target_schema.as_ref(),
1944            &column_indices,
1945            &column_infos,
1946            &vec![],
1947            num_rows,
1948            config.decoder_plugins,
1949            config.io.clone(),
1950            config.cache,
1951            &filter,
1952            &config.decoder_config,
1953        )
1954        .await
1955        {
1956            Ok(scheduler) => scheduler,
1957            Err(e) => {
1958                let _ = tx.send(Err(e));
1959                return;
1960            }
1961        };
1962
1963        match requested_rows {
1964            RequestedRows::Ranges(ranges) => {
1965                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1966            }
1967            RequestedRows::Indices(indices) => {
1968                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1969            }
1970        }
1971    });
1972
1973    Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1974}
1975
1976/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
1977/// decode the scheduled data and returns the decoder as a stream of record batches.
1978///
1979/// This is a convenience function that creates both the scheduler and the decoder
1980/// which can be a little tricky to get right.
1981pub fn schedule_and_decode(
1982    column_infos: Vec<Arc<ColumnInfo>>,
1983    requested_rows: RequestedRows,
1984    filter: FilterExpression,
1985    column_indices: Vec<u32>,
1986    target_schema: Arc<Schema>,
1987    config: SchedulerDecoderConfig,
1988) -> BoxStream<'static, ReadBatchTask> {
1989    if requested_rows.num_rows() == 0 {
1990        return stream::empty().boxed();
1991    }
1992
1993    // If the user requested any ranges that are empty, ignore them.  They are pointless and
1994    // trying to read them has caused bugs in the past.
1995    let requested_rows = requested_rows.trim_empty_ranges();
1996
1997    let io = config.io.clone();
1998
1999    // For convenience we really want this method to be a snchronous method where all
2000    // errors happen on the stream.  There is some async initialization that must happen
2001    // when creating a scheduler.  We wrap that all up in the very first task.
2002    match create_scheduler_decoder(
2003        column_infos,
2004        requested_rows,
2005        filter,
2006        column_indices,
2007        target_schema,
2008        config,
2009    ) {
2010        // Keep the io alive until the stream is dropped or finishes.  Otherwise the
2011        // I/O drops as soon as the scheduling is finished and the I/O loop terminates.
2012        Ok(stream) => stream.finally(move || drop(io)).boxed(),
2013        // If the initialization failed make it look like a failed task
2014        Err(e) => stream::once(std::future::ready(ReadBatchTask {
2015            num_rows: 0,
2016            task: std::future::ready(Err(e)).boxed(),
2017        }))
2018        .boxed(),
2019    }
2020}
2021
2022pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2023    tokio::runtime::Builder::new_current_thread()
2024        .build()
2025        .unwrap()
2026});
2027
2028/// Schedules and decodes the requested data in a blocking fashion
2029///
2030/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
2031/// and decodes it in the current thread.
2032///
2033/// This can be useful when the disk is fast (or the data is in memory) and the amount
2034/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
2035///
2036/// This should NOT be used for full scans.  Even if the data is in memory this function will
2037/// not parallelize the decode and will be slower than the async version.  Full scans typically
2038/// make relatively few IOPs and so the asynchronous overhead is much smaller.
2039///
2040/// This method will first completely run the scheduling process.  Then it will run the
2041/// decode process.
2042pub fn schedule_and_decode_blocking(
2043    column_infos: Vec<Arc<ColumnInfo>>,
2044    requested_rows: RequestedRows,
2045    filter: FilterExpression,
2046    column_indices: Vec<u32>,
2047    target_schema: Arc<Schema>,
2048    config: SchedulerDecoderConfig,
2049) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2050    if requested_rows.num_rows() == 0 {
2051        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2052        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2053    }
2054
2055    let num_rows = requested_rows.num_rows();
2056    let is_structural = column_infos[0].is_structural();
2057
2058    let (tx, mut rx) = mpsc::unbounded_channel();
2059
2060    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
2061    // runtime.
2062    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2063        target_schema.as_ref(),
2064        &column_indices,
2065        &column_infos,
2066        &vec![],
2067        num_rows,
2068        config.decoder_plugins,
2069        config.io.clone(),
2070        config.cache,
2071        &filter,
2072        &config.decoder_config,
2073    ))?;
2074
2075    // Schedule the requested rows
2076    match requested_rows {
2077        RequestedRows::Ranges(ranges) => {
2078            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2079        }
2080        RequestedRows::Indices(indices) => {
2081            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2082        }
2083    }
2084
2085    // Drain the scheduler queue into a vec of decode messages
2086    let mut messages = Vec::new();
2087    while rx
2088        .recv_many(&mut messages, usize::MAX)
2089        .now_or_never()
2090        .unwrap()
2091        != 0
2092    {}
2093
2094    // Create a decoder to decode the messages
2095    let decode_iterator = create_decode_iterator(
2096        &target_schema,
2097        num_rows,
2098        config.batch_size,
2099        config.decoder_config.validate_on_decode,
2100        is_structural,
2101        messages.into(),
2102    );
2103
2104    Ok(decode_iterator)
2105}
2106
2107/// A decoder for single-column encodings of primitive data (this includes fixed size
2108/// lists of primitive data)
2109///
2110/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2111///
2112/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2113/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2114/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2115/// the decode task for batch 0 and the decode task for batch 1.
2116///
2117/// See [`crate::decoder`] for more information
2118pub trait PrimitivePageDecoder: Send + Sync {
2119    /// Decode data into buffers
2120    ///
2121    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2122    /// such as decompressing from some compressed representation.
2123    ///
2124    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2125    /// portion only needs to be updated if the encoding has some concept of an "optional"
2126    /// buffer.
2127    ///
2128    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2129    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2130    ///
2131    /// Binary decodings have two output buffers (one for values, one for offsets)
2132    ///
2133    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2134    /// fixed size strings into variable length strings going from one input buffer to multiple output
2135    /// buffers.
2136    ///
2137    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2138    /// generally end at one of these structures.  However, intermediate structures may exist which
2139    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2140    /// with buffers that have bits-per-value that is not a multiple of 8.
2141    ///
2142    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2143    /// type (order matters) and encodings that aim to decode into arrow types should respect
2144    /// this layout.
2145    /// # Arguments
2146    ///
2147    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2148    /// * `num_rows` - how many rows to decode
2149    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2150    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2151}
2152
2153/// A scheduler for single-column encodings of primitive data
2154///
2155/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2156///
2157/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2158/// be shared in follow-up I/O tasks.
2159///
2160/// See [`crate::decoder`] for more information
2161pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2162    /// Schedules a batch of I/O to load the data needed for the requested ranges
2163    ///
2164    /// Returns a future that will yield a decoder once the data has been loaded
2165    ///
2166    /// # Arguments
2167    ///
2168    /// * `range` - the range of row offsets (relative to start of page) requested
2169    ///   these must be ordered and must not overlap
2170    /// * `scheduler` - a scheduler to submit the I/O request to
2171    /// * `top_level_row` - the row offset of the top level field currently being
2172    ///   scheduled.  This can be used to assign priority to I/O requests
2173    fn schedule_ranges(
2174        &self,
2175        ranges: &[Range<u64>],
2176        scheduler: &Arc<dyn EncodingsIo>,
2177        top_level_row: u64,
2178    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2179}
2180
2181/// A trait to control the priority of I/O
2182pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2183    fn advance(&mut self, num_rows: u64);
2184    fn current_priority(&self) -> u64;
2185    fn box_clone(&self) -> Box<dyn PriorityRange>;
2186}
2187
2188/// A simple priority scheme for top-level fields with no parent
2189/// repetition
2190#[derive(Debug)]
2191pub struct SimplePriorityRange {
2192    priority: u64,
2193}
2194
2195impl SimplePriorityRange {
2196    fn new(priority: u64) -> Self {
2197        Self { priority }
2198    }
2199}
2200
2201impl PriorityRange for SimplePriorityRange {
2202    fn advance(&mut self, num_rows: u64) {
2203        self.priority += num_rows;
2204    }
2205
2206    fn current_priority(&self) -> u64 {
2207        self.priority
2208    }
2209
2210    fn box_clone(&self) -> Box<dyn PriorityRange> {
2211        Box::new(Self {
2212            priority: self.priority,
2213        })
2214    }
2215}
2216
2217/// Determining the priority of a list request is tricky.  We want
2218/// the priority to be the top-level row.  So if we have a
2219/// list<list<int>> and each outer list has 10 rows and each inner
2220/// list has 5 rows then the priority of the 100th item is 1 because
2221/// it is the 5th item in the 10th item of the *second* row.
2222///
2223/// This structure allows us to keep track of this complicated priority
2224/// relationship.
2225///
2226/// There's a fair amount of bookkeeping involved here.
2227///
2228/// A better approach (using repetition levels) is coming in the future.
2229pub struct ListPriorityRange {
2230    base: Box<dyn PriorityRange>,
2231    offsets: Arc<[u64]>,
2232    cur_index_into_offsets: usize,
2233    cur_position: u64,
2234}
2235
2236impl ListPriorityRange {
2237    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2238        Self {
2239            base,
2240            offsets,
2241            cur_index_into_offsets: 0,
2242            cur_position: 0,
2243        }
2244    }
2245}
2246
2247impl std::fmt::Debug for ListPriorityRange {
2248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2249        f.debug_struct("ListPriorityRange")
2250            .field("base", &self.base)
2251            .field("offsets.len()", &self.offsets.len())
2252            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2253            .field("cur_position", &self.cur_position)
2254            .finish()
2255    }
2256}
2257
2258impl PriorityRange for ListPriorityRange {
2259    fn advance(&mut self, num_rows: u64) {
2260        // We've scheduled X items.  Now walk through the offsets to
2261        // determine how many rows we've scheduled.
2262        self.cur_position += num_rows;
2263        let mut idx_into_offsets = self.cur_index_into_offsets;
2264        while idx_into_offsets + 1 < self.offsets.len()
2265            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2266        {
2267            idx_into_offsets += 1;
2268        }
2269        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2270        self.cur_index_into_offsets = idx_into_offsets;
2271        self.base.advance(base_rows_advanced as u64);
2272    }
2273
2274    fn current_priority(&self) -> u64 {
2275        self.base.current_priority()
2276    }
2277
2278    fn box_clone(&self) -> Box<dyn PriorityRange> {
2279        Box::new(Self {
2280            base: self.base.box_clone(),
2281            offsets: self.offsets.clone(),
2282            cur_index_into_offsets: self.cur_index_into_offsets,
2283            cur_position: self.cur_position,
2284        })
2285    }
2286}
2287
2288/// Contains the context for a scheduler
2289pub struct SchedulerContext {
2290    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2291    io: Arc<dyn EncodingsIo>,
2292    cache: Arc<LanceCache>,
2293    name: String,
2294    path: Vec<u32>,
2295    path_names: Vec<String>,
2296}
2297
2298pub struct ScopedSchedulerContext<'a> {
2299    pub context: &'a mut SchedulerContext,
2300}
2301
2302impl<'a> ScopedSchedulerContext<'a> {
2303    pub fn pop(self) -> &'a mut SchedulerContext {
2304        self.context.pop();
2305        self.context
2306    }
2307}
2308
2309impl SchedulerContext {
2310    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2311        Self {
2312            io,
2313            cache,
2314            recv: None,
2315            name: "".to_string(),
2316            path: Vec::new(),
2317            path_names: Vec::new(),
2318        }
2319    }
2320
2321    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2322        &self.io
2323    }
2324
2325    pub fn cache(&self) -> &Arc<LanceCache> {
2326        &self.cache
2327    }
2328
2329    pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2330        self.path.push(index);
2331        self.path_names.push(name.to_string());
2332        ScopedSchedulerContext { context: self }
2333    }
2334
2335    pub fn pop(&mut self) {
2336        self.path.pop();
2337        self.path_names.pop();
2338    }
2339
2340    pub fn path_name(&self) -> String {
2341        let path = self.path_names.join("/");
2342        if self.recv.is_some() {
2343            format!("TEMP({}){}", self.name, path)
2344        } else {
2345            format!("ROOT{}", path)
2346        }
2347    }
2348
2349    pub fn current_path(&self) -> VecDeque<u32> {
2350        VecDeque::from_iter(self.path.iter().copied())
2351    }
2352
2353    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2354    pub fn locate_decoder(
2355        &mut self,
2356        decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2357    ) -> crate::previous::decoder::DecoderReady {
2358        trace!(
2359            "Scheduling decoder of type {:?} for {:?}",
2360            decoder.data_type(),
2361            self.path,
2362        );
2363        crate::previous::decoder::DecoderReady {
2364            decoder,
2365            path: self.current_path(),
2366        }
2367    }
2368}
2369
2370pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2371
2372impl std::fmt::Debug for UnloadedPageShard {
2373    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2374        f.debug_struct("UnloadedPage").finish()
2375    }
2376}
2377
2378#[derive(Debug)]
2379pub struct ScheduledScanLine {
2380    pub rows_scheduled: u64,
2381    pub decoders: Vec<MessageType>,
2382}
2383
2384pub trait StructuralSchedulingJob: std::fmt::Debug {
2385    /// Schedule the next batch of data
2386    ///
2387    /// Normally this equates to scheduling the next page of data into one task.  Very large pages
2388    /// might be split into multiple scan lines.  Each scan line has one or more rows.
2389    ///
2390    /// If a scheduler ends early it may return an empty vector.
2391    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2392}
2393
2394/// A filter expression to apply to the data
2395///
2396/// The core decoders do not currently take advantage of filtering in
2397/// any way.  In order to maintain the abstraction we represent filters
2398/// as an arbitrary byte sequence.
2399///
2400/// We recommend that encodings use Substrait for filters.
2401pub struct FilterExpression(pub Bytes);
2402
2403impl FilterExpression {
2404    /// Create a filter expression that does not filter any data
2405    ///
2406    /// This is currently represented by an empty byte array.  Encoders
2407    /// that are "filter aware" should make sure they handle this case.
2408    pub fn no_filter() -> Self {
2409        Self(Bytes::new())
2410    }
2411
2412    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2413    pub fn is_noop(&self) -> bool {
2414        self.0.is_empty()
2415    }
2416}
2417
2418pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2419    fn initialize<'a>(
2420        &'a mut self,
2421        filter: &'a FilterExpression,
2422        context: &'a SchedulerContext,
2423    ) -> BoxFuture<'a, Result<()>>;
2424    fn schedule_ranges<'a>(
2425        &'a self,
2426        ranges: &[Range<u64>],
2427        filter: &FilterExpression,
2428    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2429}
2430
2431/// A trait for tasks that decode data into an Arrow array
2432pub trait DecodeArrayTask: Send {
2433    /// Decodes the data into an Arrow array
2434    fn decode(self: Box<Self>) -> Result<ArrayRef>;
2435}
2436
2437impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2438    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2439        StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2440    }
2441}
2442
2443/// A task to decode data into an Arrow record batch
2444///
2445/// It has a child `task` which decodes a struct array with no nulls.
2446/// This is then converted into a record batch.
2447pub struct NextDecodeTask {
2448    /// The decode task itself
2449    pub task: Box<dyn DecodeArrayTask>,
2450    /// The number of rows that will be created
2451    pub num_rows: u64,
2452}
2453
2454impl NextDecodeTask {
2455    // Run the task and produce a record batch
2456    //
2457    // If the batch is very large this function will log a warning message
2458    // suggesting the user try a smaller batch size.
2459    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2460    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2461        let struct_arr = self.task.decode();
2462        match struct_arr {
2463            Ok(struct_arr) => {
2464                let batch = RecordBatch::from(struct_arr.as_struct());
2465                let size_bytes = batch.get_array_memory_size() as u64;
2466                if size_bytes > BATCH_SIZE_BYTES_WARNING {
2467                    emitted_batch_size_warning.call_once(|| {
2468                        let size_mb = size_bytes / 1024 / 1024;
2469                        debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2470                    });
2471                }
2472                Ok(batch)
2473            }
2474            Err(e) => {
2475                let e = Error::Internal {
2476                    message: format!("Error decoding batch: {}", e),
2477                    location: location!(),
2478                };
2479                Err(e)
2480            }
2481        }
2482    }
2483}
2484
2485// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2486// share some code paths between the two.  Decoders can safely unwrap into whatever
2487// style they expect since a file will be either all-2.0 or all-2.1
2488#[derive(Debug)]
2489pub enum MessageType {
2490    // The older v2.0 scheduler/decoder used a scheme where the message was the
2491    // decoder itself.  The messages were not sent in priority order and the decoder
2492    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2493    // complexity.
2494    DecoderReady(crate::previous::decoder::DecoderReady),
2495    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2496    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2497    // the decoder does not have to worry about waiting for I/O.
2498    UnloadedPage(UnloadedPageShard),
2499}
2500
2501impl MessageType {
2502    pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2503        match self {
2504            Self::DecoderReady(decoder) => decoder,
2505            Self::UnloadedPage(_) => {
2506                panic!("Expected DecoderReady but got UnloadedPage")
2507            }
2508        }
2509    }
2510
2511    pub fn into_structural(self) -> UnloadedPageShard {
2512        match self {
2513            Self::UnloadedPage(unloaded) => unloaded,
2514            Self::DecoderReady(_) => {
2515                panic!("Expected UnloadedPage but got DecoderReady")
2516            }
2517        }
2518    }
2519}
2520
2521pub struct DecoderMessage {
2522    pub scheduled_so_far: u64,
2523    pub decoders: Vec<MessageType>,
2524}
2525
2526pub struct DecoderContext {
2527    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2528}
2529
2530impl DecoderContext {
2531    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2532        Self { source }
2533    }
2534}
2535
2536pub struct DecodedPage {
2537    pub data: DataBlock,
2538    pub repdef: RepDefUnraveler,
2539}
2540
2541pub trait DecodePageTask: Send + std::fmt::Debug {
2542    /// Decodes the data into an Arrow array
2543    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2544}
2545
2546pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2547    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2548    fn num_rows(&self) -> u64;
2549}
2550
2551#[derive(Debug)]
2552pub struct LoadedPageShard {
2553    // The decoder that is ready to be decoded
2554    pub decoder: Box<dyn StructuralPageDecoder>,
2555    // The path to the decoder, the first value is the column index
2556    // following values, if present, are nested child indices
2557    //
2558    // For example, a path of [1, 1, 0] would mean to grab the second
2559    // column, then the second child, and then the first child.
2560    //
2561    // It could represent x in the following schema:
2562    //
2563    // score: float64
2564    // points: struct
2565    //   color: string
2566    //   location: struct
2567    //     x: float64
2568    //
2569    // Currently, only struct decoders have "children" although other
2570    // decoders may at some point as well.  List children are only
2571    // handled through indirect I/O at the moment and so they don't
2572    // need to be represented (yet)
2573    pub path: VecDeque<u32>,
2574}
2575
2576pub struct DecodedArray {
2577    pub array: ArrayRef,
2578    pub repdef: CompositeRepDefUnraveler,
2579}
2580
2581pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2582    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2583}
2584
2585pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2586    /// Add a newly scheduled child decoder
2587    ///
2588    /// The default implementation does not expect children and returns
2589    /// an error.
2590    fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2591    /// Creates a task to decode `num_rows` of data into an array
2592    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2593    /// The data type of the decoded data
2594    fn data_type(&self) -> &DataType;
2595}
2596
2597#[derive(Debug, Default)]
2598pub struct DecoderPlugins {}
2599
2600/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2601pub async fn decode_batch(
2602    batch: &EncodedBatch,
2603    filter: &FilterExpression,
2604    decoder_plugins: Arc<DecoderPlugins>,
2605    should_validate: bool,
2606    version: LanceFileVersion,
2607    cache: Option<Arc<LanceCache>>,
2608) -> Result<RecordBatch> {
2609    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2610    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2611    // polled twice.
2612
2613    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2614    let cache = if let Some(cache) = cache {
2615        cache
2616    } else {
2617        Arc::new(lance_core::cache::LanceCache::with_capacity(
2618            128 * 1024 * 1024,
2619        ))
2620    };
2621    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2622        batch.schema.as_ref(),
2623        &batch.top_level_columns,
2624        &batch.page_table,
2625        &vec![],
2626        batch.num_rows,
2627        decoder_plugins,
2628        io_scheduler.clone(),
2629        cache,
2630        filter,
2631        &DecoderConfig::default(),
2632    )
2633    .await?;
2634    let (tx, rx) = unbounded_channel();
2635    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2636    let is_structural = version >= LanceFileVersion::V2_1;
2637    let mut decode_stream = create_decode_stream(
2638        &batch.schema,
2639        batch.num_rows,
2640        batch.num_rows as u32,
2641        is_structural,
2642        should_validate,
2643        rx,
2644    );
2645    decode_stream.next().await.unwrap().task.await
2646}
2647
2648#[cfg(test)]
2649// test coalesce indices to ranges
2650mod tests {
2651    use super::*;
2652
2653    #[test]
2654    fn test_coalesce_indices_to_ranges_with_single_index() {
2655        let indices = vec![1];
2656        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2657        assert_eq!(ranges, vec![1..2]);
2658    }
2659
2660    #[test]
2661    fn test_coalesce_indices_to_ranges() {
2662        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2663        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2664        assert_eq!(ranges, vec![1..10]);
2665    }
2666
2667    #[test]
2668    fn test_coalesce_indices_to_ranges_with_gaps() {
2669        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2670        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2671        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2672    }
2673}