Skip to main content

lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into `FieldScheduler` and `PageScheduler`.
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into `PhysicalPageDecoder` and
27//! [`LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type `List<UInt32>`
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::{LazyLock, Once, OnceLock};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{BoxFuture, MaybeDone, maybe_done};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{BLOB_DESC_LANCE_FIELD, Field, Schema};
229use lance_core::utils::futures::FinallyStreamExt;
230use lance_core::utils::parse::parse_env_as_bool;
231use log::{debug, trace, warn};
232use tokio::sync::mpsc::error::SendError;
233use tokio::sync::mpsc::{self, unbounded_channel};
234
235use lance_core::error::LanceOptionExt;
236use lance_core::{ArrowResult, Error, Result};
237use tracing::instrument;
238
239use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
240use crate::data::DataBlock;
241use crate::encoder::EncodedBatch;
242use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
243use crate::encodings::logical::list::StructuralListScheduler;
244use crate::encodings::logical::map::StructuralMapScheduler;
245use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
246use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
247use crate::format::pb::{self, column_encoding};
248use crate::format::pb21;
249use crate::previous::decoder::LogicalPageDecoder;
250use crate::previous::encodings::logical::list::OffsetPageInfo;
251use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
252use crate::previous::encodings::logical::{
253    binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
254    primitive::PrimitiveFieldScheduler,
255};
256use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
257use crate::version::LanceFileVersion;
258use crate::{BufferScheduler, EncodingsIo};
259
260// If users are getting batches over 10MiB large then it's time to reduce the batch size
261const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
262const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str =
263    "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE";
264const ENV_LANCE_READ_CACHE_REPETITION_INDEX: &str = "LANCE_READ_CACHE_REPETITION_INDEX";
265
266fn default_cache_repetition_index() -> bool {
267    static DEFAULT_CACHE_REPETITION_INDEX: OnceLock<bool> = OnceLock::new();
268    *DEFAULT_CACHE_REPETITION_INDEX
269        .get_or_init(|| parse_env_as_bool(ENV_LANCE_READ_CACHE_REPETITION_INDEX, true))
270}
271
272/// Top-level encoding message for a page.  Wraps both the
273/// legacy pb::ArrayEncoding and the newer pb::PageLayout
274///
275/// A file should only use one or the other and never both.
276/// 2.0 decoders can always assume this is pb::ArrayEncoding
277/// and 2.1+ decoders can always assume this is pb::PageLayout
278#[derive(Debug)]
279pub enum PageEncoding {
280    Legacy(pb::ArrayEncoding),
281    Structural(pb21::PageLayout),
282}
283
284impl PageEncoding {
285    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
286        match self {
287            Self::Legacy(enc) => enc,
288            Self::Structural(_) => panic!("Expected a legacy encoding"),
289        }
290    }
291
292    pub fn as_structural(&self) -> &pb21::PageLayout {
293        match self {
294            Self::Structural(enc) => enc,
295            Self::Legacy(_) => panic!("Expected a structural encoding"),
296        }
297    }
298
299    pub fn is_structural(&self) -> bool {
300        matches!(self, Self::Structural(_))
301    }
302}
303
304/// Metadata describing a page in a file
305///
306/// This is typically created by reading the metadata section of a Lance file
307#[derive(Debug)]
308pub struct PageInfo {
309    /// The number of rows in the page
310    pub num_rows: u64,
311    /// The priority (top level row number) of the page
312    ///
313    /// This is only set in 2.1 files and will be 0 for 2.0 files
314    pub priority: u64,
315    /// The encoding that explains the buffers in the page
316    pub encoding: PageEncoding,
317    /// The offsets and sizes of the buffers in the file
318    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
319}
320
321/// Metadata describing a column in a file
322///
323/// This is typically created by reading the metadata section of a Lance file
324#[derive(Debug, Clone)]
325pub struct ColumnInfo {
326    /// The index of the column in the file
327    pub index: u32,
328    /// The metadata for each page in the column
329    pub page_infos: Arc<[PageInfo]>,
330    /// File positions and their sizes of the column-level buffers
331    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
332    pub encoding: pb::ColumnEncoding,
333}
334
335impl ColumnInfo {
336    /// Create a new instance
337    pub fn new(
338        index: u32,
339        page_infos: Arc<[PageInfo]>,
340        buffer_offsets_and_sizes: Vec<(u64, u64)>,
341        encoding: pb::ColumnEncoding,
342    ) -> Self {
343        Self {
344            index,
345            page_infos,
346            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
347            encoding,
348        }
349    }
350
351    pub fn is_structural(&self) -> bool {
352        self.page_infos
353            // Can just look at the first since all should be the same
354            .first()
355            .map(|page| page.encoding.is_structural())
356            .unwrap_or(false)
357    }
358}
359
360enum RootScheduler {
361    Structural(Box<dyn StructuralFieldScheduler>),
362    Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
363}
364
365impl RootScheduler {
366    fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
367        match self {
368            Self::Structural(_) => panic!("Expected a legacy scheduler"),
369            Self::Legacy(s) => s,
370        }
371    }
372
373    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
374        match self {
375            Self::Structural(s) => s.as_ref(),
376            Self::Legacy(_) => panic!("Expected a structural scheduler"),
377        }
378    }
379}
380
381/// The scheduler for decoding batches
382///
383/// Lance decoding is done in two steps, scheduling, and decoding.  The
384/// scheduling tends to be lightweight and should quickly figure what data
385/// is needed from the disk issue the appropriate I/O requests.  A decode task is
386/// created to eventually decode the data (once it is loaded) and scheduling
387/// moves on to scheduling the next page.
388///
389/// Meanwhile, it's expected that a decode stream will be setup to run at the
390/// same time.  Decode tasks take the data that is loaded and turn it into
391/// Arrow arrays.
392///
393/// This approach allows us to keep our I/O parallelism and CPU parallelism
394/// completely separate since those are often two very different values.
395///
396/// Backpressure should be achieved via the I/O service.  Requests that are
397/// issued will pile up if the decode stream is not polling quickly enough.
398/// The [`crate::EncodingsIo::submit_request`] function should return a pending
399/// future once there are too many I/O requests in flight.
400///
401/// TODO: Implement backpressure
402pub struct DecodeBatchScheduler {
403    root_scheduler: RootScheduler,
404    pub root_fields: Fields,
405    cache: Arc<LanceCache>,
406}
407
408pub struct ColumnInfoIter<'a> {
409    column_infos: Vec<Arc<ColumnInfo>>,
410    column_indices: &'a [u32],
411    column_info_pos: usize,
412    column_indices_pos: usize,
413}
414
415impl<'a> ColumnInfoIter<'a> {
416    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
417        let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
418        Self {
419            column_infos,
420            column_indices,
421            column_info_pos: initial_pos,
422            column_indices_pos: 0,
423        }
424    }
425
426    pub fn peek(&self) -> &Arc<ColumnInfo> {
427        &self.column_infos[self.column_info_pos]
428    }
429
430    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
431        let column_info = self.column_infos[self.column_info_pos].clone();
432        let transformed = transform(column_info);
433        self.column_infos[self.column_info_pos] = transformed;
434    }
435
436    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
437        self.next().ok_or_else(|| {
438            Error::invalid_input(
439                "there were more fields in the schema than provided column indices / infos",
440            )
441        })
442    }
443
444    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
445        if self.column_info_pos < self.column_infos.len() {
446            let info = &self.column_infos[self.column_info_pos];
447            self.column_info_pos += 1;
448            Some(info)
449        } else {
450            None
451        }
452    }
453
454    pub(crate) fn next_top_level(&mut self) {
455        self.column_indices_pos += 1;
456        if self.column_indices_pos < self.column_indices.len() {
457            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
458        } else {
459            self.column_info_pos = self.column_infos.len();
460        }
461    }
462}
463
464/// These contain the file buffers shared across the entire file
465#[derive(Clone, Copy, Debug)]
466pub struct FileBuffers<'a> {
467    pub positions_and_sizes: &'a [(u64, u64)],
468}
469
470/// These contain the file buffers and also buffers specific to a column
471#[derive(Clone, Copy, Debug)]
472pub struct ColumnBuffers<'a, 'b> {
473    pub file_buffers: FileBuffers<'a>,
474    pub positions_and_sizes: &'b [(u64, u64)],
475}
476
477/// These contain the file & column buffers and also buffers specific to a page
478#[derive(Clone, Copy, Debug)]
479pub struct PageBuffers<'a, 'b, 'c> {
480    pub column_buffers: ColumnBuffers<'a, 'b>,
481    pub positions_and_sizes: &'c [(u64, u64)],
482}
483
484/// The core decoder strategy handles all the various Arrow types
485#[derive(Debug)]
486pub struct CoreFieldDecoderStrategy {
487    pub validate_data: bool,
488    pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
489    pub cache_repetition_index: bool,
490}
491
492impl Default for CoreFieldDecoderStrategy {
493    fn default() -> Self {
494        Self {
495            validate_data: false,
496            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
497            cache_repetition_index: false,
498        }
499    }
500}
501
502impl CoreFieldDecoderStrategy {
503    /// Create a new strategy with cache_repetition_index enabled
504    pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
505        self.cache_repetition_index = cache_repetition_index;
506        self
507    }
508
509    /// Create a new strategy from decoder config
510    pub fn from_decoder_config(config: &DecoderConfig) -> Self {
511        Self {
512            validate_data: config.validate_on_decode,
513            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
514            cache_repetition_index: config.cache_repetition_index,
515        }
516    }
517
518    /// This is just a sanity check to ensure there is no "wrapped encodings"
519    /// that haven't been handled.
520    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
521        let column_encoding = column_info
522            .encoding
523            .column_encoding
524            .as_ref()
525            .ok_or_else(|| {
526                Error::invalid_input(format!(
527                    "the column at index {} was missing a ColumnEncoding",
528                    column_info.index
529                ))
530            })?;
531        if matches!(
532            column_encoding,
533            pb::column_encoding::ColumnEncoding::Values(_)
534        ) {
535            Ok(())
536        } else {
537            Err(Error::invalid_input(format!(
538                "the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it",
539                column_info.index, field_name, column_encoding
540            )))
541        }
542    }
543
544    fn is_structural_primitive(data_type: &DataType) -> bool {
545        if data_type.is_primitive() {
546            true
547        } else {
548            match data_type {
549                // DataType::is_primitive doesn't consider these primitive but we do
550                DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
551                DataType::Boolean
552                | DataType::Null
553                | DataType::FixedSizeBinary(_)
554                | DataType::Binary
555                | DataType::LargeBinary
556                | DataType::Utf8
557                | DataType::LargeUtf8 => true,
558                DataType::FixedSizeList(inner, _) => {
559                    Self::is_structural_primitive(inner.data_type())
560                }
561                _ => false,
562            }
563        }
564    }
565
566    fn is_primitive_legacy(data_type: &DataType) -> bool {
567        if data_type.is_primitive() {
568            true
569        } else {
570            match data_type {
571                // DataType::is_primitive doesn't consider these primitive but we do
572                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
573                DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
574                _ => false,
575            }
576        }
577    }
578
579    fn create_primitive_scheduler(
580        &self,
581        field: &Field,
582        column: &ColumnInfo,
583        buffers: FileBuffers,
584    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
585        Self::ensure_values_encoded(column, &field.name)?;
586        // Primitive fields map to a single column
587        let column_buffers = ColumnBuffers {
588            file_buffers: buffers,
589            positions_and_sizes: &column.buffer_offsets_and_sizes,
590        };
591        Ok(Box::new(PrimitiveFieldScheduler::new(
592            column.index,
593            field.data_type(),
594            column.page_infos.clone(),
595            column_buffers,
596            self.validate_data,
597        )))
598    }
599
600    /// Helper method to verify the page encoding of a struct header column
601    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
602        Self::ensure_values_encoded(column_info, field_name)?;
603        if column_info.page_infos.len() != 1 {
604            return Err(Error::invalid_input_source(format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into()));
605        }
606        let encoding = &column_info.page_infos[0].encoding;
607        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
608            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
609            _ => Err(Error::invalid_input_source(format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into())),
610        }
611    }
612
613    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
614        let encoding = &column_info.page_infos[0].encoding;
615        matches!(
616            encoding.as_legacy().array_encoding.as_ref().unwrap(),
617            pb::array_encoding::ArrayEncoding::PackedStruct(_)
618        )
619    }
620
621    fn create_list_scheduler(
622        &self,
623        list_field: &Field,
624        column_infos: &mut ColumnInfoIter,
625        buffers: FileBuffers,
626        offsets_column: &ColumnInfo,
627    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
628        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
629        let offsets_column_buffers = ColumnBuffers {
630            file_buffers: buffers,
631            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
632        };
633        let items_scheduler =
634            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
635
636        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
637            .page_infos
638            .iter()
639            .filter(|offsets_page| offsets_page.num_rows > 0)
640            .map(|offsets_page| {
641                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
642                    &offsets_page.encoding.as_legacy().array_encoding
643                {
644                    let inner = PageInfo {
645                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
646                        encoding: PageEncoding::Legacy(
647                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
648                        ),
649                        num_rows: offsets_page.num_rows,
650                        priority: 0,
651                    };
652                    (
653                        inner,
654                        OffsetPageInfo {
655                            offsets_in_page: offsets_page.num_rows,
656                            null_offset_adjustment: list_encoding.null_offset_adjustment,
657                            num_items_referenced_by_page: list_encoding.num_items,
658                        },
659                    )
660                } else {
661                    // TODO: Should probably return Err here
662                    panic!("Expected a list column");
663                }
664            })
665            .unzip();
666        let inner = Arc::new(PrimitiveFieldScheduler::new(
667            offsets_column.index,
668            DataType::UInt64,
669            Arc::from(inner_infos.into_boxed_slice()),
670            offsets_column_buffers,
671            self.validate_data,
672        )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
673        let items_field = match list_field.data_type() {
674            DataType::List(inner) => inner,
675            DataType::LargeList(inner) => inner,
676            _ => unreachable!(),
677        };
678        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
679            DataType::Int32
680        } else {
681            DataType::Int64
682        };
683        Ok(Box::new(ListFieldScheduler::new(
684            inner,
685            items_scheduler.into(),
686            items_field,
687            offset_type,
688            null_offset_adjustments,
689        )))
690    }
691
692    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
693        if let column_encoding::ColumnEncoding::Blob(blob) =
694            column_info.encoding.column_encoding.as_ref().unwrap()
695        {
696            let mut column_info = column_info.clone();
697            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
698            Some(column_info)
699        } else {
700            None
701        }
702    }
703
704    fn create_structural_field_scheduler(
705        &self,
706        field: &Field,
707        column_infos: &mut ColumnInfoIter,
708    ) -> Result<Box<dyn StructuralFieldScheduler>> {
709        let data_type = field.data_type();
710        if Self::is_structural_primitive(&data_type) {
711            let column_info = column_infos.expect_next()?;
712            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
713                column_info.as_ref(),
714                self.decompressor_strategy.as_ref(),
715                self.cache_repetition_index,
716                field,
717            )?);
718
719            // advance to the next top level column
720            column_infos.next_top_level();
721
722            return Ok(scheduler);
723        }
724        match &data_type {
725            DataType::Struct(fields) => {
726                if field.is_packed_struct() {
727                    // Packed struct
728                    let column_info = column_infos.expect_next()?;
729                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
730                        column_info.as_ref(),
731                        self.decompressor_strategy.as_ref(),
732                        self.cache_repetition_index,
733                        field,
734                    )?);
735
736                    // advance to the next top level column
737                    column_infos.next_top_level();
738
739                    return Ok(scheduler);
740                }
741                // Maybe a blob descriptions struct?
742                if field.is_blob() {
743                    let column_info = column_infos.peek();
744                    if column_info.page_infos.iter().any(|page| {
745                        matches!(
746                            page.encoding,
747                            PageEncoding::Structural(pb21::PageLayout {
748                                layout: Some(pb21::page_layout::Layout::BlobLayout(_))
749                            })
750                        )
751                    }) {
752                        let column_info = column_infos.expect_next()?;
753                        let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
754                            column_info.as_ref(),
755                            self.decompressor_strategy.as_ref(),
756                            self.cache_repetition_index,
757                            field,
758                        )?);
759                        column_infos.next_top_level();
760                        return Ok(scheduler);
761                    }
762                }
763
764                let mut child_schedulers = Vec::with_capacity(field.children.len());
765                for field in field.children.iter() {
766                    let field_scheduler =
767                        self.create_structural_field_scheduler(field, column_infos)?;
768                    child_schedulers.push(field_scheduler);
769                }
770
771                let fields = fields.clone();
772                Ok(
773                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
774                        as Box<dyn StructuralFieldScheduler>,
775                )
776            }
777            DataType::List(_) | DataType::LargeList(_) => {
778                let child = field.children.first().expect_ok()?;
779                let child_scheduler =
780                    self.create_structural_field_scheduler(child, column_infos)?;
781                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
782                    as Box<dyn StructuralFieldScheduler>)
783            }
784            DataType::FixedSizeList(inner, dimension)
785                if matches!(inner.data_type(), DataType::Struct(_)) =>
786            {
787                let child = field.children.first().expect_ok()?;
788                let child_scheduler =
789                    self.create_structural_field_scheduler(child, column_infos)?;
790                Ok(Box::new(StructuralFixedSizeListScheduler::new(
791                    child_scheduler,
792                    *dimension,
793                )) as Box<dyn StructuralFieldScheduler>)
794            }
795            DataType::Map(_, keys_sorted) => {
796                // TODO: We only support keys_sorted=false for now,
797                //  because converting a rust arrow map field to the python arrow field will
798                //  lose the keys_sorted property.
799                if *keys_sorted {
800                    return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into()));
801                }
802                let entries_child = field.children.first().expect_ok()?;
803                let child_scheduler =
804                    self.create_structural_field_scheduler(entries_child, column_infos)?;
805                Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
806                    as Box<dyn StructuralFieldScheduler>)
807            }
808            _ => todo!("create_structural_field_scheduler for {}", data_type),
809        }
810    }
811
812    fn create_legacy_field_scheduler(
813        &self,
814        field: &Field,
815        column_infos: &mut ColumnInfoIter,
816        buffers: FileBuffers,
817    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
818        let data_type = field.data_type();
819        if Self::is_primitive_legacy(&data_type) {
820            let column_info = column_infos.expect_next()?;
821            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
822            return Ok(scheduler);
823        } else if data_type.is_binary_like() {
824            let column_info = column_infos.expect_next()?.clone();
825            // Column is blob and user is asking for binary data
826            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
827                let desc_scheduler =
828                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
829                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
830                return Ok(blob_scheduler);
831            }
832            if let Some(page_info) = column_info.page_infos.first() {
833                if matches!(
834                    page_info.encoding.as_legacy(),
835                    pb::ArrayEncoding {
836                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
837                    }
838                ) {
839                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
840                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
841                    } else {
842                        DataType::LargeList(Arc::new(ArrowField::new(
843                            "item",
844                            DataType::UInt8,
845                            false,
846                        )))
847                    };
848                    let list_field = Field::try_from(ArrowField::new(
849                        field.name.clone(),
850                        list_type,
851                        field.nullable,
852                    ))
853                    .unwrap();
854                    let list_scheduler = self.create_list_scheduler(
855                        &list_field,
856                        column_infos,
857                        buffers,
858                        &column_info,
859                    )?;
860                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
861                        list_scheduler.into(),
862                        field.data_type(),
863                    ));
864                    return Ok(binary_scheduler);
865                } else {
866                    let scheduler =
867                        self.create_primitive_scheduler(field, &column_info, buffers)?;
868                    return Ok(scheduler);
869                }
870            } else {
871                return self.create_primitive_scheduler(field, &column_info, buffers);
872            }
873        }
874        match &data_type {
875            DataType::FixedSizeList(inner, _dimension) => {
876                // A fixed size list column could either be a physical or a logical decoder
877                // depending on the child data type.
878                if Self::is_primitive_legacy(inner.data_type()) {
879                    let primitive_col = column_infos.expect_next()?;
880                    let scheduler =
881                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
882                    Ok(scheduler)
883                } else {
884                    todo!()
885                }
886            }
887            DataType::Dictionary(_key_type, value_type) => {
888                if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
889                    let primitive_col = column_infos.expect_next()?;
890                    let scheduler =
891                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
892                    Ok(scheduler)
893                } else {
894                    Err(Error::not_supported_source(
895                        format!(
896                            "No way to decode into a dictionary field of type {}",
897                            value_type
898                        )
899                        .into(),
900                    ))
901                }
902            }
903            DataType::List(_) | DataType::LargeList(_) => {
904                let offsets_column = column_infos.expect_next()?.clone();
905                column_infos.next_top_level();
906                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
907            }
908            DataType::Struct(fields) => {
909                let column_info = column_infos.expect_next()?;
910
911                // Column is blob and user is asking for descriptions
912                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
913                    // Can use primitive scheduler here since descriptions are always packed struct
914                    return self.create_primitive_scheduler(field, &blob_col, buffers);
915                }
916
917                if Self::check_packed_struct(column_info) {
918                    // use packed struct encoding
919                    self.create_primitive_scheduler(field, column_info, buffers)
920                } else {
921                    // use default struct encoding
922                    Self::check_simple_struct(column_info, &field.name).unwrap();
923                    let num_rows = column_info
924                        .page_infos
925                        .iter()
926                        .map(|page| page.num_rows)
927                        .sum();
928                    let mut child_schedulers = Vec::with_capacity(field.children.len());
929                    for field in &field.children {
930                        column_infos.next_top_level();
931                        let field_scheduler =
932                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
933                        child_schedulers.push(Arc::from(field_scheduler));
934                    }
935
936                    let fields = fields.clone();
937                    Ok(Box::new(SimpleStructScheduler::new(
938                        child_schedulers,
939                        fields,
940                        num_rows,
941                    )))
942                }
943            }
944            // TODO: Still need support for RLE
945            _ => todo!(),
946        }
947    }
948}
949
950/// Create's a dummy ColumnInfo for the root column
951fn root_column(num_rows: u64) -> ColumnInfo {
952    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
953    let final_page_num_rows = num_rows % (u32::MAX as u64);
954    let root_pages = (0..num_root_pages)
955        .map(|i| PageInfo {
956            num_rows: if i == num_root_pages - 1 {
957                final_page_num_rows
958            } else {
959                u64::MAX
960            },
961            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
962                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
963                    pb::SimpleStruct {},
964                )),
965            }),
966            priority: 0, // not used in legacy scheduler
967            buffer_offsets_and_sizes: Arc::new([]),
968        })
969        .collect::<Vec<_>>();
970    ColumnInfo {
971        buffer_offsets_and_sizes: Arc::new([]),
972        encoding: pb::ColumnEncoding {
973            column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
974        },
975        index: u32::MAX,
976        page_infos: Arc::from(root_pages),
977    }
978}
979
980pub enum RootDecoder {
981    Structural(StructuralStructDecoder),
982    Legacy(SimpleStructDecoder),
983}
984
985impl RootDecoder {
986    pub fn into_structural(self) -> StructuralStructDecoder {
987        match self {
988            Self::Structural(decoder) => decoder,
989            Self::Legacy(_) => panic!("Expected a structural decoder"),
990        }
991    }
992
993    pub fn into_legacy(self) -> SimpleStructDecoder {
994        match self {
995            Self::Legacy(decoder) => decoder,
996            Self::Structural(_) => panic!("Expected a legacy decoder"),
997        }
998    }
999}
1000
1001impl DecodeBatchScheduler {
1002    /// Creates a new decode scheduler with the expected schema and the column
1003    /// metadata of the file.
1004    #[allow(clippy::too_many_arguments)]
1005    pub async fn try_new<'a>(
1006        schema: &'a Schema,
1007        column_indices: &[u32],
1008        column_infos: &[Arc<ColumnInfo>],
1009        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1010        num_rows: u64,
1011        _decoder_plugins: Arc<DecoderPlugins>,
1012        io: Arc<dyn EncodingsIo>,
1013        cache: Arc<LanceCache>,
1014        filter: &FilterExpression,
1015        decoder_config: &DecoderConfig,
1016    ) -> Result<Self> {
1017        assert!(num_rows > 0);
1018        let buffers = FileBuffers {
1019            positions_and_sizes: file_buffer_positions_and_sizes,
1020        };
1021        let arrow_schema = ArrowSchema::from(schema);
1022        let root_fields = arrow_schema.fields().clone();
1023        let root_type = DataType::Struct(root_fields.clone());
1024        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1025        // root_field.children and schema.fields should be identical at this point but the latter
1026        // has field ids and the former does not.  This line restores that.
1027        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
1028        root_field.children.clone_from(&schema.fields);
1029        root_field
1030            .metadata
1031            .insert("__lance_decoder_root".to_string(), "true".to_string());
1032
1033        if column_infos.is_empty() || column_infos[0].is_structural() {
1034            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1035
1036            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1037            let mut root_scheduler =
1038                strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1039
1040            let context = SchedulerContext::new(io, cache.clone());
1041            root_scheduler.initialize(filter, &context).await?;
1042
1043            Ok(Self {
1044                root_scheduler: RootScheduler::Structural(root_scheduler),
1045                root_fields,
1046                cache,
1047            })
1048        } else {
1049            // The old encoding style expected a header column for structs and so we
1050            // need a header column for the top-level struct
1051            let mut columns = Vec::with_capacity(column_infos.len() + 1);
1052            columns.push(Arc::new(root_column(num_rows)));
1053            columns.extend(column_infos.iter().cloned());
1054
1055            let adjusted_column_indices = [0_u32]
1056                .into_iter()
1057                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1058                .collect::<Vec<_>>();
1059            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1060            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1061            let root_scheduler =
1062                strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1063
1064            let context = SchedulerContext::new(io, cache.clone());
1065            root_scheduler.initialize(filter, &context).await?;
1066
1067            Ok(Self {
1068                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1069                root_fields,
1070                cache,
1071            })
1072        }
1073    }
1074
1075    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1076    pub fn from_scheduler(
1077        root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1078        root_fields: Fields,
1079        cache: Arc<LanceCache>,
1080    ) -> Self {
1081        Self {
1082            root_scheduler: RootScheduler::Legacy(root_scheduler),
1083            root_fields,
1084            cache,
1085        }
1086    }
1087
1088    fn do_schedule_ranges_structural(
1089        &mut self,
1090        ranges: &[Range<u64>],
1091        filter: &FilterExpression,
1092        io: Arc<dyn EncodingsIo>,
1093        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1094    ) {
1095        let root_scheduler = self.root_scheduler.as_structural();
1096        let mut context = SchedulerContext::new(io, self.cache.clone());
1097        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1098        if let Err(schedule_ranges_err) = maybe_root_job {
1099            schedule_action(Err(schedule_ranges_err));
1100            return;
1101        }
1102        let mut root_job = maybe_root_job.unwrap();
1103        let mut num_rows_scheduled = 0;
1104        loop {
1105            let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1106            if let Err(err) = maybe_next_scan_lines {
1107                schedule_action(Err(err));
1108                return;
1109            }
1110            let next_scan_lines = maybe_next_scan_lines.unwrap();
1111            if next_scan_lines.is_empty() {
1112                return;
1113            }
1114            for next_scan_line in next_scan_lines {
1115                trace!(
1116                    "Scheduled scan line of {} rows and {} decoders",
1117                    next_scan_line.rows_scheduled,
1118                    next_scan_line.decoders.len()
1119                );
1120                num_rows_scheduled += next_scan_line.rows_scheduled;
1121                if !schedule_action(Ok(DecoderMessage {
1122                    scheduled_so_far: num_rows_scheduled,
1123                    decoders: next_scan_line.decoders,
1124                })) {
1125                    // Decoder has disconnected
1126                    return;
1127                }
1128            }
1129        }
1130    }
1131
1132    fn do_schedule_ranges_legacy(
1133        &mut self,
1134        ranges: &[Range<u64>],
1135        filter: &FilterExpression,
1136        io: Arc<dyn EncodingsIo>,
1137        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1138        // If specified, this will be used as the top_level_row for all scheduling
1139        // tasks.  This is used by list scheduling to ensure all items scheduling
1140        // tasks are scheduled at the same top level row.
1141        priority: Option<Box<dyn PriorityRange>>,
1142    ) {
1143        let root_scheduler = self.root_scheduler.as_legacy();
1144        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1145        trace!(
1146            "Scheduling {} ranges across {}..{} ({} rows){}",
1147            ranges.len(),
1148            ranges.first().unwrap().start,
1149            ranges.last().unwrap().end,
1150            rows_requested,
1151            priority
1152                .as_ref()
1153                .map(|p| format!(" (priority={:?})", p))
1154                .unwrap_or_default()
1155        );
1156
1157        let mut context = SchedulerContext::new(io, self.cache.clone());
1158        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1159        if let Err(schedule_ranges_err) = maybe_root_job {
1160            schedule_action(Err(schedule_ranges_err));
1161            return;
1162        }
1163        let mut root_job = maybe_root_job.unwrap();
1164        let mut num_rows_scheduled = 0;
1165        let mut rows_to_schedule = root_job.num_rows();
1166        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1167        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1168        while rows_to_schedule > 0 {
1169            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1170            if let Err(schedule_next_err) = maybe_next_scan_line {
1171                schedule_action(Err(schedule_next_err));
1172                return;
1173            }
1174            let next_scan_line = maybe_next_scan_line.unwrap();
1175            priority.advance(next_scan_line.rows_scheduled);
1176            num_rows_scheduled += next_scan_line.rows_scheduled;
1177            rows_to_schedule -= next_scan_line.rows_scheduled;
1178            trace!(
1179                "Scheduled scan line of {} rows and {} decoders",
1180                next_scan_line.rows_scheduled,
1181                next_scan_line.decoders.len()
1182            );
1183            if !schedule_action(Ok(DecoderMessage {
1184                scheduled_so_far: num_rows_scheduled,
1185                decoders: next_scan_line.decoders,
1186            })) {
1187                // Decoder has disconnected
1188                return;
1189            }
1190
1191            trace!("Finished scheduling {} ranges", ranges.len());
1192        }
1193    }
1194
1195    fn do_schedule_ranges(
1196        &mut self,
1197        ranges: &[Range<u64>],
1198        filter: &FilterExpression,
1199        io: Arc<dyn EncodingsIo>,
1200        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1201        // If specified, this will be used as the top_level_row for all scheduling
1202        // tasks.  This is used by list scheduling to ensure all items scheduling
1203        // tasks are scheduled at the same top level row.
1204        priority: Option<Box<dyn PriorityRange>>,
1205    ) {
1206        match &self.root_scheduler {
1207            RootScheduler::Legacy(_) => {
1208                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1209            }
1210            RootScheduler::Structural(_) => {
1211                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1212            }
1213        }
1214    }
1215
1216    // This method is similar to schedule_ranges but instead of
1217    // sending the decoders to a channel it collects them all into a vector
1218    pub fn schedule_ranges_to_vec(
1219        &mut self,
1220        ranges: &[Range<u64>],
1221        filter: &FilterExpression,
1222        io: Arc<dyn EncodingsIo>,
1223        priority: Option<Box<dyn PriorityRange>>,
1224    ) -> Result<Vec<DecoderMessage>> {
1225        let mut decode_messages = Vec::new();
1226        self.do_schedule_ranges(
1227            ranges,
1228            filter,
1229            io,
1230            |msg| {
1231                decode_messages.push(msg);
1232                true
1233            },
1234            priority,
1235        );
1236        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1237    }
1238
1239    /// Schedules the load of multiple ranges of rows
1240    ///
1241    /// Ranges must be non-overlapping and in sorted order
1242    ///
1243    /// # Arguments
1244    ///
1245    /// * `ranges` - The ranges of rows to load
1246    /// * `sink` - A channel to send the decode tasks
1247    /// * `scheduler` An I/O scheduler to issue I/O requests
1248    #[instrument(skip_all)]
1249    pub fn schedule_ranges(
1250        &mut self,
1251        ranges: &[Range<u64>],
1252        filter: &FilterExpression,
1253        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1254        scheduler: Arc<dyn EncodingsIo>,
1255    ) {
1256        self.do_schedule_ranges(
1257            ranges,
1258            filter,
1259            scheduler,
1260            |msg| {
1261                match sink.send(msg) {
1262                    Ok(_) => true,
1263                    Err(SendError { .. }) => {
1264                        // The receiver has gone away.  We can't do anything about it
1265                        // so just ignore the error.
1266                        debug!(
1267                        "schedule_ranges aborting early since decoder appears to have been dropped"
1268                    );
1269                        false
1270                    }
1271                }
1272            },
1273            None,
1274        )
1275    }
1276
1277    /// Schedules the load of a range of rows
1278    ///
1279    /// # Arguments
1280    ///
1281    /// * `range` - The range of rows to load
1282    /// * `sink` - A channel to send the decode tasks
1283    /// * `scheduler` An I/O scheduler to issue I/O requests
1284    #[instrument(skip_all)]
1285    pub fn schedule_range(
1286        &mut self,
1287        range: Range<u64>,
1288        filter: &FilterExpression,
1289        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1290        scheduler: Arc<dyn EncodingsIo>,
1291    ) {
1292        self.schedule_ranges(&[range], filter, sink, scheduler)
1293    }
1294
1295    /// Schedules the load of selected rows
1296    ///
1297    /// # Arguments
1298    ///
1299    /// * `indices` - The row indices to load (these must be in ascending order!)
1300    /// * `sink` - A channel to send the decode tasks
1301    /// * `scheduler` An I/O scheduler to issue I/O requests
1302    pub fn schedule_take(
1303        &mut self,
1304        indices: &[u64],
1305        filter: &FilterExpression,
1306        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1307        scheduler: Arc<dyn EncodingsIo>,
1308    ) {
1309        debug_assert!(indices.windows(2).all(|w| w[0] < w[1]));
1310        if indices.is_empty() {
1311            return;
1312        }
1313        trace!("Scheduling take of {} rows", indices.len());
1314        let ranges = Self::indices_to_ranges(indices);
1315        self.schedule_ranges(&ranges, filter, sink, scheduler)
1316    }
1317
1318    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1319    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1320        let mut ranges = Vec::new();
1321        let mut start = indices[0];
1322
1323        for window in indices.windows(2) {
1324            if window[1] != window[0] + 1 {
1325                ranges.push(start..window[0] + 1);
1326                start = window[1];
1327            }
1328        }
1329
1330        ranges.push(start..*indices.last().unwrap() + 1);
1331        ranges
1332    }
1333}
1334
1335pub struct ReadBatchTask {
1336    pub task: BoxFuture<'static, Result<RecordBatch>>,
1337    pub num_rows: u32,
1338}
1339
1340/// A stream that takes scheduled jobs and generates decode tasks from them.
1341pub struct BatchDecodeStream {
1342    context: DecoderContext,
1343    root_decoder: SimpleStructDecoder,
1344    rows_remaining: u64,
1345    rows_per_batch: u32,
1346    rows_scheduled: u64,
1347    rows_drained: u64,
1348    scheduler_exhausted: bool,
1349    emitted_batch_size_warning: Arc<Once>,
1350}
1351
1352impl BatchDecodeStream {
1353    /// Create a new instance of a batch decode stream
1354    ///
1355    /// # Arguments
1356    ///
1357    /// * `scheduled` - an incoming stream of decode tasks from a `DecodeBatchScheduler`
1358    /// * `schema` - the schema of the data to create
1359    /// * `rows_per_batch` the number of rows to create before making a batch
1360    /// * `num_rows` the total number of rows scheduled
1361    /// * `num_columns` the total number of columns in the file
1362    pub fn new(
1363        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1364        rows_per_batch: u32,
1365        num_rows: u64,
1366        root_decoder: SimpleStructDecoder,
1367    ) -> Self {
1368        Self {
1369            context: DecoderContext::new(scheduled),
1370            root_decoder,
1371            rows_remaining: num_rows,
1372            rows_per_batch,
1373            rows_scheduled: 0,
1374            rows_drained: 0,
1375            scheduler_exhausted: false,
1376            emitted_batch_size_warning: Arc::new(Once::new()),
1377        }
1378    }
1379
1380    fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1381        if decoder.path.is_empty() {
1382            // The root decoder we can ignore
1383            Ok(())
1384        } else {
1385            self.root_decoder.accept_child(decoder)
1386        }
1387    }
1388
1389    #[instrument(level = "debug", skip_all)]
1390    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1391        if self.scheduler_exhausted {
1392            return Ok(self.rows_scheduled);
1393        }
1394        while self.rows_scheduled < scheduled_need {
1395            let next_message = self.context.source.recv().await;
1396            match next_message {
1397                Some(scan_line) => {
1398                    let scan_line = scan_line?;
1399                    self.rows_scheduled = scan_line.scheduled_so_far;
1400                    for message in scan_line.decoders {
1401                        self.accept_decoder(message.into_legacy())?;
1402                    }
1403                }
1404                None => {
1405                    // Schedule ended before we got all the data we expected.  This probably
1406                    // means some kind of pushdown filter was applied and we didn't load as
1407                    // much data as we thought we would.
1408                    self.scheduler_exhausted = true;
1409                    return Ok(self.rows_scheduled);
1410                }
1411            }
1412        }
1413        Ok(scheduled_need)
1414    }
1415
1416    #[instrument(level = "debug", skip_all)]
1417    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1418        trace!(
1419            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1420            self.rows_remaining, self.rows_drained, self.rows_scheduled,
1421        );
1422        if self.rows_remaining == 0 {
1423            return Ok(None);
1424        }
1425
1426        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1427        self.rows_remaining -= to_take;
1428
1429        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1430        trace!(
1431            "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1432            scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1433        );
1434        if scheduled_need > 0 {
1435            let desired_scheduled = scheduled_need + self.rows_scheduled;
1436            trace!(
1437                "Draining from scheduler (desire at least {} scheduled rows)",
1438                desired_scheduled
1439            );
1440            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1441            if actually_scheduled < desired_scheduled {
1442                let under_scheduled = desired_scheduled - actually_scheduled;
1443                to_take -= under_scheduled;
1444            }
1445        }
1446
1447        if to_take == 0 {
1448            return Ok(None);
1449        }
1450
1451        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1452        let loaded_need = self.rows_drained + to_take - 1;
1453        trace!(
1454            "Waiting for I/O (desire at least {} fully loaded rows)",
1455            loaded_need
1456        );
1457        self.root_decoder.wait_for_loaded(loaded_need).await?;
1458
1459        let next_task = self.root_decoder.drain(to_take)?;
1460        self.rows_drained += to_take;
1461        Ok(Some(next_task))
1462    }
1463
1464    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1465        let stream = futures::stream::unfold(self, |mut slf| async move {
1466            let next_task = slf.next_batch_task().await;
1467            let next_task = next_task.transpose().map(|next_task| {
1468                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1469                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1470                let task = async move {
1471                    let next_task = next_task?;
1472                    // Real decode work happens inside into_batch, which can block the current
1473                    // thread for a long time. By spawning it as a new task, we allow Tokio's
1474                    // worker threads to keep making progress.
1475                    tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1476                        .await
1477                        .map_err(|err| Error::wrapped(err.into()))?
1478                };
1479                (task, num_rows)
1480            });
1481            next_task.map(|(task, num_rows)| {
1482                // This should be true since batch size is u32
1483                debug_assert!(num_rows <= u32::MAX as u64);
1484                let next_task = ReadBatchTask {
1485                    task: task.boxed(),
1486                    num_rows: num_rows as u32,
1487                };
1488                (next_task, slf)
1489            })
1490        });
1491        stream.boxed()
1492    }
1493}
1494
1495// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1496// we can have a single implementation of the batch decode iterator
1497enum RootDecoderMessage {
1498    LoadedPage(LoadedPageShard),
1499    LegacyPage(crate::previous::decoder::DecoderReady),
1500}
1501trait RootDecoderType {
1502    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1503    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1504    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1505}
1506impl RootDecoderType for StructuralStructDecoder {
1507    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1508        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1509            unreachable!()
1510        };
1511        self.accept_page(loaded_page)
1512    }
1513    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1514        self.drain_batch_task(num_rows)
1515    }
1516    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1517        // Waiting happens elsewhere (not as part of the decoder)
1518        Ok(())
1519    }
1520}
1521impl RootDecoderType for SimpleStructDecoder {
1522    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1523        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1524            unreachable!()
1525        };
1526        self.accept_child(legacy_page)
1527    }
1528    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1529        self.drain(num_rows)
1530    }
1531    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1532        runtime.block_on(self.wait_for_loaded(loaded_need))
1533    }
1534}
1535
1536/// A blocking batch decoder that performs synchronous decoding
1537struct BatchDecodeIterator<T: RootDecoderType> {
1538    messages: VecDeque<Result<DecoderMessage>>,
1539    root_decoder: T,
1540    rows_remaining: u64,
1541    rows_per_batch: u32,
1542    rows_scheduled: u64,
1543    rows_drained: u64,
1544    emitted_batch_size_warning: Arc<Once>,
1545    // Note: this is not the runtime on which I/O happens.
1546    // That's always in the scheduler.  This is just a runtime we use to
1547    // sleep the current thread if I/O is unready
1548    wait_for_io_runtime: tokio::runtime::Runtime,
1549    schema: Arc<ArrowSchema>,
1550}
1551
1552impl<T: RootDecoderType> BatchDecodeIterator<T> {
1553    /// Create a new instance of a batch decode iterator
1554    pub fn new(
1555        messages: VecDeque<Result<DecoderMessage>>,
1556        rows_per_batch: u32,
1557        num_rows: u64,
1558        root_decoder: T,
1559        schema: Arc<ArrowSchema>,
1560    ) -> Self {
1561        Self {
1562            messages,
1563            root_decoder,
1564            rows_remaining: num_rows,
1565            rows_per_batch,
1566            rows_scheduled: 0,
1567            rows_drained: 0,
1568            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1569                .build()
1570                .unwrap(),
1571            emitted_batch_size_warning: Arc::new(Once::new()),
1572            schema,
1573        }
1574    }
1575
1576    /// Wait for a single page of data to finish loading
1577    ///
1578    /// If the data is not available this will perform a *blocking* wait (put
1579    /// the current thread to sleep)
1580    fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1581        match maybe_done(unloaded_page.0) {
1582            // Fast path, avoid all runtime shenanigans if the data is ready
1583            MaybeDone::Done(loaded_page) => loaded_page,
1584            // Slow path, we need to wait on I/O, enter the runtime
1585            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1586            MaybeDone::Gone => unreachable!(),
1587        }
1588    }
1589
1590    /// Waits for I/O until `scheduled_need` rows have been loaded
1591    ///
1592    /// Note that `scheduled_need` is cumulative.  E.g. this method
1593    /// should be called with 5, 10, 15 and not 5, 5, 5
1594    #[instrument(skip_all)]
1595    fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1596        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1597            let message = self.messages.pop_front().unwrap()?;
1598            self.rows_scheduled = message.scheduled_so_far;
1599            for decoder_message in message.decoders {
1600                match decoder_message {
1601                    MessageType::UnloadedPage(unloaded_page) => {
1602                        let loaded_page = self.wait_for_page(unloaded_page)?;
1603                        self.root_decoder
1604                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1605                    }
1606                    MessageType::DecoderReady(decoder_ready) => {
1607                        // The root decoder we can ignore
1608                        if !decoder_ready.path.is_empty() {
1609                            self.root_decoder
1610                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1611                        }
1612                    }
1613                }
1614            }
1615        }
1616
1617        let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1618
1619        self.root_decoder
1620            .wait(loaded_need, &self.wait_for_io_runtime)?;
1621        Ok(self.rows_scheduled)
1622    }
1623
1624    #[instrument(level = "debug", skip_all)]
1625    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1626        trace!(
1627            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1628            self.rows_remaining, self.rows_drained, self.rows_scheduled,
1629        );
1630        if self.rows_remaining == 0 {
1631            return Ok(None);
1632        }
1633
1634        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1635        self.rows_remaining -= to_take;
1636
1637        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1638        trace!(
1639            "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1640            scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1641        );
1642        if scheduled_need > 0 {
1643            let desired_scheduled = scheduled_need + self.rows_scheduled;
1644            trace!(
1645                "Draining from scheduler (desire at least {} scheduled rows)",
1646                desired_scheduled
1647            );
1648            let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1649            if actually_scheduled < desired_scheduled {
1650                let under_scheduled = desired_scheduled - actually_scheduled;
1651                to_take -= under_scheduled;
1652            }
1653        }
1654
1655        if to_take == 0 {
1656            return Ok(None);
1657        }
1658
1659        let next_task = self.root_decoder.drain_batch(to_take)?;
1660
1661        self.rows_drained += to_take;
1662
1663        let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1664
1665        Ok(Some(batch))
1666    }
1667}
1668
1669impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1670    type Item = ArrowResult<RecordBatch>;
1671
1672    fn next(&mut self) -> Option<Self::Item> {
1673        self.next_batch_task()
1674            .transpose()
1675            .map(|r| r.map_err(ArrowError::from))
1676    }
1677}
1678
1679impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1680    fn schema(&self) -> Arc<ArrowSchema> {
1681        self.schema.clone()
1682    }
1683}
1684
1685/// A stream that takes scheduled jobs and generates decode tasks from them.
1686pub struct StructuralBatchDecodeStream {
1687    context: DecoderContext,
1688    root_decoder: StructuralStructDecoder,
1689    rows_remaining: u64,
1690    rows_per_batch: u32,
1691    rows_scheduled: u64,
1692    rows_drained: u64,
1693    scheduler_exhausted: bool,
1694    emitted_batch_size_warning: Arc<Once>,
1695    // Decode scheduling policy selected at planning time.
1696    //
1697    // Performance tradeoff:
1698    // - true: spawn `into_batch` onto Tokio, which improves scan throughput by allowing
1699    //   more decode parallelism.
1700    // - false: run `into_batch` inline, which avoids Tokio scheduling overhead and is
1701    //   typically better for point lookups / small takes.
1702    spawn_batch_decode_tasks: bool,
1703}
1704
1705impl StructuralBatchDecodeStream {
1706    /// Create a new instance of a batch decode stream
1707    ///
1708    /// # Arguments
1709    ///
1710    /// * `scheduled` - an incoming stream of decode tasks from a `DecodeBatchScheduler`
1711    /// * `schema` - the schema of the data to create
1712    /// * `rows_per_batch` the number of rows to create before making a batch
1713    /// * `num_rows` the total number of rows scheduled
1714    /// * `num_columns` the total number of columns in the file
1715    pub fn new(
1716        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1717        rows_per_batch: u32,
1718        num_rows: u64,
1719        root_decoder: StructuralStructDecoder,
1720        spawn_batch_decode_tasks: bool,
1721    ) -> Self {
1722        Self {
1723            context: DecoderContext::new(scheduled),
1724            root_decoder,
1725            rows_remaining: num_rows,
1726            rows_per_batch,
1727            rows_scheduled: 0,
1728            rows_drained: 0,
1729            scheduler_exhausted: false,
1730            emitted_batch_size_warning: Arc::new(Once::new()),
1731            spawn_batch_decode_tasks,
1732        }
1733    }
1734
1735    #[instrument(level = "debug", skip_all)]
1736    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1737        if self.scheduler_exhausted {
1738            return Ok(self.rows_scheduled);
1739        }
1740        while self.rows_scheduled < scheduled_need {
1741            let next_message = self.context.source.recv().await;
1742            match next_message {
1743                Some(scan_line) => {
1744                    let scan_line = scan_line?;
1745                    self.rows_scheduled = scan_line.scheduled_so_far;
1746                    for message in scan_line.decoders {
1747                        let unloaded_page = message.into_structural();
1748                        let loaded_page = unloaded_page.0.await?;
1749                        self.root_decoder.accept_page(loaded_page)?;
1750                    }
1751                }
1752                None => {
1753                    // Schedule ended before we got all the data we expected.  This probably
1754                    // means some kind of pushdown filter was applied and we didn't load as
1755                    // much data as we thought we would.
1756                    self.scheduler_exhausted = true;
1757                    return Ok(self.rows_scheduled);
1758                }
1759            }
1760        }
1761        Ok(scheduled_need)
1762    }
1763
1764    #[instrument(level = "debug", skip_all)]
1765    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1766        trace!(
1767            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1768            self.rows_remaining, self.rows_drained, self.rows_scheduled,
1769        );
1770        if self.rows_remaining == 0 {
1771            return Ok(None);
1772        }
1773
1774        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1775        self.rows_remaining -= to_take;
1776
1777        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1778        trace!(
1779            "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1780            scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1781        );
1782        if scheduled_need > 0 {
1783            let desired_scheduled = scheduled_need + self.rows_scheduled;
1784            trace!(
1785                "Draining from scheduler (desire at least {} scheduled rows)",
1786                desired_scheduled
1787            );
1788            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1789            if actually_scheduled < desired_scheduled {
1790                let under_scheduled = desired_scheduled - actually_scheduled;
1791                to_take -= under_scheduled;
1792            }
1793        }
1794
1795        if to_take == 0 {
1796            return Ok(None);
1797        }
1798
1799        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1800        self.rows_drained += to_take;
1801        Ok(Some(next_task))
1802    }
1803
1804    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1805        let stream = futures::stream::unfold(self, |mut slf| async move {
1806            let next_task = slf.next_batch_task().await;
1807            let next_task = next_task.transpose().map(|next_task| {
1808                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1809                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1810                // Capture the per-stream policy once so every emitted batch task follows the
1811                // same throughput-vs-overhead choice made by the scheduler.
1812                let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks;
1813                let task = async move {
1814                    let next_task = next_task?;
1815                    if spawn_batch_decode_tasks {
1816                        tokio::spawn(
1817                            async move { next_task.into_batch(emitted_batch_size_warning) },
1818                        )
1819                        .await
1820                        .map_err(|err| Error::wrapped(err.into()))?
1821                    } else {
1822                        next_task.into_batch(emitted_batch_size_warning)
1823                    }
1824                };
1825                (task, num_rows)
1826            });
1827            next_task.map(|(task, num_rows)| {
1828                // This should be true since batch size is u32
1829                debug_assert!(num_rows <= u32::MAX as u64);
1830                let next_task = ReadBatchTask {
1831                    task: task.boxed(),
1832                    num_rows: num_rows as u32,
1833                };
1834                (next_task, slf)
1835            })
1836        });
1837        stream.boxed()
1838    }
1839}
1840
1841#[derive(Debug)]
1842pub enum RequestedRows {
1843    Ranges(Vec<Range<u64>>),
1844    Indices(Vec<u64>),
1845}
1846
1847impl RequestedRows {
1848    pub fn num_rows(&self) -> u64 {
1849        match self {
1850            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1851            Self::Indices(indices) => indices.len() as u64,
1852        }
1853    }
1854
1855    pub fn trim_empty_ranges(mut self) -> Self {
1856        if let Self::Ranges(ranges) = &mut self {
1857            ranges.retain(|r| !r.is_empty());
1858        }
1859        self
1860    }
1861}
1862
1863/// Configuration for decoder behavior
1864#[derive(Debug, Clone)]
1865pub struct DecoderConfig {
1866    /// Whether to cache repetition indices for better performance.
1867    ///
1868    /// This defaults to the `LANCE_READ_CACHE_REPETITION_INDEX` environment variable
1869    /// when present and is enabled by default. Set the env var to a non-truthy
1870    /// value (for example `0` or `false`) to disable it. The env var is read
1871    /// once per process.
1872    pub cache_repetition_index: bool,
1873    /// Whether to validate decoded data
1874    pub validate_on_decode: bool,
1875}
1876
1877impl Default for DecoderConfig {
1878    fn default() -> Self {
1879        Self {
1880            cache_repetition_index: default_cache_repetition_index(),
1881            validate_on_decode: false,
1882        }
1883    }
1884}
1885
1886#[derive(Debug, Clone)]
1887pub struct SchedulerDecoderConfig {
1888    pub decoder_plugins: Arc<DecoderPlugins>,
1889    pub batch_size: u32,
1890    pub io: Arc<dyn EncodingsIo>,
1891    pub cache: Arc<LanceCache>,
1892    /// Decoder configuration
1893    pub decoder_config: DecoderConfig,
1894}
1895
1896fn check_scheduler_on_drop(
1897    stream: BoxStream<'static, ReadBatchTask>,
1898    scheduler_handle: tokio::task::JoinHandle<()>,
1899) -> BoxStream<'static, ReadBatchTask> {
1900    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
1901    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
1902    // when the stream finishes.
1903    let mut scheduler_handle = Some(scheduler_handle);
1904    let check_scheduler = stream::unfold((), move |_| {
1905        let handle = scheduler_handle.take();
1906        async move {
1907            if let Some(handle) = handle {
1908                handle.await.unwrap();
1909            }
1910            None
1911        }
1912    });
1913    stream.chain(check_scheduler).boxed()
1914}
1915
1916pub fn create_decode_stream(
1917    schema: &Schema,
1918    num_rows: u64,
1919    batch_size: u32,
1920    is_structural: bool,
1921    should_validate: bool,
1922    spawn_structural_batch_decode_tasks: bool,
1923    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1924) -> Result<BoxStream<'static, ReadBatchTask>> {
1925    if is_structural {
1926        let arrow_schema = ArrowSchema::from(schema);
1927        let structural_decoder = StructuralStructDecoder::new(
1928            arrow_schema.fields,
1929            should_validate,
1930            /*is_root=*/ true,
1931        )?;
1932        Ok(StructuralBatchDecodeStream::new(
1933            rx,
1934            batch_size,
1935            num_rows,
1936            structural_decoder,
1937            spawn_structural_batch_decode_tasks,
1938        )
1939        .into_stream())
1940    } else {
1941        let arrow_schema = ArrowSchema::from(schema);
1942        let root_fields = arrow_schema.fields;
1943
1944        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1945        Ok(BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream())
1946    }
1947}
1948
1949/// Creates a iterator that decodes a set of messages in a blocking fashion
1950///
1951/// See [`schedule_and_decode_blocking`] for more information.
1952pub fn create_decode_iterator(
1953    schema: &Schema,
1954    num_rows: u64,
1955    batch_size: u32,
1956    should_validate: bool,
1957    is_structural: bool,
1958    messages: VecDeque<Result<DecoderMessage>>,
1959) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1960    let arrow_schema = Arc::new(ArrowSchema::from(schema));
1961    let root_fields = arrow_schema.fields.clone();
1962    if is_structural {
1963        let simple_struct_decoder =
1964            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true)?;
1965        Ok(Box::new(BatchDecodeIterator::new(
1966            messages,
1967            batch_size,
1968            num_rows,
1969            simple_struct_decoder,
1970            arrow_schema,
1971        )))
1972    } else {
1973        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1974        Ok(Box::new(BatchDecodeIterator::new(
1975            messages,
1976            batch_size,
1977            num_rows,
1978            root_decoder,
1979            arrow_schema,
1980        )))
1981    }
1982}
1983
1984fn create_scheduler_decoder(
1985    column_infos: Vec<Arc<ColumnInfo>>,
1986    requested_rows: RequestedRows,
1987    filter: FilterExpression,
1988    column_indices: Vec<u32>,
1989    target_schema: Arc<Schema>,
1990    config: SchedulerDecoderConfig,
1991) -> Result<BoxStream<'static, ReadBatchTask>> {
1992    let num_rows = requested_rows.num_rows();
1993
1994    let is_structural = column_infos[0].is_structural();
1995    let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
1996    let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() {
1997        Some("always") => true,
1998        Some("never") => false,
1999        _ => matches!(requested_rows, RequestedRows::Ranges(_)),
2000    };
2001
2002    let (tx, rx) = mpsc::unbounded_channel();
2003
2004    let decode_stream = create_decode_stream(
2005        &target_schema,
2006        num_rows,
2007        config.batch_size,
2008        is_structural,
2009        config.decoder_config.validate_on_decode,
2010        spawn_structural_batch_decode_tasks,
2011        rx,
2012    )?;
2013
2014    let scheduler_handle = tokio::task::spawn(async move {
2015        let mut decode_scheduler = match DecodeBatchScheduler::try_new(
2016            target_schema.as_ref(),
2017            &column_indices,
2018            &column_infos,
2019            &vec![],
2020            num_rows,
2021            config.decoder_plugins,
2022            config.io.clone(),
2023            config.cache,
2024            &filter,
2025            &config.decoder_config,
2026        )
2027        .await
2028        {
2029            Ok(scheduler) => scheduler,
2030            Err(e) => {
2031                let _ = tx.send(Err(e));
2032                return;
2033            }
2034        };
2035
2036        match requested_rows {
2037            RequestedRows::Ranges(ranges) => {
2038                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2039            }
2040            RequestedRows::Indices(indices) => {
2041                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2042            }
2043        }
2044    });
2045
2046    Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2047}
2048
2049/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
2050/// decode the scheduled data and returns the decoder as a stream of record batches.
2051///
2052/// This is a convenience function that creates both the scheduler and the decoder
2053/// which can be a little tricky to get right.
2054pub fn schedule_and_decode(
2055    column_infos: Vec<Arc<ColumnInfo>>,
2056    requested_rows: RequestedRows,
2057    filter: FilterExpression,
2058    column_indices: Vec<u32>,
2059    target_schema: Arc<Schema>,
2060    config: SchedulerDecoderConfig,
2061) -> BoxStream<'static, ReadBatchTask> {
2062    if requested_rows.num_rows() == 0 {
2063        return stream::empty().boxed();
2064    }
2065
2066    // If the user requested any ranges that are empty, ignore them.  They are pointless and
2067    // trying to read them has caused bugs in the past.
2068    let requested_rows = requested_rows.trim_empty_ranges();
2069
2070    let io = config.io.clone();
2071
2072    // For convenience we really want this method to be a snchronous method where all
2073    // errors happen on the stream.  There is some async initialization that must happen
2074    // when creating a scheduler.  We wrap that all up in the very first task.
2075    match create_scheduler_decoder(
2076        column_infos,
2077        requested_rows,
2078        filter,
2079        column_indices,
2080        target_schema,
2081        config,
2082    ) {
2083        // Keep the io alive until the stream is dropped or finishes.  Otherwise the
2084        // I/O drops as soon as the scheduling is finished and the I/O loop terminates.
2085        Ok(stream) => stream.finally(move || drop(io)).boxed(),
2086        // If the initialization failed make it look like a failed task
2087        Err(e) => stream::once(std::future::ready(ReadBatchTask {
2088            num_rows: 0,
2089            task: std::future::ready(Err(e)).boxed(),
2090        }))
2091        .boxed(),
2092    }
2093}
2094
2095pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2096    tokio::runtime::Builder::new_current_thread()
2097        .build()
2098        .unwrap()
2099});
2100
2101/// Schedules and decodes the requested data in a blocking fashion
2102///
2103/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
2104/// and decodes it in the current thread.
2105///
2106/// This can be useful when the disk is fast (or the data is in memory) and the amount
2107/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
2108///
2109/// This should NOT be used for full scans.  Even if the data is in memory this function will
2110/// not parallelize the decode and will be slower than the async version.  Full scans typically
2111/// make relatively few IOPs and so the asynchronous overhead is much smaller.
2112///
2113/// This method will first completely run the scheduling process.  Then it will run the
2114/// decode process.
2115pub fn schedule_and_decode_blocking(
2116    column_infos: Vec<Arc<ColumnInfo>>,
2117    requested_rows: RequestedRows,
2118    filter: FilterExpression,
2119    column_indices: Vec<u32>,
2120    target_schema: Arc<Schema>,
2121    config: SchedulerDecoderConfig,
2122) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2123    if requested_rows.num_rows() == 0 {
2124        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2125        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2126    }
2127
2128    let num_rows = requested_rows.num_rows();
2129    let is_structural = column_infos[0].is_structural();
2130
2131    let (tx, mut rx) = mpsc::unbounded_channel();
2132
2133    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
2134    // runtime.
2135    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2136        target_schema.as_ref(),
2137        &column_indices,
2138        &column_infos,
2139        &vec![],
2140        num_rows,
2141        config.decoder_plugins,
2142        config.io.clone(),
2143        config.cache,
2144        &filter,
2145        &config.decoder_config,
2146    ))?;
2147
2148    // Schedule the requested rows
2149    match requested_rows {
2150        RequestedRows::Ranges(ranges) => {
2151            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2152        }
2153        RequestedRows::Indices(indices) => {
2154            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2155        }
2156    }
2157
2158    // Drain the scheduler queue into a vec of decode messages
2159    let mut messages = Vec::new();
2160    while rx
2161        .recv_many(&mut messages, usize::MAX)
2162        .now_or_never()
2163        .unwrap()
2164        != 0
2165    {}
2166
2167    // Create a decoder to decode the messages
2168    let decode_iterator = create_decode_iterator(
2169        &target_schema,
2170        num_rows,
2171        config.batch_size,
2172        config.decoder_config.validate_on_decode,
2173        is_structural,
2174        messages.into(),
2175    )?;
2176
2177    Ok(decode_iterator)
2178}
2179
2180/// A decoder for single-column encodings of primitive data (this includes fixed size
2181/// lists of primitive data)
2182///
2183/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2184///
2185/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2186/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2187/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2188/// the decode task for batch 0 and the decode task for batch 1.
2189///
2190/// See [`crate::decoder`] for more information
2191pub trait PrimitivePageDecoder: Send + Sync {
2192    /// Decode data into buffers
2193    ///
2194    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2195    /// such as decompressing from some compressed representation.
2196    ///
2197    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2198    /// portion only needs to be updated if the encoding has some concept of an "optional"
2199    /// buffer.
2200    ///
2201    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2202    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2203    ///
2204    /// Binary decodings have two output buffers (one for values, one for offsets)
2205    ///
2206    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2207    /// fixed size strings into variable length strings going from one input buffer to multiple output
2208    /// buffers.
2209    ///
2210    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2211    /// generally end at one of these structures.  However, intermediate structures may exist which
2212    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2213    /// with buffers that have bits-per-value that is not a multiple of 8.
2214    ///
2215    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2216    /// type (order matters) and encodings that aim to decode into arrow types should respect
2217    /// this layout.
2218    /// # Arguments
2219    ///
2220    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2221    /// * `num_rows` - how many rows to decode
2222    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2223    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2224}
2225
2226/// A scheduler for single-column encodings of primitive data
2227///
2228/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2229///
2230/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2231/// be shared in follow-up I/O tasks.
2232///
2233/// See [`crate::decoder`] for more information
2234pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2235    /// Schedules a batch of I/O to load the data needed for the requested ranges
2236    ///
2237    /// Returns a future that will yield a decoder once the data has been loaded
2238    ///
2239    /// # Arguments
2240    ///
2241    /// * `range` - the range of row offsets (relative to start of page) requested
2242    ///   these must be ordered and must not overlap
2243    /// * `scheduler` - a scheduler to submit the I/O request to
2244    /// * `top_level_row` - the row offset of the top level field currently being
2245    ///   scheduled.  This can be used to assign priority to I/O requests
2246    fn schedule_ranges(
2247        &self,
2248        ranges: &[Range<u64>],
2249        scheduler: &Arc<dyn EncodingsIo>,
2250        top_level_row: u64,
2251    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2252}
2253
2254/// A trait to control the priority of I/O
2255pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2256    fn advance(&mut self, num_rows: u64);
2257    fn current_priority(&self) -> u64;
2258    fn box_clone(&self) -> Box<dyn PriorityRange>;
2259}
2260
2261/// A simple priority scheme for top-level fields with no parent
2262/// repetition
2263#[derive(Debug)]
2264pub struct SimplePriorityRange {
2265    priority: u64,
2266}
2267
2268impl SimplePriorityRange {
2269    fn new(priority: u64) -> Self {
2270        Self { priority }
2271    }
2272}
2273
2274impl PriorityRange for SimplePriorityRange {
2275    fn advance(&mut self, num_rows: u64) {
2276        self.priority += num_rows;
2277    }
2278
2279    fn current_priority(&self) -> u64 {
2280        self.priority
2281    }
2282
2283    fn box_clone(&self) -> Box<dyn PriorityRange> {
2284        Box::new(Self {
2285            priority: self.priority,
2286        })
2287    }
2288}
2289
2290/// Determining the priority of a list request is tricky.  We want
2291/// the priority to be the top-level row.  So if we have a
2292/// `list<list<int>>` and each outer list has 10 rows and each inner
2293/// list has 5 rows then the priority of the 100th item is 1 because
2294/// it is the 5th item in the 10th item of the *second* row.
2295///
2296/// This structure allows us to keep track of this complicated priority
2297/// relationship.
2298///
2299/// There's a fair amount of bookkeeping involved here.
2300///
2301/// A better approach (using repetition levels) is coming in the future.
2302pub struct ListPriorityRange {
2303    base: Box<dyn PriorityRange>,
2304    offsets: Arc<[u64]>,
2305    cur_index_into_offsets: usize,
2306    cur_position: u64,
2307}
2308
2309impl ListPriorityRange {
2310    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2311        Self {
2312            base,
2313            offsets,
2314            cur_index_into_offsets: 0,
2315            cur_position: 0,
2316        }
2317    }
2318}
2319
2320impl std::fmt::Debug for ListPriorityRange {
2321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2322        f.debug_struct("ListPriorityRange")
2323            .field("base", &self.base)
2324            .field("offsets.len()", &self.offsets.len())
2325            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2326            .field("cur_position", &self.cur_position)
2327            .finish()
2328    }
2329}
2330
2331impl PriorityRange for ListPriorityRange {
2332    fn advance(&mut self, num_rows: u64) {
2333        // We've scheduled X items.  Now walk through the offsets to
2334        // determine how many rows we've scheduled.
2335        self.cur_position += num_rows;
2336        let mut idx_into_offsets = self.cur_index_into_offsets;
2337        while idx_into_offsets + 1 < self.offsets.len()
2338            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2339        {
2340            idx_into_offsets += 1;
2341        }
2342        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2343        self.cur_index_into_offsets = idx_into_offsets;
2344        self.base.advance(base_rows_advanced as u64);
2345    }
2346
2347    fn current_priority(&self) -> u64 {
2348        self.base.current_priority()
2349    }
2350
2351    fn box_clone(&self) -> Box<dyn PriorityRange> {
2352        Box::new(Self {
2353            base: self.base.box_clone(),
2354            offsets: self.offsets.clone(),
2355            cur_index_into_offsets: self.cur_index_into_offsets,
2356            cur_position: self.cur_position,
2357        })
2358    }
2359}
2360
2361/// Contains the context for a scheduler
2362pub struct SchedulerContext {
2363    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2364    io: Arc<dyn EncodingsIo>,
2365    cache: Arc<LanceCache>,
2366    name: String,
2367    path: Vec<u32>,
2368    path_names: Vec<String>,
2369}
2370
2371pub struct ScopedSchedulerContext<'a> {
2372    pub context: &'a mut SchedulerContext,
2373}
2374
2375impl<'a> ScopedSchedulerContext<'a> {
2376    pub fn pop(self) -> &'a mut SchedulerContext {
2377        self.context.pop();
2378        self.context
2379    }
2380}
2381
2382impl SchedulerContext {
2383    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2384        Self {
2385            io,
2386            cache,
2387            recv: None,
2388            name: "".to_string(),
2389            path: Vec::new(),
2390            path_names: Vec::new(),
2391        }
2392    }
2393
2394    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2395        &self.io
2396    }
2397
2398    pub fn cache(&self) -> &Arc<LanceCache> {
2399        &self.cache
2400    }
2401
2402    pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2403        self.path.push(index);
2404        self.path_names.push(name.to_string());
2405        ScopedSchedulerContext { context: self }
2406    }
2407
2408    pub fn pop(&mut self) {
2409        self.path.pop();
2410        self.path_names.pop();
2411    }
2412
2413    pub fn path_name(&self) -> String {
2414        let path = self.path_names.join("/");
2415        if self.recv.is_some() {
2416            format!("TEMP({}){}", self.name, path)
2417        } else {
2418            format!("ROOT{}", path)
2419        }
2420    }
2421
2422    pub fn current_path(&self) -> VecDeque<u32> {
2423        VecDeque::from_iter(self.path.iter().copied())
2424    }
2425
2426    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2427    pub fn locate_decoder(
2428        &mut self,
2429        decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2430    ) -> crate::previous::decoder::DecoderReady {
2431        trace!(
2432            "Scheduling decoder of type {:?} for {:?}",
2433            decoder.data_type(),
2434            self.path,
2435        );
2436        crate::previous::decoder::DecoderReady {
2437            decoder,
2438            path: self.current_path(),
2439        }
2440    }
2441}
2442
2443pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2444
2445impl std::fmt::Debug for UnloadedPageShard {
2446    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2447        f.debug_struct("UnloadedPage").finish()
2448    }
2449}
2450
2451#[derive(Debug)]
2452pub struct ScheduledScanLine {
2453    pub rows_scheduled: u64,
2454    pub decoders: Vec<MessageType>,
2455}
2456
2457pub trait StructuralSchedulingJob: std::fmt::Debug {
2458    /// Schedule the next batch of data
2459    ///
2460    /// Normally this equates to scheduling the next page of data into one task.  Very large pages
2461    /// might be split into multiple scan lines.  Each scan line has one or more rows.
2462    ///
2463    /// If a scheduler ends early it may return an empty vector.
2464    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2465}
2466
2467/// A filter expression to apply to the data
2468///
2469/// The core decoders do not currently take advantage of filtering in
2470/// any way.  In order to maintain the abstraction we represent filters
2471/// as an arbitrary byte sequence.
2472///
2473/// We recommend that encodings use Substrait for filters.
2474pub struct FilterExpression(pub Bytes);
2475
2476impl FilterExpression {
2477    /// Create a filter expression that does not filter any data
2478    ///
2479    /// This is currently represented by an empty byte array.  Encoders
2480    /// that are "filter aware" should make sure they handle this case.
2481    pub fn no_filter() -> Self {
2482        Self(Bytes::new())
2483    }
2484
2485    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2486    pub fn is_noop(&self) -> bool {
2487        self.0.is_empty()
2488    }
2489}
2490
2491pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2492    fn initialize<'a>(
2493        &'a mut self,
2494        filter: &'a FilterExpression,
2495        context: &'a SchedulerContext,
2496    ) -> BoxFuture<'a, Result<()>>;
2497    fn schedule_ranges<'a>(
2498        &'a self,
2499        ranges: &[Range<u64>],
2500        filter: &FilterExpression,
2501    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2502}
2503
2504/// A trait for tasks that decode data into an Arrow array
2505pub trait DecodeArrayTask: Send {
2506    /// Decodes the data into an Arrow array
2507    fn decode(self: Box<Self>) -> Result<ArrayRef>;
2508}
2509
2510impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2511    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2512        StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2513    }
2514}
2515
2516/// A task to decode data into an Arrow record batch
2517///
2518/// It has a child `task` which decodes a struct array with no nulls.
2519/// This is then converted into a record batch.
2520pub struct NextDecodeTask {
2521    /// The decode task itself
2522    pub task: Box<dyn DecodeArrayTask>,
2523    /// The number of rows that will be created
2524    pub num_rows: u64,
2525}
2526
2527impl NextDecodeTask {
2528    // Run the task and produce a record batch
2529    //
2530    // If the batch is very large this function will log a warning message
2531    // suggesting the user try a smaller batch size.
2532    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2533    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2534        let struct_arr = self.task.decode();
2535        match struct_arr {
2536            Ok(struct_arr) => {
2537                let batch = RecordBatch::from(struct_arr.as_struct());
2538                let size_bytes = batch.get_array_memory_size() as u64;
2539                if size_bytes > BATCH_SIZE_BYTES_WARNING {
2540                    emitted_batch_size_warning.call_once(|| {
2541                        let size_mb = size_bytes / 1024 / 1024;
2542                        debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2543                    });
2544                }
2545                Ok(batch)
2546            }
2547            Err(e) => {
2548                let e = Error::internal(format!("Error decoding batch: {}", e));
2549                Err(e)
2550            }
2551        }
2552    }
2553}
2554
2555// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2556// share some code paths between the two.  Decoders can safely unwrap into whatever
2557// style they expect since a file will be either all-2.0 or all-2.1
2558#[derive(Debug)]
2559pub enum MessageType {
2560    // The older v2.0 scheduler/decoder used a scheme where the message was the
2561    // decoder itself.  The messages were not sent in priority order and the decoder
2562    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2563    // complexity.
2564    DecoderReady(crate::previous::decoder::DecoderReady),
2565    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2566    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2567    // the decoder does not have to worry about waiting for I/O.
2568    UnloadedPage(UnloadedPageShard),
2569}
2570
2571impl MessageType {
2572    pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2573        match self {
2574            Self::DecoderReady(decoder) => decoder,
2575            Self::UnloadedPage(_) => {
2576                panic!("Expected DecoderReady but got UnloadedPage")
2577            }
2578        }
2579    }
2580
2581    pub fn into_structural(self) -> UnloadedPageShard {
2582        match self {
2583            Self::UnloadedPage(unloaded) => unloaded,
2584            Self::DecoderReady(_) => {
2585                panic!("Expected UnloadedPage but got DecoderReady")
2586            }
2587        }
2588    }
2589}
2590
2591pub struct DecoderMessage {
2592    pub scheduled_so_far: u64,
2593    pub decoders: Vec<MessageType>,
2594}
2595
2596pub struct DecoderContext {
2597    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2598}
2599
2600impl DecoderContext {
2601    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2602        Self { source }
2603    }
2604}
2605
2606pub struct DecodedPage {
2607    pub data: DataBlock,
2608    pub repdef: RepDefUnraveler,
2609}
2610
2611pub trait DecodePageTask: Send + std::fmt::Debug {
2612    /// Decodes the data into an Arrow array
2613    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2614}
2615
2616pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2617    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2618    fn num_rows(&self) -> u64;
2619}
2620
2621#[derive(Debug)]
2622pub struct LoadedPageShard {
2623    // The decoder that is ready to be decoded
2624    pub decoder: Box<dyn StructuralPageDecoder>,
2625    // The path to the decoder, the first value is the column index
2626    // following values, if present, are nested child indices
2627    //
2628    // For example, a path of [1, 1, 0] would mean to grab the second
2629    // column, then the second child, and then the first child.
2630    //
2631    // It could represent x in the following schema:
2632    //
2633    // score: float64
2634    // points: struct
2635    //   color: string
2636    //   location: struct
2637    //     x: float64
2638    //
2639    // Currently, only struct decoders have "children" although other
2640    // decoders may at some point as well.  List children are only
2641    // handled through indirect I/O at the moment and so they don't
2642    // need to be represented (yet)
2643    pub path: VecDeque<u32>,
2644}
2645
2646pub struct DecodedArray {
2647    pub array: ArrayRef,
2648    pub repdef: CompositeRepDefUnraveler,
2649}
2650
2651pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2652    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2653}
2654
2655pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2656    /// Add a newly scheduled child decoder
2657    ///
2658    /// The default implementation does not expect children and returns
2659    /// an error.
2660    fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2661    /// Creates a task to decode `num_rows` of data into an array
2662    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2663    /// The data type of the decoded data
2664    fn data_type(&self) -> &DataType;
2665}
2666
2667#[derive(Debug, Default)]
2668pub struct DecoderPlugins {}
2669
2670/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2671pub async fn decode_batch(
2672    batch: &EncodedBatch,
2673    filter: &FilterExpression,
2674    decoder_plugins: Arc<DecoderPlugins>,
2675    should_validate: bool,
2676    version: LanceFileVersion,
2677    cache: Option<Arc<LanceCache>>,
2678) -> Result<RecordBatch> {
2679    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2680    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2681    // polled twice.
2682
2683    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2684    let cache = if let Some(cache) = cache {
2685        cache
2686    } else {
2687        Arc::new(lance_core::cache::LanceCache::with_capacity(
2688            128 * 1024 * 1024,
2689        ))
2690    };
2691    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2692        batch.schema.as_ref(),
2693        &batch.top_level_columns,
2694        &batch.page_table,
2695        &vec![],
2696        batch.num_rows,
2697        decoder_plugins,
2698        io_scheduler.clone(),
2699        cache,
2700        filter,
2701        &DecoderConfig::default(),
2702    )
2703    .await?;
2704    let (tx, rx) = unbounded_channel();
2705    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2706    let is_structural = version >= LanceFileVersion::V2_1;
2707    let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2708    let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never"));
2709    let mut decode_stream = create_decode_stream(
2710        &batch.schema,
2711        batch.num_rows,
2712        batch.num_rows as u32,
2713        is_structural,
2714        should_validate,
2715        spawn_structural_batch_decode_tasks,
2716        rx,
2717    )?;
2718    decode_stream.next().await.unwrap().task.await
2719}
2720
2721#[cfg(test)]
2722// test coalesce indices to ranges
2723mod tests {
2724    use super::*;
2725
2726    #[test]
2727    fn test_coalesce_indices_to_ranges_with_single_index() {
2728        let indices = vec![1];
2729        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2730        assert_eq!(ranges, vec![1..2]);
2731    }
2732
2733    #[test]
2734    fn test_coalesce_indices_to_ranges() {
2735        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2736        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2737        assert_eq!(ranges, vec![1..10]);
2738    }
2739
2740    #[test]
2741    fn test_coalesce_indices_to_ranges_with_gaps() {
2742        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2743        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2744        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2745    }
2746}