lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into [`self::FieldScheduler`] and [`self::PageScheduler`].
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into [`self::PhysicalPageDecoder`] and
27//! [`self::LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.git add --all
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type List<UInt32>
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
238use crate::data::DataBlock;
239use crate::encoder::EncodedBatch;
240use crate::encodings::logical::list::StructuralListScheduler;
241use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
242use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
243use crate::format::pb::{self, column_encoding};
244use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
245use crate::v2::decoder::LogicalPageDecoder;
246use crate::v2::encodings::logical::list::OffsetPageInfo;
247use crate::v2::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
248use crate::v2::encodings::logical::{
249    binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
250    primitive::PrimitiveFieldScheduler,
251};
252use crate::version::LanceFileVersion;
253use crate::{BufferScheduler, EncodingsIo};
254
255// If users are getting batches over 10MiB large then it's time to reduce the batch size
256const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
257
258/// Top-level encoding message for a page.  Wraps both the
259/// legacy pb::ArrayEncoding and the newer pb::PageLayout
260///
261/// A file should only use one or the other and never both.
262/// 2.0 decoders can always assume this is pb::ArrayEncoding
263/// and 2.1+ decoders can always assume this is pb::PageLayout
264#[derive(Debug)]
265pub enum PageEncoding {
266    Legacy(pb::ArrayEncoding),
267    Structural(pb::PageLayout),
268}
269
270impl PageEncoding {
271    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
272        match self {
273            Self::Legacy(enc) => enc,
274            Self::Structural(_) => panic!("Expected a legacy encoding"),
275        }
276    }
277
278    pub fn as_structural(&self) -> &pb::PageLayout {
279        match self {
280            Self::Structural(enc) => enc,
281            Self::Legacy(_) => panic!("Expected a structural encoding"),
282        }
283    }
284
285    pub fn is_structural(&self) -> bool {
286        matches!(self, Self::Structural(_))
287    }
288}
289
290/// Metadata describing a page in a file
291///
292/// This is typically created by reading the metadata section of a Lance file
293#[derive(Debug)]
294pub struct PageInfo {
295    /// The number of rows in the page
296    pub num_rows: u64,
297    /// The priority (top level row number) of the page
298    ///
299    /// This is only set in 2.1 files and will be 0 for 2.0 files
300    pub priority: u64,
301    /// The encoding that explains the buffers in the page
302    pub encoding: PageEncoding,
303    /// The offsets and sizes of the buffers in the file
304    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
305}
306
307/// Metadata describing a column in a file
308///
309/// This is typically created by reading the metadata section of a Lance file
310#[derive(Debug, Clone)]
311pub struct ColumnInfo {
312    /// The index of the column in the file
313    pub index: u32,
314    /// The metadata for each page in the column
315    pub page_infos: Arc<[PageInfo]>,
316    /// File positions and their sizes of the column-level buffers
317    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
318    pub encoding: pb::ColumnEncoding,
319}
320
321impl ColumnInfo {
322    /// Create a new instance
323    pub fn new(
324        index: u32,
325        page_infos: Arc<[PageInfo]>,
326        buffer_offsets_and_sizes: Vec<(u64, u64)>,
327        encoding: pb::ColumnEncoding,
328    ) -> Self {
329        Self {
330            index,
331            page_infos,
332            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
333            encoding,
334        }
335    }
336
337    pub fn is_structural(&self) -> bool {
338        self.page_infos
339            // Can just look at the first since all should be the same
340            .first()
341            .map(|page| page.encoding.is_structural())
342            .unwrap_or(false)
343    }
344}
345
346enum RootScheduler {
347    Structural(Box<dyn StructuralFieldScheduler>),
348    Legacy(Arc<dyn crate::v2::decoder::FieldScheduler>),
349}
350
351impl RootScheduler {
352    fn as_legacy(&self) -> &Arc<dyn crate::v2::decoder::FieldScheduler> {
353        match self {
354            Self::Structural(_) => panic!("Expected a legacy scheduler"),
355            Self::Legacy(s) => s,
356        }
357    }
358
359    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
360        match self {
361            Self::Structural(s) => s.as_ref(),
362            Self::Legacy(_) => panic!("Expected a structural scheduler"),
363        }
364    }
365}
366
367/// The scheduler for decoding batches
368///
369/// Lance decoding is done in two steps, scheduling, and decoding.  The
370/// scheduling tends to be lightweight and should quickly figure what data
371/// is needed from the disk issue the appropriate I/O requests.  A decode task is
372/// created to eventually decode the data (once it is loaded) and scheduling
373/// moves on to scheduling the next page.
374///
375/// Meanwhile, it's expected that a decode stream will be setup to run at the
376/// same time.  Decode tasks take the data that is loaded and turn it into
377/// Arrow arrays.
378///
379/// This approach allows us to keep our I/O parallelism and CPU parallelism
380/// completely separate since those are often two very different values.
381///
382/// Backpressure should be achieved via the I/O service.  Requests that are
383/// issued will pile up if the decode stream is not polling quickly enough.
384/// The [`crate::EncodingsIo::submit_request`] function should return a pending
385/// future once there are too many I/O requests in flight.
386///
387/// TODO: Implement backpressure
388pub struct DecodeBatchScheduler {
389    root_scheduler: RootScheduler,
390    pub root_fields: Fields,
391    cache: Arc<LanceCache>,
392}
393
394pub struct ColumnInfoIter<'a> {
395    column_infos: Vec<Arc<ColumnInfo>>,
396    column_indices: &'a [u32],
397    column_info_pos: usize,
398    column_indices_pos: usize,
399}
400
401impl<'a> ColumnInfoIter<'a> {
402    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
403        let initial_pos = column_indices[0] as usize;
404        Self {
405            column_infos,
406            column_indices,
407            column_info_pos: initial_pos,
408            column_indices_pos: 0,
409        }
410    }
411
412    pub fn peek(&self) -> &Arc<ColumnInfo> {
413        &self.column_infos[self.column_info_pos]
414    }
415
416    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
417        let column_info = self.column_infos[self.column_info_pos].clone();
418        let transformed = transform(column_info);
419        self.column_infos[self.column_info_pos] = transformed;
420    }
421
422    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
423        self.next().ok_or_else(|| {
424            Error::invalid_input(
425                "there were more fields in the schema than provided column indices / infos",
426                location!(),
427            )
428        })
429    }
430
431    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
432        if self.column_info_pos < self.column_infos.len() {
433            let info = &self.column_infos[self.column_info_pos];
434            self.column_info_pos += 1;
435            Some(info)
436        } else {
437            None
438        }
439    }
440
441    pub(crate) fn next_top_level(&mut self) {
442        self.column_indices_pos += 1;
443        if self.column_indices_pos < self.column_indices.len() {
444            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
445        } else {
446            self.column_info_pos = self.column_infos.len();
447        }
448    }
449}
450
451/// These contain the file buffers shared across the entire file
452#[derive(Clone, Copy, Debug)]
453pub struct FileBuffers<'a> {
454    pub positions_and_sizes: &'a [(u64, u64)],
455}
456
457/// These contain the file buffers and also buffers specific to a column
458#[derive(Clone, Copy, Debug)]
459pub struct ColumnBuffers<'a, 'b> {
460    pub file_buffers: FileBuffers<'a>,
461    pub positions_and_sizes: &'b [(u64, u64)],
462}
463
464/// These contain the file & column buffers and also buffers specific to a page
465#[derive(Clone, Copy, Debug)]
466pub struct PageBuffers<'a, 'b, 'c> {
467    pub column_buffers: ColumnBuffers<'a, 'b>,
468    pub positions_and_sizes: &'c [(u64, u64)],
469}
470
471/// The core decoder strategy handles all the various Arrow types
472#[derive(Debug)]
473pub struct CoreFieldDecoderStrategy {
474    pub validate_data: bool,
475    pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
476    pub cache_repetition_index: bool,
477}
478
479impl Default for CoreFieldDecoderStrategy {
480    fn default() -> Self {
481        Self {
482            validate_data: false,
483            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
484            cache_repetition_index: false,
485        }
486    }
487}
488
489impl CoreFieldDecoderStrategy {
490    /// Create a new strategy with cache_repetition_index enabled
491    pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
492        self.cache_repetition_index = cache_repetition_index;
493        self
494    }
495
496    /// This is just a sanity check to ensure there is no "wrapped encodings"
497    /// that haven't been handled.
498    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
499        let column_encoding = column_info
500            .encoding
501            .column_encoding
502            .as_ref()
503            .ok_or_else(|| {
504                Error::invalid_input(
505                    format!(
506                        "the column at index {} was missing a ColumnEncoding",
507                        column_info.index
508                    ),
509                    location!(),
510                )
511            })?;
512        if matches!(
513            column_encoding,
514            pb::column_encoding::ColumnEncoding::Values(_)
515        ) {
516            Ok(())
517        } else {
518            Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
519        }
520    }
521
522    fn is_primitive(data_type: &DataType) -> bool {
523        if data_type.is_primitive() {
524            true
525        } else {
526            match data_type {
527                // DataType::is_primitive doesn't consider these primitive but we do
528                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
529                DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
530                _ => false,
531            }
532        }
533    }
534
535    fn create_primitive_scheduler(
536        &self,
537        field: &Field,
538        column: &ColumnInfo,
539        buffers: FileBuffers,
540    ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
541        Self::ensure_values_encoded(column, &field.name)?;
542        // Primitive fields map to a single column
543        let column_buffers = ColumnBuffers {
544            file_buffers: buffers,
545            positions_and_sizes: &column.buffer_offsets_and_sizes,
546        };
547        Ok(Box::new(PrimitiveFieldScheduler::new(
548            column.index,
549            field.data_type(),
550            column.page_infos.clone(),
551            column_buffers,
552            self.validate_data,
553        )))
554    }
555
556    /// Helper method to verify the page encoding of a struct header column
557    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
558        Self::ensure_values_encoded(column_info, field_name)?;
559        if column_info.page_infos.len() != 1 {
560            return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
561        }
562        let encoding = &column_info.page_infos[0].encoding;
563        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
564            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
565            _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
566        }
567    }
568
569    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
570        let encoding = &column_info.page_infos[0].encoding;
571        matches!(
572            encoding.as_legacy().array_encoding.as_ref().unwrap(),
573            pb::array_encoding::ArrayEncoding::PackedStruct(_)
574        )
575    }
576
577    fn create_list_scheduler(
578        &self,
579        list_field: &Field,
580        column_infos: &mut ColumnInfoIter,
581        buffers: FileBuffers,
582        offsets_column: &ColumnInfo,
583    ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
584        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
585        let offsets_column_buffers = ColumnBuffers {
586            file_buffers: buffers,
587            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
588        };
589        let items_scheduler =
590            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
591
592        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
593            .page_infos
594            .iter()
595            .filter(|offsets_page| offsets_page.num_rows > 0)
596            .map(|offsets_page| {
597                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
598                    &offsets_page.encoding.as_legacy().array_encoding
599                {
600                    let inner = PageInfo {
601                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
602                        encoding: PageEncoding::Legacy(
603                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
604                        ),
605                        num_rows: offsets_page.num_rows,
606                        priority: 0,
607                    };
608                    (
609                        inner,
610                        OffsetPageInfo {
611                            offsets_in_page: offsets_page.num_rows,
612                            null_offset_adjustment: list_encoding.null_offset_adjustment,
613                            num_items_referenced_by_page: list_encoding.num_items,
614                        },
615                    )
616                } else {
617                    // TODO: Should probably return Err here
618                    panic!("Expected a list column");
619                }
620            })
621            .unzip();
622        let inner = Arc::new(PrimitiveFieldScheduler::new(
623            offsets_column.index,
624            DataType::UInt64,
625            Arc::from(inner_infos.into_boxed_slice()),
626            offsets_column_buffers,
627            self.validate_data,
628        )) as Arc<dyn crate::v2::decoder::FieldScheduler>;
629        let items_field = match list_field.data_type() {
630            DataType::List(inner) => inner,
631            DataType::LargeList(inner) => inner,
632            _ => unreachable!(),
633        };
634        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
635            DataType::Int32
636        } else {
637            DataType::Int64
638        };
639        Ok(Box::new(ListFieldScheduler::new(
640            inner,
641            items_scheduler.into(),
642            items_field,
643            offset_type,
644            null_offset_adjustments,
645        )))
646    }
647
648    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
649        if let column_encoding::ColumnEncoding::Blob(blob) =
650            column_info.encoding.column_encoding.as_ref().unwrap()
651        {
652            let mut column_info = column_info.clone();
653            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
654            Some(column_info)
655        } else {
656            None
657        }
658    }
659
660    fn create_structural_field_scheduler(
661        &self,
662        field: &Field,
663        column_infos: &mut ColumnInfoIter,
664    ) -> Result<Box<dyn StructuralFieldScheduler>> {
665        let data_type = field.data_type();
666        if Self::is_primitive(&data_type) {
667            let column_info = column_infos.expect_next()?;
668            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
669                column_info.as_ref(),
670                self.decompressor_strategy.as_ref(),
671                self.cache_repetition_index,
672            )?);
673
674            // advance to the next top level column
675            column_infos.next_top_level();
676
677            return Ok(scheduler);
678        }
679        match &data_type {
680            DataType::Struct(fields) => {
681                if field.is_packed_struct() {
682                    let column_info = column_infos.expect_next()?;
683                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
684                        column_info.as_ref(),
685                        self.decompressor_strategy.as_ref(),
686                        self.cache_repetition_index,
687                    )?);
688
689                    // advance to the next top level column
690                    column_infos.next_top_level();
691
692                    return Ok(scheduler);
693                }
694                let mut child_schedulers = Vec::with_capacity(field.children.len());
695                for field in field.children.iter() {
696                    let field_scheduler =
697                        self.create_structural_field_scheduler(field, column_infos)?;
698                    child_schedulers.push(field_scheduler);
699                }
700
701                let fields = fields.clone();
702                Ok(
703                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
704                        as Box<dyn StructuralFieldScheduler>,
705                )
706            }
707            DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => {
708                let column_info = column_infos.expect_next()?;
709                let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
710                    column_info.as_ref(),
711                    self.decompressor_strategy.as_ref(),
712                    self.cache_repetition_index,
713                )?);
714                column_infos.next_top_level();
715                Ok(scheduler)
716            }
717            DataType::List(_) | DataType::LargeList(_) => {
718                let child = field
719                    .children
720                    .first()
721                    .expect("List field must have a child");
722                let child_scheduler =
723                    self.create_structural_field_scheduler(child, column_infos)?;
724                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
725                    as Box<dyn StructuralFieldScheduler>)
726            }
727            _ => todo!(),
728        }
729    }
730
731    fn create_legacy_field_scheduler(
732        &self,
733        field: &Field,
734        column_infos: &mut ColumnInfoIter,
735        buffers: FileBuffers,
736    ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
737        let data_type = field.data_type();
738        if Self::is_primitive(&data_type) {
739            let column_info = column_infos.expect_next()?;
740            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
741            return Ok(scheduler);
742        } else if data_type.is_binary_like() {
743            let column_info = column_infos.next().unwrap().clone();
744            // Column is blob and user is asking for binary data
745            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
746                let desc_scheduler =
747                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
748                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
749                return Ok(blob_scheduler);
750            }
751            if let Some(page_info) = column_info.page_infos.first() {
752                if matches!(
753                    page_info.encoding.as_legacy(),
754                    pb::ArrayEncoding {
755                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
756                    }
757                ) {
758                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
759                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
760                    } else {
761                        DataType::LargeList(Arc::new(ArrowField::new(
762                            "item",
763                            DataType::UInt8,
764                            false,
765                        )))
766                    };
767                    let list_field = Field::try_from(ArrowField::new(
768                        field.name.clone(),
769                        list_type,
770                        field.nullable,
771                    ))
772                    .unwrap();
773                    let list_scheduler = self.create_list_scheduler(
774                        &list_field,
775                        column_infos,
776                        buffers,
777                        &column_info,
778                    )?;
779                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
780                        list_scheduler.into(),
781                        field.data_type(),
782                    ));
783                    return Ok(binary_scheduler);
784                } else {
785                    let scheduler =
786                        self.create_primitive_scheduler(field, &column_info, buffers)?;
787                    return Ok(scheduler);
788                }
789            } else {
790                return self.create_primitive_scheduler(field, &column_info, buffers);
791            }
792        }
793        match &data_type {
794            DataType::FixedSizeList(inner, _dimension) => {
795                // A fixed size list column could either be a physical or a logical decoder
796                // depending on the child data type.
797                if Self::is_primitive(inner.data_type()) {
798                    let primitive_col = column_infos.expect_next()?;
799                    let scheduler =
800                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
801                    Ok(scheduler)
802                } else {
803                    todo!()
804                }
805            }
806            DataType::Dictionary(_key_type, value_type) => {
807                if Self::is_primitive(value_type) || value_type.is_binary_like() {
808                    let primitive_col = column_infos.expect_next()?;
809                    let scheduler =
810                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
811                    Ok(scheduler)
812                } else {
813                    Err(Error::NotSupported {
814                        source: format!(
815                            "No way to decode into a dictionary field of type {}",
816                            value_type
817                        )
818                        .into(),
819                        location: location!(),
820                    })
821                }
822            }
823            DataType::List(_) | DataType::LargeList(_) => {
824                let offsets_column = column_infos.expect_next()?.clone();
825                column_infos.next_top_level();
826                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
827            }
828            DataType::Struct(fields) => {
829                let column_info = column_infos.expect_next()?;
830
831                // Column is blob and user is asking for descriptions
832                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
833                    // Can use primitive scheduler here since descriptions are always packed struct
834                    return self.create_primitive_scheduler(field, &blob_col, buffers);
835                }
836
837                if Self::check_packed_struct(column_info) {
838                    // use packed struct encoding
839                    self.create_primitive_scheduler(field, column_info, buffers)
840                } else {
841                    // use default struct encoding
842                    Self::check_simple_struct(column_info, &field.name).unwrap();
843                    let num_rows = column_info
844                        .page_infos
845                        .iter()
846                        .map(|page| page.num_rows)
847                        .sum();
848                    let mut child_schedulers = Vec::with_capacity(field.children.len());
849                    for field in &field.children {
850                        column_infos.next_top_level();
851                        let field_scheduler =
852                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
853                        child_schedulers.push(Arc::from(field_scheduler));
854                    }
855
856                    let fields = fields.clone();
857                    Ok(Box::new(SimpleStructScheduler::new(
858                        child_schedulers,
859                        fields,
860                        num_rows,
861                    )))
862                }
863            }
864            // TODO: Still need support for RLE
865            _ => todo!(),
866        }
867    }
868}
869
870/// Create's a dummy ColumnInfo for the root column
871fn root_column(num_rows: u64) -> ColumnInfo {
872    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
873    let final_page_num_rows = num_rows % (u32::MAX as u64);
874    let root_pages = (0..num_root_pages)
875        .map(|i| PageInfo {
876            num_rows: if i == num_root_pages - 1 {
877                final_page_num_rows
878            } else {
879                u64::MAX
880            },
881            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
882                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
883                    pb::SimpleStruct {},
884                )),
885            }),
886            priority: 0, // not used in legacy scheduler
887            buffer_offsets_and_sizes: Arc::new([]),
888        })
889        .collect::<Vec<_>>();
890    ColumnInfo {
891        buffer_offsets_and_sizes: Arc::new([]),
892        encoding: pb::ColumnEncoding {
893            column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
894        },
895        index: u32::MAX,
896        page_infos: Arc::from(root_pages),
897    }
898}
899
900pub enum RootDecoder {
901    Structural(StructuralStructDecoder),
902    Legacy(SimpleStructDecoder),
903}
904
905impl RootDecoder {
906    pub fn into_structural(self) -> StructuralStructDecoder {
907        match self {
908            Self::Structural(decoder) => decoder,
909            Self::Legacy(_) => panic!("Expected a structural decoder"),
910        }
911    }
912
913    pub fn into_legacy(self) -> SimpleStructDecoder {
914        match self {
915            Self::Legacy(decoder) => decoder,
916            Self::Structural(_) => panic!("Expected a legacy decoder"),
917        }
918    }
919}
920
921impl DecodeBatchScheduler {
922    /// Creates a new decode scheduler with the expected schema and the column
923    /// metadata of the file.
924    #[allow(clippy::too_many_arguments)]
925    pub async fn try_new<'a>(
926        schema: &'a Schema,
927        column_indices: &[u32],
928        column_infos: &[Arc<ColumnInfo>],
929        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
930        num_rows: u64,
931        _decoder_plugins: Arc<DecoderPlugins>,
932        io: Arc<dyn EncodingsIo>,
933        cache: Arc<LanceCache>,
934        filter: &FilterExpression,
935        cache_repetition_index: bool,
936    ) -> Result<Self> {
937        assert!(num_rows > 0);
938        let buffers = FileBuffers {
939            positions_and_sizes: file_buffer_positions_and_sizes,
940        };
941        let arrow_schema = ArrowSchema::from(schema);
942        let root_fields = arrow_schema.fields().clone();
943        let root_type = DataType::Struct(root_fields.clone());
944        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
945        // root_field.children and schema.fields should be identical at this point but the latter
946        // has field ids and the former does not.  This line restores that.
947        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
948        root_field.children.clone_from(&schema.fields);
949        root_field
950            .metadata
951            .insert("__lance_decoder_root".to_string(), "true".to_string());
952
953        if column_infos[0].is_structural() {
954            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
955
956            let strategy = CoreFieldDecoderStrategy::default()
957                .with_cache_repetition_index(cache_repetition_index);
958            let mut root_scheduler =
959                strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
960
961            let context = SchedulerContext::new(io, cache.clone());
962            root_scheduler.initialize(filter, &context).await?;
963
964            Ok(Self {
965                root_scheduler: RootScheduler::Structural(root_scheduler),
966                root_fields,
967                cache,
968            })
969        } else {
970            // The old encoding style expected a header column for structs and so we
971            // need a header column for the top-level struct
972            let mut columns = Vec::with_capacity(column_infos.len() + 1);
973            columns.push(Arc::new(root_column(num_rows)));
974            columns.extend(column_infos.iter().cloned());
975
976            let adjusted_column_indices = [0_u32]
977                .into_iter()
978                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
979                .collect::<Vec<_>>();
980            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
981            let strategy = CoreFieldDecoderStrategy::default()
982                .with_cache_repetition_index(cache_repetition_index);
983            let root_scheduler =
984                strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
985
986            let context = SchedulerContext::new(io, cache.clone());
987            root_scheduler.initialize(filter, &context).await?;
988
989            Ok(Self {
990                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
991                root_fields,
992                cache,
993            })
994        }
995    }
996
997    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
998    pub fn from_scheduler(
999        root_scheduler: Arc<dyn crate::v2::decoder::FieldScheduler>,
1000        root_fields: Fields,
1001        cache: Arc<LanceCache>,
1002    ) -> Self {
1003        Self {
1004            root_scheduler: RootScheduler::Legacy(root_scheduler),
1005            root_fields,
1006            cache,
1007        }
1008    }
1009
1010    fn do_schedule_ranges_structural(
1011        &mut self,
1012        ranges: &[Range<u64>],
1013        filter: &FilterExpression,
1014        io: Arc<dyn EncodingsIo>,
1015        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1016    ) {
1017        let root_scheduler = self.root_scheduler.as_structural();
1018        let mut context = SchedulerContext::new(io, self.cache.clone());
1019        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1020        if let Err(schedule_ranges_err) = maybe_root_job {
1021            schedule_action(Err(schedule_ranges_err));
1022            return;
1023        }
1024        let mut root_job = maybe_root_job.unwrap();
1025        let mut num_rows_scheduled = 0;
1026        loop {
1027            let maybe_next_scan_line = root_job.schedule_next(&mut context);
1028            if let Err(err) = maybe_next_scan_line {
1029                schedule_action(Err(err));
1030                return;
1031            }
1032            let next_scan_line = maybe_next_scan_line.unwrap();
1033            match next_scan_line {
1034                Some(next_scan_line) => {
1035                    trace!(
1036                        "Scheduled scan line of {} rows and {} decoders",
1037                        next_scan_line.rows_scheduled,
1038                        next_scan_line.decoders.len()
1039                    );
1040                    num_rows_scheduled += next_scan_line.rows_scheduled;
1041                    if !schedule_action(Ok(DecoderMessage {
1042                        scheduled_so_far: num_rows_scheduled,
1043                        decoders: next_scan_line.decoders,
1044                    })) {
1045                        // Decoder has disconnected
1046                        return;
1047                    }
1048                }
1049                None => return,
1050            }
1051        }
1052    }
1053
1054    fn do_schedule_ranges_legacy(
1055        &mut self,
1056        ranges: &[Range<u64>],
1057        filter: &FilterExpression,
1058        io: Arc<dyn EncodingsIo>,
1059        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1060        // If specified, this will be used as the top_level_row for all scheduling
1061        // tasks.  This is used by list scheduling to ensure all items scheduling
1062        // tasks are scheduled at the same top level row.
1063        priority: Option<Box<dyn PriorityRange>>,
1064    ) {
1065        let root_scheduler = self.root_scheduler.as_legacy();
1066        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1067        trace!(
1068            "Scheduling {} ranges across {}..{} ({} rows){}",
1069            ranges.len(),
1070            ranges.first().unwrap().start,
1071            ranges.last().unwrap().end,
1072            rows_requested,
1073            priority
1074                .as_ref()
1075                .map(|p| format!(" (priority={:?})", p))
1076                .unwrap_or_default()
1077        );
1078
1079        let mut context = SchedulerContext::new(io, self.cache.clone());
1080        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1081        if let Err(schedule_ranges_err) = maybe_root_job {
1082            schedule_action(Err(schedule_ranges_err));
1083            return;
1084        }
1085        let mut root_job = maybe_root_job.unwrap();
1086        let mut num_rows_scheduled = 0;
1087        let mut rows_to_schedule = root_job.num_rows();
1088        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1089        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1090        while rows_to_schedule > 0 {
1091            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1092            if let Err(schedule_next_err) = maybe_next_scan_line {
1093                schedule_action(Err(schedule_next_err));
1094                return;
1095            }
1096            let next_scan_line = maybe_next_scan_line.unwrap();
1097            priority.advance(next_scan_line.rows_scheduled);
1098            num_rows_scheduled += next_scan_line.rows_scheduled;
1099            rows_to_schedule -= next_scan_line.rows_scheduled;
1100            trace!(
1101                "Scheduled scan line of {} rows and {} decoders",
1102                next_scan_line.rows_scheduled,
1103                next_scan_line.decoders.len()
1104            );
1105            if !schedule_action(Ok(DecoderMessage {
1106                scheduled_so_far: num_rows_scheduled,
1107                decoders: next_scan_line.decoders,
1108            })) {
1109                // Decoder has disconnected
1110                return;
1111            }
1112
1113            trace!("Finished scheduling {} ranges", ranges.len());
1114        }
1115    }
1116
1117    fn do_schedule_ranges(
1118        &mut self,
1119        ranges: &[Range<u64>],
1120        filter: &FilterExpression,
1121        io: Arc<dyn EncodingsIo>,
1122        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1123        // If specified, this will be used as the top_level_row for all scheduling
1124        // tasks.  This is used by list scheduling to ensure all items scheduling
1125        // tasks are scheduled at the same top level row.
1126        priority: Option<Box<dyn PriorityRange>>,
1127    ) {
1128        match &self.root_scheduler {
1129            RootScheduler::Legacy(_) => {
1130                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1131            }
1132            RootScheduler::Structural(_) => {
1133                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1134            }
1135        }
1136    }
1137
1138    // This method is similar to schedule_ranges but instead of
1139    // sending the decoders to a channel it collects them all into a vector
1140    pub fn schedule_ranges_to_vec(
1141        &mut self,
1142        ranges: &[Range<u64>],
1143        filter: &FilterExpression,
1144        io: Arc<dyn EncodingsIo>,
1145        priority: Option<Box<dyn PriorityRange>>,
1146    ) -> Result<Vec<DecoderMessage>> {
1147        let mut decode_messages = Vec::new();
1148        self.do_schedule_ranges(
1149            ranges,
1150            filter,
1151            io,
1152            |msg| {
1153                decode_messages.push(msg);
1154                true
1155            },
1156            priority,
1157        );
1158        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1159    }
1160
1161    /// Schedules the load of multiple ranges of rows
1162    ///
1163    /// Ranges must be non-overlapping and in sorted order
1164    ///
1165    /// # Arguments
1166    ///
1167    /// * `ranges` - The ranges of rows to load
1168    /// * `sink` - A channel to send the decode tasks
1169    /// * `scheduler` An I/O scheduler to issue I/O requests
1170    #[instrument(skip_all)]
1171    pub fn schedule_ranges(
1172        &mut self,
1173        ranges: &[Range<u64>],
1174        filter: &FilterExpression,
1175        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1176        scheduler: Arc<dyn EncodingsIo>,
1177    ) {
1178        self.do_schedule_ranges(
1179            ranges,
1180            filter,
1181            scheduler,
1182            |msg| {
1183                match sink.send(msg) {
1184                    Ok(_) => true,
1185                    Err(SendError { .. }) => {
1186                        // The receiver has gone away.  We can't do anything about it
1187                        // so just ignore the error.
1188                        debug!(
1189                        "schedule_ranges aborting early since decoder appears to have been dropped"
1190                    );
1191                        false
1192                    }
1193                }
1194            },
1195            None,
1196        )
1197    }
1198
1199    /// Schedules the load of a range of rows
1200    ///
1201    /// # Arguments
1202    ///
1203    /// * `range` - The range of rows to load
1204    /// * `sink` - A channel to send the decode tasks
1205    /// * `scheduler` An I/O scheduler to issue I/O requests
1206    #[instrument(skip_all)]
1207    pub fn schedule_range(
1208        &mut self,
1209        range: Range<u64>,
1210        filter: &FilterExpression,
1211        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1212        scheduler: Arc<dyn EncodingsIo>,
1213    ) {
1214        self.schedule_ranges(&[range], filter, sink, scheduler)
1215    }
1216
1217    /// Schedules the load of selected rows
1218    ///
1219    /// # Arguments
1220    ///
1221    /// * `indices` - The row indices to load (these must be in ascending order!)
1222    /// * `sink` - A channel to send the decode tasks
1223    /// * `scheduler` An I/O scheduler to issue I/O requests
1224    pub fn schedule_take(
1225        &mut self,
1226        indices: &[u64],
1227        filter: &FilterExpression,
1228        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1229        scheduler: Arc<dyn EncodingsIo>,
1230    ) {
1231        debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1232        if indices.is_empty() {
1233            return;
1234        }
1235        trace!("Scheduling take of {} rows", indices.len());
1236        let ranges = Self::indices_to_ranges(indices);
1237        self.schedule_ranges(&ranges, filter, sink, scheduler)
1238    }
1239
1240    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1241    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1242        let mut ranges = Vec::new();
1243        let mut start = indices[0];
1244
1245        for window in indices.windows(2) {
1246            if window[1] != window[0] + 1 {
1247                ranges.push(start..window[0] + 1);
1248                start = window[1];
1249            }
1250        }
1251
1252        ranges.push(start..*indices.last().unwrap() + 1);
1253        ranges
1254    }
1255}
1256
1257pub struct ReadBatchTask {
1258    pub task: BoxFuture<'static, Result<RecordBatch>>,
1259    pub num_rows: u32,
1260}
1261
1262/// A stream that takes scheduled jobs and generates decode tasks from them.
1263pub struct BatchDecodeStream {
1264    context: DecoderContext,
1265    root_decoder: SimpleStructDecoder,
1266    rows_remaining: u64,
1267    rows_per_batch: u32,
1268    rows_scheduled: u64,
1269    rows_drained: u64,
1270    scheduler_exhausted: bool,
1271    emitted_batch_size_warning: Arc<Once>,
1272}
1273
1274impl BatchDecodeStream {
1275    /// Create a new instance of a batch decode stream
1276    ///
1277    /// # Arguments
1278    ///
1279    /// * `scheduled` - an incoming stream of decode tasks from a
1280    ///   [`crate::decode::DecodeBatchScheduler`]
1281    /// * `schema` - the schema of the data to create
1282    /// * `rows_per_batch` the number of rows to create before making a batch
1283    /// * `num_rows` the total number of rows scheduled
1284    /// * `num_columns` the total number of columns in the file
1285    pub fn new(
1286        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1287        rows_per_batch: u32,
1288        num_rows: u64,
1289        root_decoder: SimpleStructDecoder,
1290    ) -> Self {
1291        Self {
1292            context: DecoderContext::new(scheduled),
1293            root_decoder,
1294            rows_remaining: num_rows,
1295            rows_per_batch,
1296            rows_scheduled: 0,
1297            rows_drained: 0,
1298            scheduler_exhausted: false,
1299            emitted_batch_size_warning: Arc::new(Once::new()),
1300        }
1301    }
1302
1303    fn accept_decoder(&mut self, decoder: crate::v2::decoder::DecoderReady) -> Result<()> {
1304        if decoder.path.is_empty() {
1305            // The root decoder we can ignore
1306            Ok(())
1307        } else {
1308            self.root_decoder.accept_child(decoder)
1309        }
1310    }
1311
1312    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1313        if self.scheduler_exhausted {
1314            return Ok(self.rows_scheduled);
1315        }
1316        while self.rows_scheduled < scheduled_need {
1317            let next_message = self.context.source.recv().await;
1318            match next_message {
1319                Some(scan_line) => {
1320                    let scan_line = scan_line?;
1321                    self.rows_scheduled = scan_line.scheduled_so_far;
1322                    for message in scan_line.decoders {
1323                        self.accept_decoder(message.into_legacy())?;
1324                    }
1325                }
1326                None => {
1327                    // Schedule ended before we got all the data we expected.  This probably
1328                    // means some kind of pushdown filter was applied and we didn't load as
1329                    // much data as we thought we would.
1330                    self.scheduler_exhausted = true;
1331                    return Ok(self.rows_scheduled);
1332                }
1333            }
1334        }
1335        Ok(scheduled_need)
1336    }
1337
1338    #[instrument(level = "debug", skip_all)]
1339    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1340        trace!(
1341            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1342            self.rows_remaining,
1343            self.rows_drained,
1344            self.rows_scheduled,
1345        );
1346        if self.rows_remaining == 0 {
1347            return Ok(None);
1348        }
1349
1350        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1351        self.rows_remaining -= to_take;
1352
1353        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1354        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1355        if scheduled_need > 0 {
1356            let desired_scheduled = scheduled_need + self.rows_scheduled;
1357            trace!(
1358                "Draining from scheduler (desire at least {} scheduled rows)",
1359                desired_scheduled
1360            );
1361            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1362            if actually_scheduled < desired_scheduled {
1363                let under_scheduled = desired_scheduled - actually_scheduled;
1364                to_take -= under_scheduled;
1365            }
1366        }
1367
1368        if to_take == 0 {
1369            return Ok(None);
1370        }
1371
1372        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1373        let loaded_need = self.rows_drained + to_take - 1;
1374        trace!(
1375            "Waiting for I/O (desire at least {} fully loaded rows)",
1376            loaded_need
1377        );
1378        self.root_decoder.wait_for_loaded(loaded_need).await?;
1379
1380        let next_task = self.root_decoder.drain(to_take)?;
1381        self.rows_drained += to_take;
1382        Ok(Some(next_task))
1383    }
1384
1385    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1386        let stream = futures::stream::unfold(self, |mut slf| async move {
1387            let next_task = slf.next_batch_task().await;
1388            let next_task = next_task.transpose().map(|next_task| {
1389                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1390                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1391                let task = async move {
1392                    let next_task = next_task?;
1393                    next_task.into_batch(emitted_batch_size_warning)
1394                };
1395                (task, num_rows)
1396            });
1397            next_task.map(|(task, num_rows)| {
1398                // This should be true since batch size is u32
1399                debug_assert!(num_rows <= u32::MAX as u64);
1400                let next_task = ReadBatchTask {
1401                    task: task.boxed(),
1402                    num_rows: num_rows as u32,
1403                };
1404                (next_task, slf)
1405            })
1406        });
1407        stream.boxed()
1408    }
1409}
1410
1411// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1412// we can have a single implementation of the batch decode iterator
1413enum RootDecoderMessage {
1414    LoadedPage(LoadedPage),
1415    LegacyPage(crate::v2::decoder::DecoderReady),
1416}
1417trait RootDecoderType {
1418    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1419    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1420    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1421}
1422impl RootDecoderType for StructuralStructDecoder {
1423    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1424        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1425            unreachable!()
1426        };
1427        self.accept_page(loaded_page)
1428    }
1429    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1430        self.drain_batch_task(num_rows)
1431    }
1432    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1433        // Waiting happens elsewhere (not as part of the decoder)
1434        Ok(())
1435    }
1436}
1437impl RootDecoderType for SimpleStructDecoder {
1438    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1439        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1440            unreachable!()
1441        };
1442        self.accept_child(legacy_page)
1443    }
1444    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1445        self.drain(num_rows)
1446    }
1447    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1448        runtime.block_on(self.wait_for_loaded(loaded_need))
1449    }
1450}
1451
1452/// A blocking batch decoder that performs synchronous decoding
1453struct BatchDecodeIterator<T: RootDecoderType> {
1454    messages: VecDeque<Result<DecoderMessage>>,
1455    root_decoder: T,
1456    rows_remaining: u64,
1457    rows_per_batch: u32,
1458    rows_scheduled: u64,
1459    rows_drained: u64,
1460    emitted_batch_size_warning: Arc<Once>,
1461    // Note: this is not the runtime on which I/O happens.
1462    // That's always in the scheduler.  This is just a runtime we use to
1463    // sleep the current thread if I/O is unready
1464    wait_for_io_runtime: tokio::runtime::Runtime,
1465    schema: Arc<ArrowSchema>,
1466}
1467
1468impl<T: RootDecoderType> BatchDecodeIterator<T> {
1469    /// Create a new instance of a batch decode iterator
1470    pub fn new(
1471        messages: VecDeque<Result<DecoderMessage>>,
1472        rows_per_batch: u32,
1473        num_rows: u64,
1474        root_decoder: T,
1475        schema: Arc<ArrowSchema>,
1476    ) -> Self {
1477        Self {
1478            messages,
1479            root_decoder,
1480            rows_remaining: num_rows,
1481            rows_per_batch,
1482            rows_scheduled: 0,
1483            rows_drained: 0,
1484            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1485                .build()
1486                .unwrap(),
1487            emitted_batch_size_warning: Arc::new(Once::new()),
1488            schema,
1489        }
1490    }
1491
1492    /// Wait for a single page of data to finish loading
1493    ///
1494    /// If the data is not available this will perform a *blocking* wait (put
1495    /// the current thread to sleep)
1496    fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1497        match maybe_done(unloaded_page.0) {
1498            // Fast path, avoid all runtime shenanigans if the data is ready
1499            MaybeDone::Done(loaded_page) => loaded_page,
1500            // Slow path, we need to wait on I/O, enter the runtime
1501            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1502            MaybeDone::Gone => unreachable!(),
1503        }
1504    }
1505
1506    /// Waits for I/O until `scheduled_need` rows have been loaded
1507    ///
1508    /// Note that `scheduled_need` is cumulative.  E.g. this method
1509    /// should be called with 5, 10, 15 and not 5, 5, 5
1510    #[instrument(skip_all)]
1511    fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1512        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1513            let message = self.messages.pop_front().unwrap()?;
1514            self.rows_scheduled = message.scheduled_so_far;
1515            for decoder_message in message.decoders {
1516                match decoder_message {
1517                    MessageType::UnloadedPage(unloaded_page) => {
1518                        let loaded_page = self.wait_for_page(unloaded_page)?;
1519                        self.root_decoder
1520                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1521                    }
1522                    MessageType::DecoderReady(decoder_ready) => {
1523                        // The root decoder we can ignore
1524                        if !decoder_ready.path.is_empty() {
1525                            self.root_decoder
1526                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1527                        }
1528                    }
1529                }
1530            }
1531        }
1532
1533        let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1534
1535        self.root_decoder
1536            .wait(loaded_need, &self.wait_for_io_runtime)?;
1537        Ok(self.rows_scheduled)
1538    }
1539
1540    #[instrument(level = "debug", skip_all)]
1541    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1542        trace!(
1543            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1544            self.rows_remaining,
1545            self.rows_drained,
1546            self.rows_scheduled,
1547        );
1548        if self.rows_remaining == 0 {
1549            return Ok(None);
1550        }
1551
1552        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1553        self.rows_remaining -= to_take;
1554
1555        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1556        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1557        if scheduled_need > 0 {
1558            let desired_scheduled = scheduled_need + self.rows_scheduled;
1559            trace!(
1560                "Draining from scheduler (desire at least {} scheduled rows)",
1561                desired_scheduled
1562            );
1563            let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1564            if actually_scheduled < desired_scheduled {
1565                let under_scheduled = desired_scheduled - actually_scheduled;
1566                to_take -= under_scheduled;
1567            }
1568        }
1569
1570        if to_take == 0 {
1571            return Ok(None);
1572        }
1573
1574        let next_task = self.root_decoder.drain_batch(to_take)?;
1575
1576        self.rows_drained += to_take;
1577
1578        let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1579
1580        Ok(Some(batch))
1581    }
1582}
1583
1584impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1585    type Item = ArrowResult<RecordBatch>;
1586
1587    fn next(&mut self) -> Option<Self::Item> {
1588        self.next_batch_task()
1589            .transpose()
1590            .map(|r| r.map_err(ArrowError::from))
1591    }
1592}
1593
1594impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1595    fn schema(&self) -> Arc<ArrowSchema> {
1596        self.schema.clone()
1597    }
1598}
1599
1600/// A stream that takes scheduled jobs and generates decode tasks from them.
1601pub struct StructuralBatchDecodeStream {
1602    context: DecoderContext,
1603    root_decoder: StructuralStructDecoder,
1604    rows_remaining: u64,
1605    rows_per_batch: u32,
1606    rows_scheduled: u64,
1607    rows_drained: u64,
1608    scheduler_exhausted: bool,
1609    emitted_batch_size_warning: Arc<Once>,
1610}
1611
1612impl StructuralBatchDecodeStream {
1613    /// Create a new instance of a batch decode stream
1614    ///
1615    /// # Arguments
1616    ///
1617    /// * `scheduled` - an incoming stream of decode tasks from a
1618    ///   [`crate::decode::DecodeBatchScheduler`]
1619    /// * `schema` - the schema of the data to create
1620    /// * `rows_per_batch` the number of rows to create before making a batch
1621    /// * `num_rows` the total number of rows scheduled
1622    /// * `num_columns` the total number of columns in the file
1623    pub fn new(
1624        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1625        rows_per_batch: u32,
1626        num_rows: u64,
1627        root_decoder: StructuralStructDecoder,
1628    ) -> Self {
1629        Self {
1630            context: DecoderContext::new(scheduled),
1631            root_decoder,
1632            rows_remaining: num_rows,
1633            rows_per_batch,
1634            rows_scheduled: 0,
1635            rows_drained: 0,
1636            scheduler_exhausted: false,
1637            emitted_batch_size_warning: Arc::new(Once::new()),
1638        }
1639    }
1640
1641    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1642        if self.scheduler_exhausted {
1643            return Ok(self.rows_scheduled);
1644        }
1645        while self.rows_scheduled < scheduled_need {
1646            let next_message = self.context.source.recv().await;
1647            match next_message {
1648                Some(scan_line) => {
1649                    let scan_line = scan_line?;
1650                    self.rows_scheduled = scan_line.scheduled_so_far;
1651                    for message in scan_line.decoders {
1652                        let unloaded_page = message.into_structural();
1653                        let loaded_page = unloaded_page.0.await?;
1654                        self.root_decoder.accept_page(loaded_page)?;
1655                    }
1656                }
1657                None => {
1658                    // Schedule ended before we got all the data we expected.  This probably
1659                    // means some kind of pushdown filter was applied and we didn't load as
1660                    // much data as we thought we would.
1661                    self.scheduler_exhausted = true;
1662                    return Ok(self.rows_scheduled);
1663                }
1664            }
1665        }
1666        Ok(scheduled_need)
1667    }
1668
1669    #[instrument(level = "debug", skip_all)]
1670    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1671        trace!(
1672            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1673            self.rows_remaining,
1674            self.rows_drained,
1675            self.rows_scheduled,
1676        );
1677        if self.rows_remaining == 0 {
1678            return Ok(None);
1679        }
1680
1681        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1682        self.rows_remaining -= to_take;
1683
1684        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1685        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1686        if scheduled_need > 0 {
1687            let desired_scheduled = scheduled_need + self.rows_scheduled;
1688            trace!(
1689                "Draining from scheduler (desire at least {} scheduled rows)",
1690                desired_scheduled
1691            );
1692            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1693            if actually_scheduled < desired_scheduled {
1694                let under_scheduled = desired_scheduled - actually_scheduled;
1695                to_take -= under_scheduled;
1696            }
1697        }
1698
1699        if to_take == 0 {
1700            return Ok(None);
1701        }
1702
1703        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1704        self.rows_drained += to_take;
1705        Ok(Some(next_task))
1706    }
1707
1708    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1709        let stream = futures::stream::unfold(self, |mut slf| async move {
1710            let next_task = slf.next_batch_task().await;
1711            let next_task = next_task.transpose().map(|next_task| {
1712                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1713                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1714                let task = async move {
1715                    let next_task = next_task?;
1716                    next_task.into_batch(emitted_batch_size_warning)
1717                };
1718                (task, num_rows)
1719            });
1720            next_task.map(|(task, num_rows)| {
1721                // This should be true since batch size is u32
1722                debug_assert!(num_rows <= u32::MAX as u64);
1723                let next_task = ReadBatchTask {
1724                    task: task.boxed(),
1725                    num_rows: num_rows as u32,
1726                };
1727                (next_task, slf)
1728            })
1729        });
1730        stream.boxed()
1731    }
1732}
1733
1734#[derive(Debug)]
1735pub enum RequestedRows {
1736    Ranges(Vec<Range<u64>>),
1737    Indices(Vec<u64>),
1738}
1739
1740impl RequestedRows {
1741    pub fn num_rows(&self) -> u64 {
1742        match self {
1743            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1744            Self::Indices(indices) => indices.len() as u64,
1745        }
1746    }
1747}
1748
1749#[derive(Debug, Clone)]
1750pub struct SchedulerDecoderConfig {
1751    pub decoder_plugins: Arc<DecoderPlugins>,
1752    pub batch_size: u32,
1753    pub io: Arc<dyn EncodingsIo>,
1754    pub cache: Arc<LanceCache>,
1755    pub should_validate: bool,
1756    /// Whether to cache repetition indices for better performance
1757    pub cache_repetition_index: bool,
1758}
1759
1760fn check_scheduler_on_drop(
1761    stream: BoxStream<'static, ReadBatchTask>,
1762    scheduler_handle: tokio::task::JoinHandle<()>,
1763) -> BoxStream<'static, ReadBatchTask> {
1764    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
1765    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
1766    // when the stream finishes.
1767    let mut scheduler_handle = Some(scheduler_handle);
1768    let check_scheduler = stream::unfold((), move |_| {
1769        let handle = scheduler_handle.take();
1770        async move {
1771            if let Some(handle) = handle {
1772                handle.await.unwrap();
1773            }
1774            None
1775        }
1776    });
1777    stream.chain(check_scheduler).boxed()
1778}
1779
1780pub fn create_decode_stream(
1781    schema: &Schema,
1782    num_rows: u64,
1783    batch_size: u32,
1784    is_structural: bool,
1785    should_validate: bool,
1786    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1787) -> BoxStream<'static, ReadBatchTask> {
1788    if is_structural {
1789        let arrow_schema = ArrowSchema::from(schema);
1790        let structural_decoder = StructuralStructDecoder::new(
1791            arrow_schema.fields,
1792            should_validate,
1793            /*is_root=*/ true,
1794        );
1795        StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1796    } else {
1797        let arrow_schema = ArrowSchema::from(schema);
1798        let root_fields = arrow_schema.fields;
1799
1800        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1801        BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1802    }
1803}
1804
1805/// Creates a iterator that decodes a set of messages in a blocking fashion
1806///
1807/// See [`schedule_and_decode_blocking`] for more information.
1808pub fn create_decode_iterator(
1809    schema: &Schema,
1810    num_rows: u64,
1811    batch_size: u32,
1812    should_validate: bool,
1813    is_structural: bool,
1814    messages: VecDeque<Result<DecoderMessage>>,
1815) -> Box<dyn RecordBatchReader + Send + 'static> {
1816    let arrow_schema = Arc::new(ArrowSchema::from(schema));
1817    let root_fields = arrow_schema.fields.clone();
1818    if is_structural {
1819        let simple_struct_decoder =
1820            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true);
1821        Box::new(BatchDecodeIterator::new(
1822            messages,
1823            batch_size,
1824            num_rows,
1825            simple_struct_decoder,
1826            arrow_schema,
1827        ))
1828    } else {
1829        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1830        Box::new(BatchDecodeIterator::new(
1831            messages,
1832            batch_size,
1833            num_rows,
1834            root_decoder,
1835            arrow_schema,
1836        ))
1837    }
1838}
1839
1840fn create_scheduler_decoder(
1841    column_infos: Vec<Arc<ColumnInfo>>,
1842    requested_rows: RequestedRows,
1843    filter: FilterExpression,
1844    column_indices: Vec<u32>,
1845    target_schema: Arc<Schema>,
1846    config: SchedulerDecoderConfig,
1847) -> Result<BoxStream<'static, ReadBatchTask>> {
1848    let num_rows = requested_rows.num_rows();
1849
1850    let is_structural = column_infos[0].is_structural();
1851
1852    let (tx, rx) = mpsc::unbounded_channel();
1853
1854    let decode_stream = create_decode_stream(
1855        &target_schema,
1856        num_rows,
1857        config.batch_size,
1858        is_structural,
1859        config.should_validate,
1860        rx,
1861    );
1862
1863    let scheduler_handle = tokio::task::spawn(async move {
1864        let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1865            target_schema.as_ref(),
1866            &column_indices,
1867            &column_infos,
1868            &vec![],
1869            num_rows,
1870            config.decoder_plugins,
1871            config.io.clone(),
1872            config.cache,
1873            &filter,
1874            config.cache_repetition_index,
1875        )
1876        .await
1877        {
1878            Ok(scheduler) => scheduler,
1879            Err(e) => {
1880                let _ = tx.send(Err(e));
1881                return;
1882            }
1883        };
1884
1885        match requested_rows {
1886            RequestedRows::Ranges(ranges) => {
1887                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1888            }
1889            RequestedRows::Indices(indices) => {
1890                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1891            }
1892        }
1893    });
1894
1895    Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1896}
1897
1898/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
1899/// decode the scheduled data and returns the decoder as a stream of record batches.
1900///
1901/// This is a convenience function that creates both the scheduler and the decoder
1902/// which can be a little tricky to get right.
1903pub fn schedule_and_decode(
1904    column_infos: Vec<Arc<ColumnInfo>>,
1905    requested_rows: RequestedRows,
1906    filter: FilterExpression,
1907    column_indices: Vec<u32>,
1908    target_schema: Arc<Schema>,
1909    config: SchedulerDecoderConfig,
1910) -> BoxStream<'static, ReadBatchTask> {
1911    if requested_rows.num_rows() == 0 {
1912        return stream::empty().boxed();
1913    }
1914    // For convenience we really want this method to be a snchronous method where all
1915    // errors happen on the stream.  There is some async initialization that must happen
1916    // when creating a scheduler.  We wrap that all up in the very first task.
1917    match create_scheduler_decoder(
1918        column_infos,
1919        requested_rows,
1920        filter,
1921        column_indices,
1922        target_schema,
1923        config,
1924    ) {
1925        // If the initialization failed make it look like a failed task
1926        Ok(stream) => stream,
1927        Err(e) => stream::once(std::future::ready(ReadBatchTask {
1928            num_rows: 0,
1929            task: std::future::ready(Err(e)).boxed(),
1930        }))
1931        .boxed(),
1932    }
1933}
1934
1935pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
1936    tokio::runtime::Builder::new_current_thread()
1937        .build()
1938        .unwrap()
1939});
1940
1941/// Schedules and decodes the requested data in a blocking fashion
1942///
1943/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
1944/// and decodes it in the current thread.
1945///
1946/// This can be useful when the disk is fast (or the data is in memory) and the amount
1947/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
1948///
1949/// This should NOT be used for full scans.  Even if the data is in memory this function will
1950/// not parallelize the decode and will be slower than the async version.  Full scans typically
1951/// make relatively few IOPs and so the asynchronous overhead is much smaller.
1952///
1953/// This method will first completely run the scheduling process.  Then it will run the
1954/// decode process.
1955pub fn schedule_and_decode_blocking(
1956    column_infos: Vec<Arc<ColumnInfo>>,
1957    requested_rows: RequestedRows,
1958    filter: FilterExpression,
1959    column_indices: Vec<u32>,
1960    target_schema: Arc<Schema>,
1961    config: SchedulerDecoderConfig,
1962) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1963    if requested_rows.num_rows() == 0 {
1964        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
1965        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
1966    }
1967
1968    let num_rows = requested_rows.num_rows();
1969    let is_structural = column_infos[0].is_structural();
1970
1971    let (tx, mut rx) = mpsc::unbounded_channel();
1972
1973    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
1974    // runtime.
1975    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
1976        target_schema.as_ref(),
1977        &column_indices,
1978        &column_infos,
1979        &vec![],
1980        num_rows,
1981        config.decoder_plugins,
1982        config.io.clone(),
1983        config.cache,
1984        &filter,
1985        config.cache_repetition_index,
1986    ))?;
1987
1988    // Schedule the requested rows
1989    match requested_rows {
1990        RequestedRows::Ranges(ranges) => {
1991            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1992        }
1993        RequestedRows::Indices(indices) => {
1994            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1995        }
1996    }
1997
1998    // Drain the scheduler queue into a vec of decode messages
1999    let mut messages = Vec::new();
2000    while rx
2001        .recv_many(&mut messages, usize::MAX)
2002        .now_or_never()
2003        .unwrap()
2004        != 0
2005    {}
2006
2007    // Create a decoder to decode the messages
2008    let decode_iterator = create_decode_iterator(
2009        &target_schema,
2010        num_rows,
2011        config.batch_size,
2012        config.should_validate,
2013        is_structural,
2014        messages.into(),
2015    );
2016
2017    Ok(decode_iterator)
2018}
2019
2020/// A decoder for single-column encodings of primitive data (this includes fixed size
2021/// lists of primitive data)
2022///
2023/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2024///
2025/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2026/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2027/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2028/// the decode task for batch 0 and the decode task for batch 1.
2029///
2030/// See [`crate::decoder`] for more information
2031pub trait PrimitivePageDecoder: Send + Sync {
2032    /// Decode data into buffers
2033    ///
2034    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2035    /// such as decompressing from some compressed representation.
2036    ///
2037    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2038    /// portion only needs to be updated if the encoding has some concept of an "optional"
2039    /// buffer.
2040    ///
2041    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2042    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2043    ///
2044    /// Binary decodings have two output buffers (one for values, one for offsets)
2045    ///
2046    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2047    /// fixed size strings into variable length strings going from one input buffer to multiple output
2048    /// buffers.
2049    ///
2050    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2051    /// generally end at one of these structures.  However, intermediate structures may exist which
2052    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2053    /// with buffers that have bits-per-value that is not a multiple of 8.
2054    ///
2055    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2056    /// type (order matters) and encodings that aim to decode into arrow types should respect
2057    /// this layout.
2058    /// # Arguments
2059    ///
2060    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2061    /// * `num_rows` - how many rows to decode
2062    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2063    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2064}
2065
2066/// A scheduler for single-column encodings of primitive data
2067///
2068/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2069///
2070/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2071/// be shared in follow-up I/O tasks.
2072///
2073/// See [`crate::decoder`] for more information
2074pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2075    /// Schedules a batch of I/O to load the data needed for the requested ranges
2076    ///
2077    /// Returns a future that will yield a decoder once the data has been loaded
2078    ///
2079    /// # Arguments
2080    ///
2081    /// * `range` - the range of row offsets (relative to start of page) requested
2082    ///   these must be ordered and must not overlap
2083    /// * `scheduler` - a scheduler to submit the I/O request to
2084    /// * `top_level_row` - the row offset of the top level field currently being
2085    ///   scheduled.  This can be used to assign priority to I/O requests
2086    fn schedule_ranges(
2087        &self,
2088        ranges: &[Range<u64>],
2089        scheduler: &Arc<dyn EncodingsIo>,
2090        top_level_row: u64,
2091    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2092}
2093
2094/// A trait to control the priority of I/O
2095pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2096    fn advance(&mut self, num_rows: u64);
2097    fn current_priority(&self) -> u64;
2098    fn box_clone(&self) -> Box<dyn PriorityRange>;
2099}
2100
2101/// A simple priority scheme for top-level fields with no parent
2102/// repetition
2103#[derive(Debug)]
2104pub struct SimplePriorityRange {
2105    priority: u64,
2106}
2107
2108impl SimplePriorityRange {
2109    fn new(priority: u64) -> Self {
2110        Self { priority }
2111    }
2112}
2113
2114impl PriorityRange for SimplePriorityRange {
2115    fn advance(&mut self, num_rows: u64) {
2116        self.priority += num_rows;
2117    }
2118
2119    fn current_priority(&self) -> u64 {
2120        self.priority
2121    }
2122
2123    fn box_clone(&self) -> Box<dyn PriorityRange> {
2124        Box::new(Self {
2125            priority: self.priority,
2126        })
2127    }
2128}
2129
2130/// Determining the priority of a list request is tricky.  We want
2131/// the priority to be the top-level row.  So if we have a
2132/// list<list<int>> and each outer list has 10 rows and each inner
2133/// list has 5 rows then the priority of the 100th item is 1 because
2134/// it is the 5th item in the 10th item of the *second* row.
2135///
2136/// This structure allows us to keep track of this complicated priority
2137/// relationship.
2138///
2139/// There's a fair amount of bookkeeping involved here.
2140///
2141/// A better approach (using repetition levels) is coming in the future.
2142pub struct ListPriorityRange {
2143    base: Box<dyn PriorityRange>,
2144    offsets: Arc<[u64]>,
2145    cur_index_into_offsets: usize,
2146    cur_position: u64,
2147}
2148
2149impl ListPriorityRange {
2150    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2151        Self {
2152            base,
2153            offsets,
2154            cur_index_into_offsets: 0,
2155            cur_position: 0,
2156        }
2157    }
2158}
2159
2160impl std::fmt::Debug for ListPriorityRange {
2161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2162        f.debug_struct("ListPriorityRange")
2163            .field("base", &self.base)
2164            .field("offsets.len()", &self.offsets.len())
2165            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2166            .field("cur_position", &self.cur_position)
2167            .finish()
2168    }
2169}
2170
2171impl PriorityRange for ListPriorityRange {
2172    fn advance(&mut self, num_rows: u64) {
2173        // We've scheduled X items.  Now walk through the offsets to
2174        // determine how many rows we've scheduled.
2175        self.cur_position += num_rows;
2176        let mut idx_into_offsets = self.cur_index_into_offsets;
2177        while idx_into_offsets + 1 < self.offsets.len()
2178            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2179        {
2180            idx_into_offsets += 1;
2181        }
2182        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2183        self.cur_index_into_offsets = idx_into_offsets;
2184        self.base.advance(base_rows_advanced as u64);
2185    }
2186
2187    fn current_priority(&self) -> u64 {
2188        self.base.current_priority()
2189    }
2190
2191    fn box_clone(&self) -> Box<dyn PriorityRange> {
2192        Box::new(Self {
2193            base: self.base.box_clone(),
2194            offsets: self.offsets.clone(),
2195            cur_index_into_offsets: self.cur_index_into_offsets,
2196            cur_position: self.cur_position,
2197        })
2198    }
2199}
2200
2201/// Contains the context for a scheduler
2202pub struct SchedulerContext {
2203    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2204    io: Arc<dyn EncodingsIo>,
2205    cache: Arc<LanceCache>,
2206    name: String,
2207    path: Vec<u32>,
2208    path_names: Vec<String>,
2209}
2210
2211pub struct ScopedSchedulerContext<'a> {
2212    pub context: &'a mut SchedulerContext,
2213}
2214
2215impl<'a> ScopedSchedulerContext<'a> {
2216    pub fn pop(self) -> &'a mut SchedulerContext {
2217        self.context.pop();
2218        self.context
2219    }
2220}
2221
2222impl SchedulerContext {
2223    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2224        Self {
2225            io,
2226            cache,
2227            recv: None,
2228            name: "".to_string(),
2229            path: Vec::new(),
2230            path_names: Vec::new(),
2231        }
2232    }
2233
2234    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2235        &self.io
2236    }
2237
2238    pub fn cache(&self) -> &Arc<LanceCache> {
2239        &self.cache
2240    }
2241
2242    pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2243        self.path.push(index);
2244        self.path_names.push(name.to_string());
2245        ScopedSchedulerContext { context: self }
2246    }
2247
2248    pub fn pop(&mut self) {
2249        self.path.pop();
2250        self.path_names.pop();
2251    }
2252
2253    pub fn path_name(&self) -> String {
2254        let path = self.path_names.join("/");
2255        if self.recv.is_some() {
2256            format!("TEMP({}){}", self.name, path)
2257        } else {
2258            format!("ROOT{}", path)
2259        }
2260    }
2261
2262    pub fn current_path(&self) -> VecDeque<u32> {
2263        VecDeque::from_iter(self.path.iter().copied())
2264    }
2265
2266    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2267    pub fn locate_decoder(
2268        &mut self,
2269        decoder: Box<dyn crate::v2::decoder::LogicalPageDecoder>,
2270    ) -> crate::v2::decoder::DecoderReady {
2271        trace!(
2272            "Scheduling decoder of type {:?} for {:?}",
2273            decoder.data_type(),
2274            self.path,
2275        );
2276        crate::v2::decoder::DecoderReady {
2277            decoder,
2278            path: self.current_path(),
2279        }
2280    }
2281}
2282
2283pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2284
2285impl std::fmt::Debug for UnloadedPage {
2286    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2287        f.debug_struct("UnloadedPage").finish()
2288    }
2289}
2290
2291#[derive(Debug)]
2292pub struct ScheduledScanLine {
2293    pub rows_scheduled: u64,
2294    pub decoders: Vec<MessageType>,
2295}
2296
2297pub trait StructuralSchedulingJob: std::fmt::Debug {
2298    fn schedule_next(
2299        &mut self,
2300        context: &mut SchedulerContext,
2301    ) -> Result<Option<ScheduledScanLine>>;
2302}
2303
2304/// A filter expression to apply to the data
2305///
2306/// The core decoders do not currently take advantage of filtering in
2307/// any way.  In order to maintain the abstraction we represent filters
2308/// as an arbitrary byte sequence.
2309///
2310/// We recommend that encodings use Substrait for filters.
2311pub struct FilterExpression(pub Bytes);
2312
2313impl FilterExpression {
2314    /// Create a filter expression that does not filter any data
2315    ///
2316    /// This is currently represented by an empty byte array.  Encoders
2317    /// that are "filter aware" should make sure they handle this case.
2318    pub fn no_filter() -> Self {
2319        Self(Bytes::new())
2320    }
2321
2322    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2323    pub fn is_noop(&self) -> bool {
2324        self.0.is_empty()
2325    }
2326}
2327
2328pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2329    fn initialize<'a>(
2330        &'a mut self,
2331        filter: &'a FilterExpression,
2332        context: &'a SchedulerContext,
2333    ) -> BoxFuture<'a, Result<()>>;
2334    fn schedule_ranges<'a>(
2335        &'a self,
2336        ranges: &[Range<u64>],
2337        filter: &FilterExpression,
2338    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2339}
2340
2341/// A trait for tasks that decode data into an Arrow array
2342pub trait DecodeArrayTask: Send {
2343    /// Decodes the data into an Arrow array
2344    fn decode(self: Box<Self>) -> Result<ArrayRef>;
2345}
2346
2347impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2348    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2349        StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2350    }
2351}
2352
2353/// A task to decode data into an Arrow record batch
2354///
2355/// It has a child `task` which decodes a struct array with no nulls.
2356/// This is then converted into a record batch.
2357pub struct NextDecodeTask {
2358    /// The decode task itself
2359    pub task: Box<dyn DecodeArrayTask>,
2360    /// The number of rows that will be created
2361    pub num_rows: u64,
2362}
2363
2364impl NextDecodeTask {
2365    // Run the task and produce a record batch
2366    //
2367    // If the batch is very large this function will log a warning message
2368    // suggesting the user try a smaller batch size.
2369    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2370    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2371        let struct_arr = self.task.decode();
2372        match struct_arr {
2373            Ok(struct_arr) => {
2374                let batch = RecordBatch::from(struct_arr.as_struct());
2375                let size_bytes = batch.get_array_memory_size() as u64;
2376                if size_bytes > BATCH_SIZE_BYTES_WARNING {
2377                    emitted_batch_size_warning.call_once(|| {
2378                        let size_mb = size_bytes / 1024 / 1024;
2379                        debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2380                    });
2381                }
2382                Ok(batch)
2383            }
2384            Err(e) => {
2385                let e = Error::Internal {
2386                    message: format!("Error decoding batch: {}", e),
2387                    location: location!(),
2388                };
2389                Err(e)
2390            }
2391        }
2392    }
2393}
2394
2395// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2396// share some code paths between the two.  Decoders can safely unwrap into whatever
2397// style they expect since a file will be either all-2.0 or all-2.1
2398#[derive(Debug)]
2399pub enum MessageType {
2400    // The older v2.0 scheduler/decoder used a scheme where the message was the
2401    // decoder itself.  The messages were not sent in priority order and the decoder
2402    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2403    // complexity.
2404    DecoderReady(crate::v2::decoder::DecoderReady),
2405    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2406    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2407    // the decoder does not have to worry about waiting for I/O.
2408    UnloadedPage(UnloadedPage),
2409}
2410
2411impl MessageType {
2412    pub fn into_legacy(self) -> crate::v2::decoder::DecoderReady {
2413        match self {
2414            Self::DecoderReady(decoder) => decoder,
2415            Self::UnloadedPage(_) => {
2416                panic!("Expected DecoderReady but got UnloadedPage")
2417            }
2418        }
2419    }
2420
2421    pub fn into_structural(self) -> UnloadedPage {
2422        match self {
2423            Self::UnloadedPage(unloaded) => unloaded,
2424            Self::DecoderReady(_) => {
2425                panic!("Expected UnloadedPage but got DecoderReady")
2426            }
2427        }
2428    }
2429}
2430
2431pub struct DecoderMessage {
2432    pub scheduled_so_far: u64,
2433    pub decoders: Vec<MessageType>,
2434}
2435
2436pub struct DecoderContext {
2437    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2438}
2439
2440impl DecoderContext {
2441    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2442        Self { source }
2443    }
2444}
2445
2446pub struct DecodedPage {
2447    pub data: DataBlock,
2448    pub repdef: RepDefUnraveler,
2449}
2450
2451pub trait DecodePageTask: Send + std::fmt::Debug {
2452    /// Decodes the data into an Arrow array
2453    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2454}
2455
2456pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2457    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2458    fn num_rows(&self) -> u64;
2459}
2460
2461#[derive(Debug)]
2462pub struct LoadedPage {
2463    // The decoder that is ready to be decoded
2464    pub decoder: Box<dyn StructuralPageDecoder>,
2465    // The path to the decoder, the first value is the column index
2466    // following values, if present, are nested child indices
2467    //
2468    // For example, a path of [1, 1, 0] would mean to grab the second
2469    // column, then the second child, and then the first child.
2470    //
2471    // It could represent x in the following schema:
2472    //
2473    // score: float64
2474    // points: struct
2475    //   color: string
2476    //   location: struct
2477    //     x: float64
2478    //
2479    // Currently, only struct decoders have "children" although other
2480    // decoders may at some point as well.  List children are only
2481    // handled through indirect I/O at the moment and so they don't
2482    // need to be represented (yet)
2483    pub path: VecDeque<u32>,
2484    pub page_index: usize,
2485}
2486
2487pub struct DecodedArray {
2488    pub array: ArrayRef,
2489    pub repdef: CompositeRepDefUnraveler,
2490}
2491
2492pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2493    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2494}
2495
2496pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2497    /// Add a newly scheduled child decoder
2498    ///
2499    /// The default implementation does not expect children and returns
2500    /// an error.
2501    fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2502    /// Creates a task to decode `num_rows` of data into an array
2503    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2504    /// The data type of the decoded data
2505    fn data_type(&self) -> &DataType;
2506}
2507
2508#[derive(Debug, Default)]
2509pub struct DecoderPlugins {}
2510
2511/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2512pub async fn decode_batch(
2513    batch: &EncodedBatch,
2514    filter: &FilterExpression,
2515    decoder_plugins: Arc<DecoderPlugins>,
2516    should_validate: bool,
2517    version: LanceFileVersion,
2518    cache: Option<Arc<LanceCache>>,
2519) -> Result<RecordBatch> {
2520    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2521    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2522    // polled twice.
2523
2524    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2525    let cache = cache.unwrap_or_else(|| Arc::new(LanceCache::with_capacity(128 * 1024 * 1024)));
2526    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2527        batch.schema.as_ref(),
2528        &batch.top_level_columns,
2529        &batch.page_table,
2530        &vec![],
2531        batch.num_rows,
2532        decoder_plugins,
2533        io_scheduler.clone(),
2534        cache,
2535        filter,
2536        false, // cache_repetition_index - default to false for decode_batch
2537    )
2538    .await?;
2539    let (tx, rx) = unbounded_channel();
2540    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2541    let is_structural = version >= LanceFileVersion::V2_1;
2542    let mut decode_stream = create_decode_stream(
2543        &batch.schema,
2544        batch.num_rows,
2545        batch.num_rows as u32,
2546        is_structural,
2547        should_validate,
2548        rx,
2549    );
2550    decode_stream.next().await.unwrap().task.await
2551}
2552
2553#[cfg(test)]
2554// test coalesce indices to ranges
2555mod tests {
2556    use super::*;
2557
2558    #[test]
2559    fn test_coalesce_indices_to_ranges_with_single_index() {
2560        let indices = vec![1];
2561        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2562        assert_eq!(ranges, vec![1..2]);
2563    }
2564
2565    #[test]
2566    fn test_coalesce_indices_to_ranges() {
2567        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2568        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2569        assert_eq!(ranges, vec![1..10]);
2570    }
2571
2572    #[test]
2573    fn test_coalesce_indices_to_ranges_with_gaps() {
2574        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2575        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2576        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2577    }
2578}