lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into [`self::FieldScheduler`] and [`self::PageScheduler`].
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into [`self::PhysicalPageDecoder`] and
27//! [`self::LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type List<UInt32>
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
238use crate::data::DataBlock;
239use crate::encoder::EncodedBatch;
240use crate::encodings::logical::list::StructuralListScheduler;
241use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
242use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
243use crate::format::pb::{self, column_encoding};
244use crate::format::pb21;
245use crate::previous::decoder::LogicalPageDecoder;
246use crate::previous::encodings::logical::list::OffsetPageInfo;
247use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
248use crate::previous::encodings::logical::{
249    binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
250    primitive::PrimitiveFieldScheduler,
251};
252use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
253use crate::version::LanceFileVersion;
254use crate::{BufferScheduler, EncodingsIo};
255
256// If users are getting batches over 10MiB large then it's time to reduce the batch size
257const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
258
259/// Top-level encoding message for a page.  Wraps both the
260/// legacy pb::ArrayEncoding and the newer pb::PageLayout
261///
262/// A file should only use one or the other and never both.
263/// 2.0 decoders can always assume this is pb::ArrayEncoding
264/// and 2.1+ decoders can always assume this is pb::PageLayout
265#[derive(Debug)]
266pub enum PageEncoding {
267    Legacy(pb::ArrayEncoding),
268    Structural(pb21::PageLayout),
269}
270
271impl PageEncoding {
272    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
273        match self {
274            Self::Legacy(enc) => enc,
275            Self::Structural(_) => panic!("Expected a legacy encoding"),
276        }
277    }
278
279    pub fn as_structural(&self) -> &pb21::PageLayout {
280        match self {
281            Self::Structural(enc) => enc,
282            Self::Legacy(_) => panic!("Expected a structural encoding"),
283        }
284    }
285
286    pub fn is_structural(&self) -> bool {
287        matches!(self, Self::Structural(_))
288    }
289}
290
291/// Metadata describing a page in a file
292///
293/// This is typically created by reading the metadata section of a Lance file
294#[derive(Debug)]
295pub struct PageInfo {
296    /// The number of rows in the page
297    pub num_rows: u64,
298    /// The priority (top level row number) of the page
299    ///
300    /// This is only set in 2.1 files and will be 0 for 2.0 files
301    pub priority: u64,
302    /// The encoding that explains the buffers in the page
303    pub encoding: PageEncoding,
304    /// The offsets and sizes of the buffers in the file
305    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
306}
307
308/// Metadata describing a column in a file
309///
310/// This is typically created by reading the metadata section of a Lance file
311#[derive(Debug, Clone)]
312pub struct ColumnInfo {
313    /// The index of the column in the file
314    pub index: u32,
315    /// The metadata for each page in the column
316    pub page_infos: Arc<[PageInfo]>,
317    /// File positions and their sizes of the column-level buffers
318    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
319    pub encoding: pb::ColumnEncoding,
320}
321
322impl ColumnInfo {
323    /// Create a new instance
324    pub fn new(
325        index: u32,
326        page_infos: Arc<[PageInfo]>,
327        buffer_offsets_and_sizes: Vec<(u64, u64)>,
328        encoding: pb::ColumnEncoding,
329    ) -> Self {
330        Self {
331            index,
332            page_infos,
333            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
334            encoding,
335        }
336    }
337
338    pub fn is_structural(&self) -> bool {
339        self.page_infos
340            // Can just look at the first since all should be the same
341            .first()
342            .map(|page| page.encoding.is_structural())
343            .unwrap_or(false)
344    }
345}
346
347enum RootScheduler {
348    Structural(Box<dyn StructuralFieldScheduler>),
349    Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
350}
351
352impl RootScheduler {
353    fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
354        match self {
355            Self::Structural(_) => panic!("Expected a legacy scheduler"),
356            Self::Legacy(s) => s,
357        }
358    }
359
360    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
361        match self {
362            Self::Structural(s) => s.as_ref(),
363            Self::Legacy(_) => panic!("Expected a structural scheduler"),
364        }
365    }
366}
367
368/// The scheduler for decoding batches
369///
370/// Lance decoding is done in two steps, scheduling, and decoding.  The
371/// scheduling tends to be lightweight and should quickly figure what data
372/// is needed from the disk issue the appropriate I/O requests.  A decode task is
373/// created to eventually decode the data (once it is loaded) and scheduling
374/// moves on to scheduling the next page.
375///
376/// Meanwhile, it's expected that a decode stream will be setup to run at the
377/// same time.  Decode tasks take the data that is loaded and turn it into
378/// Arrow arrays.
379///
380/// This approach allows us to keep our I/O parallelism and CPU parallelism
381/// completely separate since those are often two very different values.
382///
383/// Backpressure should be achieved via the I/O service.  Requests that are
384/// issued will pile up if the decode stream is not polling quickly enough.
385/// The [`crate::EncodingsIo::submit_request`] function should return a pending
386/// future once there are too many I/O requests in flight.
387///
388/// TODO: Implement backpressure
389pub struct DecodeBatchScheduler {
390    root_scheduler: RootScheduler,
391    pub root_fields: Fields,
392    cache: Arc<LanceCache>,
393}
394
395pub struct ColumnInfoIter<'a> {
396    column_infos: Vec<Arc<ColumnInfo>>,
397    column_indices: &'a [u32],
398    column_info_pos: usize,
399    column_indices_pos: usize,
400}
401
402impl<'a> ColumnInfoIter<'a> {
403    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
404        let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
405        Self {
406            column_infos,
407            column_indices,
408            column_info_pos: initial_pos,
409            column_indices_pos: 0,
410        }
411    }
412
413    pub fn peek(&self) -> &Arc<ColumnInfo> {
414        &self.column_infos[self.column_info_pos]
415    }
416
417    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
418        let column_info = self.column_infos[self.column_info_pos].clone();
419        let transformed = transform(column_info);
420        self.column_infos[self.column_info_pos] = transformed;
421    }
422
423    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
424        self.next().ok_or_else(|| {
425            Error::invalid_input(
426                "there were more fields in the schema than provided column indices / infos",
427                location!(),
428            )
429        })
430    }
431
432    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
433        if self.column_info_pos < self.column_infos.len() {
434            let info = &self.column_infos[self.column_info_pos];
435            self.column_info_pos += 1;
436            Some(info)
437        } else {
438            None
439        }
440    }
441
442    pub(crate) fn next_top_level(&mut self) {
443        self.column_indices_pos += 1;
444        if self.column_indices_pos < self.column_indices.len() {
445            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
446        } else {
447            self.column_info_pos = self.column_infos.len();
448        }
449    }
450}
451
452/// These contain the file buffers shared across the entire file
453#[derive(Clone, Copy, Debug)]
454pub struct FileBuffers<'a> {
455    pub positions_and_sizes: &'a [(u64, u64)],
456}
457
458/// These contain the file buffers and also buffers specific to a column
459#[derive(Clone, Copy, Debug)]
460pub struct ColumnBuffers<'a, 'b> {
461    pub file_buffers: FileBuffers<'a>,
462    pub positions_and_sizes: &'b [(u64, u64)],
463}
464
465/// These contain the file & column buffers and also buffers specific to a page
466#[derive(Clone, Copy, Debug)]
467pub struct PageBuffers<'a, 'b, 'c> {
468    pub column_buffers: ColumnBuffers<'a, 'b>,
469    pub positions_and_sizes: &'c [(u64, u64)],
470}
471
472/// The core decoder strategy handles all the various Arrow types
473#[derive(Debug)]
474pub struct CoreFieldDecoderStrategy {
475    pub validate_data: bool,
476    pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
477    pub cache_repetition_index: bool,
478}
479
480impl Default for CoreFieldDecoderStrategy {
481    fn default() -> Self {
482        Self {
483            validate_data: false,
484            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
485            cache_repetition_index: false,
486        }
487    }
488}
489
490impl CoreFieldDecoderStrategy {
491    /// Create a new strategy with cache_repetition_index enabled
492    pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
493        self.cache_repetition_index = cache_repetition_index;
494        self
495    }
496
497    /// Create a new strategy from decoder config
498    pub fn from_decoder_config(config: &DecoderConfig) -> Self {
499        Self {
500            validate_data: config.validate_on_decode,
501            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
502            cache_repetition_index: config.cache_repetition_index,
503        }
504    }
505
506    /// This is just a sanity check to ensure there is no "wrapped encodings"
507    /// that haven't been handled.
508    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
509        let column_encoding = column_info
510            .encoding
511            .column_encoding
512            .as_ref()
513            .ok_or_else(|| {
514                Error::invalid_input(
515                    format!(
516                        "the column at index {} was missing a ColumnEncoding",
517                        column_info.index
518                    ),
519                    location!(),
520                )
521            })?;
522        if matches!(
523            column_encoding,
524            pb::column_encoding::ColumnEncoding::Values(_)
525        ) {
526            Ok(())
527        } else {
528            Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
529        }
530    }
531
532    fn is_primitive(data_type: &DataType) -> bool {
533        if data_type.is_primitive() {
534            true
535        } else {
536            match data_type {
537                // DataType::is_primitive doesn't consider these primitive but we do
538                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
539                DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
540                _ => false,
541            }
542        }
543    }
544
545    fn create_primitive_scheduler(
546        &self,
547        field: &Field,
548        column: &ColumnInfo,
549        buffers: FileBuffers,
550    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
551        Self::ensure_values_encoded(column, &field.name)?;
552        // Primitive fields map to a single column
553        let column_buffers = ColumnBuffers {
554            file_buffers: buffers,
555            positions_and_sizes: &column.buffer_offsets_and_sizes,
556        };
557        Ok(Box::new(PrimitiveFieldScheduler::new(
558            column.index,
559            field.data_type(),
560            column.page_infos.clone(),
561            column_buffers,
562            self.validate_data,
563        )))
564    }
565
566    /// Helper method to verify the page encoding of a struct header column
567    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
568        Self::ensure_values_encoded(column_info, field_name)?;
569        if column_info.page_infos.len() != 1 {
570            return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
571        }
572        let encoding = &column_info.page_infos[0].encoding;
573        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
574            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
575            _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
576        }
577    }
578
579    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
580        let encoding = &column_info.page_infos[0].encoding;
581        matches!(
582            encoding.as_legacy().array_encoding.as_ref().unwrap(),
583            pb::array_encoding::ArrayEncoding::PackedStruct(_)
584        )
585    }
586
587    fn create_list_scheduler(
588        &self,
589        list_field: &Field,
590        column_infos: &mut ColumnInfoIter,
591        buffers: FileBuffers,
592        offsets_column: &ColumnInfo,
593    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
594        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
595        let offsets_column_buffers = ColumnBuffers {
596            file_buffers: buffers,
597            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
598        };
599        let items_scheduler =
600            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
601
602        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
603            .page_infos
604            .iter()
605            .filter(|offsets_page| offsets_page.num_rows > 0)
606            .map(|offsets_page| {
607                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
608                    &offsets_page.encoding.as_legacy().array_encoding
609                {
610                    let inner = PageInfo {
611                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
612                        encoding: PageEncoding::Legacy(
613                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
614                        ),
615                        num_rows: offsets_page.num_rows,
616                        priority: 0,
617                    };
618                    (
619                        inner,
620                        OffsetPageInfo {
621                            offsets_in_page: offsets_page.num_rows,
622                            null_offset_adjustment: list_encoding.null_offset_adjustment,
623                            num_items_referenced_by_page: list_encoding.num_items,
624                        },
625                    )
626                } else {
627                    // TODO: Should probably return Err here
628                    panic!("Expected a list column");
629                }
630            })
631            .unzip();
632        let inner = Arc::new(PrimitiveFieldScheduler::new(
633            offsets_column.index,
634            DataType::UInt64,
635            Arc::from(inner_infos.into_boxed_slice()),
636            offsets_column_buffers,
637            self.validate_data,
638        )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
639        let items_field = match list_field.data_type() {
640            DataType::List(inner) => inner,
641            DataType::LargeList(inner) => inner,
642            _ => unreachable!(),
643        };
644        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
645            DataType::Int32
646        } else {
647            DataType::Int64
648        };
649        Ok(Box::new(ListFieldScheduler::new(
650            inner,
651            items_scheduler.into(),
652            items_field,
653            offset_type,
654            null_offset_adjustments,
655        )))
656    }
657
658    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
659        if let column_encoding::ColumnEncoding::Blob(blob) =
660            column_info.encoding.column_encoding.as_ref().unwrap()
661        {
662            let mut column_info = column_info.clone();
663            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
664            Some(column_info)
665        } else {
666            None
667        }
668    }
669
670    fn create_structural_field_scheduler(
671        &self,
672        field: &Field,
673        column_infos: &mut ColumnInfoIter,
674    ) -> Result<Box<dyn StructuralFieldScheduler>> {
675        let data_type = field.data_type();
676        if Self::is_primitive(&data_type) {
677            let column_info = column_infos.expect_next()?;
678            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
679                column_info.as_ref(),
680                self.decompressor_strategy.as_ref(),
681                self.cache_repetition_index,
682            )?);
683
684            // advance to the next top level column
685            column_infos.next_top_level();
686
687            return Ok(scheduler);
688        }
689        match &data_type {
690            DataType::Struct(fields) => {
691                if field.is_packed_struct() {
692                    let column_info = column_infos.expect_next()?;
693                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
694                        column_info.as_ref(),
695                        self.decompressor_strategy.as_ref(),
696                        self.cache_repetition_index,
697                    )?);
698
699                    // advance to the next top level column
700                    column_infos.next_top_level();
701
702                    return Ok(scheduler);
703                }
704                let mut child_schedulers = Vec::with_capacity(field.children.len());
705                for field in field.children.iter() {
706                    let field_scheduler =
707                        self.create_structural_field_scheduler(field, column_infos)?;
708                    child_schedulers.push(field_scheduler);
709                }
710
711                let fields = fields.clone();
712                Ok(
713                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
714                        as Box<dyn StructuralFieldScheduler>,
715                )
716            }
717            DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => {
718                let column_info = column_infos.expect_next()?;
719                let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
720                    column_info.as_ref(),
721                    self.decompressor_strategy.as_ref(),
722                    self.cache_repetition_index,
723                )?);
724                column_infos.next_top_level();
725                Ok(scheduler)
726            }
727            DataType::List(_) | DataType::LargeList(_) => {
728                let child = field
729                    .children
730                    .first()
731                    .expect("List field must have a child");
732                let child_scheduler =
733                    self.create_structural_field_scheduler(child, column_infos)?;
734                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
735                    as Box<dyn StructuralFieldScheduler>)
736            }
737            _ => todo!(),
738        }
739    }
740
741    fn create_legacy_field_scheduler(
742        &self,
743        field: &Field,
744        column_infos: &mut ColumnInfoIter,
745        buffers: FileBuffers,
746    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
747        let data_type = field.data_type();
748        if Self::is_primitive(&data_type) {
749            let column_info = column_infos.expect_next()?;
750            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
751            return Ok(scheduler);
752        } else if data_type.is_binary_like() {
753            let column_info = column_infos.next().unwrap().clone();
754            // Column is blob and user is asking for binary data
755            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
756                let desc_scheduler =
757                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
758                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
759                return Ok(blob_scheduler);
760            }
761            if let Some(page_info) = column_info.page_infos.first() {
762                if matches!(
763                    page_info.encoding.as_legacy(),
764                    pb::ArrayEncoding {
765                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
766                    }
767                ) {
768                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
769                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
770                    } else {
771                        DataType::LargeList(Arc::new(ArrowField::new(
772                            "item",
773                            DataType::UInt8,
774                            false,
775                        )))
776                    };
777                    let list_field = Field::try_from(ArrowField::new(
778                        field.name.clone(),
779                        list_type,
780                        field.nullable,
781                    ))
782                    .unwrap();
783                    let list_scheduler = self.create_list_scheduler(
784                        &list_field,
785                        column_infos,
786                        buffers,
787                        &column_info,
788                    )?;
789                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
790                        list_scheduler.into(),
791                        field.data_type(),
792                    ));
793                    return Ok(binary_scheduler);
794                } else {
795                    let scheduler =
796                        self.create_primitive_scheduler(field, &column_info, buffers)?;
797                    return Ok(scheduler);
798                }
799            } else {
800                return self.create_primitive_scheduler(field, &column_info, buffers);
801            }
802        }
803        match &data_type {
804            DataType::FixedSizeList(inner, _dimension) => {
805                // A fixed size list column could either be a physical or a logical decoder
806                // depending on the child data type.
807                if Self::is_primitive(inner.data_type()) {
808                    let primitive_col = column_infos.expect_next()?;
809                    let scheduler =
810                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
811                    Ok(scheduler)
812                } else {
813                    todo!()
814                }
815            }
816            DataType::Dictionary(_key_type, value_type) => {
817                if Self::is_primitive(value_type) || value_type.is_binary_like() {
818                    let primitive_col = column_infos.expect_next()?;
819                    let scheduler =
820                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
821                    Ok(scheduler)
822                } else {
823                    Err(Error::NotSupported {
824                        source: format!(
825                            "No way to decode into a dictionary field of type {}",
826                            value_type
827                        )
828                        .into(),
829                        location: location!(),
830                    })
831                }
832            }
833            DataType::List(_) | DataType::LargeList(_) => {
834                let offsets_column = column_infos.expect_next()?.clone();
835                column_infos.next_top_level();
836                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
837            }
838            DataType::Struct(fields) => {
839                let column_info = column_infos.expect_next()?;
840
841                // Column is blob and user is asking for descriptions
842                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
843                    // Can use primitive scheduler here since descriptions are always packed struct
844                    return self.create_primitive_scheduler(field, &blob_col, buffers);
845                }
846
847                if Self::check_packed_struct(column_info) {
848                    // use packed struct encoding
849                    self.create_primitive_scheduler(field, column_info, buffers)
850                } else {
851                    // use default struct encoding
852                    Self::check_simple_struct(column_info, &field.name).unwrap();
853                    let num_rows = column_info
854                        .page_infos
855                        .iter()
856                        .map(|page| page.num_rows)
857                        .sum();
858                    let mut child_schedulers = Vec::with_capacity(field.children.len());
859                    for field in &field.children {
860                        column_infos.next_top_level();
861                        let field_scheduler =
862                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
863                        child_schedulers.push(Arc::from(field_scheduler));
864                    }
865
866                    let fields = fields.clone();
867                    Ok(Box::new(SimpleStructScheduler::new(
868                        child_schedulers,
869                        fields,
870                        num_rows,
871                    )))
872                }
873            }
874            // TODO: Still need support for RLE
875            _ => todo!(),
876        }
877    }
878}
879
880/// Create's a dummy ColumnInfo for the root column
881fn root_column(num_rows: u64) -> ColumnInfo {
882    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
883    let final_page_num_rows = num_rows % (u32::MAX as u64);
884    let root_pages = (0..num_root_pages)
885        .map(|i| PageInfo {
886            num_rows: if i == num_root_pages - 1 {
887                final_page_num_rows
888            } else {
889                u64::MAX
890            },
891            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
892                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
893                    pb::SimpleStruct {},
894                )),
895            }),
896            priority: 0, // not used in legacy scheduler
897            buffer_offsets_and_sizes: Arc::new([]),
898        })
899        .collect::<Vec<_>>();
900    ColumnInfo {
901        buffer_offsets_and_sizes: Arc::new([]),
902        encoding: pb::ColumnEncoding {
903            column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
904        },
905        index: u32::MAX,
906        page_infos: Arc::from(root_pages),
907    }
908}
909
910pub enum RootDecoder {
911    Structural(StructuralStructDecoder),
912    Legacy(SimpleStructDecoder),
913}
914
915impl RootDecoder {
916    pub fn into_structural(self) -> StructuralStructDecoder {
917        match self {
918            Self::Structural(decoder) => decoder,
919            Self::Legacy(_) => panic!("Expected a structural decoder"),
920        }
921    }
922
923    pub fn into_legacy(self) -> SimpleStructDecoder {
924        match self {
925            Self::Legacy(decoder) => decoder,
926            Self::Structural(_) => panic!("Expected a legacy decoder"),
927        }
928    }
929}
930
931impl DecodeBatchScheduler {
932    /// Creates a new decode scheduler with the expected schema and the column
933    /// metadata of the file.
934    #[allow(clippy::too_many_arguments)]
935    pub async fn try_new<'a>(
936        schema: &'a Schema,
937        column_indices: &[u32],
938        column_infos: &[Arc<ColumnInfo>],
939        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
940        num_rows: u64,
941        _decoder_plugins: Arc<DecoderPlugins>,
942        io: Arc<dyn EncodingsIo>,
943        cache: Arc<LanceCache>,
944        filter: &FilterExpression,
945        decoder_config: &DecoderConfig,
946    ) -> Result<Self> {
947        assert!(num_rows > 0);
948        let buffers = FileBuffers {
949            positions_and_sizes: file_buffer_positions_and_sizes,
950        };
951        let arrow_schema = ArrowSchema::from(schema);
952        let root_fields = arrow_schema.fields().clone();
953        let root_type = DataType::Struct(root_fields.clone());
954        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
955        // root_field.children and schema.fields should be identical at this point but the latter
956        // has field ids and the former does not.  This line restores that.
957        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
958        root_field.children.clone_from(&schema.fields);
959        root_field
960            .metadata
961            .insert("__lance_decoder_root".to_string(), "true".to_string());
962
963        if column_infos.is_empty() || column_infos[0].is_structural() {
964            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
965
966            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
967            let mut root_scheduler =
968                strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
969
970            let context = SchedulerContext::new(io, cache.clone());
971            root_scheduler.initialize(filter, &context).await?;
972
973            Ok(Self {
974                root_scheduler: RootScheduler::Structural(root_scheduler),
975                root_fields,
976                cache,
977            })
978        } else {
979            // The old encoding style expected a header column for structs and so we
980            // need a header column for the top-level struct
981            let mut columns = Vec::with_capacity(column_infos.len() + 1);
982            columns.push(Arc::new(root_column(num_rows)));
983            columns.extend(column_infos.iter().cloned());
984
985            let adjusted_column_indices = [0_u32]
986                .into_iter()
987                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
988                .collect::<Vec<_>>();
989            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
990            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
991            let root_scheduler =
992                strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
993
994            let context = SchedulerContext::new(io, cache.clone());
995            root_scheduler.initialize(filter, &context).await?;
996
997            Ok(Self {
998                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
999                root_fields,
1000                cache,
1001            })
1002        }
1003    }
1004
1005    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1006    pub fn from_scheduler(
1007        root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1008        root_fields: Fields,
1009        cache: Arc<LanceCache>,
1010    ) -> Self {
1011        Self {
1012            root_scheduler: RootScheduler::Legacy(root_scheduler),
1013            root_fields,
1014            cache,
1015        }
1016    }
1017
1018    fn do_schedule_ranges_structural(
1019        &mut self,
1020        ranges: &[Range<u64>],
1021        filter: &FilterExpression,
1022        io: Arc<dyn EncodingsIo>,
1023        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1024    ) {
1025        let root_scheduler = self.root_scheduler.as_structural();
1026        let mut context = SchedulerContext::new(io, self.cache.clone());
1027        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1028        if let Err(schedule_ranges_err) = maybe_root_job {
1029            schedule_action(Err(schedule_ranges_err));
1030            return;
1031        }
1032        let mut root_job = maybe_root_job.unwrap();
1033        let mut num_rows_scheduled = 0;
1034        loop {
1035            let maybe_next_scan_line = root_job.schedule_next(&mut context);
1036            if let Err(err) = maybe_next_scan_line {
1037                schedule_action(Err(err));
1038                return;
1039            }
1040            let next_scan_line = maybe_next_scan_line.unwrap();
1041            match next_scan_line {
1042                Some(next_scan_line) => {
1043                    trace!(
1044                        "Scheduled scan line of {} rows and {} decoders",
1045                        next_scan_line.rows_scheduled,
1046                        next_scan_line.decoders.len()
1047                    );
1048                    num_rows_scheduled += next_scan_line.rows_scheduled;
1049                    if !schedule_action(Ok(DecoderMessage {
1050                        scheduled_so_far: num_rows_scheduled,
1051                        decoders: next_scan_line.decoders,
1052                    })) {
1053                        // Decoder has disconnected
1054                        return;
1055                    }
1056                }
1057                None => return,
1058            }
1059        }
1060    }
1061
1062    fn do_schedule_ranges_legacy(
1063        &mut self,
1064        ranges: &[Range<u64>],
1065        filter: &FilterExpression,
1066        io: Arc<dyn EncodingsIo>,
1067        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1068        // If specified, this will be used as the top_level_row for all scheduling
1069        // tasks.  This is used by list scheduling to ensure all items scheduling
1070        // tasks are scheduled at the same top level row.
1071        priority: Option<Box<dyn PriorityRange>>,
1072    ) {
1073        let root_scheduler = self.root_scheduler.as_legacy();
1074        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1075        trace!(
1076            "Scheduling {} ranges across {}..{} ({} rows){}",
1077            ranges.len(),
1078            ranges.first().unwrap().start,
1079            ranges.last().unwrap().end,
1080            rows_requested,
1081            priority
1082                .as_ref()
1083                .map(|p| format!(" (priority={:?})", p))
1084                .unwrap_or_default()
1085        );
1086
1087        let mut context = SchedulerContext::new(io, self.cache.clone());
1088        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1089        if let Err(schedule_ranges_err) = maybe_root_job {
1090            schedule_action(Err(schedule_ranges_err));
1091            return;
1092        }
1093        let mut root_job = maybe_root_job.unwrap();
1094        let mut num_rows_scheduled = 0;
1095        let mut rows_to_schedule = root_job.num_rows();
1096        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1097        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1098        while rows_to_schedule > 0 {
1099            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1100            if let Err(schedule_next_err) = maybe_next_scan_line {
1101                schedule_action(Err(schedule_next_err));
1102                return;
1103            }
1104            let next_scan_line = maybe_next_scan_line.unwrap();
1105            priority.advance(next_scan_line.rows_scheduled);
1106            num_rows_scheduled += next_scan_line.rows_scheduled;
1107            rows_to_schedule -= next_scan_line.rows_scheduled;
1108            trace!(
1109                "Scheduled scan line of {} rows and {} decoders",
1110                next_scan_line.rows_scheduled,
1111                next_scan_line.decoders.len()
1112            );
1113            if !schedule_action(Ok(DecoderMessage {
1114                scheduled_so_far: num_rows_scheduled,
1115                decoders: next_scan_line.decoders,
1116            })) {
1117                // Decoder has disconnected
1118                return;
1119            }
1120
1121            trace!("Finished scheduling {} ranges", ranges.len());
1122        }
1123    }
1124
1125    fn do_schedule_ranges(
1126        &mut self,
1127        ranges: &[Range<u64>],
1128        filter: &FilterExpression,
1129        io: Arc<dyn EncodingsIo>,
1130        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1131        // If specified, this will be used as the top_level_row for all scheduling
1132        // tasks.  This is used by list scheduling to ensure all items scheduling
1133        // tasks are scheduled at the same top level row.
1134        priority: Option<Box<dyn PriorityRange>>,
1135    ) {
1136        match &self.root_scheduler {
1137            RootScheduler::Legacy(_) => {
1138                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1139            }
1140            RootScheduler::Structural(_) => {
1141                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1142            }
1143        }
1144    }
1145
1146    // This method is similar to schedule_ranges but instead of
1147    // sending the decoders to a channel it collects them all into a vector
1148    pub fn schedule_ranges_to_vec(
1149        &mut self,
1150        ranges: &[Range<u64>],
1151        filter: &FilterExpression,
1152        io: Arc<dyn EncodingsIo>,
1153        priority: Option<Box<dyn PriorityRange>>,
1154    ) -> Result<Vec<DecoderMessage>> {
1155        let mut decode_messages = Vec::new();
1156        self.do_schedule_ranges(
1157            ranges,
1158            filter,
1159            io,
1160            |msg| {
1161                decode_messages.push(msg);
1162                true
1163            },
1164            priority,
1165        );
1166        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1167    }
1168
1169    /// Schedules the load of multiple ranges of rows
1170    ///
1171    /// Ranges must be non-overlapping and in sorted order
1172    ///
1173    /// # Arguments
1174    ///
1175    /// * `ranges` - The ranges of rows to load
1176    /// * `sink` - A channel to send the decode tasks
1177    /// * `scheduler` An I/O scheduler to issue I/O requests
1178    #[instrument(skip_all)]
1179    pub fn schedule_ranges(
1180        &mut self,
1181        ranges: &[Range<u64>],
1182        filter: &FilterExpression,
1183        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1184        scheduler: Arc<dyn EncodingsIo>,
1185    ) {
1186        self.do_schedule_ranges(
1187            ranges,
1188            filter,
1189            scheduler,
1190            |msg| {
1191                match sink.send(msg) {
1192                    Ok(_) => true,
1193                    Err(SendError { .. }) => {
1194                        // The receiver has gone away.  We can't do anything about it
1195                        // so just ignore the error.
1196                        debug!(
1197                        "schedule_ranges aborting early since decoder appears to have been dropped"
1198                    );
1199                        false
1200                    }
1201                }
1202            },
1203            None,
1204        )
1205    }
1206
1207    /// Schedules the load of a range of rows
1208    ///
1209    /// # Arguments
1210    ///
1211    /// * `range` - The range of rows to load
1212    /// * `sink` - A channel to send the decode tasks
1213    /// * `scheduler` An I/O scheduler to issue I/O requests
1214    #[instrument(skip_all)]
1215    pub fn schedule_range(
1216        &mut self,
1217        range: Range<u64>,
1218        filter: &FilterExpression,
1219        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1220        scheduler: Arc<dyn EncodingsIo>,
1221    ) {
1222        self.schedule_ranges(&[range], filter, sink, scheduler)
1223    }
1224
1225    /// Schedules the load of selected rows
1226    ///
1227    /// # Arguments
1228    ///
1229    /// * `indices` - The row indices to load (these must be in ascending order!)
1230    /// * `sink` - A channel to send the decode tasks
1231    /// * `scheduler` An I/O scheduler to issue I/O requests
1232    pub fn schedule_take(
1233        &mut self,
1234        indices: &[u64],
1235        filter: &FilterExpression,
1236        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1237        scheduler: Arc<dyn EncodingsIo>,
1238    ) {
1239        debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1240        if indices.is_empty() {
1241            return;
1242        }
1243        trace!("Scheduling take of {} rows", indices.len());
1244        let ranges = Self::indices_to_ranges(indices);
1245        self.schedule_ranges(&ranges, filter, sink, scheduler)
1246    }
1247
1248    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1249    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1250        let mut ranges = Vec::new();
1251        let mut start = indices[0];
1252
1253        for window in indices.windows(2) {
1254            if window[1] != window[0] + 1 {
1255                ranges.push(start..window[0] + 1);
1256                start = window[1];
1257            }
1258        }
1259
1260        ranges.push(start..*indices.last().unwrap() + 1);
1261        ranges
1262    }
1263}
1264
1265pub struct ReadBatchTask {
1266    pub task: BoxFuture<'static, Result<RecordBatch>>,
1267    pub num_rows: u32,
1268}
1269
1270/// A stream that takes scheduled jobs and generates decode tasks from them.
1271pub struct BatchDecodeStream {
1272    context: DecoderContext,
1273    root_decoder: SimpleStructDecoder,
1274    rows_remaining: u64,
1275    rows_per_batch: u32,
1276    rows_scheduled: u64,
1277    rows_drained: u64,
1278    scheduler_exhausted: bool,
1279    emitted_batch_size_warning: Arc<Once>,
1280}
1281
1282impl BatchDecodeStream {
1283    /// Create a new instance of a batch decode stream
1284    ///
1285    /// # Arguments
1286    ///
1287    /// * `scheduled` - an incoming stream of decode tasks from a
1288    ///   [`crate::decode::DecodeBatchScheduler`]
1289    /// * `schema` - the schema of the data to create
1290    /// * `rows_per_batch` the number of rows to create before making a batch
1291    /// * `num_rows` the total number of rows scheduled
1292    /// * `num_columns` the total number of columns in the file
1293    pub fn new(
1294        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1295        rows_per_batch: u32,
1296        num_rows: u64,
1297        root_decoder: SimpleStructDecoder,
1298    ) -> Self {
1299        Self {
1300            context: DecoderContext::new(scheduled),
1301            root_decoder,
1302            rows_remaining: num_rows,
1303            rows_per_batch,
1304            rows_scheduled: 0,
1305            rows_drained: 0,
1306            scheduler_exhausted: false,
1307            emitted_batch_size_warning: Arc::new(Once::new()),
1308        }
1309    }
1310
1311    fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1312        if decoder.path.is_empty() {
1313            // The root decoder we can ignore
1314            Ok(())
1315        } else {
1316            self.root_decoder.accept_child(decoder)
1317        }
1318    }
1319
1320    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1321        if self.scheduler_exhausted {
1322            return Ok(self.rows_scheduled);
1323        }
1324        while self.rows_scheduled < scheduled_need {
1325            let next_message = self.context.source.recv().await;
1326            match next_message {
1327                Some(scan_line) => {
1328                    let scan_line = scan_line?;
1329                    self.rows_scheduled = scan_line.scheduled_so_far;
1330                    for message in scan_line.decoders {
1331                        self.accept_decoder(message.into_legacy())?;
1332                    }
1333                }
1334                None => {
1335                    // Schedule ended before we got all the data we expected.  This probably
1336                    // means some kind of pushdown filter was applied and we didn't load as
1337                    // much data as we thought we would.
1338                    self.scheduler_exhausted = true;
1339                    return Ok(self.rows_scheduled);
1340                }
1341            }
1342        }
1343        Ok(scheduled_need)
1344    }
1345
1346    #[instrument(level = "debug", skip_all)]
1347    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1348        trace!(
1349            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1350            self.rows_remaining,
1351            self.rows_drained,
1352            self.rows_scheduled,
1353        );
1354        if self.rows_remaining == 0 {
1355            return Ok(None);
1356        }
1357
1358        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1359        self.rows_remaining -= to_take;
1360
1361        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1362        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1363        if scheduled_need > 0 {
1364            let desired_scheduled = scheduled_need + self.rows_scheduled;
1365            trace!(
1366                "Draining from scheduler (desire at least {} scheduled rows)",
1367                desired_scheduled
1368            );
1369            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1370            if actually_scheduled < desired_scheduled {
1371                let under_scheduled = desired_scheduled - actually_scheduled;
1372                to_take -= under_scheduled;
1373            }
1374        }
1375
1376        if to_take == 0 {
1377            return Ok(None);
1378        }
1379
1380        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1381        let loaded_need = self.rows_drained + to_take - 1;
1382        trace!(
1383            "Waiting for I/O (desire at least {} fully loaded rows)",
1384            loaded_need
1385        );
1386        self.root_decoder.wait_for_loaded(loaded_need).await?;
1387
1388        let next_task = self.root_decoder.drain(to_take)?;
1389        self.rows_drained += to_take;
1390        Ok(Some(next_task))
1391    }
1392
1393    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1394        let stream = futures::stream::unfold(self, |mut slf| async move {
1395            let next_task = slf.next_batch_task().await;
1396            let next_task = next_task.transpose().map(|next_task| {
1397                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1398                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1399                let task = async move {
1400                    let next_task = next_task?;
1401                    next_task.into_batch(emitted_batch_size_warning)
1402                };
1403                (task, num_rows)
1404            });
1405            next_task.map(|(task, num_rows)| {
1406                // This should be true since batch size is u32
1407                debug_assert!(num_rows <= u32::MAX as u64);
1408                let next_task = ReadBatchTask {
1409                    task: task.boxed(),
1410                    num_rows: num_rows as u32,
1411                };
1412                (next_task, slf)
1413            })
1414        });
1415        stream.boxed()
1416    }
1417}
1418
1419// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1420// we can have a single implementation of the batch decode iterator
1421enum RootDecoderMessage {
1422    LoadedPage(LoadedPage),
1423    LegacyPage(crate::previous::decoder::DecoderReady),
1424}
1425trait RootDecoderType {
1426    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1427    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1428    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1429}
1430impl RootDecoderType for StructuralStructDecoder {
1431    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1432        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1433            unreachable!()
1434        };
1435        self.accept_page(loaded_page)
1436    }
1437    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1438        self.drain_batch_task(num_rows)
1439    }
1440    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1441        // Waiting happens elsewhere (not as part of the decoder)
1442        Ok(())
1443    }
1444}
1445impl RootDecoderType for SimpleStructDecoder {
1446    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1447        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1448            unreachable!()
1449        };
1450        self.accept_child(legacy_page)
1451    }
1452    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1453        self.drain(num_rows)
1454    }
1455    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1456        runtime.block_on(self.wait_for_loaded(loaded_need))
1457    }
1458}
1459
1460/// A blocking batch decoder that performs synchronous decoding
1461struct BatchDecodeIterator<T: RootDecoderType> {
1462    messages: VecDeque<Result<DecoderMessage>>,
1463    root_decoder: T,
1464    rows_remaining: u64,
1465    rows_per_batch: u32,
1466    rows_scheduled: u64,
1467    rows_drained: u64,
1468    emitted_batch_size_warning: Arc<Once>,
1469    // Note: this is not the runtime on which I/O happens.
1470    // That's always in the scheduler.  This is just a runtime we use to
1471    // sleep the current thread if I/O is unready
1472    wait_for_io_runtime: tokio::runtime::Runtime,
1473    schema: Arc<ArrowSchema>,
1474}
1475
1476impl<T: RootDecoderType> BatchDecodeIterator<T> {
1477    /// Create a new instance of a batch decode iterator
1478    pub fn new(
1479        messages: VecDeque<Result<DecoderMessage>>,
1480        rows_per_batch: u32,
1481        num_rows: u64,
1482        root_decoder: T,
1483        schema: Arc<ArrowSchema>,
1484    ) -> Self {
1485        Self {
1486            messages,
1487            root_decoder,
1488            rows_remaining: num_rows,
1489            rows_per_batch,
1490            rows_scheduled: 0,
1491            rows_drained: 0,
1492            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1493                .build()
1494                .unwrap(),
1495            emitted_batch_size_warning: Arc::new(Once::new()),
1496            schema,
1497        }
1498    }
1499
1500    /// Wait for a single page of data to finish loading
1501    ///
1502    /// If the data is not available this will perform a *blocking* wait (put
1503    /// the current thread to sleep)
1504    fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1505        match maybe_done(unloaded_page.0) {
1506            // Fast path, avoid all runtime shenanigans if the data is ready
1507            MaybeDone::Done(loaded_page) => loaded_page,
1508            // Slow path, we need to wait on I/O, enter the runtime
1509            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1510            MaybeDone::Gone => unreachable!(),
1511        }
1512    }
1513
1514    /// Waits for I/O until `scheduled_need` rows have been loaded
1515    ///
1516    /// Note that `scheduled_need` is cumulative.  E.g. this method
1517    /// should be called with 5, 10, 15 and not 5, 5, 5
1518    #[instrument(skip_all)]
1519    fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1520        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1521            let message = self.messages.pop_front().unwrap()?;
1522            self.rows_scheduled = message.scheduled_so_far;
1523            for decoder_message in message.decoders {
1524                match decoder_message {
1525                    MessageType::UnloadedPage(unloaded_page) => {
1526                        let loaded_page = self.wait_for_page(unloaded_page)?;
1527                        self.root_decoder
1528                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1529                    }
1530                    MessageType::DecoderReady(decoder_ready) => {
1531                        // The root decoder we can ignore
1532                        if !decoder_ready.path.is_empty() {
1533                            self.root_decoder
1534                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1535                        }
1536                    }
1537                }
1538            }
1539        }
1540
1541        let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1542
1543        self.root_decoder
1544            .wait(loaded_need, &self.wait_for_io_runtime)?;
1545        Ok(self.rows_scheduled)
1546    }
1547
1548    #[instrument(level = "debug", skip_all)]
1549    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1550        trace!(
1551            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1552            self.rows_remaining,
1553            self.rows_drained,
1554            self.rows_scheduled,
1555        );
1556        if self.rows_remaining == 0 {
1557            return Ok(None);
1558        }
1559
1560        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1561        self.rows_remaining -= to_take;
1562
1563        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1564        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1565        if scheduled_need > 0 {
1566            let desired_scheduled = scheduled_need + self.rows_scheduled;
1567            trace!(
1568                "Draining from scheduler (desire at least {} scheduled rows)",
1569                desired_scheduled
1570            );
1571            let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1572            if actually_scheduled < desired_scheduled {
1573                let under_scheduled = desired_scheduled - actually_scheduled;
1574                to_take -= under_scheduled;
1575            }
1576        }
1577
1578        if to_take == 0 {
1579            return Ok(None);
1580        }
1581
1582        let next_task = self.root_decoder.drain_batch(to_take)?;
1583
1584        self.rows_drained += to_take;
1585
1586        let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1587
1588        Ok(Some(batch))
1589    }
1590}
1591
1592impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1593    type Item = ArrowResult<RecordBatch>;
1594
1595    fn next(&mut self) -> Option<Self::Item> {
1596        self.next_batch_task()
1597            .transpose()
1598            .map(|r| r.map_err(ArrowError::from))
1599    }
1600}
1601
1602impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1603    fn schema(&self) -> Arc<ArrowSchema> {
1604        self.schema.clone()
1605    }
1606}
1607
1608/// A stream that takes scheduled jobs and generates decode tasks from them.
1609pub struct StructuralBatchDecodeStream {
1610    context: DecoderContext,
1611    root_decoder: StructuralStructDecoder,
1612    rows_remaining: u64,
1613    rows_per_batch: u32,
1614    rows_scheduled: u64,
1615    rows_drained: u64,
1616    scheduler_exhausted: bool,
1617    emitted_batch_size_warning: Arc<Once>,
1618}
1619
1620impl StructuralBatchDecodeStream {
1621    /// Create a new instance of a batch decode stream
1622    ///
1623    /// # Arguments
1624    ///
1625    /// * `scheduled` - an incoming stream of decode tasks from a
1626    ///   [`crate::decode::DecodeBatchScheduler`]
1627    /// * `schema` - the schema of the data to create
1628    /// * `rows_per_batch` the number of rows to create before making a batch
1629    /// * `num_rows` the total number of rows scheduled
1630    /// * `num_columns` the total number of columns in the file
1631    pub fn new(
1632        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1633        rows_per_batch: u32,
1634        num_rows: u64,
1635        root_decoder: StructuralStructDecoder,
1636    ) -> Self {
1637        Self {
1638            context: DecoderContext::new(scheduled),
1639            root_decoder,
1640            rows_remaining: num_rows,
1641            rows_per_batch,
1642            rows_scheduled: 0,
1643            rows_drained: 0,
1644            scheduler_exhausted: false,
1645            emitted_batch_size_warning: Arc::new(Once::new()),
1646        }
1647    }
1648
1649    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1650        if self.scheduler_exhausted {
1651            return Ok(self.rows_scheduled);
1652        }
1653        while self.rows_scheduled < scheduled_need {
1654            let next_message = self.context.source.recv().await;
1655            match next_message {
1656                Some(scan_line) => {
1657                    let scan_line = scan_line?;
1658                    self.rows_scheduled = scan_line.scheduled_so_far;
1659                    for message in scan_line.decoders {
1660                        let unloaded_page = message.into_structural();
1661                        let loaded_page = unloaded_page.0.await?;
1662                        self.root_decoder.accept_page(loaded_page)?;
1663                    }
1664                }
1665                None => {
1666                    // Schedule ended before we got all the data we expected.  This probably
1667                    // means some kind of pushdown filter was applied and we didn't load as
1668                    // much data as we thought we would.
1669                    self.scheduler_exhausted = true;
1670                    return Ok(self.rows_scheduled);
1671                }
1672            }
1673        }
1674        Ok(scheduled_need)
1675    }
1676
1677    #[instrument(level = "debug", skip_all)]
1678    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1679        trace!(
1680            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1681            self.rows_remaining,
1682            self.rows_drained,
1683            self.rows_scheduled,
1684        );
1685        if self.rows_remaining == 0 {
1686            return Ok(None);
1687        }
1688
1689        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1690        self.rows_remaining -= to_take;
1691
1692        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1693        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1694        if scheduled_need > 0 {
1695            let desired_scheduled = scheduled_need + self.rows_scheduled;
1696            trace!(
1697                "Draining from scheduler (desire at least {} scheduled rows)",
1698                desired_scheduled
1699            );
1700            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1701            if actually_scheduled < desired_scheduled {
1702                let under_scheduled = desired_scheduled - actually_scheduled;
1703                to_take -= under_scheduled;
1704            }
1705        }
1706
1707        if to_take == 0 {
1708            return Ok(None);
1709        }
1710
1711        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1712        self.rows_drained += to_take;
1713        Ok(Some(next_task))
1714    }
1715
1716    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1717        let stream = futures::stream::unfold(self, |mut slf| async move {
1718            let next_task = slf.next_batch_task().await;
1719            let next_task = next_task.transpose().map(|next_task| {
1720                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1721                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1722                let task = async move {
1723                    let next_task = next_task?;
1724                    next_task.into_batch(emitted_batch_size_warning)
1725                };
1726                (task, num_rows)
1727            });
1728            next_task.map(|(task, num_rows)| {
1729                // This should be true since batch size is u32
1730                debug_assert!(num_rows <= u32::MAX as u64);
1731                let next_task = ReadBatchTask {
1732                    task: task.boxed(),
1733                    num_rows: num_rows as u32,
1734                };
1735                (next_task, slf)
1736            })
1737        });
1738        stream.boxed()
1739    }
1740}
1741
1742#[derive(Debug)]
1743pub enum RequestedRows {
1744    Ranges(Vec<Range<u64>>),
1745    Indices(Vec<u64>),
1746}
1747
1748impl RequestedRows {
1749    pub fn num_rows(&self) -> u64 {
1750        match self {
1751            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1752            Self::Indices(indices) => indices.len() as u64,
1753        }
1754    }
1755
1756    pub fn trim_empty_ranges(mut self) -> Self {
1757        if let Self::Ranges(ranges) = &mut self {
1758            ranges.retain(|r| !r.is_empty());
1759        }
1760        self
1761    }
1762}
1763
1764/// Configuration for decoder behavior
1765#[derive(Debug, Clone, Default)]
1766pub struct DecoderConfig {
1767    /// Whether to cache repetition indices for better performance
1768    pub cache_repetition_index: bool,
1769    /// Whether to validate decoded data
1770    pub validate_on_decode: bool,
1771}
1772
1773#[derive(Debug, Clone)]
1774pub struct SchedulerDecoderConfig {
1775    pub decoder_plugins: Arc<DecoderPlugins>,
1776    pub batch_size: u32,
1777    pub io: Arc<dyn EncodingsIo>,
1778    pub cache: Arc<LanceCache>,
1779    /// Decoder configuration
1780    pub decoder_config: DecoderConfig,
1781}
1782
1783fn check_scheduler_on_drop(
1784    stream: BoxStream<'static, ReadBatchTask>,
1785    scheduler_handle: tokio::task::JoinHandle<()>,
1786) -> BoxStream<'static, ReadBatchTask> {
1787    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
1788    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
1789    // when the stream finishes.
1790    let mut scheduler_handle = Some(scheduler_handle);
1791    let check_scheduler = stream::unfold((), move |_| {
1792        let handle = scheduler_handle.take();
1793        async move {
1794            if let Some(handle) = handle {
1795                handle.await.unwrap();
1796            }
1797            None
1798        }
1799    });
1800    stream.chain(check_scheduler).boxed()
1801}
1802
1803pub fn create_decode_stream(
1804    schema: &Schema,
1805    num_rows: u64,
1806    batch_size: u32,
1807    is_structural: bool,
1808    should_validate: bool,
1809    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1810) -> BoxStream<'static, ReadBatchTask> {
1811    if is_structural {
1812        let arrow_schema = ArrowSchema::from(schema);
1813        let structural_decoder = StructuralStructDecoder::new(
1814            arrow_schema.fields,
1815            should_validate,
1816            /*is_root=*/ true,
1817        );
1818        StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1819    } else {
1820        let arrow_schema = ArrowSchema::from(schema);
1821        let root_fields = arrow_schema.fields;
1822
1823        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1824        BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1825    }
1826}
1827
1828/// Creates a iterator that decodes a set of messages in a blocking fashion
1829///
1830/// See [`schedule_and_decode_blocking`] for more information.
1831pub fn create_decode_iterator(
1832    schema: &Schema,
1833    num_rows: u64,
1834    batch_size: u32,
1835    should_validate: bool,
1836    is_structural: bool,
1837    messages: VecDeque<Result<DecoderMessage>>,
1838) -> Box<dyn RecordBatchReader + Send + 'static> {
1839    let arrow_schema = Arc::new(ArrowSchema::from(schema));
1840    let root_fields = arrow_schema.fields.clone();
1841    if is_structural {
1842        let simple_struct_decoder =
1843            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true);
1844        Box::new(BatchDecodeIterator::new(
1845            messages,
1846            batch_size,
1847            num_rows,
1848            simple_struct_decoder,
1849            arrow_schema,
1850        ))
1851    } else {
1852        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1853        Box::new(BatchDecodeIterator::new(
1854            messages,
1855            batch_size,
1856            num_rows,
1857            root_decoder,
1858            arrow_schema,
1859        ))
1860    }
1861}
1862
1863fn create_scheduler_decoder(
1864    column_infos: Vec<Arc<ColumnInfo>>,
1865    requested_rows: RequestedRows,
1866    filter: FilterExpression,
1867    column_indices: Vec<u32>,
1868    target_schema: Arc<Schema>,
1869    config: SchedulerDecoderConfig,
1870) -> Result<BoxStream<'static, ReadBatchTask>> {
1871    let num_rows = requested_rows.num_rows();
1872
1873    let is_structural = column_infos[0].is_structural();
1874
1875    let (tx, rx) = mpsc::unbounded_channel();
1876
1877    let decode_stream = create_decode_stream(
1878        &target_schema,
1879        num_rows,
1880        config.batch_size,
1881        is_structural,
1882        config.decoder_config.validate_on_decode,
1883        rx,
1884    );
1885
1886    let scheduler_handle = tokio::task::spawn(async move {
1887        let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1888            target_schema.as_ref(),
1889            &column_indices,
1890            &column_infos,
1891            &vec![],
1892            num_rows,
1893            config.decoder_plugins,
1894            config.io.clone(),
1895            config.cache,
1896            &filter,
1897            &config.decoder_config,
1898        )
1899        .await
1900        {
1901            Ok(scheduler) => scheduler,
1902            Err(e) => {
1903                let _ = tx.send(Err(e));
1904                return;
1905            }
1906        };
1907
1908        match requested_rows {
1909            RequestedRows::Ranges(ranges) => {
1910                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1911            }
1912            RequestedRows::Indices(indices) => {
1913                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1914            }
1915        }
1916    });
1917
1918    Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1919}
1920
1921/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
1922/// decode the scheduled data and returns the decoder as a stream of record batches.
1923///
1924/// This is a convenience function that creates both the scheduler and the decoder
1925/// which can be a little tricky to get right.
1926pub fn schedule_and_decode(
1927    column_infos: Vec<Arc<ColumnInfo>>,
1928    requested_rows: RequestedRows,
1929    filter: FilterExpression,
1930    column_indices: Vec<u32>,
1931    target_schema: Arc<Schema>,
1932    config: SchedulerDecoderConfig,
1933) -> BoxStream<'static, ReadBatchTask> {
1934    if requested_rows.num_rows() == 0 {
1935        return stream::empty().boxed();
1936    }
1937
1938    // If the user requested any ranges that are empty, ignore them.  They are pointless and
1939    // trying to read them has caused bugs in the past.
1940    let requested_rows = requested_rows.trim_empty_ranges();
1941
1942    // For convenience we really want this method to be a snchronous method where all
1943    // errors happen on the stream.  There is some async initialization that must happen
1944    // when creating a scheduler.  We wrap that all up in the very first task.
1945    match create_scheduler_decoder(
1946        column_infos,
1947        requested_rows,
1948        filter,
1949        column_indices,
1950        target_schema,
1951        config,
1952    ) {
1953        // If the initialization failed make it look like a failed task
1954        Ok(stream) => stream,
1955        Err(e) => stream::once(std::future::ready(ReadBatchTask {
1956            num_rows: 0,
1957            task: std::future::ready(Err(e)).boxed(),
1958        }))
1959        .boxed(),
1960    }
1961}
1962
1963pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
1964    tokio::runtime::Builder::new_current_thread()
1965        .build()
1966        .unwrap()
1967});
1968
1969/// Schedules and decodes the requested data in a blocking fashion
1970///
1971/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
1972/// and decodes it in the current thread.
1973///
1974/// This can be useful when the disk is fast (or the data is in memory) and the amount
1975/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
1976///
1977/// This should NOT be used for full scans.  Even if the data is in memory this function will
1978/// not parallelize the decode and will be slower than the async version.  Full scans typically
1979/// make relatively few IOPs and so the asynchronous overhead is much smaller.
1980///
1981/// This method will first completely run the scheduling process.  Then it will run the
1982/// decode process.
1983pub fn schedule_and_decode_blocking(
1984    column_infos: Vec<Arc<ColumnInfo>>,
1985    requested_rows: RequestedRows,
1986    filter: FilterExpression,
1987    column_indices: Vec<u32>,
1988    target_schema: Arc<Schema>,
1989    config: SchedulerDecoderConfig,
1990) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1991    if requested_rows.num_rows() == 0 {
1992        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
1993        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
1994    }
1995
1996    let num_rows = requested_rows.num_rows();
1997    let is_structural = column_infos[0].is_structural();
1998
1999    let (tx, mut rx) = mpsc::unbounded_channel();
2000
2001    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
2002    // runtime.
2003    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2004        target_schema.as_ref(),
2005        &column_indices,
2006        &column_infos,
2007        &vec![],
2008        num_rows,
2009        config.decoder_plugins,
2010        config.io.clone(),
2011        config.cache,
2012        &filter,
2013        &config.decoder_config,
2014    ))?;
2015
2016    // Schedule the requested rows
2017    match requested_rows {
2018        RequestedRows::Ranges(ranges) => {
2019            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2020        }
2021        RequestedRows::Indices(indices) => {
2022            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2023        }
2024    }
2025
2026    // Drain the scheduler queue into a vec of decode messages
2027    let mut messages = Vec::new();
2028    while rx
2029        .recv_many(&mut messages, usize::MAX)
2030        .now_or_never()
2031        .unwrap()
2032        != 0
2033    {}
2034
2035    // Create a decoder to decode the messages
2036    let decode_iterator = create_decode_iterator(
2037        &target_schema,
2038        num_rows,
2039        config.batch_size,
2040        config.decoder_config.validate_on_decode,
2041        is_structural,
2042        messages.into(),
2043    );
2044
2045    Ok(decode_iterator)
2046}
2047
2048/// A decoder for single-column encodings of primitive data (this includes fixed size
2049/// lists of primitive data)
2050///
2051/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2052///
2053/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2054/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2055/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2056/// the decode task for batch 0 and the decode task for batch 1.
2057///
2058/// See [`crate::decoder`] for more information
2059pub trait PrimitivePageDecoder: Send + Sync {
2060    /// Decode data into buffers
2061    ///
2062    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2063    /// such as decompressing from some compressed representation.
2064    ///
2065    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2066    /// portion only needs to be updated if the encoding has some concept of an "optional"
2067    /// buffer.
2068    ///
2069    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2070    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2071    ///
2072    /// Binary decodings have two output buffers (one for values, one for offsets)
2073    ///
2074    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2075    /// fixed size strings into variable length strings going from one input buffer to multiple output
2076    /// buffers.
2077    ///
2078    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2079    /// generally end at one of these structures.  However, intermediate structures may exist which
2080    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2081    /// with buffers that have bits-per-value that is not a multiple of 8.
2082    ///
2083    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2084    /// type (order matters) and encodings that aim to decode into arrow types should respect
2085    /// this layout.
2086    /// # Arguments
2087    ///
2088    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2089    /// * `num_rows` - how many rows to decode
2090    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2091    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2092}
2093
2094/// A scheduler for single-column encodings of primitive data
2095///
2096/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2097///
2098/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2099/// be shared in follow-up I/O tasks.
2100///
2101/// See [`crate::decoder`] for more information
2102pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2103    /// Schedules a batch of I/O to load the data needed for the requested ranges
2104    ///
2105    /// Returns a future that will yield a decoder once the data has been loaded
2106    ///
2107    /// # Arguments
2108    ///
2109    /// * `range` - the range of row offsets (relative to start of page) requested
2110    ///   these must be ordered and must not overlap
2111    /// * `scheduler` - a scheduler to submit the I/O request to
2112    /// * `top_level_row` - the row offset of the top level field currently being
2113    ///   scheduled.  This can be used to assign priority to I/O requests
2114    fn schedule_ranges(
2115        &self,
2116        ranges: &[Range<u64>],
2117        scheduler: &Arc<dyn EncodingsIo>,
2118        top_level_row: u64,
2119    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2120}
2121
2122/// A trait to control the priority of I/O
2123pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2124    fn advance(&mut self, num_rows: u64);
2125    fn current_priority(&self) -> u64;
2126    fn box_clone(&self) -> Box<dyn PriorityRange>;
2127}
2128
2129/// A simple priority scheme for top-level fields with no parent
2130/// repetition
2131#[derive(Debug)]
2132pub struct SimplePriorityRange {
2133    priority: u64,
2134}
2135
2136impl SimplePriorityRange {
2137    fn new(priority: u64) -> Self {
2138        Self { priority }
2139    }
2140}
2141
2142impl PriorityRange for SimplePriorityRange {
2143    fn advance(&mut self, num_rows: u64) {
2144        self.priority += num_rows;
2145    }
2146
2147    fn current_priority(&self) -> u64 {
2148        self.priority
2149    }
2150
2151    fn box_clone(&self) -> Box<dyn PriorityRange> {
2152        Box::new(Self {
2153            priority: self.priority,
2154        })
2155    }
2156}
2157
2158/// Determining the priority of a list request is tricky.  We want
2159/// the priority to be the top-level row.  So if we have a
2160/// list<list<int>> and each outer list has 10 rows and each inner
2161/// list has 5 rows then the priority of the 100th item is 1 because
2162/// it is the 5th item in the 10th item of the *second* row.
2163///
2164/// This structure allows us to keep track of this complicated priority
2165/// relationship.
2166///
2167/// There's a fair amount of bookkeeping involved here.
2168///
2169/// A better approach (using repetition levels) is coming in the future.
2170pub struct ListPriorityRange {
2171    base: Box<dyn PriorityRange>,
2172    offsets: Arc<[u64]>,
2173    cur_index_into_offsets: usize,
2174    cur_position: u64,
2175}
2176
2177impl ListPriorityRange {
2178    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2179        Self {
2180            base,
2181            offsets,
2182            cur_index_into_offsets: 0,
2183            cur_position: 0,
2184        }
2185    }
2186}
2187
2188impl std::fmt::Debug for ListPriorityRange {
2189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2190        f.debug_struct("ListPriorityRange")
2191            .field("base", &self.base)
2192            .field("offsets.len()", &self.offsets.len())
2193            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2194            .field("cur_position", &self.cur_position)
2195            .finish()
2196    }
2197}
2198
2199impl PriorityRange for ListPriorityRange {
2200    fn advance(&mut self, num_rows: u64) {
2201        // We've scheduled X items.  Now walk through the offsets to
2202        // determine how many rows we've scheduled.
2203        self.cur_position += num_rows;
2204        let mut idx_into_offsets = self.cur_index_into_offsets;
2205        while idx_into_offsets + 1 < self.offsets.len()
2206            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2207        {
2208            idx_into_offsets += 1;
2209        }
2210        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2211        self.cur_index_into_offsets = idx_into_offsets;
2212        self.base.advance(base_rows_advanced as u64);
2213    }
2214
2215    fn current_priority(&self) -> u64 {
2216        self.base.current_priority()
2217    }
2218
2219    fn box_clone(&self) -> Box<dyn PriorityRange> {
2220        Box::new(Self {
2221            base: self.base.box_clone(),
2222            offsets: self.offsets.clone(),
2223            cur_index_into_offsets: self.cur_index_into_offsets,
2224            cur_position: self.cur_position,
2225        })
2226    }
2227}
2228
2229/// Contains the context for a scheduler
2230pub struct SchedulerContext {
2231    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2232    io: Arc<dyn EncodingsIo>,
2233    cache: Arc<LanceCache>,
2234    name: String,
2235    path: Vec<u32>,
2236    path_names: Vec<String>,
2237}
2238
2239pub struct ScopedSchedulerContext<'a> {
2240    pub context: &'a mut SchedulerContext,
2241}
2242
2243impl<'a> ScopedSchedulerContext<'a> {
2244    pub fn pop(self) -> &'a mut SchedulerContext {
2245        self.context.pop();
2246        self.context
2247    }
2248}
2249
2250impl SchedulerContext {
2251    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2252        Self {
2253            io,
2254            cache,
2255            recv: None,
2256            name: "".to_string(),
2257            path: Vec::new(),
2258            path_names: Vec::new(),
2259        }
2260    }
2261
2262    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2263        &self.io
2264    }
2265
2266    pub fn cache(&self) -> &Arc<LanceCache> {
2267        &self.cache
2268    }
2269
2270    pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2271        self.path.push(index);
2272        self.path_names.push(name.to_string());
2273        ScopedSchedulerContext { context: self }
2274    }
2275
2276    pub fn pop(&mut self) {
2277        self.path.pop();
2278        self.path_names.pop();
2279    }
2280
2281    pub fn path_name(&self) -> String {
2282        let path = self.path_names.join("/");
2283        if self.recv.is_some() {
2284            format!("TEMP({}){}", self.name, path)
2285        } else {
2286            format!("ROOT{}", path)
2287        }
2288    }
2289
2290    pub fn current_path(&self) -> VecDeque<u32> {
2291        VecDeque::from_iter(self.path.iter().copied())
2292    }
2293
2294    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2295    pub fn locate_decoder(
2296        &mut self,
2297        decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2298    ) -> crate::previous::decoder::DecoderReady {
2299        trace!(
2300            "Scheduling decoder of type {:?} for {:?}",
2301            decoder.data_type(),
2302            self.path,
2303        );
2304        crate::previous::decoder::DecoderReady {
2305            decoder,
2306            path: self.current_path(),
2307        }
2308    }
2309}
2310
2311pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2312
2313impl std::fmt::Debug for UnloadedPage {
2314    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2315        f.debug_struct("UnloadedPage").finish()
2316    }
2317}
2318
2319#[derive(Debug)]
2320pub struct ScheduledScanLine {
2321    pub rows_scheduled: u64,
2322    pub decoders: Vec<MessageType>,
2323}
2324
2325pub trait StructuralSchedulingJob: std::fmt::Debug {
2326    fn schedule_next(
2327        &mut self,
2328        context: &mut SchedulerContext,
2329    ) -> Result<Option<ScheduledScanLine>>;
2330}
2331
2332/// A filter expression to apply to the data
2333///
2334/// The core decoders do not currently take advantage of filtering in
2335/// any way.  In order to maintain the abstraction we represent filters
2336/// as an arbitrary byte sequence.
2337///
2338/// We recommend that encodings use Substrait for filters.
2339pub struct FilterExpression(pub Bytes);
2340
2341impl FilterExpression {
2342    /// Create a filter expression that does not filter any data
2343    ///
2344    /// This is currently represented by an empty byte array.  Encoders
2345    /// that are "filter aware" should make sure they handle this case.
2346    pub fn no_filter() -> Self {
2347        Self(Bytes::new())
2348    }
2349
2350    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2351    pub fn is_noop(&self) -> bool {
2352        self.0.is_empty()
2353    }
2354}
2355
2356pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2357    fn initialize<'a>(
2358        &'a mut self,
2359        filter: &'a FilterExpression,
2360        context: &'a SchedulerContext,
2361    ) -> BoxFuture<'a, Result<()>>;
2362    fn schedule_ranges<'a>(
2363        &'a self,
2364        ranges: &[Range<u64>],
2365        filter: &FilterExpression,
2366    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2367}
2368
2369/// A trait for tasks that decode data into an Arrow array
2370pub trait DecodeArrayTask: Send {
2371    /// Decodes the data into an Arrow array
2372    fn decode(self: Box<Self>) -> Result<ArrayRef>;
2373}
2374
2375impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2376    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2377        StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2378    }
2379}
2380
2381/// A task to decode data into an Arrow record batch
2382///
2383/// It has a child `task` which decodes a struct array with no nulls.
2384/// This is then converted into a record batch.
2385pub struct NextDecodeTask {
2386    /// The decode task itself
2387    pub task: Box<dyn DecodeArrayTask>,
2388    /// The number of rows that will be created
2389    pub num_rows: u64,
2390}
2391
2392impl NextDecodeTask {
2393    // Run the task and produce a record batch
2394    //
2395    // If the batch is very large this function will log a warning message
2396    // suggesting the user try a smaller batch size.
2397    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2398    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2399        let struct_arr = self.task.decode();
2400        match struct_arr {
2401            Ok(struct_arr) => {
2402                let batch = RecordBatch::from(struct_arr.as_struct());
2403                let size_bytes = batch.get_array_memory_size() as u64;
2404                if size_bytes > BATCH_SIZE_BYTES_WARNING {
2405                    emitted_batch_size_warning.call_once(|| {
2406                        let size_mb = size_bytes / 1024 / 1024;
2407                        debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2408                    });
2409                }
2410                Ok(batch)
2411            }
2412            Err(e) => {
2413                let e = Error::Internal {
2414                    message: format!("Error decoding batch: {}", e),
2415                    location: location!(),
2416                };
2417                Err(e)
2418            }
2419        }
2420    }
2421}
2422
2423// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2424// share some code paths between the two.  Decoders can safely unwrap into whatever
2425// style they expect since a file will be either all-2.0 or all-2.1
2426#[derive(Debug)]
2427pub enum MessageType {
2428    // The older v2.0 scheduler/decoder used a scheme where the message was the
2429    // decoder itself.  The messages were not sent in priority order and the decoder
2430    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2431    // complexity.
2432    DecoderReady(crate::previous::decoder::DecoderReady),
2433    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2434    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2435    // the decoder does not have to worry about waiting for I/O.
2436    UnloadedPage(UnloadedPage),
2437}
2438
2439impl MessageType {
2440    pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2441        match self {
2442            Self::DecoderReady(decoder) => decoder,
2443            Self::UnloadedPage(_) => {
2444                panic!("Expected DecoderReady but got UnloadedPage")
2445            }
2446        }
2447    }
2448
2449    pub fn into_structural(self) -> UnloadedPage {
2450        match self {
2451            Self::UnloadedPage(unloaded) => unloaded,
2452            Self::DecoderReady(_) => {
2453                panic!("Expected UnloadedPage but got DecoderReady")
2454            }
2455        }
2456    }
2457}
2458
2459pub struct DecoderMessage {
2460    pub scheduled_so_far: u64,
2461    pub decoders: Vec<MessageType>,
2462}
2463
2464pub struct DecoderContext {
2465    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2466}
2467
2468impl DecoderContext {
2469    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2470        Self { source }
2471    }
2472}
2473
2474pub struct DecodedPage {
2475    pub data: DataBlock,
2476    pub repdef: RepDefUnraveler,
2477}
2478
2479pub trait DecodePageTask: Send + std::fmt::Debug {
2480    /// Decodes the data into an Arrow array
2481    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2482}
2483
2484pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2485    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2486    fn num_rows(&self) -> u64;
2487}
2488
2489#[derive(Debug)]
2490pub struct LoadedPage {
2491    // The decoder that is ready to be decoded
2492    pub decoder: Box<dyn StructuralPageDecoder>,
2493    // The path to the decoder, the first value is the column index
2494    // following values, if present, are nested child indices
2495    //
2496    // For example, a path of [1, 1, 0] would mean to grab the second
2497    // column, then the second child, and then the first child.
2498    //
2499    // It could represent x in the following schema:
2500    //
2501    // score: float64
2502    // points: struct
2503    //   color: string
2504    //   location: struct
2505    //     x: float64
2506    //
2507    // Currently, only struct decoders have "children" although other
2508    // decoders may at some point as well.  List children are only
2509    // handled through indirect I/O at the moment and so they don't
2510    // need to be represented (yet)
2511    pub path: VecDeque<u32>,
2512    pub page_index: usize,
2513}
2514
2515pub struct DecodedArray {
2516    pub array: ArrayRef,
2517    pub repdef: CompositeRepDefUnraveler,
2518}
2519
2520pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2521    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2522}
2523
2524pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2525    /// Add a newly scheduled child decoder
2526    ///
2527    /// The default implementation does not expect children and returns
2528    /// an error.
2529    fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2530    /// Creates a task to decode `num_rows` of data into an array
2531    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2532    /// The data type of the decoded data
2533    fn data_type(&self) -> &DataType;
2534}
2535
2536#[derive(Debug, Default)]
2537pub struct DecoderPlugins {}
2538
2539/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2540pub async fn decode_batch(
2541    batch: &EncodedBatch,
2542    filter: &FilterExpression,
2543    decoder_plugins: Arc<DecoderPlugins>,
2544    should_validate: bool,
2545    version: LanceFileVersion,
2546    cache: Option<Arc<LanceCache>>,
2547) -> Result<RecordBatch> {
2548    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2549    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2550    // polled twice.
2551
2552    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2553    let cache = if let Some(cache) = cache {
2554        cache
2555    } else {
2556        Arc::new(lance_core::cache::LanceCache::with_capacity(
2557            128 * 1024 * 1024,
2558        ))
2559    };
2560    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2561        batch.schema.as_ref(),
2562        &batch.top_level_columns,
2563        &batch.page_table,
2564        &vec![],
2565        batch.num_rows,
2566        decoder_plugins,
2567        io_scheduler.clone(),
2568        cache,
2569        filter,
2570        &DecoderConfig::default(),
2571    )
2572    .await?;
2573    let (tx, rx) = unbounded_channel();
2574    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2575    let is_structural = version >= LanceFileVersion::V2_1;
2576    let mut decode_stream = create_decode_stream(
2577        &batch.schema,
2578        batch.num_rows,
2579        batch.num_rows as u32,
2580        is_structural,
2581        should_validate,
2582        rx,
2583    );
2584    decode_stream.next().await.unwrap().task.await
2585}
2586
2587#[cfg(test)]
2588// test coalesce indices to ranges
2589mod tests {
2590    use super::*;
2591
2592    #[test]
2593    fn test_coalesce_indices_to_ranges_with_single_index() {
2594        let indices = vec![1];
2595        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2596        assert_eq!(ranges, vec![1..2]);
2597    }
2598
2599    #[test]
2600    fn test_coalesce_indices_to_ranges() {
2601        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2602        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2603        assert_eq!(ranges, vec![1..10]);
2604    }
2605
2606    #[test]
2607    fn test_coalesce_indices_to_ranges_with_gaps() {
2608        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2609        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2610        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2611    }
2612}