lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into [`self::FieldScheduler`] and [`self::PageScheduler`].
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into [`self::PhysicalPageDecoder`] and
27//! [`self::LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.git add --all
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type List<UInt32>
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::{instrument, Instrument};
236
237use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
238use crate::data::DataBlock;
239use crate::encoder::EncodedBatch;
240use crate::encodings::logical::list::StructuralListScheduler;
241use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
242use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
243use crate::format::pb::{self, column_encoding};
244use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
245use crate::v2::decoder::LogicalPageDecoder;
246use crate::v2::encodings::logical::list::OffsetPageInfo;
247use crate::v2::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
248use crate::v2::encodings::logical::{
249    binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
250    primitive::PrimitiveFieldScheduler,
251};
252use crate::version::LanceFileVersion;
253use crate::{BufferScheduler, EncodingsIo};
254
255// If users are getting batches over 10MiB large then it's time to reduce the batch size
256const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
257
258/// Top-level encoding message for a page.  Wraps both the
259/// legacy pb::ArrayEncoding and the newer pb::PageLayout
260///
261/// A file should only use one or the other and never both.
262/// 2.0 decoders can always assume this is pb::ArrayEncoding
263/// and 2.1+ decoders can always assume this is pb::PageLayout
264#[derive(Debug)]
265pub enum PageEncoding {
266    Legacy(pb::ArrayEncoding),
267    Structural(pb::PageLayout),
268}
269
270impl PageEncoding {
271    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
272        match self {
273            Self::Legacy(enc) => enc,
274            Self::Structural(_) => panic!("Expected a legacy encoding"),
275        }
276    }
277
278    pub fn as_structural(&self) -> &pb::PageLayout {
279        match self {
280            Self::Structural(enc) => enc,
281            Self::Legacy(_) => panic!("Expected a structural encoding"),
282        }
283    }
284
285    pub fn is_structural(&self) -> bool {
286        matches!(self, Self::Structural(_))
287    }
288}
289
290/// Metadata describing a page in a file
291///
292/// This is typically created by reading the metadata section of a Lance file
293#[derive(Debug)]
294pub struct PageInfo {
295    /// The number of rows in the page
296    pub num_rows: u64,
297    /// The priority (top level row number) of the page
298    ///
299    /// This is only set in 2.1 files and will be 0 for 2.0 files
300    pub priority: u64,
301    /// The encoding that explains the buffers in the page
302    pub encoding: PageEncoding,
303    /// The offsets and sizes of the buffers in the file
304    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
305}
306
307/// Metadata describing a column in a file
308///
309/// This is typically created by reading the metadata section of a Lance file
310#[derive(Debug, Clone)]
311pub struct ColumnInfo {
312    /// The index of the column in the file
313    pub index: u32,
314    /// The metadata for each page in the column
315    pub page_infos: Arc<[PageInfo]>,
316    /// File positions and their sizes of the column-level buffers
317    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
318    pub encoding: pb::ColumnEncoding,
319}
320
321impl ColumnInfo {
322    /// Create a new instance
323    pub fn new(
324        index: u32,
325        page_infos: Arc<[PageInfo]>,
326        buffer_offsets_and_sizes: Vec<(u64, u64)>,
327        encoding: pb::ColumnEncoding,
328    ) -> Self {
329        Self {
330            index,
331            page_infos,
332            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
333            encoding,
334        }
335    }
336
337    pub fn is_structural(&self) -> bool {
338        self.page_infos
339            // Can just look at the first since all should be the same
340            .first()
341            .map(|page| page.encoding.is_structural())
342            .unwrap_or(false)
343    }
344}
345
346enum RootScheduler {
347    Structural(Box<dyn StructuralFieldScheduler>),
348    Legacy(Arc<dyn crate::v2::decoder::FieldScheduler>),
349}
350
351impl RootScheduler {
352    fn as_legacy(&self) -> &Arc<dyn crate::v2::decoder::FieldScheduler> {
353        match self {
354            Self::Structural(_) => panic!("Expected a legacy scheduler"),
355            Self::Legacy(s) => s,
356        }
357    }
358
359    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
360        match self {
361            Self::Structural(s) => s.as_ref(),
362            Self::Legacy(_) => panic!("Expected a structural scheduler"),
363        }
364    }
365}
366
367/// The scheduler for decoding batches
368///
369/// Lance decoding is done in two steps, scheduling, and decoding.  The
370/// scheduling tends to be lightweight and should quickly figure what data
371/// is needed from the disk issue the appropriate I/O requests.  A decode task is
372/// created to eventually decode the data (once it is loaded) and scheduling
373/// moves on to scheduling the next page.
374///
375/// Meanwhile, it's expected that a decode stream will be setup to run at the
376/// same time.  Decode tasks take the data that is loaded and turn it into
377/// Arrow arrays.
378///
379/// This approach allows us to keep our I/O parallelism and CPU parallelism
380/// completely separate since those are often two very different values.
381///
382/// Backpressure should be achieved via the I/O service.  Requests that are
383/// issued will pile up if the decode stream is not polling quickly enough.
384/// The [`crate::EncodingsIo::submit_request`] function should return a pending
385/// future once there are too many I/O requests in flight.
386///
387/// TODO: Implement backpressure
388pub struct DecodeBatchScheduler {
389    root_scheduler: RootScheduler,
390    pub root_fields: Fields,
391    cache: Arc<LanceCache>,
392}
393
394pub struct ColumnInfoIter<'a> {
395    column_infos: Vec<Arc<ColumnInfo>>,
396    column_indices: &'a [u32],
397    column_info_pos: usize,
398    column_indices_pos: usize,
399}
400
401impl<'a> ColumnInfoIter<'a> {
402    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
403        let initial_pos = column_indices[0] as usize;
404        Self {
405            column_infos,
406            column_indices,
407            column_info_pos: initial_pos,
408            column_indices_pos: 0,
409        }
410    }
411
412    pub fn peek(&self) -> &Arc<ColumnInfo> {
413        &self.column_infos[self.column_info_pos]
414    }
415
416    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
417        let column_info = self.column_infos[self.column_info_pos].clone();
418        let transformed = transform(column_info);
419        self.column_infos[self.column_info_pos] = transformed;
420    }
421
422    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
423        self.next().ok_or_else(|| {
424            Error::invalid_input(
425                "there were more fields in the schema than provided column indices / infos",
426                location!(),
427            )
428        })
429    }
430
431    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
432        if self.column_info_pos < self.column_infos.len() {
433            let info = &self.column_infos[self.column_info_pos];
434            self.column_info_pos += 1;
435            Some(info)
436        } else {
437            None
438        }
439    }
440
441    pub(crate) fn next_top_level(&mut self) {
442        self.column_indices_pos += 1;
443        if self.column_indices_pos < self.column_indices.len() {
444            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
445        } else {
446            self.column_info_pos = self.column_infos.len();
447        }
448    }
449}
450
451/// These contain the file buffers shared across the entire file
452#[derive(Clone, Copy, Debug)]
453pub struct FileBuffers<'a> {
454    pub positions_and_sizes: &'a [(u64, u64)],
455}
456
457/// These contain the file buffers and also buffers specific to a column
458#[derive(Clone, Copy, Debug)]
459pub struct ColumnBuffers<'a, 'b> {
460    pub file_buffers: FileBuffers<'a>,
461    pub positions_and_sizes: &'b [(u64, u64)],
462}
463
464/// These contain the file & column buffers and also buffers specific to a page
465#[derive(Clone, Copy, Debug)]
466pub struct PageBuffers<'a, 'b, 'c> {
467    pub column_buffers: ColumnBuffers<'a, 'b>,
468    pub positions_and_sizes: &'c [(u64, u64)],
469}
470
471/// The core decoder strategy handles all the various Arrow types
472#[derive(Debug)]
473pub struct CoreFieldDecoderStrategy {
474    pub validate_data: bool,
475    pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
476}
477
478impl Default for CoreFieldDecoderStrategy {
479    fn default() -> Self {
480        Self {
481            validate_data: false,
482            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
483        }
484    }
485}
486
487impl CoreFieldDecoderStrategy {
488    /// This is just a sanity check to ensure there is no "wrapped encodings"
489    /// that haven't been handled.
490    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
491        let column_encoding = column_info
492            .encoding
493            .column_encoding
494            .as_ref()
495            .ok_or_else(|| {
496                Error::invalid_input(
497                    format!(
498                        "the column at index {} was missing a ColumnEncoding",
499                        column_info.index
500                    ),
501                    location!(),
502                )
503            })?;
504        if matches!(
505            column_encoding,
506            pb::column_encoding::ColumnEncoding::Values(_)
507        ) {
508            Ok(())
509        } else {
510            Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
511        }
512    }
513
514    fn is_primitive(data_type: &DataType) -> bool {
515        if data_type.is_primitive() {
516            true
517        } else {
518            match data_type {
519                // DataType::is_primitive doesn't consider these primitive but we do
520                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
521                DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
522                _ => false,
523            }
524        }
525    }
526
527    fn create_primitive_scheduler(
528        &self,
529        field: &Field,
530        column: &ColumnInfo,
531        buffers: FileBuffers,
532    ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
533        Self::ensure_values_encoded(column, &field.name)?;
534        // Primitive fields map to a single column
535        let column_buffers = ColumnBuffers {
536            file_buffers: buffers,
537            positions_and_sizes: &column.buffer_offsets_and_sizes,
538        };
539        Ok(Box::new(PrimitiveFieldScheduler::new(
540            column.index,
541            field.data_type(),
542            column.page_infos.clone(),
543            column_buffers,
544            self.validate_data,
545        )))
546    }
547
548    /// Helper method to verify the page encoding of a struct header column
549    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
550        Self::ensure_values_encoded(column_info, field_name)?;
551        if column_info.page_infos.len() != 1 {
552            return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
553        }
554        let encoding = &column_info.page_infos[0].encoding;
555        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
556            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
557            _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
558        }
559    }
560
561    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
562        let encoding = &column_info.page_infos[0].encoding;
563        matches!(
564            encoding.as_legacy().array_encoding.as_ref().unwrap(),
565            pb::array_encoding::ArrayEncoding::PackedStruct(_)
566        )
567    }
568
569    fn create_list_scheduler(
570        &self,
571        list_field: &Field,
572        column_infos: &mut ColumnInfoIter,
573        buffers: FileBuffers,
574        offsets_column: &ColumnInfo,
575    ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
576        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
577        let offsets_column_buffers = ColumnBuffers {
578            file_buffers: buffers,
579            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
580        };
581        let items_scheduler =
582            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
583
584        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
585            .page_infos
586            .iter()
587            .filter(|offsets_page| offsets_page.num_rows > 0)
588            .map(|offsets_page| {
589                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
590                    &offsets_page.encoding.as_legacy().array_encoding
591                {
592                    let inner = PageInfo {
593                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
594                        encoding: PageEncoding::Legacy(
595                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
596                        ),
597                        num_rows: offsets_page.num_rows,
598                        priority: 0,
599                    };
600                    (
601                        inner,
602                        OffsetPageInfo {
603                            offsets_in_page: offsets_page.num_rows,
604                            null_offset_adjustment: list_encoding.null_offset_adjustment,
605                            num_items_referenced_by_page: list_encoding.num_items,
606                        },
607                    )
608                } else {
609                    // TODO: Should probably return Err here
610                    panic!("Expected a list column");
611                }
612            })
613            .unzip();
614        let inner = Arc::new(PrimitiveFieldScheduler::new(
615            offsets_column.index,
616            DataType::UInt64,
617            Arc::from(inner_infos.into_boxed_slice()),
618            offsets_column_buffers,
619            self.validate_data,
620        )) as Arc<dyn crate::v2::decoder::FieldScheduler>;
621        let items_field = match list_field.data_type() {
622            DataType::List(inner) => inner,
623            DataType::LargeList(inner) => inner,
624            _ => unreachable!(),
625        };
626        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
627            DataType::Int32
628        } else {
629            DataType::Int64
630        };
631        Ok(Box::new(ListFieldScheduler::new(
632            inner,
633            items_scheduler.into(),
634            items_field,
635            offset_type,
636            null_offset_adjustments,
637        )))
638    }
639
640    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
641        if let column_encoding::ColumnEncoding::Blob(blob) =
642            column_info.encoding.column_encoding.as_ref().unwrap()
643        {
644            let mut column_info = column_info.clone();
645            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
646            Some(column_info)
647        } else {
648            None
649        }
650    }
651
652    fn create_structural_field_scheduler(
653        &self,
654        field: &Field,
655        column_infos: &mut ColumnInfoIter,
656    ) -> Result<Box<dyn StructuralFieldScheduler>> {
657        let data_type = field.data_type();
658        if Self::is_primitive(&data_type) {
659            let column_info = column_infos.expect_next()?;
660            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
661                column_info.as_ref(),
662                self.decompressor_strategy.as_ref(),
663            )?);
664
665            // advance to the next top level column
666            column_infos.next_top_level();
667
668            return Ok(scheduler);
669        }
670        match &data_type {
671            DataType::Struct(fields) => {
672                if field.is_packed_struct() {
673                    let column_info = column_infos.expect_next()?;
674                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
675                        column_info.as_ref(),
676                        self.decompressor_strategy.as_ref(),
677                    )?);
678
679                    // advance to the next top level column
680                    column_infos.next_top_level();
681
682                    return Ok(scheduler);
683                }
684                let mut child_schedulers = Vec::with_capacity(field.children.len());
685                for field in field.children.iter() {
686                    let field_scheduler =
687                        self.create_structural_field_scheduler(field, column_infos)?;
688                    child_schedulers.push(field_scheduler);
689                }
690
691                let fields = fields.clone();
692                Ok(
693                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
694                        as Box<dyn StructuralFieldScheduler>,
695                )
696            }
697            DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => {
698                let column_info = column_infos.expect_next()?;
699                let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
700                    column_info.as_ref(),
701                    self.decompressor_strategy.as_ref(),
702                )?);
703                column_infos.next_top_level();
704                Ok(scheduler)
705            }
706            DataType::List(_) | DataType::LargeList(_) => {
707                let child = field
708                    .children
709                    .first()
710                    .expect("List field must have a child");
711                let child_scheduler =
712                    self.create_structural_field_scheduler(child, column_infos)?;
713                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
714                    as Box<dyn StructuralFieldScheduler>)
715            }
716            _ => todo!(),
717        }
718    }
719
720    fn create_legacy_field_scheduler(
721        &self,
722        field: &Field,
723        column_infos: &mut ColumnInfoIter,
724        buffers: FileBuffers,
725    ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
726        let data_type = field.data_type();
727        if Self::is_primitive(&data_type) {
728            let column_info = column_infos.expect_next()?;
729            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
730            return Ok(scheduler);
731        } else if data_type.is_binary_like() {
732            let column_info = column_infos.next().unwrap().clone();
733            // Column is blob and user is asking for binary data
734            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
735                let desc_scheduler =
736                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
737                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
738                return Ok(blob_scheduler);
739            }
740            if let Some(page_info) = column_info.page_infos.first() {
741                if matches!(
742                    page_info.encoding.as_legacy(),
743                    pb::ArrayEncoding {
744                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
745                    }
746                ) {
747                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
748                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
749                    } else {
750                        DataType::LargeList(Arc::new(ArrowField::new(
751                            "item",
752                            DataType::UInt8,
753                            false,
754                        )))
755                    };
756                    let list_field = Field::try_from(ArrowField::new(
757                        field.name.clone(),
758                        list_type,
759                        field.nullable,
760                    ))
761                    .unwrap();
762                    let list_scheduler = self.create_list_scheduler(
763                        &list_field,
764                        column_infos,
765                        buffers,
766                        &column_info,
767                    )?;
768                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
769                        list_scheduler.into(),
770                        field.data_type(),
771                    ));
772                    return Ok(binary_scheduler);
773                } else {
774                    let scheduler =
775                        self.create_primitive_scheduler(field, &column_info, buffers)?;
776                    return Ok(scheduler);
777                }
778            } else {
779                return self.create_primitive_scheduler(field, &column_info, buffers);
780            }
781        }
782        match &data_type {
783            DataType::FixedSizeList(inner, _dimension) => {
784                // A fixed size list column could either be a physical or a logical decoder
785                // depending on the child data type.
786                if Self::is_primitive(inner.data_type()) {
787                    let primitive_col = column_infos.expect_next()?;
788                    let scheduler =
789                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
790                    Ok(scheduler)
791                } else {
792                    todo!()
793                }
794            }
795            DataType::Dictionary(_key_type, value_type) => {
796                if Self::is_primitive(value_type) || value_type.is_binary_like() {
797                    let primitive_col = column_infos.expect_next()?;
798                    let scheduler =
799                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
800                    Ok(scheduler)
801                } else {
802                    Err(Error::NotSupported {
803                        source: format!(
804                            "No way to decode into a dictionary field of type {}",
805                            value_type
806                        )
807                        .into(),
808                        location: location!(),
809                    })
810                }
811            }
812            DataType::List(_) | DataType::LargeList(_) => {
813                let offsets_column = column_infos.expect_next()?.clone();
814                column_infos.next_top_level();
815                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
816            }
817            DataType::Struct(fields) => {
818                let column_info = column_infos.expect_next()?;
819
820                // Column is blob and user is asking for descriptions
821                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
822                    // Can use primitive scheduler here since descriptions are always packed struct
823                    return self.create_primitive_scheduler(field, &blob_col, buffers);
824                }
825
826                if Self::check_packed_struct(column_info) {
827                    // use packed struct encoding
828                    self.create_primitive_scheduler(field, column_info, buffers)
829                } else {
830                    // use default struct encoding
831                    Self::check_simple_struct(column_info, &field.name).unwrap();
832                    let num_rows = column_info
833                        .page_infos
834                        .iter()
835                        .map(|page| page.num_rows)
836                        .sum();
837                    let mut child_schedulers = Vec::with_capacity(field.children.len());
838                    for field in &field.children {
839                        column_infos.next_top_level();
840                        let field_scheduler =
841                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
842                        child_schedulers.push(Arc::from(field_scheduler));
843                    }
844
845                    let fields = fields.clone();
846                    Ok(Box::new(SimpleStructScheduler::new(
847                        child_schedulers,
848                        fields,
849                        num_rows,
850                    )))
851                }
852            }
853            // TODO: Still need support for RLE
854            _ => todo!(),
855        }
856    }
857}
858
859/// Create's a dummy ColumnInfo for the root column
860fn root_column(num_rows: u64) -> ColumnInfo {
861    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
862    let final_page_num_rows = num_rows % (u32::MAX as u64);
863    let root_pages = (0..num_root_pages)
864        .map(|i| PageInfo {
865            num_rows: if i == num_root_pages - 1 {
866                final_page_num_rows
867            } else {
868                u64::MAX
869            },
870            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
871                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
872                    pb::SimpleStruct {},
873                )),
874            }),
875            priority: 0, // not used in legacy scheduler
876            buffer_offsets_and_sizes: Arc::new([]),
877        })
878        .collect::<Vec<_>>();
879    ColumnInfo {
880        buffer_offsets_and_sizes: Arc::new([]),
881        encoding: pb::ColumnEncoding {
882            column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
883        },
884        index: u32::MAX,
885        page_infos: Arc::from(root_pages),
886    }
887}
888
889pub enum RootDecoder {
890    Structural(StructuralStructDecoder),
891    Legacy(SimpleStructDecoder),
892}
893
894impl RootDecoder {
895    pub fn into_structural(self) -> StructuralStructDecoder {
896        match self {
897            Self::Structural(decoder) => decoder,
898            Self::Legacy(_) => panic!("Expected a structural decoder"),
899        }
900    }
901
902    pub fn into_legacy(self) -> SimpleStructDecoder {
903        match self {
904            Self::Legacy(decoder) => decoder,
905            Self::Structural(_) => panic!("Expected a legacy decoder"),
906        }
907    }
908}
909
910impl DecodeBatchScheduler {
911    /// Creates a new decode scheduler with the expected schema and the column
912    /// metadata of the file.
913    #[allow(clippy::too_many_arguments)]
914    pub async fn try_new<'a>(
915        schema: &'a Schema,
916        column_indices: &[u32],
917        column_infos: &[Arc<ColumnInfo>],
918        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
919        num_rows: u64,
920        _decoder_plugins: Arc<DecoderPlugins>,
921        io: Arc<dyn EncodingsIo>,
922        cache: Arc<LanceCache>,
923        filter: &FilterExpression,
924    ) -> Result<Self> {
925        assert!(num_rows > 0);
926        let buffers = FileBuffers {
927            positions_and_sizes: file_buffer_positions_and_sizes,
928        };
929        let arrow_schema = ArrowSchema::from(schema);
930        let root_fields = arrow_schema.fields().clone();
931        let root_type = DataType::Struct(root_fields.clone());
932        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
933        // root_field.children and schema.fields should be identical at this point but the latter
934        // has field ids and the former does not.  This line restores that.
935        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
936        root_field.children.clone_from(&schema.fields);
937        root_field
938            .metadata
939            .insert("__lance_decoder_root".to_string(), "true".to_string());
940
941        if column_infos[0].is_structural() {
942            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
943
944            let mut root_scheduler = CoreFieldDecoderStrategy::default()
945                .create_structural_field_scheduler(&root_field, &mut column_iter)?;
946
947            let context = SchedulerContext::new(io, cache.clone());
948            root_scheduler.initialize(filter, &context).await?;
949
950            Ok(Self {
951                root_scheduler: RootScheduler::Structural(root_scheduler),
952                root_fields,
953                cache,
954            })
955        } else {
956            // The old encoding style expected a header column for structs and so we
957            // need a header column for the top-level struct
958            let mut columns = Vec::with_capacity(column_infos.len() + 1);
959            columns.push(Arc::new(root_column(num_rows)));
960            columns.extend(column_infos.iter().cloned());
961
962            let adjusted_column_indices = [0_u32]
963                .into_iter()
964                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
965                .collect::<Vec<_>>();
966            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
967            let root_scheduler = CoreFieldDecoderStrategy::default()
968                .create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
969
970            let context = SchedulerContext::new(io, cache.clone());
971            root_scheduler.initialize(filter, &context).await?;
972
973            Ok(Self {
974                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
975                root_fields,
976                cache,
977            })
978        }
979    }
980
981    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
982    pub fn from_scheduler(
983        root_scheduler: Arc<dyn crate::v2::decoder::FieldScheduler>,
984        root_fields: Fields,
985        cache: Arc<LanceCache>,
986    ) -> Self {
987        Self {
988            root_scheduler: RootScheduler::Legacy(root_scheduler),
989            root_fields,
990            cache,
991        }
992    }
993
994    fn do_schedule_ranges_structural(
995        &mut self,
996        ranges: &[Range<u64>],
997        filter: &FilterExpression,
998        io: Arc<dyn EncodingsIo>,
999        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1000    ) {
1001        let root_scheduler = self.root_scheduler.as_structural();
1002        let mut context = SchedulerContext::new(io, self.cache.clone());
1003        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1004        if let Err(schedule_ranges_err) = maybe_root_job {
1005            schedule_action(Err(schedule_ranges_err));
1006            return;
1007        }
1008        let mut root_job = maybe_root_job.unwrap();
1009        let mut num_rows_scheduled = 0;
1010        loop {
1011            let maybe_next_scan_line = root_job.schedule_next(&mut context);
1012            if let Err(err) = maybe_next_scan_line {
1013                schedule_action(Err(err));
1014                return;
1015            }
1016            let next_scan_line = maybe_next_scan_line.unwrap();
1017            match next_scan_line {
1018                Some(next_scan_line) => {
1019                    trace!(
1020                        "Scheduled scan line of {} rows and {} decoders",
1021                        next_scan_line.rows_scheduled,
1022                        next_scan_line.decoders.len()
1023                    );
1024                    num_rows_scheduled += next_scan_line.rows_scheduled;
1025                    if !schedule_action(Ok(DecoderMessage {
1026                        scheduled_so_far: num_rows_scheduled,
1027                        decoders: next_scan_line.decoders,
1028                    })) {
1029                        // Decoder has disconnected
1030                        return;
1031                    }
1032                }
1033                None => return,
1034            }
1035        }
1036    }
1037
1038    fn do_schedule_ranges_legacy(
1039        &mut self,
1040        ranges: &[Range<u64>],
1041        filter: &FilterExpression,
1042        io: Arc<dyn EncodingsIo>,
1043        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1044        // If specified, this will be used as the top_level_row for all scheduling
1045        // tasks.  This is used by list scheduling to ensure all items scheduling
1046        // tasks are scheduled at the same top level row.
1047        priority: Option<Box<dyn PriorityRange>>,
1048    ) {
1049        let root_scheduler = self.root_scheduler.as_legacy();
1050        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1051        trace!(
1052            "Scheduling {} ranges across {}..{} ({} rows){}",
1053            ranges.len(),
1054            ranges.first().unwrap().start,
1055            ranges.last().unwrap().end,
1056            rows_requested,
1057            priority
1058                .as_ref()
1059                .map(|p| format!(" (priority={:?})", p))
1060                .unwrap_or_default()
1061        );
1062
1063        let mut context = SchedulerContext::new(io, self.cache.clone());
1064        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1065        if let Err(schedule_ranges_err) = maybe_root_job {
1066            schedule_action(Err(schedule_ranges_err));
1067            return;
1068        }
1069        let mut root_job = maybe_root_job.unwrap();
1070        let mut num_rows_scheduled = 0;
1071        let mut rows_to_schedule = root_job.num_rows();
1072        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1073        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1074        while rows_to_schedule > 0 {
1075            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1076            if let Err(schedule_next_err) = maybe_next_scan_line {
1077                schedule_action(Err(schedule_next_err));
1078                return;
1079            }
1080            let next_scan_line = maybe_next_scan_line.unwrap();
1081            priority.advance(next_scan_line.rows_scheduled);
1082            num_rows_scheduled += next_scan_line.rows_scheduled;
1083            rows_to_schedule -= next_scan_line.rows_scheduled;
1084            trace!(
1085                "Scheduled scan line of {} rows and {} decoders",
1086                next_scan_line.rows_scheduled,
1087                next_scan_line.decoders.len()
1088            );
1089            if !schedule_action(Ok(DecoderMessage {
1090                scheduled_so_far: num_rows_scheduled,
1091                decoders: next_scan_line.decoders,
1092            })) {
1093                // Decoder has disconnected
1094                return;
1095            }
1096
1097            trace!("Finished scheduling {} ranges", ranges.len());
1098        }
1099    }
1100
1101    fn do_schedule_ranges(
1102        &mut self,
1103        ranges: &[Range<u64>],
1104        filter: &FilterExpression,
1105        io: Arc<dyn EncodingsIo>,
1106        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1107        // If specified, this will be used as the top_level_row for all scheduling
1108        // tasks.  This is used by list scheduling to ensure all items scheduling
1109        // tasks are scheduled at the same top level row.
1110        priority: Option<Box<dyn PriorityRange>>,
1111    ) {
1112        match &self.root_scheduler {
1113            RootScheduler::Legacy(_) => {
1114                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1115            }
1116            RootScheduler::Structural(_) => {
1117                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1118            }
1119        }
1120    }
1121
1122    // This method is similar to schedule_ranges but instead of
1123    // sending the decoders to a channel it collects them all into a vector
1124    pub fn schedule_ranges_to_vec(
1125        &mut self,
1126        ranges: &[Range<u64>],
1127        filter: &FilterExpression,
1128        io: Arc<dyn EncodingsIo>,
1129        priority: Option<Box<dyn PriorityRange>>,
1130    ) -> Result<Vec<DecoderMessage>> {
1131        let mut decode_messages = Vec::new();
1132        self.do_schedule_ranges(
1133            ranges,
1134            filter,
1135            io,
1136            |msg| {
1137                decode_messages.push(msg);
1138                true
1139            },
1140            priority,
1141        );
1142        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1143    }
1144
1145    /// Schedules the load of multiple ranges of rows
1146    ///
1147    /// Ranges must be non-overlapping and in sorted order
1148    ///
1149    /// # Arguments
1150    ///
1151    /// * `ranges` - The ranges of rows to load
1152    /// * `sink` - A channel to send the decode tasks
1153    /// * `scheduler` An I/O scheduler to issue I/O requests
1154    #[instrument(skip_all)]
1155    pub fn schedule_ranges(
1156        &mut self,
1157        ranges: &[Range<u64>],
1158        filter: &FilterExpression,
1159        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1160        scheduler: Arc<dyn EncodingsIo>,
1161    ) {
1162        self.do_schedule_ranges(
1163            ranges,
1164            filter,
1165            scheduler,
1166            |msg| {
1167                match sink.send(msg) {
1168                    Ok(_) => true,
1169                    Err(SendError { .. }) => {
1170                        // The receiver has gone away.  We can't do anything about it
1171                        // so just ignore the error.
1172                        debug!(
1173                        "schedule_ranges aborting early since decoder appears to have been dropped"
1174                    );
1175                        false
1176                    }
1177                }
1178            },
1179            None,
1180        )
1181    }
1182
1183    /// Schedules the load of a range of rows
1184    ///
1185    /// # Arguments
1186    ///
1187    /// * `range` - The range of rows to load
1188    /// * `sink` - A channel to send the decode tasks
1189    /// * `scheduler` An I/O scheduler to issue I/O requests
1190    #[instrument(skip_all)]
1191    pub fn schedule_range(
1192        &mut self,
1193        range: Range<u64>,
1194        filter: &FilterExpression,
1195        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1196        scheduler: Arc<dyn EncodingsIo>,
1197    ) {
1198        self.schedule_ranges(&[range], filter, sink, scheduler)
1199    }
1200
1201    /// Schedules the load of selected rows
1202    ///
1203    /// # Arguments
1204    ///
1205    /// * `indices` - The row indices to load (these must be in ascending order!)
1206    /// * `sink` - A channel to send the decode tasks
1207    /// * `scheduler` An I/O scheduler to issue I/O requests
1208    pub fn schedule_take(
1209        &mut self,
1210        indices: &[u64],
1211        filter: &FilterExpression,
1212        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1213        scheduler: Arc<dyn EncodingsIo>,
1214    ) {
1215        debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1216        if indices.is_empty() {
1217            return;
1218        }
1219        trace!("Scheduling take of {} rows", indices.len());
1220        let ranges = Self::indices_to_ranges(indices);
1221        self.schedule_ranges(&ranges, filter, sink, scheduler)
1222    }
1223
1224    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1225    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1226        let mut ranges = Vec::new();
1227        let mut start = indices[0];
1228
1229        for window in indices.windows(2) {
1230            if window[1] != window[0] + 1 {
1231                ranges.push(start..window[0] + 1);
1232                start = window[1];
1233            }
1234        }
1235
1236        ranges.push(start..*indices.last().unwrap() + 1);
1237        ranges
1238    }
1239}
1240
1241pub struct ReadBatchTask {
1242    pub task: BoxFuture<'static, Result<RecordBatch>>,
1243    pub num_rows: u32,
1244}
1245
1246/// A stream that takes scheduled jobs and generates decode tasks from them.
1247pub struct BatchDecodeStream {
1248    context: DecoderContext,
1249    root_decoder: SimpleStructDecoder,
1250    rows_remaining: u64,
1251    rows_per_batch: u32,
1252    rows_scheduled: u64,
1253    rows_drained: u64,
1254    scheduler_exhausted: bool,
1255    emitted_batch_size_warning: Arc<Once>,
1256}
1257
1258impl BatchDecodeStream {
1259    /// Create a new instance of a batch decode stream
1260    ///
1261    /// # Arguments
1262    ///
1263    /// * `scheduled` - an incoming stream of decode tasks from a
1264    ///   [`crate::decode::DecodeBatchScheduler`]
1265    /// * `schema` - the schema of the data to create
1266    /// * `rows_per_batch` the number of rows to create before making a batch
1267    /// * `num_rows` the total number of rows scheduled
1268    /// * `num_columns` the total number of columns in the file
1269    pub fn new(
1270        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1271        rows_per_batch: u32,
1272        num_rows: u64,
1273        root_decoder: SimpleStructDecoder,
1274    ) -> Self {
1275        Self {
1276            context: DecoderContext::new(scheduled),
1277            root_decoder,
1278            rows_remaining: num_rows,
1279            rows_per_batch,
1280            rows_scheduled: 0,
1281            rows_drained: 0,
1282            scheduler_exhausted: false,
1283            emitted_batch_size_warning: Arc::new(Once::new()),
1284        }
1285    }
1286
1287    fn accept_decoder(&mut self, decoder: crate::v2::decoder::DecoderReady) -> Result<()> {
1288        if decoder.path.is_empty() {
1289            // The root decoder we can ignore
1290            Ok(())
1291        } else {
1292            self.root_decoder.accept_child(decoder)
1293        }
1294    }
1295
1296    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1297        if self.scheduler_exhausted {
1298            return Ok(self.rows_scheduled);
1299        }
1300        while self.rows_scheduled < scheduled_need {
1301            let next_message = self.context.source.recv().await;
1302            match next_message {
1303                Some(scan_line) => {
1304                    let scan_line = scan_line?;
1305                    self.rows_scheduled = scan_line.scheduled_so_far;
1306                    for message in scan_line.decoders {
1307                        self.accept_decoder(message.into_legacy())?;
1308                    }
1309                }
1310                None => {
1311                    // Schedule ended before we got all the data we expected.  This probably
1312                    // means some kind of pushdown filter was applied and we didn't load as
1313                    // much data as we thought we would.
1314                    self.scheduler_exhausted = true;
1315                    return Ok(self.rows_scheduled);
1316                }
1317            }
1318        }
1319        Ok(scheduled_need)
1320    }
1321
1322    #[instrument(level = "debug", skip_all)]
1323    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1324        trace!(
1325            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1326            self.rows_remaining,
1327            self.rows_drained,
1328            self.rows_scheduled,
1329        );
1330        if self.rows_remaining == 0 {
1331            return Ok(None);
1332        }
1333
1334        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1335        self.rows_remaining -= to_take;
1336
1337        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1338        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1339        if scheduled_need > 0 {
1340            let desired_scheduled = scheduled_need + self.rows_scheduled;
1341            trace!(
1342                "Draining from scheduler (desire at least {} scheduled rows)",
1343                desired_scheduled
1344            );
1345            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1346            if actually_scheduled < desired_scheduled {
1347                let under_scheduled = desired_scheduled - actually_scheduled;
1348                to_take -= under_scheduled;
1349            }
1350        }
1351
1352        if to_take == 0 {
1353            return Ok(None);
1354        }
1355
1356        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1357        let loaded_need = self.rows_drained + to_take - 1;
1358        trace!(
1359            "Waiting for I/O (desire at least {} fully loaded rows)",
1360            loaded_need
1361        );
1362        self.root_decoder.wait_for_loaded(loaded_need).await?;
1363
1364        let next_task = self.root_decoder.drain(to_take)?;
1365        self.rows_drained += to_take;
1366        Ok(Some(next_task))
1367    }
1368
1369    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1370        let stream = futures::stream::unfold(self, |mut slf| async move {
1371            let next_task = slf.next_batch_task().await;
1372            let next_task = next_task.transpose().map(|next_task| {
1373                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1374                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1375                let task = tokio::spawn(
1376                    (async move {
1377                        let next_task = next_task?;
1378                        next_task.into_batch(emitted_batch_size_warning)
1379                    })
1380                    .in_current_span(),
1381                );
1382                (task, num_rows)
1383            });
1384            next_task.map(|(task, num_rows)| {
1385                let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1386                // This should be true since batch size is u32
1387                debug_assert!(num_rows <= u32::MAX as u64);
1388                let next_task = ReadBatchTask {
1389                    task,
1390                    num_rows: num_rows as u32,
1391                };
1392                (next_task, slf)
1393            })
1394        });
1395        stream.boxed()
1396    }
1397}
1398
1399// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1400// we can have a single implementation of the batch decode iterator
1401enum RootDecoderMessage {
1402    LoadedPage(LoadedPage),
1403    LegacyPage(crate::v2::decoder::DecoderReady),
1404}
1405trait RootDecoderType {
1406    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1407    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1408    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1409}
1410impl RootDecoderType for StructuralStructDecoder {
1411    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1412        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1413            unreachable!()
1414        };
1415        self.accept_page(loaded_page)
1416    }
1417    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1418        self.drain_batch_task(num_rows)
1419    }
1420    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1421        // Waiting happens elsewhere (not as part of the decoder)
1422        Ok(())
1423    }
1424}
1425impl RootDecoderType for SimpleStructDecoder {
1426    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1427        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1428            unreachable!()
1429        };
1430        self.accept_child(legacy_page)
1431    }
1432    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1433        self.drain(num_rows)
1434    }
1435    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1436        runtime.block_on(self.wait_for_loaded(loaded_need))
1437    }
1438}
1439
1440/// A blocking batch decoder that performs synchronous decoding
1441struct BatchDecodeIterator<T: RootDecoderType> {
1442    messages: VecDeque<Result<DecoderMessage>>,
1443    root_decoder: T,
1444    rows_remaining: u64,
1445    rows_per_batch: u32,
1446    rows_scheduled: u64,
1447    rows_drained: u64,
1448    emitted_batch_size_warning: Arc<Once>,
1449    // Note: this is not the runtime on which I/O happens.
1450    // That's always in the scheduler.  This is just a runtime we use to
1451    // sleep the current thread if I/O is unready
1452    wait_for_io_runtime: tokio::runtime::Runtime,
1453    schema: Arc<ArrowSchema>,
1454}
1455
1456impl<T: RootDecoderType> BatchDecodeIterator<T> {
1457    /// Create a new instance of a batch decode iterator
1458    pub fn new(
1459        messages: VecDeque<Result<DecoderMessage>>,
1460        rows_per_batch: u32,
1461        num_rows: u64,
1462        root_decoder: T,
1463        schema: Arc<ArrowSchema>,
1464    ) -> Self {
1465        Self {
1466            messages,
1467            root_decoder,
1468            rows_remaining: num_rows,
1469            rows_per_batch,
1470            rows_scheduled: 0,
1471            rows_drained: 0,
1472            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1473                .build()
1474                .unwrap(),
1475            emitted_batch_size_warning: Arc::new(Once::new()),
1476            schema,
1477        }
1478    }
1479
1480    /// Wait for a single page of data to finish loading
1481    ///
1482    /// If the data is not available this will perform a *blocking* wait (put
1483    /// the current thread to sleep)
1484    fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1485        match maybe_done(unloaded_page.0) {
1486            // Fast path, avoid all runtime shenanigans if the data is ready
1487            MaybeDone::Done(loaded_page) => loaded_page,
1488            // Slow path, we need to wait on I/O, enter the runtime
1489            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1490            MaybeDone::Gone => unreachable!(),
1491        }
1492    }
1493
1494    /// Waits for I/O until `scheduled_need` rows have been loaded
1495    ///
1496    /// Note that `scheduled_need` is cumulative.  E.g. this method
1497    /// should be called with 5, 10, 15 and not 5, 5, 5
1498    #[instrument(skip_all)]
1499    fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1500        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1501            let message = self.messages.pop_front().unwrap()?;
1502            self.rows_scheduled = message.scheduled_so_far;
1503            for decoder_message in message.decoders {
1504                match decoder_message {
1505                    MessageType::UnloadedPage(unloaded_page) => {
1506                        let loaded_page = self.wait_for_page(unloaded_page)?;
1507                        self.root_decoder
1508                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1509                    }
1510                    MessageType::DecoderReady(decoder_ready) => {
1511                        // The root decoder we can ignore
1512                        if !decoder_ready.path.is_empty() {
1513                            self.root_decoder
1514                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1515                        }
1516                    }
1517                }
1518            }
1519        }
1520
1521        let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1522
1523        self.root_decoder
1524            .wait(loaded_need, &self.wait_for_io_runtime)?;
1525        Ok(self.rows_scheduled)
1526    }
1527
1528    #[instrument(level = "debug", skip_all)]
1529    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1530        trace!(
1531            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1532            self.rows_remaining,
1533            self.rows_drained,
1534            self.rows_scheduled,
1535        );
1536        if self.rows_remaining == 0 {
1537            return Ok(None);
1538        }
1539
1540        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1541        self.rows_remaining -= to_take;
1542
1543        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1544        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1545        if scheduled_need > 0 {
1546            let desired_scheduled = scheduled_need + self.rows_scheduled;
1547            trace!(
1548                "Draining from scheduler (desire at least {} scheduled rows)",
1549                desired_scheduled
1550            );
1551            let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1552            if actually_scheduled < desired_scheduled {
1553                let under_scheduled = desired_scheduled - actually_scheduled;
1554                to_take -= under_scheduled;
1555            }
1556        }
1557
1558        if to_take == 0 {
1559            return Ok(None);
1560        }
1561
1562        let next_task = self.root_decoder.drain_batch(to_take)?;
1563
1564        self.rows_drained += to_take;
1565
1566        let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1567
1568        Ok(Some(batch))
1569    }
1570}
1571
1572impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1573    type Item = ArrowResult<RecordBatch>;
1574
1575    fn next(&mut self) -> Option<Self::Item> {
1576        self.next_batch_task()
1577            .transpose()
1578            .map(|r| r.map_err(ArrowError::from))
1579    }
1580}
1581
1582impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1583    fn schema(&self) -> Arc<ArrowSchema> {
1584        self.schema.clone()
1585    }
1586}
1587
1588/// A stream that takes scheduled jobs and generates decode tasks from them.
1589pub struct StructuralBatchDecodeStream {
1590    context: DecoderContext,
1591    root_decoder: StructuralStructDecoder,
1592    rows_remaining: u64,
1593    rows_per_batch: u32,
1594    rows_scheduled: u64,
1595    rows_drained: u64,
1596    scheduler_exhausted: bool,
1597    emitted_batch_size_warning: Arc<Once>,
1598}
1599
1600impl StructuralBatchDecodeStream {
1601    /// Create a new instance of a batch decode stream
1602    ///
1603    /// # Arguments
1604    ///
1605    /// * `scheduled` - an incoming stream of decode tasks from a
1606    ///   [`crate::decode::DecodeBatchScheduler`]
1607    /// * `schema` - the schema of the data to create
1608    /// * `rows_per_batch` the number of rows to create before making a batch
1609    /// * `num_rows` the total number of rows scheduled
1610    /// * `num_columns` the total number of columns in the file
1611    pub fn new(
1612        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1613        rows_per_batch: u32,
1614        num_rows: u64,
1615        root_decoder: StructuralStructDecoder,
1616    ) -> Self {
1617        Self {
1618            context: DecoderContext::new(scheduled),
1619            root_decoder,
1620            rows_remaining: num_rows,
1621            rows_per_batch,
1622            rows_scheduled: 0,
1623            rows_drained: 0,
1624            scheduler_exhausted: false,
1625            emitted_batch_size_warning: Arc::new(Once::new()),
1626        }
1627    }
1628
1629    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1630        if self.scheduler_exhausted {
1631            return Ok(self.rows_scheduled);
1632        }
1633        while self.rows_scheduled < scheduled_need {
1634            let next_message = self.context.source.recv().await;
1635            match next_message {
1636                Some(scan_line) => {
1637                    let scan_line = scan_line?;
1638                    self.rows_scheduled = scan_line.scheduled_so_far;
1639                    for message in scan_line.decoders {
1640                        let unloaded_page = message.into_structural();
1641                        let loaded_page = unloaded_page.0.await?;
1642                        self.root_decoder.accept_page(loaded_page)?;
1643                    }
1644                }
1645                None => {
1646                    // Schedule ended before we got all the data we expected.  This probably
1647                    // means some kind of pushdown filter was applied and we didn't load as
1648                    // much data as we thought we would.
1649                    self.scheduler_exhausted = true;
1650                    return Ok(self.rows_scheduled);
1651                }
1652            }
1653        }
1654        Ok(scheduled_need)
1655    }
1656
1657    #[instrument(level = "debug", skip_all)]
1658    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1659        trace!(
1660            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1661            self.rows_remaining,
1662            self.rows_drained,
1663            self.rows_scheduled,
1664        );
1665        if self.rows_remaining == 0 {
1666            return Ok(None);
1667        }
1668
1669        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1670        self.rows_remaining -= to_take;
1671
1672        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1673        trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1674        if scheduled_need > 0 {
1675            let desired_scheduled = scheduled_need + self.rows_scheduled;
1676            trace!(
1677                "Draining from scheduler (desire at least {} scheduled rows)",
1678                desired_scheduled
1679            );
1680            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1681            if actually_scheduled < desired_scheduled {
1682                let under_scheduled = desired_scheduled - actually_scheduled;
1683                to_take -= under_scheduled;
1684            }
1685        }
1686
1687        if to_take == 0 {
1688            return Ok(None);
1689        }
1690
1691        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1692        self.rows_drained += to_take;
1693        Ok(Some(next_task))
1694    }
1695
1696    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1697        let stream = futures::stream::unfold(self, |mut slf| async move {
1698            let next_task = slf.next_batch_task().await;
1699            let next_task = next_task.transpose().map(|next_task| {
1700                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1701                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1702                let task = tokio::spawn(async move {
1703                    let next_task = next_task?;
1704                    next_task.into_batch(emitted_batch_size_warning)
1705                });
1706                (task, num_rows)
1707            });
1708            next_task.map(|(task, num_rows)| {
1709                let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1710                // This should be true since batch size is u32
1711                debug_assert!(num_rows <= u32::MAX as u64);
1712                let next_task = ReadBatchTask {
1713                    task,
1714                    num_rows: num_rows as u32,
1715                };
1716                (next_task, slf)
1717            })
1718        });
1719        stream.boxed()
1720    }
1721}
1722
1723#[derive(Debug)]
1724pub enum RequestedRows {
1725    Ranges(Vec<Range<u64>>),
1726    Indices(Vec<u64>),
1727}
1728
1729impl RequestedRows {
1730    pub fn num_rows(&self) -> u64 {
1731        match self {
1732            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1733            Self::Indices(indices) => indices.len() as u64,
1734        }
1735    }
1736}
1737
1738#[derive(Debug, Clone)]
1739pub struct SchedulerDecoderConfig {
1740    pub decoder_plugins: Arc<DecoderPlugins>,
1741    pub batch_size: u32,
1742    pub io: Arc<dyn EncodingsIo>,
1743    pub cache: Arc<LanceCache>,
1744    pub should_validate: bool,
1745}
1746
1747fn check_scheduler_on_drop(
1748    stream: BoxStream<'static, ReadBatchTask>,
1749    scheduler_handle: tokio::task::JoinHandle<()>,
1750) -> BoxStream<'static, ReadBatchTask> {
1751    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
1752    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
1753    // when the stream finishes.
1754    let mut scheduler_handle = Some(scheduler_handle);
1755    let check_scheduler = stream::unfold((), move |_| {
1756        let handle = scheduler_handle.take();
1757        async move {
1758            if let Some(handle) = handle {
1759                handle.await.unwrap();
1760            }
1761            None
1762        }
1763    });
1764    stream.chain(check_scheduler).boxed()
1765}
1766
1767pub fn create_decode_stream(
1768    schema: &Schema,
1769    num_rows: u64,
1770    batch_size: u32,
1771    is_structural: bool,
1772    should_validate: bool,
1773    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1774) -> BoxStream<'static, ReadBatchTask> {
1775    if is_structural {
1776        let arrow_schema = ArrowSchema::from(schema);
1777        let structural_decoder = StructuralStructDecoder::new(
1778            arrow_schema.fields,
1779            should_validate,
1780            /*is_root=*/ true,
1781        );
1782        StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1783    } else {
1784        let arrow_schema = ArrowSchema::from(schema);
1785        let root_fields = arrow_schema.fields;
1786
1787        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1788        BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1789    }
1790}
1791
1792/// Creates a iterator that decodes a set of messages in a blocking fashion
1793///
1794/// See [`schedule_and_decode_blocking`] for more information.
1795pub fn create_decode_iterator(
1796    schema: &Schema,
1797    num_rows: u64,
1798    batch_size: u32,
1799    should_validate: bool,
1800    is_structural: bool,
1801    messages: VecDeque<Result<DecoderMessage>>,
1802) -> Box<dyn RecordBatchReader + Send + 'static> {
1803    let arrow_schema = Arc::new(ArrowSchema::from(schema));
1804    let root_fields = arrow_schema.fields.clone();
1805    if is_structural {
1806        let simple_struct_decoder =
1807            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true);
1808        Box::new(BatchDecodeIterator::new(
1809            messages,
1810            batch_size,
1811            num_rows,
1812            simple_struct_decoder,
1813            arrow_schema,
1814        ))
1815    } else {
1816        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1817        Box::new(BatchDecodeIterator::new(
1818            messages,
1819            batch_size,
1820            num_rows,
1821            root_decoder,
1822            arrow_schema,
1823        ))
1824    }
1825}
1826
1827fn create_scheduler_decoder(
1828    column_infos: Vec<Arc<ColumnInfo>>,
1829    requested_rows: RequestedRows,
1830    filter: FilterExpression,
1831    column_indices: Vec<u32>,
1832    target_schema: Arc<Schema>,
1833    config: SchedulerDecoderConfig,
1834) -> Result<BoxStream<'static, ReadBatchTask>> {
1835    let num_rows = requested_rows.num_rows();
1836
1837    let is_structural = column_infos[0].is_structural();
1838
1839    let (tx, rx) = mpsc::unbounded_channel();
1840
1841    let decode_stream = create_decode_stream(
1842        &target_schema,
1843        num_rows,
1844        config.batch_size,
1845        is_structural,
1846        config.should_validate,
1847        rx,
1848    );
1849
1850    let scheduler_handle = tokio::task::spawn(async move {
1851        let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1852            target_schema.as_ref(),
1853            &column_indices,
1854            &column_infos,
1855            &vec![],
1856            num_rows,
1857            config.decoder_plugins,
1858            config.io.clone(),
1859            config.cache,
1860            &filter,
1861        )
1862        .await
1863        {
1864            Ok(scheduler) => scheduler,
1865            Err(e) => {
1866                let _ = tx.send(Err(e));
1867                return;
1868            }
1869        };
1870
1871        match requested_rows {
1872            RequestedRows::Ranges(ranges) => {
1873                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1874            }
1875            RequestedRows::Indices(indices) => {
1876                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1877            }
1878        }
1879    });
1880
1881    Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1882}
1883
1884/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
1885/// decode the scheduled data and returns the decoder as a stream of record batches.
1886///
1887/// This is a convenience function that creates both the scheduler and the decoder
1888/// which can be a little tricky to get right.
1889pub fn schedule_and_decode(
1890    column_infos: Vec<Arc<ColumnInfo>>,
1891    requested_rows: RequestedRows,
1892    filter: FilterExpression,
1893    column_indices: Vec<u32>,
1894    target_schema: Arc<Schema>,
1895    config: SchedulerDecoderConfig,
1896) -> BoxStream<'static, ReadBatchTask> {
1897    if requested_rows.num_rows() == 0 {
1898        return stream::empty().boxed();
1899    }
1900    // For convenience we really want this method to be a snchronous method where all
1901    // errors happen on the stream.  There is some async initialization that must happen
1902    // when creating a scheduler.  We wrap that all up in the very first task.
1903    match create_scheduler_decoder(
1904        column_infos,
1905        requested_rows,
1906        filter,
1907        column_indices,
1908        target_schema,
1909        config,
1910    ) {
1911        // If the initialization failed make it look like a failed task
1912        Ok(stream) => stream,
1913        Err(e) => stream::once(std::future::ready(ReadBatchTask {
1914            num_rows: 0,
1915            task: std::future::ready(Err(e)).boxed(),
1916        }))
1917        .boxed(),
1918    }
1919}
1920
1921pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
1922    tokio::runtime::Builder::new_current_thread()
1923        .build()
1924        .unwrap()
1925});
1926
1927/// Schedules and decodes the requested data in a blocking fashion
1928///
1929/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
1930/// and decodes it in the current thread.
1931///
1932/// This can be useful when the disk is fast (or the data is in memory) and the amount
1933/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
1934///
1935/// This should NOT be used for full scans.  Even if the data is in memory this function will
1936/// not parallelize the decode and will be slower than the async version.  Full scans typically
1937/// make relatively few IOPs and so the asynchronous overhead is much smaller.
1938///
1939/// This method will first completely run the scheduling process.  Then it will run the
1940/// decode process.
1941pub fn schedule_and_decode_blocking(
1942    column_infos: Vec<Arc<ColumnInfo>>,
1943    requested_rows: RequestedRows,
1944    filter: FilterExpression,
1945    column_indices: Vec<u32>,
1946    target_schema: Arc<Schema>,
1947    config: SchedulerDecoderConfig,
1948) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1949    if requested_rows.num_rows() == 0 {
1950        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
1951        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
1952    }
1953
1954    let num_rows = requested_rows.num_rows();
1955    let is_structural = column_infos[0].is_structural();
1956
1957    let (tx, mut rx) = mpsc::unbounded_channel();
1958
1959    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
1960    // runtime.
1961    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
1962        target_schema.as_ref(),
1963        &column_indices,
1964        &column_infos,
1965        &vec![],
1966        num_rows,
1967        config.decoder_plugins,
1968        config.io.clone(),
1969        config.cache,
1970        &filter,
1971    ))?;
1972
1973    // Schedule the requested rows
1974    match requested_rows {
1975        RequestedRows::Ranges(ranges) => {
1976            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1977        }
1978        RequestedRows::Indices(indices) => {
1979            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1980        }
1981    }
1982
1983    // Drain the scheduler queue into a vec of decode messages
1984    let mut messages = Vec::new();
1985    while rx
1986        .recv_many(&mut messages, usize::MAX)
1987        .now_or_never()
1988        .unwrap()
1989        != 0
1990    {}
1991
1992    // Create a decoder to decode the messages
1993    let decode_iterator = create_decode_iterator(
1994        &target_schema,
1995        num_rows,
1996        config.batch_size,
1997        config.should_validate,
1998        is_structural,
1999        messages.into(),
2000    );
2001
2002    Ok(decode_iterator)
2003}
2004
2005/// A decoder for single-column encodings of primitive data (this includes fixed size
2006/// lists of primitive data)
2007///
2008/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2009///
2010/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2011/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2012/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2013/// the decode task for batch 0 and the decode task for batch 1.
2014///
2015/// See [`crate::decoder`] for more information
2016pub trait PrimitivePageDecoder: Send + Sync {
2017    /// Decode data into buffers
2018    ///
2019    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2020    /// such as decompressing from some compressed representation.
2021    ///
2022    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2023    /// portion only needs to be updated if the encoding has some concept of an "optional"
2024    /// buffer.
2025    ///
2026    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2027    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2028    ///
2029    /// Binary decodings have two output buffers (one for values, one for offsets)
2030    ///
2031    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2032    /// fixed size strings into variable length strings going from one input buffer to multiple output
2033    /// buffers.
2034    ///
2035    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2036    /// generally end at one of these structures.  However, intermediate structures may exist which
2037    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2038    /// with buffers that have bits-per-value that is not a multiple of 8.
2039    ///
2040    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2041    /// type (order matters) and encodings that aim to decode into arrow types should respect
2042    /// this layout.
2043    /// # Arguments
2044    ///
2045    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2046    /// * `num_rows` - how many rows to decode
2047    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2048    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2049}
2050
2051/// A scheduler for single-column encodings of primitive data
2052///
2053/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2054///
2055/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2056/// be shared in follow-up I/O tasks.
2057///
2058/// See [`crate::decoder`] for more information
2059pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2060    /// Schedules a batch of I/O to load the data needed for the requested ranges
2061    ///
2062    /// Returns a future that will yield a decoder once the data has been loaded
2063    ///
2064    /// # Arguments
2065    ///
2066    /// * `range` - the range of row offsets (relative to start of page) requested
2067    ///   these must be ordered and must not overlap
2068    /// * `scheduler` - a scheduler to submit the I/O request to
2069    /// * `top_level_row` - the row offset of the top level field currently being
2070    ///   scheduled.  This can be used to assign priority to I/O requests
2071    fn schedule_ranges(
2072        &self,
2073        ranges: &[Range<u64>],
2074        scheduler: &Arc<dyn EncodingsIo>,
2075        top_level_row: u64,
2076    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2077}
2078
2079/// A trait to control the priority of I/O
2080pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2081    fn advance(&mut self, num_rows: u64);
2082    fn current_priority(&self) -> u64;
2083    fn box_clone(&self) -> Box<dyn PriorityRange>;
2084}
2085
2086/// A simple priority scheme for top-level fields with no parent
2087/// repetition
2088#[derive(Debug)]
2089pub struct SimplePriorityRange {
2090    priority: u64,
2091}
2092
2093impl SimplePriorityRange {
2094    fn new(priority: u64) -> Self {
2095        Self { priority }
2096    }
2097}
2098
2099impl PriorityRange for SimplePriorityRange {
2100    fn advance(&mut self, num_rows: u64) {
2101        self.priority += num_rows;
2102    }
2103
2104    fn current_priority(&self) -> u64 {
2105        self.priority
2106    }
2107
2108    fn box_clone(&self) -> Box<dyn PriorityRange> {
2109        Box::new(Self {
2110            priority: self.priority,
2111        })
2112    }
2113}
2114
2115/// Determining the priority of a list request is tricky.  We want
2116/// the priority to be the top-level row.  So if we have a
2117/// list<list<int>> and each outer list has 10 rows and each inner
2118/// list has 5 rows then the priority of the 100th item is 1 because
2119/// it is the 5th item in the 10th item of the *second* row.
2120///
2121/// This structure allows us to keep track of this complicated priority
2122/// relationship.
2123///
2124/// There's a fair amount of bookkeeping involved here.
2125///
2126/// A better approach (using repetition levels) is coming in the future.
2127pub struct ListPriorityRange {
2128    base: Box<dyn PriorityRange>,
2129    offsets: Arc<[u64]>,
2130    cur_index_into_offsets: usize,
2131    cur_position: u64,
2132}
2133
2134impl ListPriorityRange {
2135    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2136        Self {
2137            base,
2138            offsets,
2139            cur_index_into_offsets: 0,
2140            cur_position: 0,
2141        }
2142    }
2143}
2144
2145impl std::fmt::Debug for ListPriorityRange {
2146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2147        f.debug_struct("ListPriorityRange")
2148            .field("base", &self.base)
2149            .field("offsets.len()", &self.offsets.len())
2150            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2151            .field("cur_position", &self.cur_position)
2152            .finish()
2153    }
2154}
2155
2156impl PriorityRange for ListPriorityRange {
2157    fn advance(&mut self, num_rows: u64) {
2158        // We've scheduled X items.  Now walk through the offsets to
2159        // determine how many rows we've scheduled.
2160        self.cur_position += num_rows;
2161        let mut idx_into_offsets = self.cur_index_into_offsets;
2162        while idx_into_offsets + 1 < self.offsets.len()
2163            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2164        {
2165            idx_into_offsets += 1;
2166        }
2167        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2168        self.cur_index_into_offsets = idx_into_offsets;
2169        self.base.advance(base_rows_advanced as u64);
2170    }
2171
2172    fn current_priority(&self) -> u64 {
2173        self.base.current_priority()
2174    }
2175
2176    fn box_clone(&self) -> Box<dyn PriorityRange> {
2177        Box::new(Self {
2178            base: self.base.box_clone(),
2179            offsets: self.offsets.clone(),
2180            cur_index_into_offsets: self.cur_index_into_offsets,
2181            cur_position: self.cur_position,
2182        })
2183    }
2184}
2185
2186/// Contains the context for a scheduler
2187pub struct SchedulerContext {
2188    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2189    io: Arc<dyn EncodingsIo>,
2190    cache: Arc<LanceCache>,
2191    name: String,
2192    path: Vec<u32>,
2193    path_names: Vec<String>,
2194}
2195
2196pub struct ScopedSchedulerContext<'a> {
2197    pub context: &'a mut SchedulerContext,
2198}
2199
2200impl<'a> ScopedSchedulerContext<'a> {
2201    pub fn pop(self) -> &'a mut SchedulerContext {
2202        self.context.pop();
2203        self.context
2204    }
2205}
2206
2207impl SchedulerContext {
2208    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2209        Self {
2210            io,
2211            cache,
2212            recv: None,
2213            name: "".to_string(),
2214            path: Vec::new(),
2215            path_names: Vec::new(),
2216        }
2217    }
2218
2219    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2220        &self.io
2221    }
2222
2223    pub fn cache(&self) -> &Arc<LanceCache> {
2224        &self.cache
2225    }
2226
2227    pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2228        self.path.push(index);
2229        self.path_names.push(name.to_string());
2230        ScopedSchedulerContext { context: self }
2231    }
2232
2233    pub fn pop(&mut self) {
2234        self.path.pop();
2235        self.path_names.pop();
2236    }
2237
2238    pub fn path_name(&self) -> String {
2239        let path = self.path_names.join("/");
2240        if self.recv.is_some() {
2241            format!("TEMP({}){}", self.name, path)
2242        } else {
2243            format!("ROOT{}", path)
2244        }
2245    }
2246
2247    pub fn current_path(&self) -> VecDeque<u32> {
2248        VecDeque::from_iter(self.path.iter().copied())
2249    }
2250
2251    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2252    pub fn locate_decoder(
2253        &mut self,
2254        decoder: Box<dyn crate::v2::decoder::LogicalPageDecoder>,
2255    ) -> crate::v2::decoder::DecoderReady {
2256        trace!(
2257            "Scheduling decoder of type {:?} for {:?}",
2258            decoder.data_type(),
2259            self.path,
2260        );
2261        crate::v2::decoder::DecoderReady {
2262            decoder,
2263            path: self.current_path(),
2264        }
2265    }
2266}
2267
2268pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2269
2270impl std::fmt::Debug for UnloadedPage {
2271    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2272        f.debug_struct("UnloadedPage").finish()
2273    }
2274}
2275
2276#[derive(Debug)]
2277pub struct ScheduledScanLine {
2278    pub rows_scheduled: u64,
2279    pub decoders: Vec<MessageType>,
2280}
2281
2282pub trait StructuralSchedulingJob: std::fmt::Debug {
2283    fn schedule_next(
2284        &mut self,
2285        context: &mut SchedulerContext,
2286    ) -> Result<Option<ScheduledScanLine>>;
2287}
2288
2289/// A filter expression to apply to the data
2290///
2291/// The core decoders do not currently take advantage of filtering in
2292/// any way.  In order to maintain the abstraction we represent filters
2293/// as an arbitrary byte sequence.
2294///
2295/// We recommend that encodings use Substrait for filters.
2296pub struct FilterExpression(pub Bytes);
2297
2298impl FilterExpression {
2299    /// Create a filter expression that does not filter any data
2300    ///
2301    /// This is currently represented by an empty byte array.  Encoders
2302    /// that are "filter aware" should make sure they handle this case.
2303    pub fn no_filter() -> Self {
2304        Self(Bytes::new())
2305    }
2306
2307    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2308    pub fn is_noop(&self) -> bool {
2309        self.0.is_empty()
2310    }
2311}
2312
2313pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2314    fn initialize<'a>(
2315        &'a mut self,
2316        filter: &'a FilterExpression,
2317        context: &'a SchedulerContext,
2318    ) -> BoxFuture<'a, Result<()>>;
2319    fn schedule_ranges<'a>(
2320        &'a self,
2321        ranges: &[Range<u64>],
2322        filter: &FilterExpression,
2323    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2324}
2325
2326/// A trait for tasks that decode data into an Arrow array
2327pub trait DecodeArrayTask: Send {
2328    /// Decodes the data into an Arrow array
2329    fn decode(self: Box<Self>) -> Result<ArrayRef>;
2330}
2331
2332impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2333    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2334        StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2335    }
2336}
2337
2338/// A task to decode data into an Arrow record batch
2339///
2340/// It has a child `task` which decodes a struct array with no nulls.
2341/// This is then converted into a record batch.
2342pub struct NextDecodeTask {
2343    /// The decode task itself
2344    pub task: Box<dyn DecodeArrayTask>,
2345    /// The number of rows that will be created
2346    pub num_rows: u64,
2347}
2348
2349impl NextDecodeTask {
2350    // Run the task and produce a record batch
2351    //
2352    // If the batch is very large this function will log a warning message
2353    // suggesting the user try a smaller batch size.
2354    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2355    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2356        let struct_arr = self.task.decode();
2357        match struct_arr {
2358            Ok(struct_arr) => {
2359                let batch = RecordBatch::from(struct_arr.as_struct());
2360                let size_bytes = batch.get_array_memory_size() as u64;
2361                if size_bytes > BATCH_SIZE_BYTES_WARNING {
2362                    emitted_batch_size_warning.call_once(|| {
2363                        let size_mb = size_bytes / 1024 / 1024;
2364                        debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2365                    });
2366                }
2367                Ok(batch)
2368            }
2369            Err(e) => {
2370                let e = Error::Internal {
2371                    message: format!("Error decoding batch: {}", e),
2372                    location: location!(),
2373                };
2374                Err(e)
2375            }
2376        }
2377    }
2378}
2379
2380// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2381// share some code paths between the two.  Decoders can safely unwrap into whatever
2382// style they expect since a file will be either all-2.0 or all-2.1
2383#[derive(Debug)]
2384pub enum MessageType {
2385    // The older v2.0 scheduler/decoder used a scheme where the message was the
2386    // decoder itself.  The messages were not sent in priority order and the decoder
2387    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2388    // complexity.
2389    DecoderReady(crate::v2::decoder::DecoderReady),
2390    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2391    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2392    // the decoder does not have to worry about waiting for I/O.
2393    UnloadedPage(UnloadedPage),
2394}
2395
2396impl MessageType {
2397    pub fn into_legacy(self) -> crate::v2::decoder::DecoderReady {
2398        match self {
2399            Self::DecoderReady(decoder) => decoder,
2400            Self::UnloadedPage(_) => {
2401                panic!("Expected DecoderReady but got UnloadedPage")
2402            }
2403        }
2404    }
2405
2406    pub fn into_structural(self) -> UnloadedPage {
2407        match self {
2408            Self::UnloadedPage(unloaded) => unloaded,
2409            Self::DecoderReady(_) => {
2410                panic!("Expected UnloadedPage but got DecoderReady")
2411            }
2412        }
2413    }
2414}
2415
2416pub struct DecoderMessage {
2417    pub scheduled_so_far: u64,
2418    pub decoders: Vec<MessageType>,
2419}
2420
2421pub struct DecoderContext {
2422    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2423}
2424
2425impl DecoderContext {
2426    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2427        Self { source }
2428    }
2429}
2430
2431pub struct DecodedPage {
2432    pub data: DataBlock,
2433    pub repdef: RepDefUnraveler,
2434}
2435
2436pub trait DecodePageTask: Send + std::fmt::Debug {
2437    /// Decodes the data into an Arrow array
2438    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2439}
2440
2441pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2442    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2443    fn num_rows(&self) -> u64;
2444}
2445
2446#[derive(Debug)]
2447pub struct LoadedPage {
2448    // The decoder that is ready to be decoded
2449    pub decoder: Box<dyn StructuralPageDecoder>,
2450    // The path to the decoder, the first value is the column index
2451    // following values, if present, are nested child indices
2452    //
2453    // For example, a path of [1, 1, 0] would mean to grab the second
2454    // column, then the second child, and then the first child.
2455    //
2456    // It could represent x in the following schema:
2457    //
2458    // score: float64
2459    // points: struct
2460    //   color: string
2461    //   location: struct
2462    //     x: float64
2463    //
2464    // Currently, only struct decoders have "children" although other
2465    // decoders may at some point as well.  List children are only
2466    // handled through indirect I/O at the moment and so they don't
2467    // need to be represented (yet)
2468    pub path: VecDeque<u32>,
2469    pub page_index: usize,
2470}
2471
2472pub struct DecodedArray {
2473    pub array: ArrayRef,
2474    pub repdef: CompositeRepDefUnraveler,
2475}
2476
2477pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2478    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2479}
2480
2481pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2482    /// Add a newly scheduled child decoder
2483    ///
2484    /// The default implementation does not expect children and returns
2485    /// an error.
2486    fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2487    /// Creates a task to decode `num_rows` of data into an array
2488    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2489    /// The data type of the decoded data
2490    fn data_type(&self) -> &DataType;
2491}
2492
2493#[derive(Debug, Default)]
2494pub struct DecoderPlugins {}
2495
2496/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2497pub async fn decode_batch(
2498    batch: &EncodedBatch,
2499    filter: &FilterExpression,
2500    decoder_plugins: Arc<DecoderPlugins>,
2501    should_validate: bool,
2502    version: LanceFileVersion,
2503    cache: Option<Arc<LanceCache>>,
2504) -> Result<RecordBatch> {
2505    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2506    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2507    // polled twice.
2508
2509    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2510    let cache = cache.unwrap_or_else(|| Arc::new(LanceCache::with_capacity(128 * 1024 * 1024)));
2511    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2512        batch.schema.as_ref(),
2513        &batch.top_level_columns,
2514        &batch.page_table,
2515        &vec![],
2516        batch.num_rows,
2517        decoder_plugins,
2518        io_scheduler.clone(),
2519        cache,
2520        filter,
2521    )
2522    .await?;
2523    let (tx, rx) = unbounded_channel();
2524    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2525    let is_structural = version >= LanceFileVersion::V2_1;
2526    let mut decode_stream = create_decode_stream(
2527        &batch.schema,
2528        batch.num_rows,
2529        batch.num_rows as u32,
2530        is_structural,
2531        should_validate,
2532        rx,
2533    );
2534    decode_stream.next().await.unwrap().task.await
2535}
2536
2537#[cfg(test)]
2538// test coalesce indices to ranges
2539mod tests {
2540    use super::*;
2541
2542    #[test]
2543    fn test_coalesce_indices_to_ranges_with_single_index() {
2544        let indices = vec![1];
2545        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2546        assert_eq!(ranges, vec![1..2]);
2547    }
2548
2549    #[test]
2550    fn test_coalesce_indices_to_ranges() {
2551        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2552        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2553        assert_eq!(ranges, vec![1..10]);
2554    }
2555
2556    #[test]
2557    fn test_coalesce_indices_to_ranges_with_gaps() {
2558        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2559        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2560        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2561    }
2562}