Skip to main content

lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into `FieldScheduler` and `PageScheduler`.
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into `PhysicalPageDecoder` and
27//! [`LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type `List<UInt32>`
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::atomic::{AtomicU64, Ordering};
217use std::sync::{LazyLock, Once, OnceLock};
218use std::{ops::Range, sync::Arc};
219
220use arrow_array::cast::AsArray;
221use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
222use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
223use bytes::Bytes;
224use futures::future::{BoxFuture, MaybeDone, maybe_done};
225use futures::stream::{self, BoxStream};
226use futures::{FutureExt, StreamExt};
227use lance_arrow::DataTypeExt;
228use lance_core::cache::LanceCache;
229use lance_core::datatypes::{
230    BLOB_DESC_LANCE_FIELD, Field, Schema, validate_fixed_size_list_dimensions,
231};
232use lance_core::utils::futures::{FinallyStreamExt, StreamOnDropExt};
233use lance_core::utils::parse::parse_env_as_bool;
234use log::{debug, trace, warn};
235use tokio::sync::mpsc::error::SendError;
236use tokio::sync::mpsc::{self, unbounded_channel};
237
238use lance_core::error::LanceOptionExt;
239use lance_core::{ArrowResult, Error, Result};
240use tracing::instrument;
241
242use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
243use crate::data::DataBlock;
244use crate::encoder::EncodedBatch;
245use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
246use crate::encodings::logical::list::StructuralListScheduler;
247use crate::encodings::logical::map::StructuralMapScheduler;
248use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
249use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
250use crate::format::pb::{self, column_encoding};
251use crate::format::pb21;
252use crate::previous::decoder::LogicalPageDecoder;
253use crate::previous::encodings::logical::list::OffsetPageInfo;
254use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
255use crate::previous::encodings::logical::{
256    binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
257    primitive::PrimitiveFieldScheduler,
258};
259use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
260use crate::version::LanceFileVersion;
261use crate::{BufferScheduler, EncodingsIo};
262
263// If users are getting batches over 10MiB large then it's time to reduce the batch size
264const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
265const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str =
266    "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE";
267const ENV_LANCE_READ_CACHE_REPETITION_INDEX: &str = "LANCE_READ_CACHE_REPETITION_INDEX";
268const ENV_LANCE_INLINE_SCHEDULING_THRESHOLD: &str = "LANCE_INLINE_SCHEDULING_THRESHOLD";
269
270// If a request is for at most this many rows we skip the scheduler-task spawn
271// and run scheduling inline as part of the `schedule_and_decode` await.
272const DEFAULT_INLINE_SCHEDULING_THRESHOLD: u64 = 16 * 1024;
273
274fn default_cache_repetition_index() -> bool {
275    static DEFAULT_CACHE_REPETITION_INDEX: OnceLock<bool> = OnceLock::new();
276    *DEFAULT_CACHE_REPETITION_INDEX
277        .get_or_init(|| parse_env_as_bool(ENV_LANCE_READ_CACHE_REPETITION_INDEX, true))
278}
279
280fn inline_scheduling_threshold() -> u64 {
281    static THRESHOLD: OnceLock<u64> = OnceLock::new();
282    *THRESHOLD.get_or_init(|| {
283        std::env::var(ENV_LANCE_INLINE_SCHEDULING_THRESHOLD)
284            .ok()
285            .and_then(|v| v.trim().parse::<u64>().ok())
286            .unwrap_or(DEFAULT_INLINE_SCHEDULING_THRESHOLD)
287    })
288}
289
290/// Top-level encoding message for a page.  Wraps both the
291/// legacy pb::ArrayEncoding and the newer pb::PageLayout
292///
293/// A file should only use one or the other and never both.
294/// 2.0 decoders can always assume this is pb::ArrayEncoding
295/// and 2.1+ decoders can always assume this is pb::PageLayout
296#[derive(Debug)]
297pub enum PageEncoding {
298    Legacy(pb::ArrayEncoding),
299    Structural(pb21::PageLayout),
300}
301
302impl PageEncoding {
303    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
304        match self {
305            Self::Legacy(enc) => enc,
306            Self::Structural(_) => panic!("Expected a legacy encoding"),
307        }
308    }
309
310    pub fn as_structural(&self) -> &pb21::PageLayout {
311        match self {
312            Self::Structural(enc) => enc,
313            Self::Legacy(_) => panic!("Expected a structural encoding"),
314        }
315    }
316
317    pub fn is_structural(&self) -> bool {
318        matches!(self, Self::Structural(_))
319    }
320}
321
322/// Metadata describing a page in a file
323///
324/// This is typically created by reading the metadata section of a Lance file
325#[derive(Debug)]
326pub struct PageInfo {
327    /// The number of rows in the page
328    pub num_rows: u64,
329    /// The priority (top level row number) of the page
330    ///
331    /// This is only set in 2.1 files and will be 0 for 2.0 files
332    pub priority: u64,
333    /// The encoding that explains the buffers in the page
334    pub encoding: PageEncoding,
335    /// The offsets and sizes of the buffers in the file
336    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
337}
338
339/// Metadata describing a column in a file
340///
341/// This is typically created by reading the metadata section of a Lance file
342#[derive(Debug, Clone)]
343pub struct ColumnInfo {
344    /// The index of the column in the file
345    pub index: u32,
346    /// The metadata for each page in the column
347    pub page_infos: Arc<[PageInfo]>,
348    /// File positions and their sizes of the column-level buffers
349    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
350    pub encoding: pb::ColumnEncoding,
351}
352
353impl ColumnInfo {
354    /// Create a new instance
355    pub fn new(
356        index: u32,
357        page_infos: Arc<[PageInfo]>,
358        buffer_offsets_and_sizes: Vec<(u64, u64)>,
359        encoding: pb::ColumnEncoding,
360    ) -> Self {
361        Self {
362            index,
363            page_infos,
364            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
365            encoding,
366        }
367    }
368
369    pub fn is_structural(&self) -> bool {
370        self.page_infos
371            // Can just look at the first since all should be the same
372            .first()
373            .map(|page| page.encoding.is_structural())
374            .unwrap_or(false)
375    }
376}
377
378enum RootScheduler {
379    Structural(Box<dyn StructuralFieldScheduler>),
380    Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
381}
382
383impl RootScheduler {
384    fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
385        match self {
386            Self::Structural(_) => panic!("Expected a legacy scheduler"),
387            Self::Legacy(s) => s,
388        }
389    }
390
391    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
392        match self {
393            Self::Structural(s) => s.as_ref(),
394            Self::Legacy(_) => panic!("Expected a structural scheduler"),
395        }
396    }
397}
398
399/// The scheduler for decoding batches
400///
401/// Lance decoding is done in two steps, scheduling, and decoding.  The
402/// scheduling tends to be lightweight and should quickly figure what data
403/// is needed from the disk issue the appropriate I/O requests.  A decode task is
404/// created to eventually decode the data (once it is loaded) and scheduling
405/// moves on to scheduling the next page.
406///
407/// Meanwhile, it's expected that a decode stream will be setup to run at the
408/// same time.  Decode tasks take the data that is loaded and turn it into
409/// Arrow arrays.
410///
411/// This approach allows us to keep our I/O parallelism and CPU parallelism
412/// completely separate since those are often two very different values.
413///
414/// Backpressure should be achieved via the I/O service.  Requests that are
415/// issued will pile up if the decode stream is not polling quickly enough.
416/// The [`crate::EncodingsIo::submit_request`] function should return a pending
417/// future once there are too many I/O requests in flight.
418///
419/// TODO: Implement backpressure
420pub struct DecodeBatchScheduler {
421    root_scheduler: RootScheduler,
422    pub root_fields: Fields,
423    cache: Arc<LanceCache>,
424}
425
426pub struct ColumnInfoIter<'a> {
427    column_infos: Vec<Arc<ColumnInfo>>,
428    column_indices: &'a [u32],
429    column_info_pos: usize,
430    column_indices_pos: usize,
431}
432
433impl<'a> ColumnInfoIter<'a> {
434    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
435        let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
436        Self {
437            column_infos,
438            column_indices,
439            column_info_pos: initial_pos,
440            column_indices_pos: 0,
441        }
442    }
443
444    pub fn peek(&self) -> &Arc<ColumnInfo> {
445        &self.column_infos[self.column_info_pos]
446    }
447
448    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
449        let column_info = self.column_infos[self.column_info_pos].clone();
450        let transformed = transform(column_info);
451        self.column_infos[self.column_info_pos] = transformed;
452    }
453
454    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
455        self.next().ok_or_else(|| {
456            Error::invalid_input(
457                "there were more fields in the schema than provided column indices / infos",
458            )
459        })
460    }
461
462    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
463        if self.column_info_pos < self.column_infos.len() {
464            let info = &self.column_infos[self.column_info_pos];
465            self.column_info_pos += 1;
466            Some(info)
467        } else {
468            None
469        }
470    }
471
472    pub(crate) fn next_top_level(&mut self) {
473        self.column_indices_pos += 1;
474        if self.column_indices_pos < self.column_indices.len() {
475            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
476        } else {
477            self.column_info_pos = self.column_infos.len();
478        }
479    }
480}
481
482/// These contain the file buffers shared across the entire file
483#[derive(Clone, Copy, Debug)]
484pub struct FileBuffers<'a> {
485    pub positions_and_sizes: &'a [(u64, u64)],
486}
487
488/// These contain the file buffers and also buffers specific to a column
489#[derive(Clone, Copy, Debug)]
490pub struct ColumnBuffers<'a, 'b> {
491    pub file_buffers: FileBuffers<'a>,
492    pub positions_and_sizes: &'b [(u64, u64)],
493}
494
495/// These contain the file & column buffers and also buffers specific to a page
496#[derive(Clone, Copy, Debug)]
497pub struct PageBuffers<'a, 'b, 'c> {
498    pub column_buffers: ColumnBuffers<'a, 'b>,
499    pub positions_and_sizes: &'c [(u64, u64)],
500}
501
502/// The core decoder strategy handles all the various Arrow types
503#[derive(Debug)]
504pub struct CoreFieldDecoderStrategy {
505    pub validate_data: bool,
506    pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
507    pub cache_repetition_index: bool,
508}
509
510impl Default for CoreFieldDecoderStrategy {
511    fn default() -> Self {
512        Self {
513            validate_data: false,
514            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
515            cache_repetition_index: false,
516        }
517    }
518}
519
520impl CoreFieldDecoderStrategy {
521    /// Create a new strategy with cache_repetition_index enabled
522    pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
523        self.cache_repetition_index = cache_repetition_index;
524        self
525    }
526
527    /// Create a new strategy from decoder config
528    pub fn from_decoder_config(config: &DecoderConfig) -> Self {
529        Self {
530            validate_data: config.validate_on_decode,
531            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
532            cache_repetition_index: config.cache_repetition_index,
533        }
534    }
535
536    /// This is just a sanity check to ensure there is no "wrapped encodings"
537    /// that haven't been handled.
538    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
539        let column_encoding = column_info
540            .encoding
541            .column_encoding
542            .as_ref()
543            .ok_or_else(|| {
544                Error::invalid_input(format!(
545                    "the column at index {} was missing a ColumnEncoding",
546                    column_info.index
547                ))
548            })?;
549        if matches!(
550            column_encoding,
551            pb::column_encoding::ColumnEncoding::Values(_)
552        ) {
553            Ok(())
554        } else {
555            Err(Error::invalid_input(format!(
556                "the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it",
557                column_info.index, field_name, column_encoding
558            )))
559        }
560    }
561
562    fn is_structural_primitive(data_type: &DataType) -> bool {
563        if data_type.is_primitive() {
564            true
565        } else {
566            match data_type {
567                // DataType::is_primitive doesn't consider these primitive but we do
568                DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
569                DataType::Boolean
570                | DataType::Null
571                | DataType::FixedSizeBinary(_)
572                | DataType::Binary
573                | DataType::LargeBinary
574                | DataType::Utf8
575                | DataType::LargeUtf8 => true,
576                DataType::FixedSizeList(inner, _) => {
577                    Self::is_structural_primitive(inner.data_type())
578                }
579                _ => false,
580            }
581        }
582    }
583
584    fn is_primitive_legacy(data_type: &DataType) -> bool {
585        if data_type.is_primitive() {
586            true
587        } else {
588            match data_type {
589                // DataType::is_primitive doesn't consider these primitive but we do
590                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
591                DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
592                _ => false,
593            }
594        }
595    }
596
597    fn create_primitive_scheduler(
598        &self,
599        field: &Field,
600        column: &ColumnInfo,
601        buffers: FileBuffers,
602    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
603        Self::ensure_values_encoded(column, &field.name)?;
604        // Primitive fields map to a single column
605        let column_buffers = ColumnBuffers {
606            file_buffers: buffers,
607            positions_and_sizes: &column.buffer_offsets_and_sizes,
608        };
609        Ok(Box::new(PrimitiveFieldScheduler::new(
610            column.index,
611            field.data_type(),
612            column.page_infos.clone(),
613            column_buffers,
614            self.validate_data,
615        )))
616    }
617
618    /// Helper method to verify the page encoding of a struct header column
619    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
620        Self::ensure_values_encoded(column_info, field_name)?;
621        if column_info.page_infos.len() != 1 {
622            return Err(Error::invalid_input_source(format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into()));
623        }
624        let encoding = &column_info.page_infos[0].encoding;
625        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
626            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
627            _ => Err(Error::invalid_input_source(format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into())),
628        }
629    }
630
631    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
632        let encoding = &column_info.page_infos[0].encoding;
633        matches!(
634            encoding.as_legacy().array_encoding.as_ref().unwrap(),
635            pb::array_encoding::ArrayEncoding::PackedStruct(_)
636        )
637    }
638
639    fn create_list_scheduler(
640        &self,
641        list_field: &Field,
642        column_infos: &mut ColumnInfoIter,
643        buffers: FileBuffers,
644        offsets_column: &ColumnInfo,
645    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
646        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
647        let offsets_column_buffers = ColumnBuffers {
648            file_buffers: buffers,
649            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
650        };
651        let items_scheduler =
652            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
653
654        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
655            .page_infos
656            .iter()
657            .filter(|offsets_page| offsets_page.num_rows > 0)
658            .map(|offsets_page| {
659                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
660                    &offsets_page.encoding.as_legacy().array_encoding
661                {
662                    let inner = PageInfo {
663                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
664                        encoding: PageEncoding::Legacy(
665                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
666                        ),
667                        num_rows: offsets_page.num_rows,
668                        priority: 0,
669                    };
670                    (
671                        inner,
672                        OffsetPageInfo {
673                            offsets_in_page: offsets_page.num_rows,
674                            null_offset_adjustment: list_encoding.null_offset_adjustment,
675                            num_items_referenced_by_page: list_encoding.num_items,
676                        },
677                    )
678                } else {
679                    // TODO: Should probably return Err here
680                    panic!("Expected a list column");
681                }
682            })
683            .unzip();
684        let inner = Arc::new(PrimitiveFieldScheduler::new(
685            offsets_column.index,
686            DataType::UInt64,
687            Arc::from(inner_infos.into_boxed_slice()),
688            offsets_column_buffers,
689            self.validate_data,
690        )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
691        let items_field = match list_field.data_type() {
692            DataType::List(inner) => inner,
693            DataType::LargeList(inner) => inner,
694            _ => unreachable!(),
695        };
696        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
697            DataType::Int32
698        } else {
699            DataType::Int64
700        };
701        Ok(Box::new(ListFieldScheduler::new(
702            inner,
703            items_scheduler.into(),
704            items_field,
705            offset_type,
706            null_offset_adjustments,
707        )))
708    }
709
710    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
711        if let column_encoding::ColumnEncoding::Blob(blob) =
712            column_info.encoding.column_encoding.as_ref().unwrap()
713        {
714            let mut column_info = column_info.clone();
715            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
716            Some(column_info)
717        } else {
718            None
719        }
720    }
721
722    fn create_structural_field_scheduler(
723        &self,
724        field: &Field,
725        column_infos: &mut ColumnInfoIter,
726    ) -> Result<Box<dyn StructuralFieldScheduler>> {
727        let data_type = field.data_type();
728        validate_fixed_size_list_dimensions(&field.name, &data_type)?;
729        if Self::is_structural_primitive(&data_type) {
730            let column_info = column_infos.expect_next()?;
731            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
732                column_info.as_ref(),
733                self.decompressor_strategy.as_ref(),
734                self.cache_repetition_index,
735                field,
736            )?);
737
738            // advance to the next top level column
739            column_infos.next_top_level();
740
741            return Ok(scheduler);
742        }
743        match &data_type {
744            DataType::Struct(fields) => {
745                if field.is_packed_struct() {
746                    // Packed struct
747                    let column_info = column_infos.expect_next()?;
748                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
749                        column_info.as_ref(),
750                        self.decompressor_strategy.as_ref(),
751                        self.cache_repetition_index,
752                        field,
753                    )?);
754
755                    // advance to the next top level column
756                    column_infos.next_top_level();
757
758                    return Ok(scheduler);
759                }
760                // Maybe a blob descriptions struct?
761                if field.is_blob() {
762                    let column_info = column_infos.peek();
763                    if column_info.page_infos.iter().any(|page| {
764                        matches!(
765                            page.encoding,
766                            PageEncoding::Structural(pb21::PageLayout {
767                                layout: Some(pb21::page_layout::Layout::BlobLayout(_))
768                            })
769                        )
770                    }) {
771                        let column_info = column_infos.expect_next()?;
772                        let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
773                            column_info.as_ref(),
774                            self.decompressor_strategy.as_ref(),
775                            self.cache_repetition_index,
776                            field,
777                        )?);
778                        column_infos.next_top_level();
779                        return Ok(scheduler);
780                    }
781                }
782
783                let mut child_schedulers = Vec::with_capacity(field.children.len());
784                for field in field.children.iter() {
785                    let field_scheduler =
786                        self.create_structural_field_scheduler(field, column_infos)?;
787                    child_schedulers.push(field_scheduler);
788                }
789
790                let fields = fields.clone();
791                Ok(
792                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
793                        as Box<dyn StructuralFieldScheduler>,
794                )
795            }
796            DataType::List(_) | DataType::LargeList(_) => {
797                let child = field.children.first().expect_ok()?;
798                let child_scheduler =
799                    self.create_structural_field_scheduler(child, column_infos)?;
800                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
801                    as Box<dyn StructuralFieldScheduler>)
802            }
803            DataType::FixedSizeList(inner, dimension)
804                if matches!(inner.data_type(), DataType::Struct(_)) =>
805            {
806                let child = field.children.first().expect_ok()?;
807                let child_scheduler =
808                    self.create_structural_field_scheduler(child, column_infos)?;
809                Ok(Box::new(StructuralFixedSizeListScheduler::new(
810                    child_scheduler,
811                    *dimension,
812                )) as Box<dyn StructuralFieldScheduler>)
813            }
814            DataType::Map(_, keys_sorted) => {
815                // TODO: We only support keys_sorted=false for now,
816                //  because converting a rust arrow map field to the python arrow field will
817                //  lose the keys_sorted property.
818                if *keys_sorted {
819                    return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into()));
820                }
821                let entries_child = field.children.first().expect_ok()?;
822                let child_scheduler =
823                    self.create_structural_field_scheduler(entries_child, column_infos)?;
824                Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
825                    as Box<dyn StructuralFieldScheduler>)
826            }
827            _ => todo!("create_structural_field_scheduler for {}", data_type),
828        }
829    }
830
831    fn create_legacy_field_scheduler(
832        &self,
833        field: &Field,
834        column_infos: &mut ColumnInfoIter,
835        buffers: FileBuffers,
836    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
837        let data_type = field.data_type();
838        validate_fixed_size_list_dimensions(&field.name, &data_type)?;
839        if Self::is_primitive_legacy(&data_type) {
840            let column_info = column_infos.expect_next()?;
841            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
842            return Ok(scheduler);
843        } else if data_type.is_binary_like() {
844            let column_info = column_infos.expect_next()?.clone();
845            // Column is blob and user is asking for binary data
846            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
847                let desc_scheduler =
848                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
849                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
850                return Ok(blob_scheduler);
851            }
852            if let Some(page_info) = column_info.page_infos.first() {
853                if matches!(
854                    page_info.encoding.as_legacy(),
855                    pb::ArrayEncoding {
856                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
857                    }
858                ) {
859                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
860                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
861                    } else {
862                        DataType::LargeList(Arc::new(ArrowField::new(
863                            "item",
864                            DataType::UInt8,
865                            false,
866                        )))
867                    };
868                    let list_field = Field::try_from(ArrowField::new(
869                        field.name.clone(),
870                        list_type,
871                        field.nullable,
872                    ))
873                    .unwrap();
874                    let list_scheduler = self.create_list_scheduler(
875                        &list_field,
876                        column_infos,
877                        buffers,
878                        &column_info,
879                    )?;
880                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
881                        list_scheduler.into(),
882                        field.data_type(),
883                    ));
884                    return Ok(binary_scheduler);
885                } else {
886                    let scheduler =
887                        self.create_primitive_scheduler(field, &column_info, buffers)?;
888                    return Ok(scheduler);
889                }
890            } else {
891                return self.create_primitive_scheduler(field, &column_info, buffers);
892            }
893        }
894        match &data_type {
895            DataType::FixedSizeList(inner, _dimension) => {
896                // A fixed size list column could either be a physical or a logical decoder
897                // depending on the child data type.
898                if Self::is_primitive_legacy(inner.data_type()) {
899                    let primitive_col = column_infos.expect_next()?;
900                    let scheduler =
901                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
902                    Ok(scheduler)
903                } else {
904                    todo!()
905                }
906            }
907            DataType::Dictionary(_key_type, value_type) => {
908                if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
909                    let primitive_col = column_infos.expect_next()?;
910                    let scheduler =
911                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
912                    Ok(scheduler)
913                } else {
914                    Err(Error::not_supported_source(
915                        format!(
916                            "No way to decode into a dictionary field of type {}",
917                            value_type
918                        )
919                        .into(),
920                    ))
921                }
922            }
923            DataType::List(_) | DataType::LargeList(_) => {
924                let offsets_column = column_infos.expect_next()?.clone();
925                column_infos.next_top_level();
926                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
927            }
928            DataType::Struct(fields) => {
929                let column_info = column_infos.expect_next()?;
930
931                // Column is blob and user is asking for descriptions
932                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
933                    // Can use primitive scheduler here since descriptions are always packed struct
934                    return self.create_primitive_scheduler(field, &blob_col, buffers);
935                }
936
937                if Self::check_packed_struct(column_info) {
938                    // use packed struct encoding
939                    self.create_primitive_scheduler(field, column_info, buffers)
940                } else {
941                    // use default struct encoding
942                    Self::check_simple_struct(column_info, &field.name).unwrap();
943                    let num_rows = column_info
944                        .page_infos
945                        .iter()
946                        .map(|page| page.num_rows)
947                        .sum();
948                    let mut child_schedulers = Vec::with_capacity(field.children.len());
949                    for field in &field.children {
950                        column_infos.next_top_level();
951                        let field_scheduler =
952                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
953                        child_schedulers.push(Arc::from(field_scheduler));
954                    }
955
956                    let fields = fields.clone();
957                    Ok(Box::new(SimpleStructScheduler::new(
958                        child_schedulers,
959                        fields,
960                        num_rows,
961                    )))
962                }
963            }
964            // TODO: Still need support for RLE
965            _ => todo!(),
966        }
967    }
968}
969
970/// Create's a dummy ColumnInfo for the root column
971fn root_column(num_rows: u64) -> ColumnInfo {
972    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
973    let final_page_num_rows = num_rows % (u32::MAX as u64);
974    let root_pages = (0..num_root_pages)
975        .map(|i| PageInfo {
976            num_rows: if i == num_root_pages - 1 {
977                final_page_num_rows
978            } else {
979                u64::MAX
980            },
981            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
982                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
983                    pb::SimpleStruct {},
984                )),
985            }),
986            priority: 0, // not used in legacy scheduler
987            buffer_offsets_and_sizes: Arc::new([]),
988        })
989        .collect::<Vec<_>>();
990    ColumnInfo {
991        buffer_offsets_and_sizes: Arc::new([]),
992        encoding: pb::ColumnEncoding {
993            column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
994        },
995        index: u32::MAX,
996        page_infos: Arc::from(root_pages),
997    }
998}
999
1000pub enum RootDecoder {
1001    Structural(StructuralStructDecoder),
1002    Legacy(SimpleStructDecoder),
1003}
1004
1005impl RootDecoder {
1006    pub fn into_structural(self) -> StructuralStructDecoder {
1007        match self {
1008            Self::Structural(decoder) => decoder,
1009            Self::Legacy(_) => panic!("Expected a structural decoder"),
1010        }
1011    }
1012
1013    pub fn into_legacy(self) -> SimpleStructDecoder {
1014        match self {
1015            Self::Legacy(decoder) => decoder,
1016            Self::Structural(_) => panic!("Expected a legacy decoder"),
1017        }
1018    }
1019}
1020
1021impl DecodeBatchScheduler {
1022    /// Creates a new decode scheduler with the expected schema and the column
1023    /// metadata of the file.
1024    #[allow(clippy::too_many_arguments)]
1025    pub async fn try_new<'a>(
1026        schema: &'a Schema,
1027        column_indices: &[u32],
1028        column_infos: &[Arc<ColumnInfo>],
1029        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1030        num_rows: u64,
1031        _decoder_plugins: Arc<DecoderPlugins>,
1032        io: Arc<dyn EncodingsIo>,
1033        cache: Arc<LanceCache>,
1034        filter: &FilterExpression,
1035        decoder_config: &DecoderConfig,
1036    ) -> Result<Self> {
1037        assert!(num_rows > 0);
1038        let buffers = FileBuffers {
1039            positions_and_sizes: file_buffer_positions_and_sizes,
1040        };
1041        let arrow_schema = ArrowSchema::from(schema);
1042        let root_fields = arrow_schema.fields().clone();
1043        let root_type = DataType::Struct(root_fields.clone());
1044        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1045        // root_field.children and schema.fields should be identical at this point but the latter
1046        // has field ids and the former does not.  This line restores that.
1047        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
1048        root_field.children.clone_from(&schema.fields);
1049        root_field
1050            .metadata
1051            .insert("__lance_decoder_root".to_string(), "true".to_string());
1052
1053        if column_infos.is_empty() || column_infos[0].is_structural() {
1054            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1055
1056            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1057            let mut root_scheduler =
1058                strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1059
1060            let context = SchedulerContext::new(io, cache.clone());
1061            root_scheduler.initialize(filter, &context).await?;
1062
1063            Ok(Self {
1064                root_scheduler: RootScheduler::Structural(root_scheduler),
1065                root_fields,
1066                cache,
1067            })
1068        } else {
1069            // The old encoding style expected a header column for structs and so we
1070            // need a header column for the top-level struct
1071            let mut columns = Vec::with_capacity(column_infos.len() + 1);
1072            columns.push(Arc::new(root_column(num_rows)));
1073            columns.extend(column_infos.iter().cloned());
1074
1075            let adjusted_column_indices = [0_u32]
1076                .into_iter()
1077                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1078                .collect::<Vec<_>>();
1079            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1080            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1081            let root_scheduler =
1082                strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1083
1084            let context = SchedulerContext::new(io, cache.clone());
1085            root_scheduler.initialize(filter, &context).await?;
1086
1087            Ok(Self {
1088                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1089                root_fields,
1090                cache,
1091            })
1092        }
1093    }
1094
1095    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1096    pub fn from_scheduler(
1097        root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1098        root_fields: Fields,
1099        cache: Arc<LanceCache>,
1100    ) -> Self {
1101        Self {
1102            root_scheduler: RootScheduler::Legacy(root_scheduler),
1103            root_fields,
1104            cache,
1105        }
1106    }
1107
1108    fn do_schedule_ranges_structural(
1109        &mut self,
1110        ranges: &[Range<u64>],
1111        filter: &FilterExpression,
1112        io: Arc<dyn EncodingsIo>,
1113        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1114    ) {
1115        let root_scheduler = self.root_scheduler.as_structural();
1116        let mut context = SchedulerContext::new(io, self.cache.clone());
1117        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1118        if let Err(schedule_ranges_err) = maybe_root_job {
1119            schedule_action(Err(schedule_ranges_err));
1120            return;
1121        }
1122        let mut root_job = maybe_root_job.unwrap();
1123        let mut num_rows_scheduled = 0;
1124        loop {
1125            let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1126            if let Err(err) = maybe_next_scan_lines {
1127                schedule_action(Err(err));
1128                return;
1129            }
1130            let next_scan_lines = maybe_next_scan_lines.unwrap();
1131            if next_scan_lines.is_empty() {
1132                return;
1133            }
1134            for next_scan_line in next_scan_lines {
1135                trace!(
1136                    "Scheduled scan line of {} rows and {} decoders",
1137                    next_scan_line.rows_scheduled,
1138                    next_scan_line.decoders.len()
1139                );
1140                num_rows_scheduled += next_scan_line.rows_scheduled;
1141                if !schedule_action(Ok(DecoderMessage {
1142                    scheduled_so_far: num_rows_scheduled,
1143                    decoders: next_scan_line.decoders,
1144                })) {
1145                    // Decoder has disconnected
1146                    return;
1147                }
1148            }
1149        }
1150    }
1151
1152    fn do_schedule_ranges_legacy(
1153        &mut self,
1154        ranges: &[Range<u64>],
1155        filter: &FilterExpression,
1156        io: Arc<dyn EncodingsIo>,
1157        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1158        // If specified, this will be used as the top_level_row for all scheduling
1159        // tasks.  This is used by list scheduling to ensure all items scheduling
1160        // tasks are scheduled at the same top level row.
1161        priority: Option<Box<dyn PriorityRange>>,
1162    ) {
1163        let root_scheduler = self.root_scheduler.as_legacy();
1164        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1165        trace!(
1166            "Scheduling {} ranges across {}..{} ({} rows){}",
1167            ranges.len(),
1168            ranges.first().unwrap().start,
1169            ranges.last().unwrap().end,
1170            rows_requested,
1171            priority
1172                .as_ref()
1173                .map(|p| format!(" (priority={:?})", p))
1174                .unwrap_or_default()
1175        );
1176
1177        let mut context = SchedulerContext::new(io, self.cache.clone());
1178        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1179        if let Err(schedule_ranges_err) = maybe_root_job {
1180            schedule_action(Err(schedule_ranges_err));
1181            return;
1182        }
1183        let mut root_job = maybe_root_job.unwrap();
1184        let mut num_rows_scheduled = 0;
1185        let mut rows_to_schedule = root_job.num_rows();
1186        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1187        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1188        while rows_to_schedule > 0 {
1189            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1190            if let Err(schedule_next_err) = maybe_next_scan_line {
1191                schedule_action(Err(schedule_next_err));
1192                return;
1193            }
1194            let next_scan_line = maybe_next_scan_line.unwrap();
1195            priority.advance(next_scan_line.rows_scheduled);
1196            num_rows_scheduled += next_scan_line.rows_scheduled;
1197            rows_to_schedule -= next_scan_line.rows_scheduled;
1198            trace!(
1199                "Scheduled scan line of {} rows and {} decoders",
1200                next_scan_line.rows_scheduled,
1201                next_scan_line.decoders.len()
1202            );
1203            if !schedule_action(Ok(DecoderMessage {
1204                scheduled_so_far: num_rows_scheduled,
1205                decoders: next_scan_line.decoders,
1206            })) {
1207                // Decoder has disconnected
1208                return;
1209            }
1210
1211            trace!("Finished scheduling {} ranges", ranges.len());
1212        }
1213    }
1214
1215    fn do_schedule_ranges(
1216        &mut self,
1217        ranges: &[Range<u64>],
1218        filter: &FilterExpression,
1219        io: Arc<dyn EncodingsIo>,
1220        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1221        // If specified, this will be used as the top_level_row for all scheduling
1222        // tasks.  This is used by list scheduling to ensure all items scheduling
1223        // tasks are scheduled at the same top level row.
1224        priority: Option<Box<dyn PriorityRange>>,
1225    ) {
1226        match &self.root_scheduler {
1227            RootScheduler::Legacy(_) => {
1228                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1229            }
1230            RootScheduler::Structural(_) => {
1231                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1232            }
1233        }
1234    }
1235
1236    // This method is similar to schedule_ranges but instead of
1237    // sending the decoders to a channel it collects them all into a vector
1238    pub fn schedule_ranges_to_vec(
1239        &mut self,
1240        ranges: &[Range<u64>],
1241        filter: &FilterExpression,
1242        io: Arc<dyn EncodingsIo>,
1243        priority: Option<Box<dyn PriorityRange>>,
1244    ) -> Result<Vec<DecoderMessage>> {
1245        let mut decode_messages = Vec::new();
1246        self.do_schedule_ranges(
1247            ranges,
1248            filter,
1249            io,
1250            |msg| {
1251                decode_messages.push(msg);
1252                true
1253            },
1254            priority,
1255        );
1256        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1257    }
1258
1259    /// Schedules the load of multiple ranges of rows
1260    ///
1261    /// Ranges must be non-overlapping and in sorted order
1262    ///
1263    /// # Arguments
1264    ///
1265    /// * `ranges` - The ranges of rows to load
1266    /// * `sink` - A channel to send the decode tasks
1267    /// * `scheduler` An I/O scheduler to issue I/O requests
1268    #[instrument(skip_all)]
1269    pub fn schedule_ranges(
1270        &mut self,
1271        ranges: &[Range<u64>],
1272        filter: &FilterExpression,
1273        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1274        scheduler: Arc<dyn EncodingsIo>,
1275    ) {
1276        self.do_schedule_ranges(
1277            ranges,
1278            filter,
1279            scheduler,
1280            |msg| {
1281                match sink.send(msg) {
1282                    Ok(_) => true,
1283                    Err(SendError { .. }) => {
1284                        // The receiver has gone away.  We can't do anything about it
1285                        // so just ignore the error.
1286                        debug!(
1287                        "schedule_ranges aborting early since decoder appears to have been dropped"
1288                    );
1289                        false
1290                    }
1291                }
1292            },
1293            None,
1294        )
1295    }
1296
1297    /// Schedules the load of a range of rows
1298    ///
1299    /// # Arguments
1300    ///
1301    /// * `range` - The range of rows to load
1302    /// * `sink` - A channel to send the decode tasks
1303    /// * `scheduler` An I/O scheduler to issue I/O requests
1304    #[instrument(skip_all)]
1305    pub fn schedule_range(
1306        &mut self,
1307        range: Range<u64>,
1308        filter: &FilterExpression,
1309        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1310        scheduler: Arc<dyn EncodingsIo>,
1311    ) {
1312        self.schedule_ranges(&[range], filter, sink, scheduler)
1313    }
1314
1315    /// Schedules the load of selected rows
1316    ///
1317    /// # Arguments
1318    ///
1319    /// * `indices` - The row indices to load (these must be in ascending order!)
1320    /// * `sink` - A channel to send the decode tasks
1321    /// * `scheduler` An I/O scheduler to issue I/O requests
1322    pub fn schedule_take(
1323        &mut self,
1324        indices: &[u64],
1325        filter: &FilterExpression,
1326        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1327        scheduler: Arc<dyn EncodingsIo>,
1328    ) {
1329        debug_assert!(indices.windows(2).all(|w| w[0] < w[1]));
1330        if indices.is_empty() {
1331            return;
1332        }
1333        trace!("Scheduling take of {} rows", indices.len());
1334        let ranges = Self::indices_to_ranges(indices);
1335        self.schedule_ranges(&ranges, filter, sink, scheduler)
1336    }
1337
1338    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1339    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1340        let mut ranges = Vec::new();
1341        let mut start = indices[0];
1342
1343        for window in indices.windows(2) {
1344            if window[1] != window[0] + 1 {
1345                ranges.push(start..window[0] + 1);
1346                start = window[1];
1347            }
1348        }
1349
1350        ranges.push(start..*indices.last().unwrap() + 1);
1351        ranges
1352    }
1353}
1354
1355pub struct ReadBatchTask {
1356    pub task: BoxFuture<'static, Result<RecordBatch>>,
1357    pub num_rows: u32,
1358}
1359
1360/// A stream that takes scheduled jobs and generates decode tasks from them.
1361pub struct BatchDecodeStream {
1362    context: DecoderContext,
1363    root_decoder: SimpleStructDecoder,
1364    rows_remaining: u64,
1365    rows_per_batch: u32,
1366    rows_scheduled: u64,
1367    rows_drained: u64,
1368    scheduler_exhausted: bool,
1369    emitted_batch_size_warning: Arc<Once>,
1370}
1371
1372impl BatchDecodeStream {
1373    /// Create a new instance of a batch decode stream
1374    ///
1375    /// # Arguments
1376    ///
1377    /// * `scheduled` - an incoming stream of decode tasks from a `DecodeBatchScheduler`
1378    /// * `schema` - the schema of the data to create
1379    /// * `rows_per_batch` the number of rows to create before making a batch
1380    /// * `num_rows` the total number of rows scheduled
1381    /// * `num_columns` the total number of columns in the file
1382    pub fn new(
1383        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1384        rows_per_batch: u32,
1385        num_rows: u64,
1386        root_decoder: SimpleStructDecoder,
1387    ) -> Self {
1388        Self {
1389            context: DecoderContext::new(scheduled),
1390            root_decoder,
1391            rows_remaining: num_rows,
1392            rows_per_batch,
1393            rows_scheduled: 0,
1394            rows_drained: 0,
1395            scheduler_exhausted: false,
1396            emitted_batch_size_warning: Arc::new(Once::new()),
1397        }
1398    }
1399
1400    fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1401        if decoder.path.is_empty() {
1402            // The root decoder we can ignore
1403            Ok(())
1404        } else {
1405            self.root_decoder.accept_child(decoder)
1406        }
1407    }
1408
1409    #[instrument(level = "debug", skip_all)]
1410    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1411        if self.scheduler_exhausted {
1412            return Ok(self.rows_scheduled);
1413        }
1414        while self.rows_scheduled < scheduled_need {
1415            let next_message = self.context.source.recv().await;
1416            match next_message {
1417                Some(scan_line) => {
1418                    let scan_line = scan_line?;
1419                    self.rows_scheduled = scan_line.scheduled_so_far;
1420                    for message in scan_line.decoders {
1421                        self.accept_decoder(message.into_legacy())?;
1422                    }
1423                }
1424                None => {
1425                    // Schedule ended before we got all the data we expected.  This probably
1426                    // means some kind of pushdown filter was applied and we didn't load as
1427                    // much data as we thought we would.
1428                    self.scheduler_exhausted = true;
1429                    return Ok(self.rows_scheduled);
1430                }
1431            }
1432        }
1433        Ok(scheduled_need)
1434    }
1435
1436    #[instrument(level = "debug", skip_all)]
1437    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1438        trace!(
1439            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1440            self.rows_remaining, self.rows_drained, self.rows_scheduled,
1441        );
1442        if self.rows_remaining == 0 {
1443            return Ok(None);
1444        }
1445
1446        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1447        self.rows_remaining -= to_take;
1448
1449        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1450        trace!(
1451            "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1452            scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1453        );
1454        if scheduled_need > 0 {
1455            let desired_scheduled = scheduled_need + self.rows_scheduled;
1456            trace!(
1457                "Draining from scheduler (desire at least {} scheduled rows)",
1458                desired_scheduled
1459            );
1460            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1461            if actually_scheduled < desired_scheduled {
1462                let under_scheduled = desired_scheduled - actually_scheduled;
1463                to_take -= under_scheduled;
1464            }
1465        }
1466
1467        if to_take == 0 {
1468            return Ok(None);
1469        }
1470
1471        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1472        let loaded_need = self.rows_drained + to_take - 1;
1473        trace!(
1474            "Waiting for I/O (desire at least {} fully loaded rows)",
1475            loaded_need
1476        );
1477        self.root_decoder.wait_for_loaded(loaded_need).await?;
1478
1479        let next_task = self.root_decoder.drain(to_take)?;
1480        self.rows_drained += to_take;
1481        Ok(Some(next_task))
1482    }
1483
1484    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1485        let stream = futures::stream::unfold(self, |mut slf| async move {
1486            let next_task = slf.next_batch_task().await;
1487            let next_task = next_task.transpose().map(|next_task| {
1488                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1489                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1490                let task = async move {
1491                    let next_task = next_task?;
1492                    // Real decode work happens inside into_batch, which can block the current
1493                    // thread for a long time. By spawning it as a new task, we allow Tokio's
1494                    // worker threads to keep making progress.
1495                    let (batch, _data_size) =
1496                        tokio::spawn(
1497                            async move { next_task.into_batch(emitted_batch_size_warning) },
1498                        )
1499                        .await
1500                        .map_err(|err| Error::wrapped(err.into()))??;
1501                    Ok(batch)
1502                };
1503                (task, num_rows)
1504            });
1505            next_task.map(|(task, num_rows)| {
1506                // This should be true since batch size is u32
1507                debug_assert!(num_rows <= u32::MAX as u64);
1508                let next_task = ReadBatchTask {
1509                    task: task.boxed(),
1510                    num_rows: num_rows as u32,
1511                };
1512                (next_task, slf)
1513            })
1514        });
1515        stream.boxed()
1516    }
1517}
1518
1519// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1520// we can have a single implementation of the batch decode iterator
1521enum RootDecoderMessage {
1522    LoadedPage(LoadedPageShard),
1523    LegacyPage(crate::previous::decoder::DecoderReady),
1524}
1525trait RootDecoderType {
1526    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1527    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1528    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1529}
1530impl RootDecoderType for StructuralStructDecoder {
1531    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1532        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1533            unreachable!()
1534        };
1535        self.accept_page(loaded_page)
1536    }
1537    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1538        self.drain_batch_task(num_rows)
1539    }
1540    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1541        // Waiting happens elsewhere (not as part of the decoder)
1542        Ok(())
1543    }
1544}
1545impl RootDecoderType for SimpleStructDecoder {
1546    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1547        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1548            unreachable!()
1549        };
1550        self.accept_child(legacy_page)
1551    }
1552    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1553        self.drain(num_rows)
1554    }
1555    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1556        runtime.block_on(self.wait_for_loaded(loaded_need))
1557    }
1558}
1559
1560/// A blocking batch decoder that performs synchronous decoding
1561struct BatchDecodeIterator<T: RootDecoderType> {
1562    messages: VecDeque<Result<DecoderMessage>>,
1563    root_decoder: T,
1564    rows_remaining: u64,
1565    rows_per_batch: u32,
1566    rows_scheduled: u64,
1567    rows_drained: u64,
1568    emitted_batch_size_warning: Arc<Once>,
1569    // Note: this is not the runtime on which I/O happens.
1570    // That's always in the scheduler.  This is just a runtime we use to
1571    // sleep the current thread if I/O is unready
1572    wait_for_io_runtime: tokio::runtime::Runtime,
1573    schema: Arc<ArrowSchema>,
1574}
1575
1576impl<T: RootDecoderType> BatchDecodeIterator<T> {
1577    /// Create a new instance of a batch decode iterator
1578    pub fn new(
1579        messages: VecDeque<Result<DecoderMessage>>,
1580        rows_per_batch: u32,
1581        num_rows: u64,
1582        root_decoder: T,
1583        schema: Arc<ArrowSchema>,
1584    ) -> Self {
1585        Self {
1586            messages,
1587            root_decoder,
1588            rows_remaining: num_rows,
1589            rows_per_batch,
1590            rows_scheduled: 0,
1591            rows_drained: 0,
1592            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1593                .build()
1594                .unwrap(),
1595            emitted_batch_size_warning: Arc::new(Once::new()),
1596            schema,
1597        }
1598    }
1599
1600    /// Wait for a single page of data to finish loading
1601    ///
1602    /// If the data is not available this will perform a *blocking* wait (put
1603    /// the current thread to sleep)
1604    fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1605        match maybe_done(unloaded_page.0) {
1606            // Fast path, avoid all runtime shenanigans if the data is ready
1607            MaybeDone::Done(loaded_page) => loaded_page,
1608            // Slow path, we need to wait on I/O, enter the runtime
1609            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1610            MaybeDone::Gone => unreachable!(),
1611        }
1612    }
1613
1614    /// Waits for I/O until `scheduled_need` rows have been loaded
1615    ///
1616    /// Note that `scheduled_need` is cumulative.  E.g. this method
1617    /// should be called with 5, 10, 15 and not 5, 5, 5
1618    #[instrument(skip_all)]
1619    fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1620        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1621            let message = self.messages.pop_front().unwrap()?;
1622            self.rows_scheduled = message.scheduled_so_far;
1623            for decoder_message in message.decoders {
1624                match decoder_message {
1625                    MessageType::UnloadedPage(unloaded_page) => {
1626                        let loaded_page = self.wait_for_page(unloaded_page)?;
1627                        self.root_decoder
1628                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1629                    }
1630                    MessageType::DecoderReady(decoder_ready) => {
1631                        // The root decoder we can ignore
1632                        if !decoder_ready.path.is_empty() {
1633                            self.root_decoder
1634                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1635                        }
1636                    }
1637                }
1638            }
1639        }
1640
1641        let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1642
1643        self.root_decoder
1644            .wait(loaded_need, &self.wait_for_io_runtime)?;
1645        Ok(self.rows_scheduled)
1646    }
1647
1648    #[instrument(level = "debug", skip_all)]
1649    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1650        trace!(
1651            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1652            self.rows_remaining, self.rows_drained, self.rows_scheduled,
1653        );
1654        if self.rows_remaining == 0 {
1655            return Ok(None);
1656        }
1657
1658        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1659        self.rows_remaining -= to_take;
1660
1661        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1662        trace!(
1663            "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1664            scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1665        );
1666        if scheduled_need > 0 {
1667            let desired_scheduled = scheduled_need + self.rows_scheduled;
1668            trace!(
1669                "Draining from scheduler (desire at least {} scheduled rows)",
1670                desired_scheduled
1671            );
1672            let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1673            if actually_scheduled < desired_scheduled {
1674                let under_scheduled = desired_scheduled - actually_scheduled;
1675                to_take -= under_scheduled;
1676            }
1677        }
1678
1679        if to_take == 0 {
1680            return Ok(None);
1681        }
1682
1683        let next_task = self.root_decoder.drain_batch(to_take)?;
1684
1685        self.rows_drained += to_take;
1686
1687        let (batch, _data_size) = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1688
1689        Ok(Some(batch))
1690    }
1691}
1692
1693impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1694    type Item = ArrowResult<RecordBatch>;
1695
1696    fn next(&mut self) -> Option<Self::Item> {
1697        self.next_batch_task()
1698            .transpose()
1699            .map(|r| r.map_err(ArrowError::from))
1700    }
1701}
1702
1703impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1704    fn schema(&self) -> Arc<ArrowSchema> {
1705        self.schema.clone()
1706    }
1707}
1708
1709/// Estimate the number of bytes per row for a given Arrow data type.
1710///
1711/// For fixed-width types this is exact. For variable-width types (strings,
1712/// binary, lists) a rough default is used. The estimate is used as a
1713/// starting point when `batch_size_bytes` is set; a post-decode feedback
1714/// loop corrects it after the first batch.
1715///
1716/// This estimate ignores validity bitmaps at the moment.  We can't infer
1717/// their presence simply from the data_type and their impact is probably
1718/// fairly negligible.
1719fn estimate_bytes_per_row(data_type: &DataType) -> f64 {
1720    if let Some(w) = data_type.byte_width_opt() {
1721        return w as f64;
1722    }
1723    match data_type {
1724        DataType::Boolean => 1.0 / 8.0,
1725        DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => 64.0,
1726        DataType::Struct(fields) => fields
1727            .iter()
1728            .map(|f| estimate_bytes_per_row(f.data_type()))
1729            .sum(),
1730        DataType::List(child) | DataType::LargeList(child) => {
1731            5.0 * estimate_bytes_per_row(child.data_type())
1732        }
1733        DataType::FixedSizeList(child, dim) => {
1734            *dim as f64 * estimate_bytes_per_row(child.data_type())
1735        }
1736        DataType::Dictionary(_, value_type) => estimate_bytes_per_row(value_type),
1737        DataType::Map(entries, _) => 5.0 * estimate_bytes_per_row(entries.data_type()),
1738        _ => 64.0,
1739    }
1740}
1741
1742/// A stream that takes scheduled jobs and generates decode tasks from them.
1743pub struct StructuralBatchDecodeStream {
1744    context: DecoderContext,
1745    root_decoder: StructuralStructDecoder,
1746    rows_remaining: u64,
1747    rows_per_batch: u32,
1748    rows_scheduled: u64,
1749    rows_drained: u64,
1750    scheduler_exhausted: bool,
1751    emitted_batch_size_warning: Arc<Once>,
1752    // Decode scheduling policy selected at planning time.
1753    //
1754    // Performance tradeoff:
1755    // - true: spawn `into_batch` onto Tokio, which improves scan throughput by allowing
1756    //   more decode parallelism.
1757    // - false: run `into_batch` inline, which avoids Tokio scheduling overhead and is
1758    //   typically better for point lookups / small takes.
1759    spawn_batch_decode_tasks: bool,
1760    /// If set, target this many bytes per batch instead of `rows_per_batch` rows.
1761    batch_size_bytes: Option<u64>,
1762    /// Schema-based estimate of bytes per row, computed once at construction.
1763    /// Only meaningful when `batch_size_bytes` is `Some`.
1764    schema_bytes_per_row: f64,
1765    /// Post-decode feedback: actual bytes-per-row measured from the most
1766    /// recently decoded batch.  Zero means no feedback yet (use schema estimate).
1767    bytes_per_row_feedback: Arc<AtomicU64>,
1768}
1769
1770impl StructuralBatchDecodeStream {
1771    /// Create a new instance of a batch decode stream
1772    ///
1773    /// # Arguments
1774    ///
1775    /// * `scheduled` - an incoming stream of decode tasks from a `DecodeBatchScheduler`
1776    /// * `schema` - the schema of the data to create
1777    /// * `rows_per_batch` the number of rows to create before making a batch
1778    /// * `num_rows` the total number of rows scheduled
1779    /// * `num_columns` the total number of columns in the file
1780    pub fn new(
1781        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1782        rows_per_batch: u32,
1783        num_rows: u64,
1784        root_decoder: StructuralStructDecoder,
1785        spawn_batch_decode_tasks: bool,
1786        batch_size_bytes: Option<u64>,
1787    ) -> Self {
1788        let schema_bytes_per_row = if batch_size_bytes.is_some() {
1789            estimate_bytes_per_row(root_decoder.data_type()).max(1.0)
1790        } else {
1791            0.0
1792        };
1793        Self {
1794            context: DecoderContext::new(scheduled),
1795            root_decoder,
1796            rows_remaining: num_rows,
1797            rows_per_batch,
1798            rows_scheduled: 0,
1799            rows_drained: 0,
1800            scheduler_exhausted: false,
1801            emitted_batch_size_warning: Arc::new(Once::new()),
1802            spawn_batch_decode_tasks,
1803            batch_size_bytes,
1804            schema_bytes_per_row,
1805            bytes_per_row_feedback: Arc::new(AtomicU64::new(0)),
1806        }
1807    }
1808
1809    #[instrument(level = "debug", skip_all)]
1810    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1811        if self.scheduler_exhausted {
1812            return Ok(self.rows_scheduled);
1813        }
1814        while self.rows_scheduled < scheduled_need {
1815            let next_message = self.context.source.recv().await;
1816            match next_message {
1817                Some(scan_line) => {
1818                    let scan_line = scan_line?;
1819                    self.rows_scheduled = scan_line.scheduled_so_far;
1820                    for message in scan_line.decoders {
1821                        let unloaded_page = message.into_structural();
1822                        let loaded_page = unloaded_page.0.await?;
1823                        self.root_decoder.accept_page(loaded_page)?;
1824                    }
1825                }
1826                None => {
1827                    // Schedule ended before we got all the data we expected.  This probably
1828                    // means some kind of pushdown filter was applied and we didn't load as
1829                    // much data as we thought we would.
1830                    self.scheduler_exhausted = true;
1831                    return Ok(self.rows_scheduled);
1832                }
1833            }
1834        }
1835        Ok(scheduled_need)
1836    }
1837
1838    #[instrument(level = "debug", skip_all)]
1839    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1840        trace!(
1841            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1842            self.rows_remaining, self.rows_drained, self.rows_scheduled,
1843        );
1844        if self.rows_remaining == 0 {
1845            return Ok(None);
1846        }
1847
1848        let mut to_take = if let Some(batch_size_bytes) = self.batch_size_bytes {
1849            let feedback = self.bytes_per_row_feedback.load(Ordering::Relaxed);
1850            let bpr = if feedback > 0 {
1851                feedback as f64
1852            } else {
1853                self.schema_bytes_per_row
1854            };
1855            let rows = (batch_size_bytes as f64 / bpr) as u64;
1856            self.rows_remaining.min(rows.max(1))
1857        } else {
1858            self.rows_remaining.min(self.rows_per_batch as u64)
1859        };
1860        self.rows_remaining -= to_take;
1861
1862        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1863        trace!(
1864            "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1865            scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1866        );
1867        if scheduled_need > 0 {
1868            let desired_scheduled = scheduled_need + self.rows_scheduled;
1869            trace!(
1870                "Draining from scheduler (desire at least {} scheduled rows)",
1871                desired_scheduled
1872            );
1873            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1874            if actually_scheduled < desired_scheduled {
1875                let under_scheduled = desired_scheduled - actually_scheduled;
1876                to_take -= under_scheduled;
1877            }
1878        }
1879
1880        if to_take == 0 {
1881            return Ok(None);
1882        }
1883
1884        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1885        self.rows_drained += to_take;
1886        Ok(Some(next_task))
1887    }
1888
1889    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1890        let stream = futures::stream::unfold(self, |mut slf| async move {
1891            let next_task = slf.next_batch_task().await;
1892            let next_task = next_task.transpose().map(|next_task| {
1893                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1894                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1895                let bytes_per_row_feedback = slf.bytes_per_row_feedback.clone();
1896                // Capture the per-stream policy once so every emitted batch task follows the
1897                // same throughput-vs-overhead choice made by the scheduler.
1898                let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks;
1899                let task = async move {
1900                    let next_task = next_task?;
1901                    let (batch, data_size) = if spawn_batch_decode_tasks {
1902                        tokio::spawn(
1903                            async move { next_task.into_batch(emitted_batch_size_warning) },
1904                        )
1905                        .await
1906                        .map_err(|err| Error::wrapped(err.into()))??
1907                    } else {
1908                        next_task.into_batch(emitted_batch_size_warning)?
1909                    };
1910                    let num_rows = batch.num_rows() as u64;
1911                    if num_rows > 0 {
1912                        let bpr = data_size / num_rows;
1913                        let prev = bytes_per_row_feedback.load(Ordering::Relaxed);
1914                        let next = if prev == 0 || bpr >= prev {
1915                            // First batch or actual size is larger than estimate:
1916                            // adopt immediately to avoid OOM.
1917                            bpr
1918                        } else {
1919                            // Actual size is smaller: degrade gradually toward
1920                            // the true value to avoid over-correcting on a
1921                            // single anomalous batch.
1922                            (prev + bpr) / 2
1923                        };
1924                        bytes_per_row_feedback.store(next.max(1), Ordering::Relaxed);
1925                    }
1926                    Ok(batch)
1927                };
1928                (task, num_rows)
1929            });
1930            next_task.map(|(task, num_rows)| {
1931                // This should be true since batch size is u32
1932                debug_assert!(num_rows <= u32::MAX as u64);
1933                let next_task = ReadBatchTask {
1934                    task: task.boxed(),
1935                    num_rows: num_rows as u32,
1936                };
1937                (next_task, slf)
1938            })
1939        });
1940        stream.boxed()
1941    }
1942}
1943
1944#[derive(Debug)]
1945pub enum RequestedRows {
1946    Ranges(Vec<Range<u64>>),
1947    Indices(Vec<u64>),
1948}
1949
1950impl RequestedRows {
1951    pub fn num_rows(&self) -> u64 {
1952        match self {
1953            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1954            Self::Indices(indices) => indices.len() as u64,
1955        }
1956    }
1957
1958    pub fn trim_empty_ranges(mut self) -> Self {
1959        if let Self::Ranges(ranges) = &mut self {
1960            ranges.retain(|r| !r.is_empty());
1961        }
1962        self
1963    }
1964}
1965
1966/// Configuration for decoder behavior
1967#[derive(Debug, Clone)]
1968pub struct DecoderConfig {
1969    /// Whether to cache repetition indices for better performance.
1970    ///
1971    /// This defaults to the `LANCE_READ_CACHE_REPETITION_INDEX` environment variable
1972    /// when present and is enabled by default. Set the env var to a non-truthy
1973    /// value (for example `0` or `false`) to disable it. The env var is read
1974    /// once per process.
1975    pub cache_repetition_index: bool,
1976    /// Whether to validate decoded data
1977    pub validate_on_decode: bool,
1978    /// Override the strategy used to dispatch the scheduling work in
1979    /// [`schedule_and_decode`].
1980    ///
1981    /// `schedule_and_decode` always awaits the scheduler's `initialize` (which
1982    /// performs metadata I/O) before returning.  This flag controls what
1983    /// happens with the subsequent (synchronous) work of pushing decoder
1984    /// messages into the channel that feeds the decode stream.
1985    ///
1986    /// * `None` - default behavior: the scheduling work runs inline (as part
1987    ///   of the `schedule_and_decode` await) when the request is small
1988    ///   (controlled by the `LANCE_INLINE_SCHEDULING_THRESHOLD` env var) and
1989    ///   is dispatched onto a spawned task otherwise.
1990    /// * `Some(true)` - always run scheduling inline.  The await of
1991    ///   `schedule_and_decode` does not return until every decoder message
1992    ///   has been queued.
1993    /// * `Some(false)` - always spawn a task for scheduling so that it can
1994    ///   overlap with consumption of the decode stream.
1995    pub inline_scheduling: Option<bool>,
1996}
1997
1998impl Default for DecoderConfig {
1999    fn default() -> Self {
2000        Self {
2001            cache_repetition_index: default_cache_repetition_index(),
2002            validate_on_decode: false,
2003            inline_scheduling: None,
2004        }
2005    }
2006}
2007
2008#[derive(Debug, Clone)]
2009pub struct SchedulerDecoderConfig {
2010    pub decoder_plugins: Arc<DecoderPlugins>,
2011    pub batch_size: u32,
2012    pub io: Arc<dyn EncodingsIo>,
2013    pub cache: Arc<LanceCache>,
2014    /// Decoder configuration
2015    pub decoder_config: DecoderConfig,
2016    /// If set, target this many bytes per batch instead of using `batch_size` rows.
2017    ///
2018    /// Only supported for v2.1+ (structural) files. For v2.0 files this
2019    /// option is ignored and a warning is logged.
2020    pub batch_size_bytes: Option<u64>,
2021}
2022
2023fn check_scheduler_on_drop(
2024    stream: BoxStream<'static, ReadBatchTask>,
2025    scheduler_handle: tokio::task::JoinHandle<()>,
2026) -> BoxStream<'static, ReadBatchTask> {
2027    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
2028    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
2029    // when the stream finishes.
2030    let abort_handle = scheduler_handle.abort_handle();
2031    let mut scheduler_handle = Some(scheduler_handle);
2032    let check_scheduler = stream::unfold((), move |_| {
2033        let handle = scheduler_handle.take();
2034        async move {
2035            if let Some(handle) = handle {
2036                handle.await.unwrap();
2037            }
2038            None
2039        }
2040    });
2041    stream
2042        .chain(check_scheduler)
2043        .on_drop(move || {
2044            // Abort the scheduler task on early drop. The scheduler task holds
2045            // a reference to the I/O scheduler (via config.io) which keeps the
2046            // ScanScheduler alive. If the scheduler task is stuck waiting for
2047            // initialization I/O (which is blocked on backpressure that will
2048            // never drain because no one is consuming the stream), we need to
2049            // abort it so it releases its I/O reference and allows the
2050            // ScanScheduler to drop and cancel pending I/O.
2051            abort_handle.abort();
2052        })
2053        .boxed()
2054}
2055
2056#[allow(clippy::too_many_arguments)]
2057pub fn create_decode_stream(
2058    schema: &Schema,
2059    num_rows: u64,
2060    batch_size: u32,
2061    is_structural: bool,
2062    should_validate: bool,
2063    spawn_structural_batch_decode_tasks: bool,
2064    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2065    batch_size_bytes: Option<u64>,
2066) -> Result<BoxStream<'static, ReadBatchTask>> {
2067    if is_structural {
2068        let arrow_schema = ArrowSchema::from(schema);
2069        let structural_decoder = StructuralStructDecoder::new(
2070            arrow_schema.fields,
2071            should_validate,
2072            /*is_root=*/ true,
2073        )?;
2074        Ok(StructuralBatchDecodeStream::new(
2075            rx,
2076            batch_size,
2077            num_rows,
2078            structural_decoder,
2079            spawn_structural_batch_decode_tasks,
2080            batch_size_bytes,
2081        )
2082        .into_stream())
2083    } else {
2084        if batch_size_bytes.is_some() {
2085            warn!("batch_size_bytes is not supported for v2.0 files and will be ignored");
2086        }
2087        let arrow_schema = ArrowSchema::from(schema);
2088        let root_fields = arrow_schema.fields;
2089
2090        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
2091        Ok(BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream())
2092    }
2093}
2094
2095/// Creates a iterator that decodes a set of messages in a blocking fashion
2096///
2097/// See [`schedule_and_decode_blocking`] for more information.
2098pub fn create_decode_iterator(
2099    schema: &Schema,
2100    num_rows: u64,
2101    batch_size: u32,
2102    should_validate: bool,
2103    is_structural: bool,
2104    messages: VecDeque<Result<DecoderMessage>>,
2105) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2106    let arrow_schema = Arc::new(ArrowSchema::from(schema));
2107    let root_fields = arrow_schema.fields.clone();
2108    if is_structural {
2109        let simple_struct_decoder =
2110            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true)?;
2111        Ok(Box::new(BatchDecodeIterator::new(
2112            messages,
2113            batch_size,
2114            num_rows,
2115            simple_struct_decoder,
2116            arrow_schema,
2117        )))
2118    } else {
2119        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
2120        Ok(Box::new(BatchDecodeIterator::new(
2121            messages,
2122            batch_size,
2123            num_rows,
2124            root_decoder,
2125            arrow_schema,
2126        )))
2127    }
2128}
2129
2130async fn create_scheduler_decoder(
2131    column_infos: Vec<Arc<ColumnInfo>>,
2132    requested_rows: RequestedRows,
2133    filter: FilterExpression,
2134    column_indices: Vec<u32>,
2135    target_schema: Arc<Schema>,
2136    config: SchedulerDecoderConfig,
2137) -> Result<BoxStream<'static, ReadBatchTask>> {
2138    let num_rows = requested_rows.num_rows();
2139
2140    let is_structural = column_infos[0].is_structural();
2141    let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2142    let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() {
2143        Some("always") => true,
2144        Some("never") => false,
2145        _ => matches!(requested_rows, RequestedRows::Ranges(_)),
2146    };
2147
2148    let (tx, rx) = mpsc::unbounded_channel();
2149
2150    let decode_stream = create_decode_stream(
2151        &target_schema,
2152        num_rows,
2153        config.batch_size,
2154        is_structural,
2155        config.decoder_config.validate_on_decode,
2156        spawn_structural_batch_decode_tasks,
2157        rx,
2158        config.batch_size_bytes,
2159    )?;
2160
2161    // The scheduler's `initialize` may perform I/O to load column metadata
2162    // unless that metadata is already in the cache.  This metadata loading
2163    // happens as part of this call and should be parallelized if reading
2164    // multiple files.
2165    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2166        target_schema.as_ref(),
2167        &column_indices,
2168        &column_infos,
2169        &vec![],
2170        num_rows,
2171        config.decoder_plugins,
2172        config.io.clone(),
2173        config.cache,
2174        &filter,
2175        &config.decoder_config,
2176    )
2177    .await?;
2178
2179    // For small requests the scheduling cost is dwarfed by the overhead of
2180    // spawning a task, so we run scheduling inline (still as part of this
2181    // await) before returning.  The threshold is configurable via
2182    // `LANCE_INLINE_SCHEDULING_THRESHOLD`, and callers can force either
2183    // strategy via `DecoderConfig::inline_scheduling`.
2184    let inline_scheduling = config
2185        .decoder_config
2186        .inline_scheduling
2187        .unwrap_or_else(|| num_rows <= inline_scheduling_threshold());
2188
2189    if inline_scheduling {
2190        match requested_rows {
2191            RequestedRows::Ranges(ranges) => {
2192                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2193            }
2194            RequestedRows::Indices(indices) => {
2195                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2196            }
2197        }
2198        Ok(decode_stream)
2199    } else {
2200        // Spawn the (still synchronous) scheduling work so that decoder
2201        // messages can stream into the channel while the consumer is
2202        // already pulling from the decode stream.
2203        let scheduling = async move {
2204            match requested_rows {
2205                RequestedRows::Ranges(ranges) => {
2206                    decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2207                }
2208                RequestedRows::Indices(indices) => {
2209                    decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2210                }
2211            }
2212        };
2213        let scheduler_handle = tokio::task::spawn(scheduling);
2214        Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2215    }
2216}
2217
2218/// Initializes the scheduler, schedules the requested rows, and returns a
2219/// stream of decode tasks for the resulting batches.
2220///
2221/// This is a convenience function that creates both the scheduler and the
2222/// decoder, which can be a little tricky to get right.
2223///
2224/// # Why is this async?
2225///
2226/// Constructing the scheduler runs `initialize` which will perform I/O
2227/// unless the data required is already in the file metadata cache.
2228///
2229/// When `DecoderConfig::inline_scheduling` resolves to `true`, the
2230/// subsequent (synchronous) scheduling work also runs before this function
2231/// returns, leaving a fully primed decode stream.
2232pub async fn schedule_and_decode(
2233    column_infos: Vec<Arc<ColumnInfo>>,
2234    requested_rows: RequestedRows,
2235    filter: FilterExpression,
2236    column_indices: Vec<u32>,
2237    target_schema: Arc<Schema>,
2238    config: SchedulerDecoderConfig,
2239) -> Result<BoxStream<'static, ReadBatchTask>> {
2240    if requested_rows.num_rows() == 0 {
2241        return Ok(stream::empty().boxed());
2242    }
2243
2244    // If the user requested any ranges that are empty, ignore them.  They are pointless and
2245    // trying to read them has caused bugs in the past.
2246    let requested_rows = requested_rows.trim_empty_ranges();
2247
2248    let io = config.io.clone();
2249
2250    let stream = create_scheduler_decoder(
2251        column_infos,
2252        requested_rows,
2253        filter,
2254        column_indices,
2255        target_schema,
2256        config,
2257    )
2258    .await?;
2259
2260    // Keep the io alive until the stream is dropped or finishes.  Otherwise the
2261    // I/O drops as soon as the scheduling is finished and the I/O loop terminates.
2262    Ok(stream.finally(move || drop(io)).boxed())
2263}
2264
2265pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2266    tokio::runtime::Builder::new_current_thread()
2267        .build()
2268        .unwrap()
2269});
2270
2271/// Schedules and decodes the requested data in a blocking fashion
2272///
2273/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
2274/// and decodes it in the current thread.
2275///
2276/// This can be useful when the disk is fast (or the data is in memory) and the amount
2277/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
2278///
2279/// This should NOT be used for full scans.  Even if the data is in memory this function will
2280/// not parallelize the decode and will be slower than the async version.  Full scans typically
2281/// make relatively few IOPs and so the asynchronous overhead is much smaller.
2282///
2283/// This method will first completely run the scheduling process.  Then it will run the
2284/// decode process.
2285pub fn schedule_and_decode_blocking(
2286    column_infos: Vec<Arc<ColumnInfo>>,
2287    requested_rows: RequestedRows,
2288    filter: FilterExpression,
2289    column_indices: Vec<u32>,
2290    target_schema: Arc<Schema>,
2291    config: SchedulerDecoderConfig,
2292) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2293    if requested_rows.num_rows() == 0 {
2294        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2295        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2296    }
2297
2298    let num_rows = requested_rows.num_rows();
2299    let is_structural = column_infos[0].is_structural();
2300
2301    let (tx, mut rx) = mpsc::unbounded_channel();
2302
2303    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
2304    // runtime.
2305    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2306        target_schema.as_ref(),
2307        &column_indices,
2308        &column_infos,
2309        &vec![],
2310        num_rows,
2311        config.decoder_plugins,
2312        config.io.clone(),
2313        config.cache,
2314        &filter,
2315        &config.decoder_config,
2316    ))?;
2317
2318    // Schedule the requested rows
2319    match requested_rows {
2320        RequestedRows::Ranges(ranges) => {
2321            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2322        }
2323        RequestedRows::Indices(indices) => {
2324            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2325        }
2326    }
2327
2328    // Drain the scheduler queue into a vec of decode messages
2329    let mut messages = Vec::new();
2330    while rx
2331        .recv_many(&mut messages, usize::MAX)
2332        .now_or_never()
2333        .unwrap()
2334        != 0
2335    {}
2336
2337    // Create a decoder to decode the messages
2338    let decode_iterator = create_decode_iterator(
2339        &target_schema,
2340        num_rows,
2341        config.batch_size,
2342        config.decoder_config.validate_on_decode,
2343        is_structural,
2344        messages.into(),
2345    )?;
2346
2347    Ok(decode_iterator)
2348}
2349
2350/// A decoder for single-column encodings of primitive data (this includes fixed size
2351/// lists of primitive data)
2352///
2353/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2354///
2355/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2356/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2357/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2358/// the decode task for batch 0 and the decode task for batch 1.
2359///
2360/// See [`crate::decoder`] for more information
2361pub trait PrimitivePageDecoder: Send + Sync {
2362    /// Decode data into buffers
2363    ///
2364    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2365    /// such as decompressing from some compressed representation.
2366    ///
2367    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2368    /// portion only needs to be updated if the encoding has some concept of an "optional"
2369    /// buffer.
2370    ///
2371    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2372    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2373    ///
2374    /// Binary decodings have two output buffers (one for values, one for offsets)
2375    ///
2376    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2377    /// fixed size strings into variable length strings going from one input buffer to multiple output
2378    /// buffers.
2379    ///
2380    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2381    /// generally end at one of these structures.  However, intermediate structures may exist which
2382    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2383    /// with buffers that have bits-per-value that is not a multiple of 8.
2384    ///
2385    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2386    /// type (order matters) and encodings that aim to decode into arrow types should respect
2387    /// this layout.
2388    /// # Arguments
2389    ///
2390    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2391    /// * `num_rows` - how many rows to decode
2392    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2393    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2394}
2395
2396/// A scheduler for single-column encodings of primitive data
2397///
2398/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2399///
2400/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2401/// be shared in follow-up I/O tasks.
2402///
2403/// See [`crate::decoder`] for more information
2404pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2405    /// Schedules a batch of I/O to load the data needed for the requested ranges
2406    ///
2407    /// Returns a future that will yield a decoder once the data has been loaded
2408    ///
2409    /// # Arguments
2410    ///
2411    /// * `range` - the range of row offsets (relative to start of page) requested
2412    ///   these must be ordered and must not overlap
2413    /// * `scheduler` - a scheduler to submit the I/O request to
2414    /// * `top_level_row` - the row offset of the top level field currently being
2415    ///   scheduled.  This can be used to assign priority to I/O requests
2416    fn schedule_ranges(
2417        &self,
2418        ranges: &[Range<u64>],
2419        scheduler: &Arc<dyn EncodingsIo>,
2420        top_level_row: u64,
2421    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2422}
2423
2424/// A trait to control the priority of I/O
2425pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2426    fn advance(&mut self, num_rows: u64);
2427    fn current_priority(&self) -> u64;
2428    fn box_clone(&self) -> Box<dyn PriorityRange>;
2429}
2430
2431/// A simple priority scheme for top-level fields with no parent
2432/// repetition
2433#[derive(Debug)]
2434pub struct SimplePriorityRange {
2435    priority: u64,
2436}
2437
2438impl SimplePriorityRange {
2439    fn new(priority: u64) -> Self {
2440        Self { priority }
2441    }
2442}
2443
2444impl PriorityRange for SimplePriorityRange {
2445    fn advance(&mut self, num_rows: u64) {
2446        self.priority += num_rows;
2447    }
2448
2449    fn current_priority(&self) -> u64 {
2450        self.priority
2451    }
2452
2453    fn box_clone(&self) -> Box<dyn PriorityRange> {
2454        Box::new(Self {
2455            priority: self.priority,
2456        })
2457    }
2458}
2459
2460/// Determining the priority of a list request is tricky.  We want
2461/// the priority to be the top-level row.  So if we have a
2462/// `list<list<int>>` and each outer list has 10 rows and each inner
2463/// list has 5 rows then the priority of the 100th item is 1 because
2464/// it is the 5th item in the 10th item of the *second* row.
2465///
2466/// This structure allows us to keep track of this complicated priority
2467/// relationship.
2468///
2469/// There's a fair amount of bookkeeping involved here.
2470///
2471/// A better approach (using repetition levels) is coming in the future.
2472pub struct ListPriorityRange {
2473    base: Box<dyn PriorityRange>,
2474    offsets: Arc<[u64]>,
2475    cur_index_into_offsets: usize,
2476    cur_position: u64,
2477}
2478
2479impl ListPriorityRange {
2480    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2481        Self {
2482            base,
2483            offsets,
2484            cur_index_into_offsets: 0,
2485            cur_position: 0,
2486        }
2487    }
2488}
2489
2490impl std::fmt::Debug for ListPriorityRange {
2491    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2492        f.debug_struct("ListPriorityRange")
2493            .field("base", &self.base)
2494            .field("offsets.len()", &self.offsets.len())
2495            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2496            .field("cur_position", &self.cur_position)
2497            .finish()
2498    }
2499}
2500
2501impl PriorityRange for ListPriorityRange {
2502    fn advance(&mut self, num_rows: u64) {
2503        // We've scheduled X items.  Now walk through the offsets to
2504        // determine how many rows we've scheduled.
2505        self.cur_position += num_rows;
2506        let mut idx_into_offsets = self.cur_index_into_offsets;
2507        while idx_into_offsets + 1 < self.offsets.len()
2508            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2509        {
2510            idx_into_offsets += 1;
2511        }
2512        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2513        self.cur_index_into_offsets = idx_into_offsets;
2514        self.base.advance(base_rows_advanced as u64);
2515    }
2516
2517    fn current_priority(&self) -> u64 {
2518        self.base.current_priority()
2519    }
2520
2521    fn box_clone(&self) -> Box<dyn PriorityRange> {
2522        Box::new(Self {
2523            base: self.base.box_clone(),
2524            offsets: self.offsets.clone(),
2525            cur_index_into_offsets: self.cur_index_into_offsets,
2526            cur_position: self.cur_position,
2527        })
2528    }
2529}
2530
2531/// Contains the context for a scheduler
2532pub struct SchedulerContext {
2533    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2534    io: Arc<dyn EncodingsIo>,
2535    cache: Arc<LanceCache>,
2536    name: String,
2537    path: Vec<u32>,
2538    path_names: Vec<String>,
2539}
2540
2541pub struct ScopedSchedulerContext<'a> {
2542    pub context: &'a mut SchedulerContext,
2543}
2544
2545impl<'a> ScopedSchedulerContext<'a> {
2546    pub fn pop(self) -> &'a mut SchedulerContext {
2547        self.context.pop();
2548        self.context
2549    }
2550}
2551
2552impl SchedulerContext {
2553    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2554        Self {
2555            io,
2556            cache,
2557            recv: None,
2558            name: "".to_string(),
2559            path: Vec::new(),
2560            path_names: Vec::new(),
2561        }
2562    }
2563
2564    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2565        &self.io
2566    }
2567
2568    pub fn cache(&self) -> &Arc<LanceCache> {
2569        &self.cache
2570    }
2571
2572    pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2573        self.path.push(index);
2574        self.path_names.push(name.to_string());
2575        ScopedSchedulerContext { context: self }
2576    }
2577
2578    pub fn pop(&mut self) {
2579        self.path.pop();
2580        self.path_names.pop();
2581    }
2582
2583    pub fn path_name(&self) -> String {
2584        let path = self.path_names.join("/");
2585        if self.recv.is_some() {
2586            format!("TEMP({}){}", self.name, path)
2587        } else {
2588            format!("ROOT{}", path)
2589        }
2590    }
2591
2592    pub fn current_path(&self) -> VecDeque<u32> {
2593        VecDeque::from_iter(self.path.iter().copied())
2594    }
2595
2596    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2597    pub fn locate_decoder(
2598        &mut self,
2599        decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2600    ) -> crate::previous::decoder::DecoderReady {
2601        trace!(
2602            "Scheduling decoder of type {:?} for {:?}",
2603            decoder.data_type(),
2604            self.path,
2605        );
2606        crate::previous::decoder::DecoderReady {
2607            decoder,
2608            path: self.current_path(),
2609        }
2610    }
2611}
2612
2613pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2614
2615impl std::fmt::Debug for UnloadedPageShard {
2616    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2617        f.debug_struct("UnloadedPage").finish()
2618    }
2619}
2620
2621#[derive(Debug)]
2622pub struct ScheduledScanLine {
2623    pub rows_scheduled: u64,
2624    pub decoders: Vec<MessageType>,
2625}
2626
2627pub trait StructuralSchedulingJob: std::fmt::Debug {
2628    /// Schedule the next batch of data
2629    ///
2630    /// Normally this equates to scheduling the next page of data into one task.  Very large pages
2631    /// might be split into multiple scan lines.  Each scan line has one or more rows.
2632    ///
2633    /// If a scheduler ends early it may return an empty vector.
2634    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2635}
2636
2637/// A filter expression to apply to the data
2638///
2639/// The core decoders do not currently take advantage of filtering in
2640/// any way.  In order to maintain the abstraction we represent filters
2641/// as an arbitrary byte sequence.
2642///
2643/// We recommend that encodings use Substrait for filters.
2644pub struct FilterExpression(pub Bytes);
2645
2646impl FilterExpression {
2647    /// Create a filter expression that does not filter any data
2648    ///
2649    /// This is currently represented by an empty byte array.  Encoders
2650    /// that are "filter aware" should make sure they handle this case.
2651    pub fn no_filter() -> Self {
2652        Self(Bytes::new())
2653    }
2654
2655    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2656    pub fn is_noop(&self) -> bool {
2657        self.0.is_empty()
2658    }
2659}
2660
2661pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2662    fn initialize<'a>(
2663        &'a mut self,
2664        filter: &'a FilterExpression,
2665        context: &'a SchedulerContext,
2666    ) -> BoxFuture<'a, Result<()>>;
2667    fn schedule_ranges<'a>(
2668        &'a self,
2669        ranges: &[Range<u64>],
2670        filter: &FilterExpression,
2671    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2672}
2673
2674/// A trait for tasks that decode data into an Arrow array
2675pub trait DecodeArrayTask: Send {
2676    /// Decodes the data into an Arrow array and its data size in bytes
2677    fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)>;
2678}
2679
2680impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2681    fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
2682        StructuralDecodeArrayTask::decode(*self)
2683            .map(|decoded_array| (decoded_array.array, decoded_array.data_size))
2684    }
2685}
2686
2687/// A task to decode data into an Arrow record batch
2688///
2689/// It has a child `task` which decodes a struct array with no nulls.
2690/// This is then converted into a record batch.
2691pub struct NextDecodeTask {
2692    /// The decode task itself
2693    pub task: Box<dyn DecodeArrayTask>,
2694    /// The number of rows that will be created
2695    pub num_rows: u64,
2696}
2697
2698impl NextDecodeTask {
2699    // Run the task and produce a record batch
2700    //
2701    // If the batch is very large this function will log a warning message
2702    // suggesting the user try a smaller batch size.
2703    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2704    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<(RecordBatch, u64)> {
2705        let (struct_arr, data_size) = self
2706            .task
2707            .decode()
2708            .map_err(|e| Error::internal(format!("Error decoding batch: {}", e)))?;
2709        let batch = RecordBatch::from(struct_arr.as_struct());
2710        if data_size > BATCH_SIZE_BYTES_WARNING {
2711            emitted_batch_size_warning.call_once(|| {
2712                let size_mb = data_size / 1024 / 1024;
2713                debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2714            });
2715        }
2716        Ok((batch, data_size))
2717    }
2718}
2719
2720// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2721// share some code paths between the two.  Decoders can safely unwrap into whatever
2722// style they expect since a file will be either all-2.0 or all-2.1
2723#[derive(Debug)]
2724pub enum MessageType {
2725    // The older v2.0 scheduler/decoder used a scheme where the message was the
2726    // decoder itself.  The messages were not sent in priority order and the decoder
2727    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2728    // complexity.
2729    DecoderReady(crate::previous::decoder::DecoderReady),
2730    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2731    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2732    // the decoder does not have to worry about waiting for I/O.
2733    UnloadedPage(UnloadedPageShard),
2734}
2735
2736impl MessageType {
2737    pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2738        match self {
2739            Self::DecoderReady(decoder) => decoder,
2740            Self::UnloadedPage(_) => {
2741                panic!("Expected DecoderReady but got UnloadedPage")
2742            }
2743        }
2744    }
2745
2746    pub fn into_structural(self) -> UnloadedPageShard {
2747        match self {
2748            Self::UnloadedPage(unloaded) => unloaded,
2749            Self::DecoderReady(_) => {
2750                panic!("Expected UnloadedPage but got DecoderReady")
2751            }
2752        }
2753    }
2754}
2755
2756pub struct DecoderMessage {
2757    pub scheduled_so_far: u64,
2758    pub decoders: Vec<MessageType>,
2759}
2760
2761pub struct DecoderContext {
2762    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2763}
2764
2765impl DecoderContext {
2766    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2767        Self { source }
2768    }
2769}
2770
2771pub struct DecodedPage {
2772    pub data: DataBlock,
2773    pub repdef: RepDefUnraveler,
2774}
2775
2776pub trait DecodePageTask: Send + std::fmt::Debug {
2777    /// Decodes the data into an Arrow array
2778    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2779}
2780
2781pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2782    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2783    fn num_rows(&self) -> u64;
2784}
2785
2786#[derive(Debug)]
2787pub struct LoadedPageShard {
2788    // The decoder that is ready to be decoded
2789    pub decoder: Box<dyn StructuralPageDecoder>,
2790    // The path to the decoder, the first value is the column index
2791    // following values, if present, are nested child indices
2792    //
2793    // For example, a path of [1, 1, 0] would mean to grab the second
2794    // column, then the second child, and then the first child.
2795    //
2796    // It could represent x in the following schema:
2797    //
2798    // score: float64
2799    // points: struct
2800    //   color: string
2801    //   location: struct
2802    //     x: float64
2803    //
2804    // Currently, only struct decoders have "children" although other
2805    // decoders may at some point as well.  List children are only
2806    // handled through indirect I/O at the moment and so they don't
2807    // need to be represented (yet)
2808    pub path: VecDeque<u32>,
2809}
2810
2811pub struct DecodedArray {
2812    pub array: ArrayRef,
2813    pub repdef: CompositeRepDefUnraveler,
2814    /// The number of bytes of data in this array (excluding Arrow overhead).
2815    pub data_size: u64,
2816}
2817
2818pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2819    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2820}
2821
2822pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2823    /// Add a newly scheduled child decoder
2824    ///
2825    /// The default implementation does not expect children and returns
2826    /// an error.
2827    fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2828    /// Creates a task to decode `num_rows` of data into an array
2829    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2830    /// The data type of the decoded data
2831    fn data_type(&self) -> &DataType;
2832}
2833
2834#[derive(Debug, Default)]
2835pub struct DecoderPlugins {}
2836
2837/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2838pub async fn decode_batch(
2839    batch: &EncodedBatch,
2840    filter: &FilterExpression,
2841    decoder_plugins: Arc<DecoderPlugins>,
2842    should_validate: bool,
2843    version: LanceFileVersion,
2844    cache: Option<Arc<LanceCache>>,
2845) -> Result<RecordBatch> {
2846    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2847    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2848    // polled twice.
2849
2850    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2851    let cache = if let Some(cache) = cache {
2852        cache
2853    } else {
2854        Arc::new(lance_core::cache::LanceCache::with_capacity(
2855            128 * 1024 * 1024,
2856        ))
2857    };
2858    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2859        batch.schema.as_ref(),
2860        &batch.top_level_columns,
2861        &batch.page_table,
2862        &vec![],
2863        batch.num_rows,
2864        decoder_plugins,
2865        io_scheduler.clone(),
2866        cache,
2867        filter,
2868        &DecoderConfig::default(),
2869    )
2870    .await?;
2871    let (tx, rx) = unbounded_channel();
2872    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2873    let is_structural = version >= LanceFileVersion::V2_1;
2874    let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2875    let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never"));
2876    let mut decode_stream = create_decode_stream(
2877        &batch.schema,
2878        batch.num_rows,
2879        batch.num_rows as u32,
2880        is_structural,
2881        should_validate,
2882        spawn_structural_batch_decode_tasks,
2883        rx,
2884        None,
2885    )?;
2886    decode_stream.next().await.unwrap().task.await
2887}
2888
2889#[cfg(test)]
2890// test coalesce indices to ranges
2891mod tests {
2892    use super::*;
2893
2894    #[test]
2895    fn test_read_zero_dimension_fsl_errors_instead_of_panicking() {
2896        // Simulates reading a column whose stored schema declares a
2897        // zero-dimension FixedSizeList, as old writers (before #5102) could
2898        // persist. The read plan is built by the field-scheduler factories,
2899        // which run the dimension guard before touching any column data, so
2900        // an empty column iterator is sufficient to reach the guard. The read
2901        // must surface a clean error rather than a divide-by-zero panic.
2902        use arrow_schema::Field as ArrowField;
2903
2904        let zero_dim = DataType::FixedSizeList(
2905            Arc::new(ArrowField::new("item", DataType::Float32, true)),
2906            0,
2907        );
2908        let field = Field::try_from(&ArrowField::new("vec", zero_dim, true)).unwrap();
2909        let strategy = CoreFieldDecoderStrategy::default();
2910
2911        let mut structural_columns = ColumnInfoIter::new(vec![], &[]);
2912        let err = strategy
2913            .create_structural_field_scheduler(&field, &mut structural_columns)
2914            .unwrap_err();
2915        assert!(
2916            err.to_string()
2917                .contains("dimension must be a positive integer"),
2918            "unexpected error: {}",
2919            err
2920        );
2921
2922        let mut legacy_columns = ColumnInfoIter::new(vec![], &[]);
2923        let err = strategy
2924            .create_legacy_field_scheduler(
2925                &field,
2926                &mut legacy_columns,
2927                FileBuffers {
2928                    positions_and_sizes: &[],
2929                },
2930            )
2931            .unwrap_err();
2932        assert!(
2933            err.to_string()
2934                .contains("dimension must be a positive integer"),
2935            "unexpected error: {}",
2936            err
2937        );
2938    }
2939
2940    #[test]
2941    fn test_coalesce_indices_to_ranges_with_single_index() {
2942        let indices = vec![1];
2943        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2944        assert_eq!(ranges, vec![1..2]);
2945    }
2946
2947    #[test]
2948    fn test_coalesce_indices_to_ranges() {
2949        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2950        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2951        assert_eq!(ranges, vec![1..10]);
2952    }
2953
2954    #[test]
2955    fn test_coalesce_indices_to_ranges_with_gaps() {
2956        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2957        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2958        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2959    }
2960
2961    #[test]
2962    fn test_estimate_bytes_per_row() {
2963        assert_eq!(estimate_bytes_per_row(&DataType::Int32), 4.0);
2964        assert_eq!(estimate_bytes_per_row(&DataType::Int64), 8.0);
2965        assert_eq!(estimate_bytes_per_row(&DataType::Float32), 4.0);
2966        assert_eq!(estimate_bytes_per_row(&DataType::Boolean), 1.0 / 8.0);
2967        assert_eq!(estimate_bytes_per_row(&DataType::Utf8), 64.0);
2968        assert_eq!(estimate_bytes_per_row(&DataType::Binary), 64.0);
2969        // Struct of 4 x Int32 = 16 bytes
2970        let struct_type = DataType::Struct(Fields::from(vec![
2971            ArrowField::new("a", DataType::Int32, false),
2972            ArrowField::new("b", DataType::Int32, false),
2973            ArrowField::new("c", DataType::Int32, false),
2974            ArrowField::new("d", DataType::Int32, false),
2975        ]));
2976        assert_eq!(estimate_bytes_per_row(&struct_type), 16.0);
2977    }
2978
2979    /// Helper: encode a batch, then decode it as a stream with optional
2980    /// `batch_size_bytes`, collecting all output batches.
2981    async fn decode_batches_with_byte_limit(
2982        batch: &RecordBatch,
2983        batch_size: u32,
2984        batch_size_bytes: Option<u64>,
2985    ) -> Vec<RecordBatch> {
2986        use crate::encoder::{EncodingOptions, default_encoding_strategy, encode_batch};
2987        use crate::version::LanceFileVersion;
2988
2989        let version = LanceFileVersion::V2_1;
2990        let options = EncodingOptions {
2991            version,
2992            ..Default::default()
2993        };
2994        let strategy = default_encoding_strategy(version);
2995        let schema = Schema::try_from(batch.schema().as_ref()).unwrap();
2996        let encoded = encode_batch(batch, Arc::new(schema.clone()), strategy.as_ref(), &options)
2997            .await
2998            .unwrap();
2999
3000        let io_scheduler =
3001            Arc::new(BufferScheduler::new(encoded.data.clone())) as Arc<dyn EncodingsIo>;
3002        let cache = Arc::new(lance_core::cache::LanceCache::with_capacity(
3003            128 * 1024 * 1024,
3004        ));
3005        let decoder_plugins = Arc::new(DecoderPlugins::default());
3006
3007        let mut decode_scheduler = DecodeBatchScheduler::try_new(
3008            encoded.schema.as_ref(),
3009            &encoded.top_level_columns,
3010            &encoded.page_table,
3011            &vec![],
3012            encoded.num_rows,
3013            decoder_plugins,
3014            io_scheduler.clone(),
3015            cache,
3016            &FilterExpression::no_filter(),
3017            &DecoderConfig::default(),
3018        )
3019        .await
3020        .unwrap();
3021
3022        let (tx, rx) = unbounded_channel();
3023        decode_scheduler.schedule_range(
3024            0..encoded.num_rows,
3025            &FilterExpression::no_filter(),
3026            tx,
3027            io_scheduler,
3028        );
3029
3030        let mut decode_stream = create_decode_stream(
3031            &encoded.schema,
3032            encoded.num_rows,
3033            batch_size,
3034            /*is_structural=*/ true,
3035            /*should_validate=*/ true,
3036            /*spawn_structural_batch_decode_tasks=*/ true,
3037            rx,
3038            batch_size_bytes,
3039        )
3040        .unwrap();
3041
3042        let mut batches = Vec::new();
3043        while let Some(task) = decode_stream.next().await {
3044            batches.push(task.task.await.unwrap());
3045        }
3046        batches
3047    }
3048
3049    #[tokio::test]
3050    async fn test_byte_sized_batches_fixed_width() {
3051        use arrow_array::Int32Array;
3052
3053        // 1000 rows x 4 Int32 columns = 16 bytes/row
3054        let num_rows: i32 = 1000;
3055        let arrays: Vec<Arc<dyn arrow_array::Array>> = (0..4)
3056            .map(|col| {
3057                Arc::new(Int32Array::from_iter_values(
3058                    (0..num_rows).map(move |row| row * 10 + col),
3059                )) as _
3060            })
3061            .collect();
3062
3063        let schema = Arc::new(ArrowSchema::new(vec![
3064            ArrowField::new("a", DataType::Int32, false),
3065            ArrowField::new("b", DataType::Int32, false),
3066            ArrowField::new("c", DataType::Int32, false),
3067            ArrowField::new("d", DataType::Int32, false),
3068        ]));
3069        let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3070
3071        // 16 bytes/row, batch_size_bytes=1600 => 100 rows/batch
3072        let batches =
3073            decode_batches_with_byte_limit(&input_batch, /*batch_size=*/ 1024, Some(1600)).await;
3074
3075        // Should produce 10 batches of 100 rows each
3076        assert_eq!(batches.len(), 10);
3077        for (i, batch) in batches.iter().enumerate() {
3078            assert_eq!(
3079                batch.num_rows(),
3080                100,
3081                "batch {i} should have 100 rows, got {}",
3082                batch.num_rows()
3083            );
3084        }
3085
3086        // Verify roundtrip: concatenate and compare
3087        let all_batches: Vec<&RecordBatch> = batches.iter().collect();
3088        let concatenated =
3089            arrow_select::concat::concat_batches(&batches[0].schema(), all_batches.iter().copied())
3090                .unwrap();
3091        assert_eq!(concatenated.num_rows(), num_rows as usize);
3092        for col in 0..4 {
3093            assert_eq!(
3094                concatenated.column(col).as_ref(),
3095                input_batch.column(col).as_ref(),
3096                "column {col} roundtrip mismatch"
3097            );
3098        }
3099    }
3100
3101    #[tokio::test]
3102    async fn test_byte_sized_batches_none_unchanged() {
3103        use arrow_array::Int32Array;
3104
3105        // Without batch_size_bytes, rows_per_batch controls batching
3106        let num_rows: i32 = 1000;
3107        let arrays: Vec<Arc<dyn arrow_array::Array>> = (0..2)
3108            .map(|col| {
3109                Arc::new(Int32Array::from_iter_values(
3110                    (0..num_rows).map(move |row| row * 10 + col),
3111                )) as _
3112            })
3113            .collect();
3114
3115        let schema = Arc::new(ArrowSchema::new(vec![
3116            ArrowField::new("x", DataType::Int32, false),
3117            ArrowField::new("y", DataType::Int32, false),
3118        ]));
3119        let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3120
3121        // batch_size=250, batch_size_bytes=None => 4 batches of 250 rows
3122        let batches = decode_batches_with_byte_limit(&input_batch, /*batch_size=*/ 250, None).await;
3123        assert_eq!(batches.len(), 4);
3124        for (i, batch) in batches.iter().enumerate() {
3125            assert_eq!(
3126                batch.num_rows(),
3127                250,
3128                "batch {i} should have 250 rows, got {}",
3129                batch.num_rows()
3130            );
3131        }
3132    }
3133
3134    #[tokio::test]
3135    async fn test_byte_sized_batches_feedback_convergence() {
3136        use arrow_array::StringArray;
3137
3138        // Each row has a 100-byte string. Schema estimate = 64 bytes (default
3139        // for Utf8), so the first batch will overshoot. The feedback loop
3140        // should correct subsequent batches toward the target.
3141        let num_rows = 500;
3142        let value: String = "x".repeat(100);
3143        let arrays: Vec<Arc<dyn arrow_array::Array>> = vec![Arc::new(StringArray::from(
3144            (0..num_rows).map(|_| value.as_str()).collect::<Vec<_>>(),
3145        ))];
3146        let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
3147            "s",
3148            DataType::Utf8,
3149            false,
3150        )]));
3151        let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3152
3153        // Target 5000 bytes/batch. At 100 bytes/row the ideal is 50 rows/batch.
3154        // Schema estimate is 64 bytes/row → first batch ~78 rows (overshoot).
3155        // After feedback kicks in, batches should converge to ~50 rows.
3156        let target_bytes: u64 = 5000;
3157        let batches = decode_batches_with_byte_limit(
3158            &input_batch,
3159            /*batch_size=*/ 1024,
3160            Some(target_bytes),
3161        )
3162        .await;
3163
3164        // Verify all data round-trips correctly
3165        let all_batches: Vec<&RecordBatch> = batches.iter().collect();
3166        let concatenated =
3167            arrow_select::concat::concat_batches(&batches[0].schema(), all_batches.iter().copied())
3168                .unwrap();
3169        assert_eq!(concatenated.num_rows(), num_rows as usize);
3170        assert_eq!(
3171            concatenated.column(0).as_ref(),
3172            input_batch.column(0).as_ref()
3173        );
3174
3175        // After the first batch, subsequent batches should be closer to the
3176        // target. The ideal is 50 rows/batch.
3177        assert!(
3178            batches.len() >= 2,
3179            "need at least 2 batches to test convergence"
3180        );
3181        // The first batch uses the schema estimate (64 bytes/row) →
3182        // ~78 rows. After feedback the rows should settle near 50.
3183        if batches.len() >= 3 {
3184            let second_batch_rows = batches[1].num_rows();
3185            let third_batch_rows = batches[2].num_rows();
3186            // Both should be within 20% of the ideal (50 rows)
3187            assert!(
3188                (40..=60).contains(&second_batch_rows),
3189                "second batch should be near 50 rows, got {second_batch_rows}"
3190            );
3191            assert!(
3192                (40..=60).contains(&third_batch_rows),
3193                "third batch should be near 50 rows, got {third_batch_rows}"
3194            );
3195        }
3196    }
3197}