Skip to main content

lance_encoding/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities and traits for scheduling & decoding data
5//!
6//! Reading data involves two steps: scheduling and decoding.  The
7//! scheduling step is responsible for figuring out what data is needed
8//! and issuing the appropriate I/O requests.  The decoding step is
9//! responsible for taking the loaded data and turning it into Arrow
10//! arrays.
11//!
12//! # Scheduling
13//!
14//! Scheduling is split into `FieldScheduler` and `PageScheduler`.
15//! There is one field scheduler for each output field, which may map to many
16//! columns of actual data.  A field scheduler is responsible for figuring out
17//! the order in which pages should be scheduled.  Field schedulers then delegate
18//! to page schedulers to figure out the I/O requests that need to be made for
19//! the page.
20//!
21//! Page schedulers also create the decoders that will be used to decode the
22//! scheduled data.
23//!
24//! # Decoding
25//!
26//! Decoders are split into `PhysicalPageDecoder` and
27//! [`LogicalPageDecoder`].  Note that both physical and logical decoding
28//! happens on a per-page basis.  There is no concept of a "field decoder" or
29//! "column decoder".
30//!
31//! The physical decoders handle lower level encodings.  They have a few advantages:
32//!
33//!  * They do not need to decode into an Arrow array and so they don't need
34//!    to be enveloped into the Arrow filesystem (e.g. Arrow doesn't have a
35//!    bit-packed type.  We can use variable-length binary but that is kind
36//!    of awkward)
37//!  * They can decode into an existing allocation.  This can allow for "page
38//!    bridging".  If we are trying to decode into a batch of 1024 rows and
39//!    the rows 0..1024 are spread across two pages then we can avoid a memory
40//!    copy by allocating once and decoding each page into the outer allocation.
41//!    (note: page bridging is not actually implemented yet)
42//!
43//! However, there are some limitations for physical decoders:
44//!
45//!  * They are constrained to a single column
46//!  * The API is more complex
47//!
48//! The logical decoders are designed to map one or more columns of Lance
49//! data into an Arrow array.
50//!
51//! Typically, a "logical encoding" will have both a logical decoder and a field scheduler.
52//! Meanwhile, a "physical encoding" will have a physical decoder but no corresponding field
53//! scheduler.
54//!
55//!
56//! # General notes
57//!
58//! Encodings are typically nested into each other to form a tree.  The top of the tree is
59//! the user requested schema.  Each field in that schema is assigned to one top-level logical
60//! encoding.  That encoding can then contain other logical encodings or physical encodings.
61//! Physical encodings can also contain other physical encodings.
62//!
63//! So, for example, a single field in the Arrow schema might have the type `List<UInt32>`
64//!
65//! The encoding tree could then be:
66//!
67//! root: List (logical encoding)
68//!  - indices: Primitive (logical encoding)
69//!    - column: Basic (physical encoding)
70//!      - validity: Bitmap (physical encoding)
71//!      - values: RLE (physical encoding)
72//!        - runs: Value (physical encoding)
73//!        - values: Value (physical encoding)
74//!  - items: Primitive (logical encoding)
75//!    - column: Basic (physical encoding)
76//!      - values: Value (physical encoding)
77//!
78//! Note that, in this example, root.items.column does not have a validity because there were
79//! no nulls in the page.
80//!
81//! ## Multiple buffers or multiple columns?
82//!
83//! Note that there are many different ways we can write encodings.  For example, we might
84//! store primitive fields in a single column with two buffers (one for validity and one for
85//! values)
86//!
87//! On the other hand, we could also store a primitive field as two different columns.  One
88//! that yields a non-nullable boolean array and one that yields a non-nullable array of items.
89//! Then we could combine these two arrays into a single array where the boolean array is the
90//! bitmap.  There are a few subtle differences between the approaches:
91//!
92//! * Storing things as multiple buffers within the same column is generally more efficient and
93//!   easier to schedule.  For example, in-batch coalescing is very easy but can only be done
94//!   on data that is in the same page.
95//! * When things are stored in multiple columns you have to worry about their pages not being
96//!   in sync.  In our previous validity / values example this means we might have to do some
97//!   memory copies to get the validity array and values arrays to be the same length as
98//!   decode.
99//! * When things are stored in a single column, projection is impossible.  For example, if we
100//!   tried to store all the struct fields in a single column with lots of buffers then we wouldn't
101//!   be able to read back individual fields of the struct.
102//!
103//! The fixed size list decoding is an interesting example because it is actually both a physical
104//! encoding and a logical encoding.  A fixed size list of a physical encoding is, itself, a physical
105//! encoding (e.g. a fixed size list of doubles).  However, a fixed size list of a logical encoding
106//! is a logical encoding (e.g. a fixed size list of structs).
107//!
108//! # The scheduling loop
109//!
110//! Reading a Lance file involves both scheduling and decoding.  Its generally expected that these
111//! will run as two separate threads.
112//!
113//! ```text
114//!
115//!                                    I/O PARALLELISM
116//!                       Issues
117//!                       Requests   ┌─────────────────┐
118//!                                  │                 │        Wait for
119//!                       ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
120//!                       │          │                 │        For batch    │
121//!                       │          └─────────────────┘             │3      │
122//!                       │                                          │       │
123//!                       │                                          │       │2
124//! ┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
125//! │                       │                              │                  │Poll
126//! │       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
127//! │       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
128//! │                       │                              │                  │
129//! └─────▲─────────────┬───┘                              └─────────┬────────┘
130//!       │             │                                            │4
131//!       │             │                                            │
132//!       └─────────────┘                                   ┌────────┴────────┐
133//!  Caller of schedule_range                Buffer polling │                 │
134//!  will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
135//!  and schedule one decode                 parallelism    │ Task            │
136//!  task (and all needed I/O)               (thread per    │                 │
137//!  per logical page                         batch)        └─────────────────┘
138//! ```
139//!
140//! The scheduling thread will work through the file from the
141//! start to the end as quickly as possible.  Data is scheduled one page at a time in a row-major
142//! fashion.  For example, imagine we have a file with the following page structure:
143//!
144//! ```text
145//! Score (Float32)     | C0P0 |
146//! Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
147//! Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
148//! ```
149//!
150//! This would be quite common as each of these pages has the same number of bytes.  Let's pretend
151//! each page is 1MiB and so there are 256Ki rows of data.  Each page of `Score` has 256Ki rows.
152//! Each page of `Id` has 64Ki rows.  Each page of `Vector` has 256 rows.  The scheduler would then
153//! schedule in the following order:
154//!
155//! C0 P0
156//! C1 P0
157//! C2 P0
158//! C2 P1
159//! ... (254 pages omitted)
160//! C2 P255
161//! C1 P1
162//! C2 P256
163//! ... (254 pages omitted)
164//! C2 P511
165//! C1 P2
166//! C2 P512
167//! ... (254 pages omitted)
168//! C2 P767
169//! C1 P3
170//! C2 P768
171//! ... (254 pages omitted)
172//! C2 P1024
173//!
174//! This is the ideal scheduling order because it means we can decode complete rows as quickly as possible.
175//! Note that the scheduler thread does not need to wait for I/O to happen at any point.  As soon as it starts
176//! it will start scheduling one page of I/O after another until it has scheduled the entire file's worth of
177//! I/O.  This is slightly different than other file readers which have "row group parallelism" and will
178//! typically only schedule X row groups worth of reads at a time.
179//!
180//! In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute
181//! falls behind.
182//!
183//! ## Indirect I/O
184//!
185//! Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded
186//! the file.  This happens when we have variable sized list data.  In that case the scheduling task for that
187//! page will only schedule the first part of the read (loading the list offsets).  It will then immediately
188//! spawn a new tokio task to wait for that I/O and decode the list offsets.  That follow-up task is not part
189//! of the scheduling loop or the decode loop.  It is a free task.  Once the list offsets are decoded we submit
190//! a follow-up I/O task.  This task is scheduled at a high priority because the decoder is going to need it soon.
191//!
192//! # The decode loop
193//!
194//! As soon as the scheduler starts we can start decoding.  Each time we schedule a page we
195//! push a decoder for that page's data into a channel.  The decode loop
196//! ([`BatchDecodeStream`]) reads from that channel.  Each time it receives a decoder it
197//! waits until the decoder has all of its data.  Then it grabs the next decoder.  Once it has
198//! enough loaded decoders to complete a batch worth of rows it will spawn a "decode batch task".
199//!
200//! These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow
201//! arrays.  This may involve signifciant CPU processing like decompression or arithmetic in order
202//! to restore the data to its correct in-memory representation.
203//!
204//! ## Batch size
205//!
206//! The `BatchDecodeStream` is configured with a batch size.  This does not need to have any
207//! relation to the page size(s) used to write the data.  This keeps our compute work completely
208//! independent of our I/O work.  We suggest using small batch sizes:
209//!
210//!  * Batches should fit in CPU cache (at least L3)
211//!  * More batches means more opportunity for parallelism
212//!  * The "batch overhead" is very small in Lance compared to other formats because it has no
213//!    relation to the way the data is stored.
214
215use std::collections::VecDeque;
216use std::sync::atomic::{AtomicU64, Ordering};
217use std::sync::{LazyLock, Once, OnceLock};
218use std::{ops::Range, sync::Arc};
219
220use arrow_array::cast::AsArray;
221use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
222use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
223use bytes::Bytes;
224use futures::future::{BoxFuture, MaybeDone, maybe_done};
225use futures::stream::{self, BoxStream};
226use futures::{FutureExt, StreamExt};
227use lance_arrow::DataTypeExt;
228use lance_core::cache::LanceCache;
229use lance_core::datatypes::{BLOB_DESC_LANCE_FIELD, Field, Schema};
230use lance_core::utils::futures::{FinallyStreamExt, StreamOnDropExt};
231use lance_core::utils::parse::parse_env_as_bool;
232use log::{debug, trace, warn};
233use tokio::sync::mpsc::error::SendError;
234use tokio::sync::mpsc::{self, unbounded_channel};
235
236use lance_core::error::LanceOptionExt;
237use lance_core::{ArrowResult, Error, Result};
238use tracing::instrument;
239
240use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
241use crate::data::DataBlock;
242use crate::encoder::EncodedBatch;
243use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
244use crate::encodings::logical::list::StructuralListScheduler;
245use crate::encodings::logical::map::StructuralMapScheduler;
246use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
247use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
248use crate::format::pb::{self, column_encoding};
249use crate::format::pb21;
250use crate::previous::decoder::LogicalPageDecoder;
251use crate::previous::encodings::logical::list::OffsetPageInfo;
252use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
253use crate::previous::encodings::logical::{
254    binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
255    primitive::PrimitiveFieldScheduler,
256};
257use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
258use crate::version::LanceFileVersion;
259use crate::{BufferScheduler, EncodingsIo};
260
261// If users are getting batches over 10MiB large then it's time to reduce the batch size
262const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
263const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str =
264    "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE";
265const ENV_LANCE_READ_CACHE_REPETITION_INDEX: &str = "LANCE_READ_CACHE_REPETITION_INDEX";
266const ENV_LANCE_INLINE_SCHEDULING_THRESHOLD: &str = "LANCE_INLINE_SCHEDULING_THRESHOLD";
267
268// If a request is for at most this many rows we skip the scheduler-task spawn
269// and run scheduling inline as part of the `schedule_and_decode` await.
270const DEFAULT_INLINE_SCHEDULING_THRESHOLD: u64 = 16 * 1024;
271
272fn default_cache_repetition_index() -> bool {
273    static DEFAULT_CACHE_REPETITION_INDEX: OnceLock<bool> = OnceLock::new();
274    *DEFAULT_CACHE_REPETITION_INDEX
275        .get_or_init(|| parse_env_as_bool(ENV_LANCE_READ_CACHE_REPETITION_INDEX, true))
276}
277
278fn inline_scheduling_threshold() -> u64 {
279    static THRESHOLD: OnceLock<u64> = OnceLock::new();
280    *THRESHOLD.get_or_init(|| {
281        std::env::var(ENV_LANCE_INLINE_SCHEDULING_THRESHOLD)
282            .ok()
283            .and_then(|v| v.trim().parse::<u64>().ok())
284            .unwrap_or(DEFAULT_INLINE_SCHEDULING_THRESHOLD)
285    })
286}
287
288/// Top-level encoding message for a page.  Wraps both the
289/// legacy pb::ArrayEncoding and the newer pb::PageLayout
290///
291/// A file should only use one or the other and never both.
292/// 2.0 decoders can always assume this is pb::ArrayEncoding
293/// and 2.1+ decoders can always assume this is pb::PageLayout
294#[derive(Debug)]
295pub enum PageEncoding {
296    Legacy(pb::ArrayEncoding),
297    Structural(pb21::PageLayout),
298}
299
300impl PageEncoding {
301    pub fn as_legacy(&self) -> &pb::ArrayEncoding {
302        match self {
303            Self::Legacy(enc) => enc,
304            Self::Structural(_) => panic!("Expected a legacy encoding"),
305        }
306    }
307
308    pub fn as_structural(&self) -> &pb21::PageLayout {
309        match self {
310            Self::Structural(enc) => enc,
311            Self::Legacy(_) => panic!("Expected a structural encoding"),
312        }
313    }
314
315    pub fn is_structural(&self) -> bool {
316        matches!(self, Self::Structural(_))
317    }
318}
319
320/// Metadata describing a page in a file
321///
322/// This is typically created by reading the metadata section of a Lance file
323#[derive(Debug)]
324pub struct PageInfo {
325    /// The number of rows in the page
326    pub num_rows: u64,
327    /// The priority (top level row number) of the page
328    ///
329    /// This is only set in 2.1 files and will be 0 for 2.0 files
330    pub priority: u64,
331    /// The encoding that explains the buffers in the page
332    pub encoding: PageEncoding,
333    /// The offsets and sizes of the buffers in the file
334    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
335}
336
337/// Metadata describing a column in a file
338///
339/// This is typically created by reading the metadata section of a Lance file
340#[derive(Debug, Clone)]
341pub struct ColumnInfo {
342    /// The index of the column in the file
343    pub index: u32,
344    /// The metadata for each page in the column
345    pub page_infos: Arc<[PageInfo]>,
346    /// File positions and their sizes of the column-level buffers
347    pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
348    pub encoding: pb::ColumnEncoding,
349}
350
351impl ColumnInfo {
352    /// Create a new instance
353    pub fn new(
354        index: u32,
355        page_infos: Arc<[PageInfo]>,
356        buffer_offsets_and_sizes: Vec<(u64, u64)>,
357        encoding: pb::ColumnEncoding,
358    ) -> Self {
359        Self {
360            index,
361            page_infos,
362            buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
363            encoding,
364        }
365    }
366
367    pub fn is_structural(&self) -> bool {
368        self.page_infos
369            // Can just look at the first since all should be the same
370            .first()
371            .map(|page| page.encoding.is_structural())
372            .unwrap_or(false)
373    }
374}
375
376enum RootScheduler {
377    Structural(Box<dyn StructuralFieldScheduler>),
378    Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
379}
380
381impl RootScheduler {
382    fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
383        match self {
384            Self::Structural(_) => panic!("Expected a legacy scheduler"),
385            Self::Legacy(s) => s,
386        }
387    }
388
389    fn as_structural(&self) -> &dyn StructuralFieldScheduler {
390        match self {
391            Self::Structural(s) => s.as_ref(),
392            Self::Legacy(_) => panic!("Expected a structural scheduler"),
393        }
394    }
395}
396
397/// The scheduler for decoding batches
398///
399/// Lance decoding is done in two steps, scheduling, and decoding.  The
400/// scheduling tends to be lightweight and should quickly figure what data
401/// is needed from the disk issue the appropriate I/O requests.  A decode task is
402/// created to eventually decode the data (once it is loaded) and scheduling
403/// moves on to scheduling the next page.
404///
405/// Meanwhile, it's expected that a decode stream will be setup to run at the
406/// same time.  Decode tasks take the data that is loaded and turn it into
407/// Arrow arrays.
408///
409/// This approach allows us to keep our I/O parallelism and CPU parallelism
410/// completely separate since those are often two very different values.
411///
412/// Backpressure should be achieved via the I/O service.  Requests that are
413/// issued will pile up if the decode stream is not polling quickly enough.
414/// The [`crate::EncodingsIo::submit_request`] function should return a pending
415/// future once there are too many I/O requests in flight.
416///
417/// TODO: Implement backpressure
418pub struct DecodeBatchScheduler {
419    root_scheduler: RootScheduler,
420    pub root_fields: Fields,
421    cache: Arc<LanceCache>,
422}
423
424pub struct ColumnInfoIter<'a> {
425    column_infos: Vec<Arc<ColumnInfo>>,
426    column_indices: &'a [u32],
427    column_info_pos: usize,
428    column_indices_pos: usize,
429}
430
431impl<'a> ColumnInfoIter<'a> {
432    pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
433        let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
434        Self {
435            column_infos,
436            column_indices,
437            column_info_pos: initial_pos,
438            column_indices_pos: 0,
439        }
440    }
441
442    pub fn peek(&self) -> &Arc<ColumnInfo> {
443        &self.column_infos[self.column_info_pos]
444    }
445
446    pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
447        let column_info = self.column_infos[self.column_info_pos].clone();
448        let transformed = transform(column_info);
449        self.column_infos[self.column_info_pos] = transformed;
450    }
451
452    pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
453        self.next().ok_or_else(|| {
454            Error::invalid_input(
455                "there were more fields in the schema than provided column indices / infos",
456            )
457        })
458    }
459
460    fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
461        if self.column_info_pos < self.column_infos.len() {
462            let info = &self.column_infos[self.column_info_pos];
463            self.column_info_pos += 1;
464            Some(info)
465        } else {
466            None
467        }
468    }
469
470    pub(crate) fn next_top_level(&mut self) {
471        self.column_indices_pos += 1;
472        if self.column_indices_pos < self.column_indices.len() {
473            self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
474        } else {
475            self.column_info_pos = self.column_infos.len();
476        }
477    }
478}
479
480/// These contain the file buffers shared across the entire file
481#[derive(Clone, Copy, Debug)]
482pub struct FileBuffers<'a> {
483    pub positions_and_sizes: &'a [(u64, u64)],
484}
485
486/// These contain the file buffers and also buffers specific to a column
487#[derive(Clone, Copy, Debug)]
488pub struct ColumnBuffers<'a, 'b> {
489    pub file_buffers: FileBuffers<'a>,
490    pub positions_and_sizes: &'b [(u64, u64)],
491}
492
493/// These contain the file & column buffers and also buffers specific to a page
494#[derive(Clone, Copy, Debug)]
495pub struct PageBuffers<'a, 'b, 'c> {
496    pub column_buffers: ColumnBuffers<'a, 'b>,
497    pub positions_and_sizes: &'c [(u64, u64)],
498}
499
500/// The core decoder strategy handles all the various Arrow types
501#[derive(Debug)]
502pub struct CoreFieldDecoderStrategy {
503    pub validate_data: bool,
504    pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
505    pub cache_repetition_index: bool,
506}
507
508impl Default for CoreFieldDecoderStrategy {
509    fn default() -> Self {
510        Self {
511            validate_data: false,
512            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
513            cache_repetition_index: false,
514        }
515    }
516}
517
518impl CoreFieldDecoderStrategy {
519    /// Create a new strategy with cache_repetition_index enabled
520    pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
521        self.cache_repetition_index = cache_repetition_index;
522        self
523    }
524
525    /// Create a new strategy from decoder config
526    pub fn from_decoder_config(config: &DecoderConfig) -> Self {
527        Self {
528            validate_data: config.validate_on_decode,
529            decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
530            cache_repetition_index: config.cache_repetition_index,
531        }
532    }
533
534    /// This is just a sanity check to ensure there is no "wrapped encodings"
535    /// that haven't been handled.
536    fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
537        let column_encoding = column_info
538            .encoding
539            .column_encoding
540            .as_ref()
541            .ok_or_else(|| {
542                Error::invalid_input(format!(
543                    "the column at index {} was missing a ColumnEncoding",
544                    column_info.index
545                ))
546            })?;
547        if matches!(
548            column_encoding,
549            pb::column_encoding::ColumnEncoding::Values(_)
550        ) {
551            Ok(())
552        } else {
553            Err(Error::invalid_input(format!(
554                "the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it",
555                column_info.index, field_name, column_encoding
556            )))
557        }
558    }
559
560    fn is_structural_primitive(data_type: &DataType) -> bool {
561        if data_type.is_primitive() {
562            true
563        } else {
564            match data_type {
565                // DataType::is_primitive doesn't consider these primitive but we do
566                DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
567                DataType::Boolean
568                | DataType::Null
569                | DataType::FixedSizeBinary(_)
570                | DataType::Binary
571                | DataType::LargeBinary
572                | DataType::Utf8
573                | DataType::LargeUtf8 => true,
574                DataType::FixedSizeList(inner, _) => {
575                    Self::is_structural_primitive(inner.data_type())
576                }
577                _ => false,
578            }
579        }
580    }
581
582    fn is_primitive_legacy(data_type: &DataType) -> bool {
583        if data_type.is_primitive() {
584            true
585        } else {
586            match data_type {
587                // DataType::is_primitive doesn't consider these primitive but we do
588                DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
589                DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
590                _ => false,
591            }
592        }
593    }
594
595    fn create_primitive_scheduler(
596        &self,
597        field: &Field,
598        column: &ColumnInfo,
599        buffers: FileBuffers,
600    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
601        Self::ensure_values_encoded(column, &field.name)?;
602        // Primitive fields map to a single column
603        let column_buffers = ColumnBuffers {
604            file_buffers: buffers,
605            positions_and_sizes: &column.buffer_offsets_and_sizes,
606        };
607        Ok(Box::new(PrimitiveFieldScheduler::new(
608            column.index,
609            field.data_type(),
610            column.page_infos.clone(),
611            column_buffers,
612            self.validate_data,
613        )))
614    }
615
616    /// Helper method to verify the page encoding of a struct header column
617    fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
618        Self::ensure_values_encoded(column_info, field_name)?;
619        if column_info.page_infos.len() != 1 {
620            return Err(Error::invalid_input_source(format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into()));
621        }
622        let encoding = &column_info.page_infos[0].encoding;
623        match encoding.as_legacy().array_encoding.as_ref().unwrap() {
624            pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
625            _ => Err(Error::invalid_input_source(format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into())),
626        }
627    }
628
629    fn check_packed_struct(column_info: &ColumnInfo) -> bool {
630        let encoding = &column_info.page_infos[0].encoding;
631        matches!(
632            encoding.as_legacy().array_encoding.as_ref().unwrap(),
633            pb::array_encoding::ArrayEncoding::PackedStruct(_)
634        )
635    }
636
637    fn create_list_scheduler(
638        &self,
639        list_field: &Field,
640        column_infos: &mut ColumnInfoIter,
641        buffers: FileBuffers,
642        offsets_column: &ColumnInfo,
643    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
644        Self::ensure_values_encoded(offsets_column, &list_field.name)?;
645        let offsets_column_buffers = ColumnBuffers {
646            file_buffers: buffers,
647            positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
648        };
649        let items_scheduler =
650            self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
651
652        let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
653            .page_infos
654            .iter()
655            .filter(|offsets_page| offsets_page.num_rows > 0)
656            .map(|offsets_page| {
657                if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
658                    &offsets_page.encoding.as_legacy().array_encoding
659                {
660                    let inner = PageInfo {
661                        buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
662                        encoding: PageEncoding::Legacy(
663                            list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
664                        ),
665                        num_rows: offsets_page.num_rows,
666                        priority: 0,
667                    };
668                    (
669                        inner,
670                        OffsetPageInfo {
671                            offsets_in_page: offsets_page.num_rows,
672                            null_offset_adjustment: list_encoding.null_offset_adjustment,
673                            num_items_referenced_by_page: list_encoding.num_items,
674                        },
675                    )
676                } else {
677                    // TODO: Should probably return Err here
678                    panic!("Expected a list column");
679                }
680            })
681            .unzip();
682        let inner = Arc::new(PrimitiveFieldScheduler::new(
683            offsets_column.index,
684            DataType::UInt64,
685            Arc::from(inner_infos.into_boxed_slice()),
686            offsets_column_buffers,
687            self.validate_data,
688        )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
689        let items_field = match list_field.data_type() {
690            DataType::List(inner) => inner,
691            DataType::LargeList(inner) => inner,
692            _ => unreachable!(),
693        };
694        let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
695            DataType::Int32
696        } else {
697            DataType::Int64
698        };
699        Ok(Box::new(ListFieldScheduler::new(
700            inner,
701            items_scheduler.into(),
702            items_field,
703            offset_type,
704            null_offset_adjustments,
705        )))
706    }
707
708    fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
709        if let column_encoding::ColumnEncoding::Blob(blob) =
710            column_info.encoding.column_encoding.as_ref().unwrap()
711        {
712            let mut column_info = column_info.clone();
713            column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
714            Some(column_info)
715        } else {
716            None
717        }
718    }
719
720    fn create_structural_field_scheduler(
721        &self,
722        field: &Field,
723        column_infos: &mut ColumnInfoIter,
724    ) -> Result<Box<dyn StructuralFieldScheduler>> {
725        let data_type = field.data_type();
726        if Self::is_structural_primitive(&data_type) {
727            let column_info = column_infos.expect_next()?;
728            let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
729                column_info.as_ref(),
730                self.decompressor_strategy.as_ref(),
731                self.cache_repetition_index,
732                field,
733            )?);
734
735            // advance to the next top level column
736            column_infos.next_top_level();
737
738            return Ok(scheduler);
739        }
740        match &data_type {
741            DataType::Struct(fields) => {
742                if field.is_packed_struct() {
743                    // Packed struct
744                    let column_info = column_infos.expect_next()?;
745                    let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
746                        column_info.as_ref(),
747                        self.decompressor_strategy.as_ref(),
748                        self.cache_repetition_index,
749                        field,
750                    )?);
751
752                    // advance to the next top level column
753                    column_infos.next_top_level();
754
755                    return Ok(scheduler);
756                }
757                // Maybe a blob descriptions struct?
758                if field.is_blob() {
759                    let column_info = column_infos.peek();
760                    if column_info.page_infos.iter().any(|page| {
761                        matches!(
762                            page.encoding,
763                            PageEncoding::Structural(pb21::PageLayout {
764                                layout: Some(pb21::page_layout::Layout::BlobLayout(_))
765                            })
766                        )
767                    }) {
768                        let column_info = column_infos.expect_next()?;
769                        let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
770                            column_info.as_ref(),
771                            self.decompressor_strategy.as_ref(),
772                            self.cache_repetition_index,
773                            field,
774                        )?);
775                        column_infos.next_top_level();
776                        return Ok(scheduler);
777                    }
778                }
779
780                let mut child_schedulers = Vec::with_capacity(field.children.len());
781                for field in field.children.iter() {
782                    let field_scheduler =
783                        self.create_structural_field_scheduler(field, column_infos)?;
784                    child_schedulers.push(field_scheduler);
785                }
786
787                let fields = fields.clone();
788                Ok(
789                    Box::new(StructuralStructScheduler::new(child_schedulers, fields))
790                        as Box<dyn StructuralFieldScheduler>,
791                )
792            }
793            DataType::List(_) | DataType::LargeList(_) => {
794                let child = field.children.first().expect_ok()?;
795                let child_scheduler =
796                    self.create_structural_field_scheduler(child, column_infos)?;
797                Ok(Box::new(StructuralListScheduler::new(child_scheduler))
798                    as Box<dyn StructuralFieldScheduler>)
799            }
800            DataType::FixedSizeList(inner, dimension)
801                if matches!(inner.data_type(), DataType::Struct(_)) =>
802            {
803                let child = field.children.first().expect_ok()?;
804                let child_scheduler =
805                    self.create_structural_field_scheduler(child, column_infos)?;
806                Ok(Box::new(StructuralFixedSizeListScheduler::new(
807                    child_scheduler,
808                    *dimension,
809                )) as Box<dyn StructuralFieldScheduler>)
810            }
811            DataType::Map(_, keys_sorted) => {
812                // TODO: We only support keys_sorted=false for now,
813                //  because converting a rust arrow map field to the python arrow field will
814                //  lose the keys_sorted property.
815                if *keys_sorted {
816                    return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into()));
817                }
818                let entries_child = field.children.first().expect_ok()?;
819                let child_scheduler =
820                    self.create_structural_field_scheduler(entries_child, column_infos)?;
821                Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
822                    as Box<dyn StructuralFieldScheduler>)
823            }
824            _ => todo!("create_structural_field_scheduler for {}", data_type),
825        }
826    }
827
828    fn create_legacy_field_scheduler(
829        &self,
830        field: &Field,
831        column_infos: &mut ColumnInfoIter,
832        buffers: FileBuffers,
833    ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
834        let data_type = field.data_type();
835        if Self::is_primitive_legacy(&data_type) {
836            let column_info = column_infos.expect_next()?;
837            let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
838            return Ok(scheduler);
839        } else if data_type.is_binary_like() {
840            let column_info = column_infos.expect_next()?.clone();
841            // Column is blob and user is asking for binary data
842            if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
843                let desc_scheduler =
844                    self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
845                let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
846                return Ok(blob_scheduler);
847            }
848            if let Some(page_info) = column_info.page_infos.first() {
849                if matches!(
850                    page_info.encoding.as_legacy(),
851                    pb::ArrayEncoding {
852                        array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
853                    }
854                ) {
855                    let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
856                        DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
857                    } else {
858                        DataType::LargeList(Arc::new(ArrowField::new(
859                            "item",
860                            DataType::UInt8,
861                            false,
862                        )))
863                    };
864                    let list_field = Field::try_from(ArrowField::new(
865                        field.name.clone(),
866                        list_type,
867                        field.nullable,
868                    ))
869                    .unwrap();
870                    let list_scheduler = self.create_list_scheduler(
871                        &list_field,
872                        column_infos,
873                        buffers,
874                        &column_info,
875                    )?;
876                    let binary_scheduler = Box::new(BinaryFieldScheduler::new(
877                        list_scheduler.into(),
878                        field.data_type(),
879                    ));
880                    return Ok(binary_scheduler);
881                } else {
882                    let scheduler =
883                        self.create_primitive_scheduler(field, &column_info, buffers)?;
884                    return Ok(scheduler);
885                }
886            } else {
887                return self.create_primitive_scheduler(field, &column_info, buffers);
888            }
889        }
890        match &data_type {
891            DataType::FixedSizeList(inner, _dimension) => {
892                // A fixed size list column could either be a physical or a logical decoder
893                // depending on the child data type.
894                if Self::is_primitive_legacy(inner.data_type()) {
895                    let primitive_col = column_infos.expect_next()?;
896                    let scheduler =
897                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
898                    Ok(scheduler)
899                } else {
900                    todo!()
901                }
902            }
903            DataType::Dictionary(_key_type, value_type) => {
904                if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
905                    let primitive_col = column_infos.expect_next()?;
906                    let scheduler =
907                        self.create_primitive_scheduler(field, primitive_col, buffers)?;
908                    Ok(scheduler)
909                } else {
910                    Err(Error::not_supported_source(
911                        format!(
912                            "No way to decode into a dictionary field of type {}",
913                            value_type
914                        )
915                        .into(),
916                    ))
917                }
918            }
919            DataType::List(_) | DataType::LargeList(_) => {
920                let offsets_column = column_infos.expect_next()?.clone();
921                column_infos.next_top_level();
922                self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
923            }
924            DataType::Struct(fields) => {
925                let column_info = column_infos.expect_next()?;
926
927                // Column is blob and user is asking for descriptions
928                if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
929                    // Can use primitive scheduler here since descriptions are always packed struct
930                    return self.create_primitive_scheduler(field, &blob_col, buffers);
931                }
932
933                if Self::check_packed_struct(column_info) {
934                    // use packed struct encoding
935                    self.create_primitive_scheduler(field, column_info, buffers)
936                } else {
937                    // use default struct encoding
938                    Self::check_simple_struct(column_info, &field.name).unwrap();
939                    let num_rows = column_info
940                        .page_infos
941                        .iter()
942                        .map(|page| page.num_rows)
943                        .sum();
944                    let mut child_schedulers = Vec::with_capacity(field.children.len());
945                    for field in &field.children {
946                        column_infos.next_top_level();
947                        let field_scheduler =
948                            self.create_legacy_field_scheduler(field, column_infos, buffers)?;
949                        child_schedulers.push(Arc::from(field_scheduler));
950                    }
951
952                    let fields = fields.clone();
953                    Ok(Box::new(SimpleStructScheduler::new(
954                        child_schedulers,
955                        fields,
956                        num_rows,
957                    )))
958                }
959            }
960            // TODO: Still need support for RLE
961            _ => todo!(),
962        }
963    }
964}
965
966/// Create's a dummy ColumnInfo for the root column
967fn root_column(num_rows: u64) -> ColumnInfo {
968    let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
969    let final_page_num_rows = num_rows % (u32::MAX as u64);
970    let root_pages = (0..num_root_pages)
971        .map(|i| PageInfo {
972            num_rows: if i == num_root_pages - 1 {
973                final_page_num_rows
974            } else {
975                u64::MAX
976            },
977            encoding: PageEncoding::Legacy(pb::ArrayEncoding {
978                array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
979                    pb::SimpleStruct {},
980                )),
981            }),
982            priority: 0, // not used in legacy scheduler
983            buffer_offsets_and_sizes: Arc::new([]),
984        })
985        .collect::<Vec<_>>();
986    ColumnInfo {
987        buffer_offsets_and_sizes: Arc::new([]),
988        encoding: pb::ColumnEncoding {
989            column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
990        },
991        index: u32::MAX,
992        page_infos: Arc::from(root_pages),
993    }
994}
995
996pub enum RootDecoder {
997    Structural(StructuralStructDecoder),
998    Legacy(SimpleStructDecoder),
999}
1000
1001impl RootDecoder {
1002    pub fn into_structural(self) -> StructuralStructDecoder {
1003        match self {
1004            Self::Structural(decoder) => decoder,
1005            Self::Legacy(_) => panic!("Expected a structural decoder"),
1006        }
1007    }
1008
1009    pub fn into_legacy(self) -> SimpleStructDecoder {
1010        match self {
1011            Self::Legacy(decoder) => decoder,
1012            Self::Structural(_) => panic!("Expected a legacy decoder"),
1013        }
1014    }
1015}
1016
1017impl DecodeBatchScheduler {
1018    /// Creates a new decode scheduler with the expected schema and the column
1019    /// metadata of the file.
1020    #[allow(clippy::too_many_arguments)]
1021    pub async fn try_new<'a>(
1022        schema: &'a Schema,
1023        column_indices: &[u32],
1024        column_infos: &[Arc<ColumnInfo>],
1025        file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1026        num_rows: u64,
1027        _decoder_plugins: Arc<DecoderPlugins>,
1028        io: Arc<dyn EncodingsIo>,
1029        cache: Arc<LanceCache>,
1030        filter: &FilterExpression,
1031        decoder_config: &DecoderConfig,
1032    ) -> Result<Self> {
1033        assert!(num_rows > 0);
1034        let buffers = FileBuffers {
1035            positions_and_sizes: file_buffer_positions_and_sizes,
1036        };
1037        let arrow_schema = ArrowSchema::from(schema);
1038        let root_fields = arrow_schema.fields().clone();
1039        let root_type = DataType::Struct(root_fields.clone());
1040        let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1041        // root_field.children and schema.fields should be identical at this point but the latter
1042        // has field ids and the former does not.  This line restores that.
1043        // TODO:  Is there another way to create the root field without forcing a trip through arrow?
1044        root_field.children.clone_from(&schema.fields);
1045        root_field
1046            .metadata
1047            .insert("__lance_decoder_root".to_string(), "true".to_string());
1048
1049        if column_infos.is_empty() || column_infos[0].is_structural() {
1050            let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1051
1052            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1053            let mut root_scheduler =
1054                strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1055
1056            let context = SchedulerContext::new(io, cache.clone());
1057            root_scheduler.initialize(filter, &context).await?;
1058
1059            Ok(Self {
1060                root_scheduler: RootScheduler::Structural(root_scheduler),
1061                root_fields,
1062                cache,
1063            })
1064        } else {
1065            // The old encoding style expected a header column for structs and so we
1066            // need a header column for the top-level struct
1067            let mut columns = Vec::with_capacity(column_infos.len() + 1);
1068            columns.push(Arc::new(root_column(num_rows)));
1069            columns.extend(column_infos.iter().cloned());
1070
1071            let adjusted_column_indices = [0_u32]
1072                .into_iter()
1073                .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1074                .collect::<Vec<_>>();
1075            let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1076            let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1077            let root_scheduler =
1078                strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1079
1080            let context = SchedulerContext::new(io, cache.clone());
1081            root_scheduler.initialize(filter, &context).await?;
1082
1083            Ok(Self {
1084                root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1085                root_fields,
1086                cache,
1087            })
1088        }
1089    }
1090
1091    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1092    pub fn from_scheduler(
1093        root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1094        root_fields: Fields,
1095        cache: Arc<LanceCache>,
1096    ) -> Self {
1097        Self {
1098            root_scheduler: RootScheduler::Legacy(root_scheduler),
1099            root_fields,
1100            cache,
1101        }
1102    }
1103
1104    fn do_schedule_ranges_structural(
1105        &mut self,
1106        ranges: &[Range<u64>],
1107        filter: &FilterExpression,
1108        io: Arc<dyn EncodingsIo>,
1109        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1110    ) {
1111        let root_scheduler = self.root_scheduler.as_structural();
1112        let mut context = SchedulerContext::new(io, self.cache.clone());
1113        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1114        if let Err(schedule_ranges_err) = maybe_root_job {
1115            schedule_action(Err(schedule_ranges_err));
1116            return;
1117        }
1118        let mut root_job = maybe_root_job.unwrap();
1119        let mut num_rows_scheduled = 0;
1120        loop {
1121            let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1122            if let Err(err) = maybe_next_scan_lines {
1123                schedule_action(Err(err));
1124                return;
1125            }
1126            let next_scan_lines = maybe_next_scan_lines.unwrap();
1127            if next_scan_lines.is_empty() {
1128                return;
1129            }
1130            for next_scan_line in next_scan_lines {
1131                trace!(
1132                    "Scheduled scan line of {} rows and {} decoders",
1133                    next_scan_line.rows_scheduled,
1134                    next_scan_line.decoders.len()
1135                );
1136                num_rows_scheduled += next_scan_line.rows_scheduled;
1137                if !schedule_action(Ok(DecoderMessage {
1138                    scheduled_so_far: num_rows_scheduled,
1139                    decoders: next_scan_line.decoders,
1140                })) {
1141                    // Decoder has disconnected
1142                    return;
1143                }
1144            }
1145        }
1146    }
1147
1148    fn do_schedule_ranges_legacy(
1149        &mut self,
1150        ranges: &[Range<u64>],
1151        filter: &FilterExpression,
1152        io: Arc<dyn EncodingsIo>,
1153        mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1154        // If specified, this will be used as the top_level_row for all scheduling
1155        // tasks.  This is used by list scheduling to ensure all items scheduling
1156        // tasks are scheduled at the same top level row.
1157        priority: Option<Box<dyn PriorityRange>>,
1158    ) {
1159        let root_scheduler = self.root_scheduler.as_legacy();
1160        let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1161        trace!(
1162            "Scheduling {} ranges across {}..{} ({} rows){}",
1163            ranges.len(),
1164            ranges.first().unwrap().start,
1165            ranges.last().unwrap().end,
1166            rows_requested,
1167            priority
1168                .as_ref()
1169                .map(|p| format!(" (priority={:?})", p))
1170                .unwrap_or_default()
1171        );
1172
1173        let mut context = SchedulerContext::new(io, self.cache.clone());
1174        let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1175        if let Err(schedule_ranges_err) = maybe_root_job {
1176            schedule_action(Err(schedule_ranges_err));
1177            return;
1178        }
1179        let mut root_job = maybe_root_job.unwrap();
1180        let mut num_rows_scheduled = 0;
1181        let mut rows_to_schedule = root_job.num_rows();
1182        let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1183        trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1184        while rows_to_schedule > 0 {
1185            let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1186            if let Err(schedule_next_err) = maybe_next_scan_line {
1187                schedule_action(Err(schedule_next_err));
1188                return;
1189            }
1190            let next_scan_line = maybe_next_scan_line.unwrap();
1191            priority.advance(next_scan_line.rows_scheduled);
1192            num_rows_scheduled += next_scan_line.rows_scheduled;
1193            rows_to_schedule -= next_scan_line.rows_scheduled;
1194            trace!(
1195                "Scheduled scan line of {} rows and {} decoders",
1196                next_scan_line.rows_scheduled,
1197                next_scan_line.decoders.len()
1198            );
1199            if !schedule_action(Ok(DecoderMessage {
1200                scheduled_so_far: num_rows_scheduled,
1201                decoders: next_scan_line.decoders,
1202            })) {
1203                // Decoder has disconnected
1204                return;
1205            }
1206
1207            trace!("Finished scheduling {} ranges", ranges.len());
1208        }
1209    }
1210
1211    fn do_schedule_ranges(
1212        &mut self,
1213        ranges: &[Range<u64>],
1214        filter: &FilterExpression,
1215        io: Arc<dyn EncodingsIo>,
1216        schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1217        // If specified, this will be used as the top_level_row for all scheduling
1218        // tasks.  This is used by list scheduling to ensure all items scheduling
1219        // tasks are scheduled at the same top level row.
1220        priority: Option<Box<dyn PriorityRange>>,
1221    ) {
1222        match &self.root_scheduler {
1223            RootScheduler::Legacy(_) => {
1224                self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1225            }
1226            RootScheduler::Structural(_) => {
1227                self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1228            }
1229        }
1230    }
1231
1232    // This method is similar to schedule_ranges but instead of
1233    // sending the decoders to a channel it collects them all into a vector
1234    pub fn schedule_ranges_to_vec(
1235        &mut self,
1236        ranges: &[Range<u64>],
1237        filter: &FilterExpression,
1238        io: Arc<dyn EncodingsIo>,
1239        priority: Option<Box<dyn PriorityRange>>,
1240    ) -> Result<Vec<DecoderMessage>> {
1241        let mut decode_messages = Vec::new();
1242        self.do_schedule_ranges(
1243            ranges,
1244            filter,
1245            io,
1246            |msg| {
1247                decode_messages.push(msg);
1248                true
1249            },
1250            priority,
1251        );
1252        decode_messages.into_iter().collect::<Result<Vec<_>>>()
1253    }
1254
1255    /// Schedules the load of multiple ranges of rows
1256    ///
1257    /// Ranges must be non-overlapping and in sorted order
1258    ///
1259    /// # Arguments
1260    ///
1261    /// * `ranges` - The ranges of rows to load
1262    /// * `sink` - A channel to send the decode tasks
1263    /// * `scheduler` An I/O scheduler to issue I/O requests
1264    #[instrument(skip_all)]
1265    pub fn schedule_ranges(
1266        &mut self,
1267        ranges: &[Range<u64>],
1268        filter: &FilterExpression,
1269        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1270        scheduler: Arc<dyn EncodingsIo>,
1271    ) {
1272        self.do_schedule_ranges(
1273            ranges,
1274            filter,
1275            scheduler,
1276            |msg| {
1277                match sink.send(msg) {
1278                    Ok(_) => true,
1279                    Err(SendError { .. }) => {
1280                        // The receiver has gone away.  We can't do anything about it
1281                        // so just ignore the error.
1282                        debug!(
1283                        "schedule_ranges aborting early since decoder appears to have been dropped"
1284                    );
1285                        false
1286                    }
1287                }
1288            },
1289            None,
1290        )
1291    }
1292
1293    /// Schedules the load of a range of rows
1294    ///
1295    /// # Arguments
1296    ///
1297    /// * `range` - The range of rows to load
1298    /// * `sink` - A channel to send the decode tasks
1299    /// * `scheduler` An I/O scheduler to issue I/O requests
1300    #[instrument(skip_all)]
1301    pub fn schedule_range(
1302        &mut self,
1303        range: Range<u64>,
1304        filter: &FilterExpression,
1305        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1306        scheduler: Arc<dyn EncodingsIo>,
1307    ) {
1308        self.schedule_ranges(&[range], filter, sink, scheduler)
1309    }
1310
1311    /// Schedules the load of selected rows
1312    ///
1313    /// # Arguments
1314    ///
1315    /// * `indices` - The row indices to load (these must be in ascending order!)
1316    /// * `sink` - A channel to send the decode tasks
1317    /// * `scheduler` An I/O scheduler to issue I/O requests
1318    pub fn schedule_take(
1319        &mut self,
1320        indices: &[u64],
1321        filter: &FilterExpression,
1322        sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1323        scheduler: Arc<dyn EncodingsIo>,
1324    ) {
1325        debug_assert!(indices.windows(2).all(|w| w[0] < w[1]));
1326        if indices.is_empty() {
1327            return;
1328        }
1329        trace!("Scheduling take of {} rows", indices.len());
1330        let ranges = Self::indices_to_ranges(indices);
1331        self.schedule_ranges(&ranges, filter, sink, scheduler)
1332    }
1333
1334    // coalesce continuous indices if possible (the input indices must be sorted and non-empty)
1335    fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1336        let mut ranges = Vec::new();
1337        let mut start = indices[0];
1338
1339        for window in indices.windows(2) {
1340            if window[1] != window[0] + 1 {
1341                ranges.push(start..window[0] + 1);
1342                start = window[1];
1343            }
1344        }
1345
1346        ranges.push(start..*indices.last().unwrap() + 1);
1347        ranges
1348    }
1349}
1350
1351pub struct ReadBatchTask {
1352    pub task: BoxFuture<'static, Result<RecordBatch>>,
1353    pub num_rows: u32,
1354}
1355
1356/// A stream that takes scheduled jobs and generates decode tasks from them.
1357pub struct BatchDecodeStream {
1358    context: DecoderContext,
1359    root_decoder: SimpleStructDecoder,
1360    rows_remaining: u64,
1361    rows_per_batch: u32,
1362    rows_scheduled: u64,
1363    rows_drained: u64,
1364    scheduler_exhausted: bool,
1365    emitted_batch_size_warning: Arc<Once>,
1366}
1367
1368impl BatchDecodeStream {
1369    /// Create a new instance of a batch decode stream
1370    ///
1371    /// # Arguments
1372    ///
1373    /// * `scheduled` - an incoming stream of decode tasks from a `DecodeBatchScheduler`
1374    /// * `schema` - the schema of the data to create
1375    /// * `rows_per_batch` the number of rows to create before making a batch
1376    /// * `num_rows` the total number of rows scheduled
1377    /// * `num_columns` the total number of columns in the file
1378    pub fn new(
1379        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1380        rows_per_batch: u32,
1381        num_rows: u64,
1382        root_decoder: SimpleStructDecoder,
1383    ) -> Self {
1384        Self {
1385            context: DecoderContext::new(scheduled),
1386            root_decoder,
1387            rows_remaining: num_rows,
1388            rows_per_batch,
1389            rows_scheduled: 0,
1390            rows_drained: 0,
1391            scheduler_exhausted: false,
1392            emitted_batch_size_warning: Arc::new(Once::new()),
1393        }
1394    }
1395
1396    fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1397        if decoder.path.is_empty() {
1398            // The root decoder we can ignore
1399            Ok(())
1400        } else {
1401            self.root_decoder.accept_child(decoder)
1402        }
1403    }
1404
1405    #[instrument(level = "debug", skip_all)]
1406    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1407        if self.scheduler_exhausted {
1408            return Ok(self.rows_scheduled);
1409        }
1410        while self.rows_scheduled < scheduled_need {
1411            let next_message = self.context.source.recv().await;
1412            match next_message {
1413                Some(scan_line) => {
1414                    let scan_line = scan_line?;
1415                    self.rows_scheduled = scan_line.scheduled_so_far;
1416                    for message in scan_line.decoders {
1417                        self.accept_decoder(message.into_legacy())?;
1418                    }
1419                }
1420                None => {
1421                    // Schedule ended before we got all the data we expected.  This probably
1422                    // means some kind of pushdown filter was applied and we didn't load as
1423                    // much data as we thought we would.
1424                    self.scheduler_exhausted = true;
1425                    return Ok(self.rows_scheduled);
1426                }
1427            }
1428        }
1429        Ok(scheduled_need)
1430    }
1431
1432    #[instrument(level = "debug", skip_all)]
1433    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1434        trace!(
1435            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1436            self.rows_remaining, self.rows_drained, self.rows_scheduled,
1437        );
1438        if self.rows_remaining == 0 {
1439            return Ok(None);
1440        }
1441
1442        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1443        self.rows_remaining -= to_take;
1444
1445        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1446        trace!(
1447            "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1448            scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1449        );
1450        if scheduled_need > 0 {
1451            let desired_scheduled = scheduled_need + self.rows_scheduled;
1452            trace!(
1453                "Draining from scheduler (desire at least {} scheduled rows)",
1454                desired_scheduled
1455            );
1456            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1457            if actually_scheduled < desired_scheduled {
1458                let under_scheduled = desired_scheduled - actually_scheduled;
1459                to_take -= under_scheduled;
1460            }
1461        }
1462
1463        if to_take == 0 {
1464            return Ok(None);
1465        }
1466
1467        // wait_for_loaded waits for *>* loaded_need (not >=) so we do a -1 here
1468        let loaded_need = self.rows_drained + to_take - 1;
1469        trace!(
1470            "Waiting for I/O (desire at least {} fully loaded rows)",
1471            loaded_need
1472        );
1473        self.root_decoder.wait_for_loaded(loaded_need).await?;
1474
1475        let next_task = self.root_decoder.drain(to_take)?;
1476        self.rows_drained += to_take;
1477        Ok(Some(next_task))
1478    }
1479
1480    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1481        let stream = futures::stream::unfold(self, |mut slf| async move {
1482            let next_task = slf.next_batch_task().await;
1483            let next_task = next_task.transpose().map(|next_task| {
1484                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1485                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1486                let task = async move {
1487                    let next_task = next_task?;
1488                    // Real decode work happens inside into_batch, which can block the current
1489                    // thread for a long time. By spawning it as a new task, we allow Tokio's
1490                    // worker threads to keep making progress.
1491                    let (batch, _data_size) =
1492                        tokio::spawn(
1493                            async move { next_task.into_batch(emitted_batch_size_warning) },
1494                        )
1495                        .await
1496                        .map_err(|err| Error::wrapped(err.into()))??;
1497                    Ok(batch)
1498                };
1499                (task, num_rows)
1500            });
1501            next_task.map(|(task, num_rows)| {
1502                // This should be true since batch size is u32
1503                debug_assert!(num_rows <= u32::MAX as u64);
1504                let next_task = ReadBatchTask {
1505                    task: task.boxed(),
1506                    num_rows: num_rows as u32,
1507                };
1508                (next_task, slf)
1509            })
1510        });
1511        stream.boxed()
1512    }
1513}
1514
1515// Utility types to smooth out the differences between the 2.0 and 2.1 decoders so that
1516// we can have a single implementation of the batch decode iterator
1517enum RootDecoderMessage {
1518    LoadedPage(LoadedPageShard),
1519    LegacyPage(crate::previous::decoder::DecoderReady),
1520}
1521trait RootDecoderType {
1522    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1523    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1524    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1525}
1526impl RootDecoderType for StructuralStructDecoder {
1527    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1528        let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1529            unreachable!()
1530        };
1531        self.accept_page(loaded_page)
1532    }
1533    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1534        self.drain_batch_task(num_rows)
1535    }
1536    fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1537        // Waiting happens elsewhere (not as part of the decoder)
1538        Ok(())
1539    }
1540}
1541impl RootDecoderType for SimpleStructDecoder {
1542    fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1543        let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1544            unreachable!()
1545        };
1546        self.accept_child(legacy_page)
1547    }
1548    fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1549        self.drain(num_rows)
1550    }
1551    fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1552        runtime.block_on(self.wait_for_loaded(loaded_need))
1553    }
1554}
1555
1556/// A blocking batch decoder that performs synchronous decoding
1557struct BatchDecodeIterator<T: RootDecoderType> {
1558    messages: VecDeque<Result<DecoderMessage>>,
1559    root_decoder: T,
1560    rows_remaining: u64,
1561    rows_per_batch: u32,
1562    rows_scheduled: u64,
1563    rows_drained: u64,
1564    emitted_batch_size_warning: Arc<Once>,
1565    // Note: this is not the runtime on which I/O happens.
1566    // That's always in the scheduler.  This is just a runtime we use to
1567    // sleep the current thread if I/O is unready
1568    wait_for_io_runtime: tokio::runtime::Runtime,
1569    schema: Arc<ArrowSchema>,
1570}
1571
1572impl<T: RootDecoderType> BatchDecodeIterator<T> {
1573    /// Create a new instance of a batch decode iterator
1574    pub fn new(
1575        messages: VecDeque<Result<DecoderMessage>>,
1576        rows_per_batch: u32,
1577        num_rows: u64,
1578        root_decoder: T,
1579        schema: Arc<ArrowSchema>,
1580    ) -> Self {
1581        Self {
1582            messages,
1583            root_decoder,
1584            rows_remaining: num_rows,
1585            rows_per_batch,
1586            rows_scheduled: 0,
1587            rows_drained: 0,
1588            wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1589                .build()
1590                .unwrap(),
1591            emitted_batch_size_warning: Arc::new(Once::new()),
1592            schema,
1593        }
1594    }
1595
1596    /// Wait for a single page of data to finish loading
1597    ///
1598    /// If the data is not available this will perform a *blocking* wait (put
1599    /// the current thread to sleep)
1600    fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1601        match maybe_done(unloaded_page.0) {
1602            // Fast path, avoid all runtime shenanigans if the data is ready
1603            MaybeDone::Done(loaded_page) => loaded_page,
1604            // Slow path, we need to wait on I/O, enter the runtime
1605            MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1606            MaybeDone::Gone => unreachable!(),
1607        }
1608    }
1609
1610    /// Waits for I/O until `scheduled_need` rows have been loaded
1611    ///
1612    /// Note that `scheduled_need` is cumulative.  E.g. this method
1613    /// should be called with 5, 10, 15 and not 5, 5, 5
1614    #[instrument(skip_all)]
1615    fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1616        while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1617            let message = self.messages.pop_front().unwrap()?;
1618            self.rows_scheduled = message.scheduled_so_far;
1619            for decoder_message in message.decoders {
1620                match decoder_message {
1621                    MessageType::UnloadedPage(unloaded_page) => {
1622                        let loaded_page = self.wait_for_page(unloaded_page)?;
1623                        self.root_decoder
1624                            .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1625                    }
1626                    MessageType::DecoderReady(decoder_ready) => {
1627                        // The root decoder we can ignore
1628                        if !decoder_ready.path.is_empty() {
1629                            self.root_decoder
1630                                .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1631                        }
1632                    }
1633                }
1634            }
1635        }
1636
1637        let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1638
1639        self.root_decoder
1640            .wait(loaded_need, &self.wait_for_io_runtime)?;
1641        Ok(self.rows_scheduled)
1642    }
1643
1644    #[instrument(level = "debug", skip_all)]
1645    fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1646        trace!(
1647            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1648            self.rows_remaining, self.rows_drained, self.rows_scheduled,
1649        );
1650        if self.rows_remaining == 0 {
1651            return Ok(None);
1652        }
1653
1654        let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1655        self.rows_remaining -= to_take;
1656
1657        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1658        trace!(
1659            "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1660            scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1661        );
1662        if scheduled_need > 0 {
1663            let desired_scheduled = scheduled_need + self.rows_scheduled;
1664            trace!(
1665                "Draining from scheduler (desire at least {} scheduled rows)",
1666                desired_scheduled
1667            );
1668            let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1669            if actually_scheduled < desired_scheduled {
1670                let under_scheduled = desired_scheduled - actually_scheduled;
1671                to_take -= under_scheduled;
1672            }
1673        }
1674
1675        if to_take == 0 {
1676            return Ok(None);
1677        }
1678
1679        let next_task = self.root_decoder.drain_batch(to_take)?;
1680
1681        self.rows_drained += to_take;
1682
1683        let (batch, _data_size) = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1684
1685        Ok(Some(batch))
1686    }
1687}
1688
1689impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1690    type Item = ArrowResult<RecordBatch>;
1691
1692    fn next(&mut self) -> Option<Self::Item> {
1693        self.next_batch_task()
1694            .transpose()
1695            .map(|r| r.map_err(ArrowError::from))
1696    }
1697}
1698
1699impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1700    fn schema(&self) -> Arc<ArrowSchema> {
1701        self.schema.clone()
1702    }
1703}
1704
1705/// Estimate the number of bytes per row for a given Arrow data type.
1706///
1707/// For fixed-width types this is exact. For variable-width types (strings,
1708/// binary, lists) a rough default is used. The estimate is used as a
1709/// starting point when `batch_size_bytes` is set; a post-decode feedback
1710/// loop corrects it after the first batch.
1711///
1712/// This estimate ignores validity bitmaps at the moment.  We can't infer
1713/// their presence simply from the data_type and their impact is probably
1714/// fairly negligible.
1715fn estimate_bytes_per_row(data_type: &DataType) -> f64 {
1716    if let Some(w) = data_type.byte_width_opt() {
1717        return w as f64;
1718    }
1719    match data_type {
1720        DataType::Boolean => 1.0 / 8.0,
1721        DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => 64.0,
1722        DataType::Struct(fields) => fields
1723            .iter()
1724            .map(|f| estimate_bytes_per_row(f.data_type()))
1725            .sum(),
1726        DataType::List(child) | DataType::LargeList(child) => {
1727            5.0 * estimate_bytes_per_row(child.data_type())
1728        }
1729        DataType::FixedSizeList(child, dim) => {
1730            *dim as f64 * estimate_bytes_per_row(child.data_type())
1731        }
1732        DataType::Dictionary(_, value_type) => estimate_bytes_per_row(value_type),
1733        DataType::Map(entries, _) => 5.0 * estimate_bytes_per_row(entries.data_type()),
1734        _ => 64.0,
1735    }
1736}
1737
1738/// A stream that takes scheduled jobs and generates decode tasks from them.
1739pub struct StructuralBatchDecodeStream {
1740    context: DecoderContext,
1741    root_decoder: StructuralStructDecoder,
1742    rows_remaining: u64,
1743    rows_per_batch: u32,
1744    rows_scheduled: u64,
1745    rows_drained: u64,
1746    scheduler_exhausted: bool,
1747    emitted_batch_size_warning: Arc<Once>,
1748    // Decode scheduling policy selected at planning time.
1749    //
1750    // Performance tradeoff:
1751    // - true: spawn `into_batch` onto Tokio, which improves scan throughput by allowing
1752    //   more decode parallelism.
1753    // - false: run `into_batch` inline, which avoids Tokio scheduling overhead and is
1754    //   typically better for point lookups / small takes.
1755    spawn_batch_decode_tasks: bool,
1756    /// If set, target this many bytes per batch instead of `rows_per_batch` rows.
1757    batch_size_bytes: Option<u64>,
1758    /// Schema-based estimate of bytes per row, computed once at construction.
1759    /// Only meaningful when `batch_size_bytes` is `Some`.
1760    schema_bytes_per_row: f64,
1761    /// Post-decode feedback: actual bytes-per-row measured from the most
1762    /// recently decoded batch.  Zero means no feedback yet (use schema estimate).
1763    bytes_per_row_feedback: Arc<AtomicU64>,
1764}
1765
1766impl StructuralBatchDecodeStream {
1767    /// Create a new instance of a batch decode stream
1768    ///
1769    /// # Arguments
1770    ///
1771    /// * `scheduled` - an incoming stream of decode tasks from a `DecodeBatchScheduler`
1772    /// * `schema` - the schema of the data to create
1773    /// * `rows_per_batch` the number of rows to create before making a batch
1774    /// * `num_rows` the total number of rows scheduled
1775    /// * `num_columns` the total number of columns in the file
1776    pub fn new(
1777        scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1778        rows_per_batch: u32,
1779        num_rows: u64,
1780        root_decoder: StructuralStructDecoder,
1781        spawn_batch_decode_tasks: bool,
1782        batch_size_bytes: Option<u64>,
1783    ) -> Self {
1784        let schema_bytes_per_row = if batch_size_bytes.is_some() {
1785            estimate_bytes_per_row(root_decoder.data_type()).max(1.0)
1786        } else {
1787            0.0
1788        };
1789        Self {
1790            context: DecoderContext::new(scheduled),
1791            root_decoder,
1792            rows_remaining: num_rows,
1793            rows_per_batch,
1794            rows_scheduled: 0,
1795            rows_drained: 0,
1796            scheduler_exhausted: false,
1797            emitted_batch_size_warning: Arc::new(Once::new()),
1798            spawn_batch_decode_tasks,
1799            batch_size_bytes,
1800            schema_bytes_per_row,
1801            bytes_per_row_feedback: Arc::new(AtomicU64::new(0)),
1802        }
1803    }
1804
1805    #[instrument(level = "debug", skip_all)]
1806    async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1807        if self.scheduler_exhausted {
1808            return Ok(self.rows_scheduled);
1809        }
1810        while self.rows_scheduled < scheduled_need {
1811            let next_message = self.context.source.recv().await;
1812            match next_message {
1813                Some(scan_line) => {
1814                    let scan_line = scan_line?;
1815                    self.rows_scheduled = scan_line.scheduled_so_far;
1816                    for message in scan_line.decoders {
1817                        let unloaded_page = message.into_structural();
1818                        let loaded_page = unloaded_page.0.await?;
1819                        self.root_decoder.accept_page(loaded_page)?;
1820                    }
1821                }
1822                None => {
1823                    // Schedule ended before we got all the data we expected.  This probably
1824                    // means some kind of pushdown filter was applied and we didn't load as
1825                    // much data as we thought we would.
1826                    self.scheduler_exhausted = true;
1827                    return Ok(self.rows_scheduled);
1828                }
1829            }
1830        }
1831        Ok(scheduled_need)
1832    }
1833
1834    #[instrument(level = "debug", skip_all)]
1835    async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1836        trace!(
1837            "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1838            self.rows_remaining, self.rows_drained, self.rows_scheduled,
1839        );
1840        if self.rows_remaining == 0 {
1841            return Ok(None);
1842        }
1843
1844        let mut to_take = if let Some(batch_size_bytes) = self.batch_size_bytes {
1845            let feedback = self.bytes_per_row_feedback.load(Ordering::Relaxed);
1846            let bpr = if feedback > 0 {
1847                feedback as f64
1848            } else {
1849                self.schema_bytes_per_row
1850            };
1851            let rows = (batch_size_bytes as f64 / bpr) as u64;
1852            self.rows_remaining.min(rows.max(1))
1853        } else {
1854            self.rows_remaining.min(self.rows_per_batch as u64)
1855        };
1856        self.rows_remaining -= to_take;
1857
1858        let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1859        trace!(
1860            "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1861            scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1862        );
1863        if scheduled_need > 0 {
1864            let desired_scheduled = scheduled_need + self.rows_scheduled;
1865            trace!(
1866                "Draining from scheduler (desire at least {} scheduled rows)",
1867                desired_scheduled
1868            );
1869            let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1870            if actually_scheduled < desired_scheduled {
1871                let under_scheduled = desired_scheduled - actually_scheduled;
1872                to_take -= under_scheduled;
1873            }
1874        }
1875
1876        if to_take == 0 {
1877            return Ok(None);
1878        }
1879
1880        let next_task = self.root_decoder.drain_batch_task(to_take)?;
1881        self.rows_drained += to_take;
1882        Ok(Some(next_task))
1883    }
1884
1885    pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1886        let stream = futures::stream::unfold(self, |mut slf| async move {
1887            let next_task = slf.next_batch_task().await;
1888            let next_task = next_task.transpose().map(|next_task| {
1889                let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1890                let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1891                let bytes_per_row_feedback = slf.bytes_per_row_feedback.clone();
1892                // Capture the per-stream policy once so every emitted batch task follows the
1893                // same throughput-vs-overhead choice made by the scheduler.
1894                let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks;
1895                let task = async move {
1896                    let next_task = next_task?;
1897                    let (batch, data_size) = if spawn_batch_decode_tasks {
1898                        tokio::spawn(
1899                            async move { next_task.into_batch(emitted_batch_size_warning) },
1900                        )
1901                        .await
1902                        .map_err(|err| Error::wrapped(err.into()))??
1903                    } else {
1904                        next_task.into_batch(emitted_batch_size_warning)?
1905                    };
1906                    let num_rows = batch.num_rows() as u64;
1907                    if num_rows > 0 {
1908                        let bpr = data_size / num_rows;
1909                        let prev = bytes_per_row_feedback.load(Ordering::Relaxed);
1910                        let next = if prev == 0 || bpr >= prev {
1911                            // First batch or actual size is larger than estimate:
1912                            // adopt immediately to avoid OOM.
1913                            bpr
1914                        } else {
1915                            // Actual size is smaller: degrade gradually toward
1916                            // the true value to avoid over-correcting on a
1917                            // single anomalous batch.
1918                            (prev + bpr) / 2
1919                        };
1920                        bytes_per_row_feedback.store(next.max(1), Ordering::Relaxed);
1921                    }
1922                    Ok(batch)
1923                };
1924                (task, num_rows)
1925            });
1926            next_task.map(|(task, num_rows)| {
1927                // This should be true since batch size is u32
1928                debug_assert!(num_rows <= u32::MAX as u64);
1929                let next_task = ReadBatchTask {
1930                    task: task.boxed(),
1931                    num_rows: num_rows as u32,
1932                };
1933                (next_task, slf)
1934            })
1935        });
1936        stream.boxed()
1937    }
1938}
1939
1940#[derive(Debug)]
1941pub enum RequestedRows {
1942    Ranges(Vec<Range<u64>>),
1943    Indices(Vec<u64>),
1944}
1945
1946impl RequestedRows {
1947    pub fn num_rows(&self) -> u64 {
1948        match self {
1949            Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1950            Self::Indices(indices) => indices.len() as u64,
1951        }
1952    }
1953
1954    pub fn trim_empty_ranges(mut self) -> Self {
1955        if let Self::Ranges(ranges) = &mut self {
1956            ranges.retain(|r| !r.is_empty());
1957        }
1958        self
1959    }
1960}
1961
1962/// Configuration for decoder behavior
1963#[derive(Debug, Clone)]
1964pub struct DecoderConfig {
1965    /// Whether to cache repetition indices for better performance.
1966    ///
1967    /// This defaults to the `LANCE_READ_CACHE_REPETITION_INDEX` environment variable
1968    /// when present and is enabled by default. Set the env var to a non-truthy
1969    /// value (for example `0` or `false`) to disable it. The env var is read
1970    /// once per process.
1971    pub cache_repetition_index: bool,
1972    /// Whether to validate decoded data
1973    pub validate_on_decode: bool,
1974    /// Override the strategy used to dispatch the scheduling work in
1975    /// [`schedule_and_decode`].
1976    ///
1977    /// `schedule_and_decode` always awaits the scheduler's `initialize` (which
1978    /// performs metadata I/O) before returning.  This flag controls what
1979    /// happens with the subsequent (synchronous) work of pushing decoder
1980    /// messages into the channel that feeds the decode stream.
1981    ///
1982    /// * `None` - default behavior: the scheduling work runs inline (as part
1983    ///   of the `schedule_and_decode` await) when the request is small
1984    ///   (controlled by the `LANCE_INLINE_SCHEDULING_THRESHOLD` env var) and
1985    ///   is dispatched onto a spawned task otherwise.
1986    /// * `Some(true)` - always run scheduling inline.  The await of
1987    ///   `schedule_and_decode` does not return until every decoder message
1988    ///   has been queued.
1989    /// * `Some(false)` - always spawn a task for scheduling so that it can
1990    ///   overlap with consumption of the decode stream.
1991    pub inline_scheduling: Option<bool>,
1992}
1993
1994impl Default for DecoderConfig {
1995    fn default() -> Self {
1996        Self {
1997            cache_repetition_index: default_cache_repetition_index(),
1998            validate_on_decode: false,
1999            inline_scheduling: None,
2000        }
2001    }
2002}
2003
2004#[derive(Debug, Clone)]
2005pub struct SchedulerDecoderConfig {
2006    pub decoder_plugins: Arc<DecoderPlugins>,
2007    pub batch_size: u32,
2008    pub io: Arc<dyn EncodingsIo>,
2009    pub cache: Arc<LanceCache>,
2010    /// Decoder configuration
2011    pub decoder_config: DecoderConfig,
2012    /// If set, target this many bytes per batch instead of using `batch_size` rows.
2013    ///
2014    /// Only supported for v2.1+ (structural) files. For v2.0 files this
2015    /// option is ignored and a warning is logged.
2016    pub batch_size_bytes: Option<u64>,
2017}
2018
2019fn check_scheduler_on_drop(
2020    stream: BoxStream<'static, ReadBatchTask>,
2021    scheduler_handle: tokio::task::JoinHandle<()>,
2022) -> BoxStream<'static, ReadBatchTask> {
2023    // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which
2024    // will panic if the scheduler panicked).  This let's us check if the scheduler panicked
2025    // when the stream finishes.
2026    let abort_handle = scheduler_handle.abort_handle();
2027    let mut scheduler_handle = Some(scheduler_handle);
2028    let check_scheduler = stream::unfold((), move |_| {
2029        let handle = scheduler_handle.take();
2030        async move {
2031            if let Some(handle) = handle {
2032                handle.await.unwrap();
2033            }
2034            None
2035        }
2036    });
2037    stream
2038        .chain(check_scheduler)
2039        .on_drop(move || {
2040            // Abort the scheduler task on early drop. The scheduler task holds
2041            // a reference to the I/O scheduler (via config.io) which keeps the
2042            // ScanScheduler alive. If the scheduler task is stuck waiting for
2043            // initialization I/O (which is blocked on backpressure that will
2044            // never drain because no one is consuming the stream), we need to
2045            // abort it so it releases its I/O reference and allows the
2046            // ScanScheduler to drop and cancel pending I/O.
2047            abort_handle.abort();
2048        })
2049        .boxed()
2050}
2051
2052#[allow(clippy::too_many_arguments)]
2053pub fn create_decode_stream(
2054    schema: &Schema,
2055    num_rows: u64,
2056    batch_size: u32,
2057    is_structural: bool,
2058    should_validate: bool,
2059    spawn_structural_batch_decode_tasks: bool,
2060    rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2061    batch_size_bytes: Option<u64>,
2062) -> Result<BoxStream<'static, ReadBatchTask>> {
2063    if is_structural {
2064        let arrow_schema = ArrowSchema::from(schema);
2065        let structural_decoder = StructuralStructDecoder::new(
2066            arrow_schema.fields,
2067            should_validate,
2068            /*is_root=*/ true,
2069        )?;
2070        Ok(StructuralBatchDecodeStream::new(
2071            rx,
2072            batch_size,
2073            num_rows,
2074            structural_decoder,
2075            spawn_structural_batch_decode_tasks,
2076            batch_size_bytes,
2077        )
2078        .into_stream())
2079    } else {
2080        if batch_size_bytes.is_some() {
2081            warn!("batch_size_bytes is not supported for v2.0 files and will be ignored");
2082        }
2083        let arrow_schema = ArrowSchema::from(schema);
2084        let root_fields = arrow_schema.fields;
2085
2086        let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
2087        Ok(BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream())
2088    }
2089}
2090
2091/// Creates a iterator that decodes a set of messages in a blocking fashion
2092///
2093/// See [`schedule_and_decode_blocking`] for more information.
2094pub fn create_decode_iterator(
2095    schema: &Schema,
2096    num_rows: u64,
2097    batch_size: u32,
2098    should_validate: bool,
2099    is_structural: bool,
2100    messages: VecDeque<Result<DecoderMessage>>,
2101) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2102    let arrow_schema = Arc::new(ArrowSchema::from(schema));
2103    let root_fields = arrow_schema.fields.clone();
2104    if is_structural {
2105        let simple_struct_decoder =
2106            StructuralStructDecoder::new(root_fields, should_validate, /*is_root=*/ true)?;
2107        Ok(Box::new(BatchDecodeIterator::new(
2108            messages,
2109            batch_size,
2110            num_rows,
2111            simple_struct_decoder,
2112            arrow_schema,
2113        )))
2114    } else {
2115        let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
2116        Ok(Box::new(BatchDecodeIterator::new(
2117            messages,
2118            batch_size,
2119            num_rows,
2120            root_decoder,
2121            arrow_schema,
2122        )))
2123    }
2124}
2125
2126async fn create_scheduler_decoder(
2127    column_infos: Vec<Arc<ColumnInfo>>,
2128    requested_rows: RequestedRows,
2129    filter: FilterExpression,
2130    column_indices: Vec<u32>,
2131    target_schema: Arc<Schema>,
2132    config: SchedulerDecoderConfig,
2133) -> Result<BoxStream<'static, ReadBatchTask>> {
2134    let num_rows = requested_rows.num_rows();
2135
2136    let is_structural = column_infos[0].is_structural();
2137    let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2138    let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() {
2139        Some("always") => true,
2140        Some("never") => false,
2141        _ => matches!(requested_rows, RequestedRows::Ranges(_)),
2142    };
2143
2144    let (tx, rx) = mpsc::unbounded_channel();
2145
2146    let decode_stream = create_decode_stream(
2147        &target_schema,
2148        num_rows,
2149        config.batch_size,
2150        is_structural,
2151        config.decoder_config.validate_on_decode,
2152        spawn_structural_batch_decode_tasks,
2153        rx,
2154        config.batch_size_bytes,
2155    )?;
2156
2157    // The scheduler's `initialize` may perform I/O to load column metadata
2158    // unless that metadata is already in the cache.  This metadata loading
2159    // happens as part of this call and should be parallelized if reading
2160    // multiple files.
2161    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2162        target_schema.as_ref(),
2163        &column_indices,
2164        &column_infos,
2165        &vec![],
2166        num_rows,
2167        config.decoder_plugins,
2168        config.io.clone(),
2169        config.cache,
2170        &filter,
2171        &config.decoder_config,
2172    )
2173    .await?;
2174
2175    // For small requests the scheduling cost is dwarfed by the overhead of
2176    // spawning a task, so we run scheduling inline (still as part of this
2177    // await) before returning.  The threshold is configurable via
2178    // `LANCE_INLINE_SCHEDULING_THRESHOLD`, and callers can force either
2179    // strategy via `DecoderConfig::inline_scheduling`.
2180    let inline_scheduling = config
2181        .decoder_config
2182        .inline_scheduling
2183        .unwrap_or_else(|| num_rows <= inline_scheduling_threshold());
2184
2185    if inline_scheduling {
2186        match requested_rows {
2187            RequestedRows::Ranges(ranges) => {
2188                decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2189            }
2190            RequestedRows::Indices(indices) => {
2191                decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2192            }
2193        }
2194        Ok(decode_stream)
2195    } else {
2196        // Spawn the (still synchronous) scheduling work so that decoder
2197        // messages can stream into the channel while the consumer is
2198        // already pulling from the decode stream.
2199        let scheduling = async move {
2200            match requested_rows {
2201                RequestedRows::Ranges(ranges) => {
2202                    decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2203                }
2204                RequestedRows::Indices(indices) => {
2205                    decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2206                }
2207            }
2208        };
2209        let scheduler_handle = tokio::task::spawn(scheduling);
2210        Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2211    }
2212}
2213
2214/// Initializes the scheduler, schedules the requested rows, and returns a
2215/// stream of decode tasks for the resulting batches.
2216///
2217/// This is a convenience function that creates both the scheduler and the
2218/// decoder, which can be a little tricky to get right.
2219///
2220/// # Why is this async?
2221///
2222/// Constructing the scheduler runs `initialize` which will perform I/O
2223/// unless the data required is already in the file metadata cache.
2224///
2225/// When `DecoderConfig::inline_scheduling` resolves to `true`, the
2226/// subsequent (synchronous) scheduling work also runs before this function
2227/// returns, leaving a fully primed decode stream.
2228pub async fn schedule_and_decode(
2229    column_infos: Vec<Arc<ColumnInfo>>,
2230    requested_rows: RequestedRows,
2231    filter: FilterExpression,
2232    column_indices: Vec<u32>,
2233    target_schema: Arc<Schema>,
2234    config: SchedulerDecoderConfig,
2235) -> Result<BoxStream<'static, ReadBatchTask>> {
2236    if requested_rows.num_rows() == 0 {
2237        return Ok(stream::empty().boxed());
2238    }
2239
2240    // If the user requested any ranges that are empty, ignore them.  They are pointless and
2241    // trying to read them has caused bugs in the past.
2242    let requested_rows = requested_rows.trim_empty_ranges();
2243
2244    let io = config.io.clone();
2245
2246    let stream = create_scheduler_decoder(
2247        column_infos,
2248        requested_rows,
2249        filter,
2250        column_indices,
2251        target_schema,
2252        config,
2253    )
2254    .await?;
2255
2256    // Keep the io alive until the stream is dropped or finishes.  Otherwise the
2257    // I/O drops as soon as the scheduling is finished and the I/O loop terminates.
2258    Ok(stream.finally(move || drop(io)).boxed())
2259}
2260
2261pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2262    tokio::runtime::Builder::new_current_thread()
2263        .build()
2264        .unwrap()
2265});
2266
2267/// Schedules and decodes the requested data in a blocking fashion
2268///
2269/// This function is a blocking version of [`schedule_and_decode`]. It schedules the requested data
2270/// and decodes it in the current thread.
2271///
2272/// This can be useful when the disk is fast (or the data is in memory) and the amount
2273/// of data is relatively small.  For example, when doing a take against NVMe or in-memory data.
2274///
2275/// This should NOT be used for full scans.  Even if the data is in memory this function will
2276/// not parallelize the decode and will be slower than the async version.  Full scans typically
2277/// make relatively few IOPs and so the asynchronous overhead is much smaller.
2278///
2279/// This method will first completely run the scheduling process.  Then it will run the
2280/// decode process.
2281pub fn schedule_and_decode_blocking(
2282    column_infos: Vec<Arc<ColumnInfo>>,
2283    requested_rows: RequestedRows,
2284    filter: FilterExpression,
2285    column_indices: Vec<u32>,
2286    target_schema: Arc<Schema>,
2287    config: SchedulerDecoderConfig,
2288) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2289    if requested_rows.num_rows() == 0 {
2290        let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2291        return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2292    }
2293
2294    let num_rows = requested_rows.num_rows();
2295    let is_structural = column_infos[0].is_structural();
2296
2297    let (tx, mut rx) = mpsc::unbounded_channel();
2298
2299    // Initialize the scheduler.  This is still "asynchronous" but we run it with a current-thread
2300    // runtime.
2301    let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2302        target_schema.as_ref(),
2303        &column_indices,
2304        &column_infos,
2305        &vec![],
2306        num_rows,
2307        config.decoder_plugins,
2308        config.io.clone(),
2309        config.cache,
2310        &filter,
2311        &config.decoder_config,
2312    ))?;
2313
2314    // Schedule the requested rows
2315    match requested_rows {
2316        RequestedRows::Ranges(ranges) => {
2317            decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2318        }
2319        RequestedRows::Indices(indices) => {
2320            decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2321        }
2322    }
2323
2324    // Drain the scheduler queue into a vec of decode messages
2325    let mut messages = Vec::new();
2326    while rx
2327        .recv_many(&mut messages, usize::MAX)
2328        .now_or_never()
2329        .unwrap()
2330        != 0
2331    {}
2332
2333    // Create a decoder to decode the messages
2334    let decode_iterator = create_decode_iterator(
2335        &target_schema,
2336        num_rows,
2337        config.batch_size,
2338        config.decoder_config.validate_on_decode,
2339        is_structural,
2340        messages.into(),
2341    )?;
2342
2343    Ok(decode_iterator)
2344}
2345
2346/// A decoder for single-column encodings of primitive data (this includes fixed size
2347/// lists of primitive data)
2348///
2349/// Physical decoders are able to decode into existing buffers for zero-copy operation.
2350///
2351/// Instances should be stateless and `Send` / `Sync`.  This is because multiple decode
2352/// tasks could reference the same page.  For example, imagine a page covers rows 0-2000
2353/// and the decoder stream has a batch size of 1024.  The decoder will be needed by both
2354/// the decode task for batch 0 and the decode task for batch 1.
2355///
2356/// See [`crate::decoder`] for more information
2357pub trait PrimitivePageDecoder: Send + Sync {
2358    /// Decode data into buffers
2359    ///
2360    /// This may be a simple zero-copy from a disk buffer or could involve complex decoding
2361    /// such as decompressing from some compressed representation.
2362    ///
2363    /// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool).  The `is_needed`
2364    /// portion only needs to be updated if the encoding has some concept of an "optional"
2365    /// buffer.
2366    ///
2367    /// Encodings can have any number of input or output buffers.  For example, a dictionary
2368    /// decoding will convert two buffers (indices + dictionary) into a single buffer
2369    ///
2370    /// Binary decodings have two output buffers (one for values, one for offsets)
2371    ///
2372    /// Other decodings could even expand the # of output buffers.  For example, we could decode
2373    /// fixed size strings into variable length strings going from one input buffer to multiple output
2374    /// buffers.
2375    ///
2376    /// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
2377    /// generally end at one of these structures.  However, intermediate structures may exist which
2378    /// do not correspond to any Arrow type at all.  For example, a bitpacking encoding will deal
2379    /// with buffers that have bits-per-value that is not a multiple of 8.
2380    ///
2381    /// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
2382    /// type (order matters) and encodings that aim to decode into arrow types should respect
2383    /// this layout.
2384    /// # Arguments
2385    ///
2386    /// * `rows_to_skip` - how many rows to skip (within the page) before decoding
2387    /// * `num_rows` - how many rows to decode
2388    /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
2389    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2390}
2391
2392/// A scheduler for single-column encodings of primitive data
2393///
2394/// The scheduler is responsible for calculating what I/O is needed for the requested rows
2395///
2396/// Instances should be stateless and `Send` and `Sync`.  This is because instances can
2397/// be shared in follow-up I/O tasks.
2398///
2399/// See [`crate::decoder`] for more information
2400pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2401    /// Schedules a batch of I/O to load the data needed for the requested ranges
2402    ///
2403    /// Returns a future that will yield a decoder once the data has been loaded
2404    ///
2405    /// # Arguments
2406    ///
2407    /// * `range` - the range of row offsets (relative to start of page) requested
2408    ///   these must be ordered and must not overlap
2409    /// * `scheduler` - a scheduler to submit the I/O request to
2410    /// * `top_level_row` - the row offset of the top level field currently being
2411    ///   scheduled.  This can be used to assign priority to I/O requests
2412    fn schedule_ranges(
2413        &self,
2414        ranges: &[Range<u64>],
2415        scheduler: &Arc<dyn EncodingsIo>,
2416        top_level_row: u64,
2417    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2418}
2419
2420/// A trait to control the priority of I/O
2421pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2422    fn advance(&mut self, num_rows: u64);
2423    fn current_priority(&self) -> u64;
2424    fn box_clone(&self) -> Box<dyn PriorityRange>;
2425}
2426
2427/// A simple priority scheme for top-level fields with no parent
2428/// repetition
2429#[derive(Debug)]
2430pub struct SimplePriorityRange {
2431    priority: u64,
2432}
2433
2434impl SimplePriorityRange {
2435    fn new(priority: u64) -> Self {
2436        Self { priority }
2437    }
2438}
2439
2440impl PriorityRange for SimplePriorityRange {
2441    fn advance(&mut self, num_rows: u64) {
2442        self.priority += num_rows;
2443    }
2444
2445    fn current_priority(&self) -> u64 {
2446        self.priority
2447    }
2448
2449    fn box_clone(&self) -> Box<dyn PriorityRange> {
2450        Box::new(Self {
2451            priority: self.priority,
2452        })
2453    }
2454}
2455
2456/// Determining the priority of a list request is tricky.  We want
2457/// the priority to be the top-level row.  So if we have a
2458/// `list<list<int>>` and each outer list has 10 rows and each inner
2459/// list has 5 rows then the priority of the 100th item is 1 because
2460/// it is the 5th item in the 10th item of the *second* row.
2461///
2462/// This structure allows us to keep track of this complicated priority
2463/// relationship.
2464///
2465/// There's a fair amount of bookkeeping involved here.
2466///
2467/// A better approach (using repetition levels) is coming in the future.
2468pub struct ListPriorityRange {
2469    base: Box<dyn PriorityRange>,
2470    offsets: Arc<[u64]>,
2471    cur_index_into_offsets: usize,
2472    cur_position: u64,
2473}
2474
2475impl ListPriorityRange {
2476    pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2477        Self {
2478            base,
2479            offsets,
2480            cur_index_into_offsets: 0,
2481            cur_position: 0,
2482        }
2483    }
2484}
2485
2486impl std::fmt::Debug for ListPriorityRange {
2487    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2488        f.debug_struct("ListPriorityRange")
2489            .field("base", &self.base)
2490            .field("offsets.len()", &self.offsets.len())
2491            .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2492            .field("cur_position", &self.cur_position)
2493            .finish()
2494    }
2495}
2496
2497impl PriorityRange for ListPriorityRange {
2498    fn advance(&mut self, num_rows: u64) {
2499        // We've scheduled X items.  Now walk through the offsets to
2500        // determine how many rows we've scheduled.
2501        self.cur_position += num_rows;
2502        let mut idx_into_offsets = self.cur_index_into_offsets;
2503        while idx_into_offsets + 1 < self.offsets.len()
2504            && self.offsets[idx_into_offsets + 1] <= self.cur_position
2505        {
2506            idx_into_offsets += 1;
2507        }
2508        let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2509        self.cur_index_into_offsets = idx_into_offsets;
2510        self.base.advance(base_rows_advanced as u64);
2511    }
2512
2513    fn current_priority(&self) -> u64 {
2514        self.base.current_priority()
2515    }
2516
2517    fn box_clone(&self) -> Box<dyn PriorityRange> {
2518        Box::new(Self {
2519            base: self.base.box_clone(),
2520            offsets: self.offsets.clone(),
2521            cur_index_into_offsets: self.cur_index_into_offsets,
2522            cur_position: self.cur_position,
2523        })
2524    }
2525}
2526
2527/// Contains the context for a scheduler
2528pub struct SchedulerContext {
2529    recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2530    io: Arc<dyn EncodingsIo>,
2531    cache: Arc<LanceCache>,
2532    name: String,
2533    path: Vec<u32>,
2534    path_names: Vec<String>,
2535}
2536
2537pub struct ScopedSchedulerContext<'a> {
2538    pub context: &'a mut SchedulerContext,
2539}
2540
2541impl<'a> ScopedSchedulerContext<'a> {
2542    pub fn pop(self) -> &'a mut SchedulerContext {
2543        self.context.pop();
2544        self.context
2545    }
2546}
2547
2548impl SchedulerContext {
2549    pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2550        Self {
2551            io,
2552            cache,
2553            recv: None,
2554            name: "".to_string(),
2555            path: Vec::new(),
2556            path_names: Vec::new(),
2557        }
2558    }
2559
2560    pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2561        &self.io
2562    }
2563
2564    pub fn cache(&self) -> &Arc<LanceCache> {
2565        &self.cache
2566    }
2567
2568    pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2569        self.path.push(index);
2570        self.path_names.push(name.to_string());
2571        ScopedSchedulerContext { context: self }
2572    }
2573
2574    pub fn pop(&mut self) {
2575        self.path.pop();
2576        self.path_names.pop();
2577    }
2578
2579    pub fn path_name(&self) -> String {
2580        let path = self.path_names.join("/");
2581        if self.recv.is_some() {
2582            format!("TEMP({}){}", self.name, path)
2583        } else {
2584            format!("ROOT{}", path)
2585        }
2586    }
2587
2588    pub fn current_path(&self) -> VecDeque<u32> {
2589        VecDeque::from_iter(self.path.iter().copied())
2590    }
2591
2592    #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2593    pub fn locate_decoder(
2594        &mut self,
2595        decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2596    ) -> crate::previous::decoder::DecoderReady {
2597        trace!(
2598            "Scheduling decoder of type {:?} for {:?}",
2599            decoder.data_type(),
2600            self.path,
2601        );
2602        crate::previous::decoder::DecoderReady {
2603            decoder,
2604            path: self.current_path(),
2605        }
2606    }
2607}
2608
2609pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2610
2611impl std::fmt::Debug for UnloadedPageShard {
2612    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2613        f.debug_struct("UnloadedPage").finish()
2614    }
2615}
2616
2617#[derive(Debug)]
2618pub struct ScheduledScanLine {
2619    pub rows_scheduled: u64,
2620    pub decoders: Vec<MessageType>,
2621}
2622
2623pub trait StructuralSchedulingJob: std::fmt::Debug {
2624    /// Schedule the next batch of data
2625    ///
2626    /// Normally this equates to scheduling the next page of data into one task.  Very large pages
2627    /// might be split into multiple scan lines.  Each scan line has one or more rows.
2628    ///
2629    /// If a scheduler ends early it may return an empty vector.
2630    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2631}
2632
2633/// A filter expression to apply to the data
2634///
2635/// The core decoders do not currently take advantage of filtering in
2636/// any way.  In order to maintain the abstraction we represent filters
2637/// as an arbitrary byte sequence.
2638///
2639/// We recommend that encodings use Substrait for filters.
2640pub struct FilterExpression(pub Bytes);
2641
2642impl FilterExpression {
2643    /// Create a filter expression that does not filter any data
2644    ///
2645    /// This is currently represented by an empty byte array.  Encoders
2646    /// that are "filter aware" should make sure they handle this case.
2647    pub fn no_filter() -> Self {
2648        Self(Bytes::new())
2649    }
2650
2651    /// Returns true if the filter is the same as the [`Self::no_filter`] filter
2652    pub fn is_noop(&self) -> bool {
2653        self.0.is_empty()
2654    }
2655}
2656
2657pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2658    fn initialize<'a>(
2659        &'a mut self,
2660        filter: &'a FilterExpression,
2661        context: &'a SchedulerContext,
2662    ) -> BoxFuture<'a, Result<()>>;
2663    fn schedule_ranges<'a>(
2664        &'a self,
2665        ranges: &[Range<u64>],
2666        filter: &FilterExpression,
2667    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2668}
2669
2670/// A trait for tasks that decode data into an Arrow array
2671pub trait DecodeArrayTask: Send {
2672    /// Decodes the data into an Arrow array and its data size in bytes
2673    fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)>;
2674}
2675
2676impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2677    fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
2678        StructuralDecodeArrayTask::decode(*self)
2679            .map(|decoded_array| (decoded_array.array, decoded_array.data_size))
2680    }
2681}
2682
2683/// A task to decode data into an Arrow record batch
2684///
2685/// It has a child `task` which decodes a struct array with no nulls.
2686/// This is then converted into a record batch.
2687pub struct NextDecodeTask {
2688    /// The decode task itself
2689    pub task: Box<dyn DecodeArrayTask>,
2690    /// The number of rows that will be created
2691    pub num_rows: u64,
2692}
2693
2694impl NextDecodeTask {
2695    // Run the task and produce a record batch
2696    //
2697    // If the batch is very large this function will log a warning message
2698    // suggesting the user try a smaller batch size.
2699    #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2700    fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<(RecordBatch, u64)> {
2701        let (struct_arr, data_size) = self
2702            .task
2703            .decode()
2704            .map_err(|e| Error::internal(format!("Error decoding batch: {}", e)))?;
2705        let batch = RecordBatch::from(struct_arr.as_struct());
2706        if data_size > BATCH_SIZE_BYTES_WARNING {
2707            emitted_batch_size_warning.call_once(|| {
2708                let size_mb = data_size / 1024 / 1024;
2709                debug!("Lance read in a single batch that contained more than {}MiB of data.  You may want to consider reducing the batch size.", size_mb);
2710            });
2711        }
2712        Ok((batch, data_size))
2713    }
2714}
2715
2716// An envelope to wrap both 2.0 style messages and 2.1 style messages so we can
2717// share some code paths between the two.  Decoders can safely unwrap into whatever
2718// style they expect since a file will be either all-2.0 or all-2.1
2719#[derive(Debug)]
2720pub enum MessageType {
2721    // The older v2.0 scheduler/decoder used a scheme where the message was the
2722    // decoder itself.  The messages were not sent in priority order and the decoder
2723    // had to wait for I/O, figuring out the correct priority.  This was a lot of
2724    // complexity.
2725    DecoderReady(crate::previous::decoder::DecoderReady),
2726    // Starting in 2.1 we use a simpler scheme where the scheduling happens in priority
2727    // order and the message is an unloaded decoder.  These can be awaited, in order, and
2728    // the decoder does not have to worry about waiting for I/O.
2729    UnloadedPage(UnloadedPageShard),
2730}
2731
2732impl MessageType {
2733    pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2734        match self {
2735            Self::DecoderReady(decoder) => decoder,
2736            Self::UnloadedPage(_) => {
2737                panic!("Expected DecoderReady but got UnloadedPage")
2738            }
2739        }
2740    }
2741
2742    pub fn into_structural(self) -> UnloadedPageShard {
2743        match self {
2744            Self::UnloadedPage(unloaded) => unloaded,
2745            Self::DecoderReady(_) => {
2746                panic!("Expected UnloadedPage but got DecoderReady")
2747            }
2748        }
2749    }
2750}
2751
2752pub struct DecoderMessage {
2753    pub scheduled_so_far: u64,
2754    pub decoders: Vec<MessageType>,
2755}
2756
2757pub struct DecoderContext {
2758    source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2759}
2760
2761impl DecoderContext {
2762    pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2763        Self { source }
2764    }
2765}
2766
2767pub struct DecodedPage {
2768    pub data: DataBlock,
2769    pub repdef: RepDefUnraveler,
2770}
2771
2772pub trait DecodePageTask: Send + std::fmt::Debug {
2773    /// Decodes the data into an Arrow array
2774    fn decode(self: Box<Self>) -> Result<DecodedPage>;
2775}
2776
2777pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2778    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2779    fn num_rows(&self) -> u64;
2780}
2781
2782#[derive(Debug)]
2783pub struct LoadedPageShard {
2784    // The decoder that is ready to be decoded
2785    pub decoder: Box<dyn StructuralPageDecoder>,
2786    // The path to the decoder, the first value is the column index
2787    // following values, if present, are nested child indices
2788    //
2789    // For example, a path of [1, 1, 0] would mean to grab the second
2790    // column, then the second child, and then the first child.
2791    //
2792    // It could represent x in the following schema:
2793    //
2794    // score: float64
2795    // points: struct
2796    //   color: string
2797    //   location: struct
2798    //     x: float64
2799    //
2800    // Currently, only struct decoders have "children" although other
2801    // decoders may at some point as well.  List children are only
2802    // handled through indirect I/O at the moment and so they don't
2803    // need to be represented (yet)
2804    pub path: VecDeque<u32>,
2805}
2806
2807pub struct DecodedArray {
2808    pub array: ArrayRef,
2809    pub repdef: CompositeRepDefUnraveler,
2810    /// The number of bytes of data in this array (excluding Arrow overhead).
2811    pub data_size: u64,
2812}
2813
2814pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2815    fn decode(self: Box<Self>) -> Result<DecodedArray>;
2816}
2817
2818pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2819    /// Add a newly scheduled child decoder
2820    ///
2821    /// The default implementation does not expect children and returns
2822    /// an error.
2823    fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2824    /// Creates a task to decode `num_rows` of data into an array
2825    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2826    /// The data type of the decoded data
2827    fn data_type(&self) -> &DataType;
2828}
2829
2830#[derive(Debug, Default)]
2831pub struct DecoderPlugins {}
2832
2833/// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`]
2834pub async fn decode_batch(
2835    batch: &EncodedBatch,
2836    filter: &FilterExpression,
2837    decoder_plugins: Arc<DecoderPlugins>,
2838    should_validate: bool,
2839    version: LanceFileVersion,
2840    cache: Option<Arc<LanceCache>>,
2841) -> Result<RecordBatch> {
2842    // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress
2843    // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be
2844    // polled twice.
2845
2846    let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2847    let cache = if let Some(cache) = cache {
2848        cache
2849    } else {
2850        Arc::new(lance_core::cache::LanceCache::with_capacity(
2851            128 * 1024 * 1024,
2852        ))
2853    };
2854    let mut decode_scheduler = DecodeBatchScheduler::try_new(
2855        batch.schema.as_ref(),
2856        &batch.top_level_columns,
2857        &batch.page_table,
2858        &vec![],
2859        batch.num_rows,
2860        decoder_plugins,
2861        io_scheduler.clone(),
2862        cache,
2863        filter,
2864        &DecoderConfig::default(),
2865    )
2866    .await?;
2867    let (tx, rx) = unbounded_channel();
2868    decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2869    let is_structural = version >= LanceFileVersion::V2_1;
2870    let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2871    let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never"));
2872    let mut decode_stream = create_decode_stream(
2873        &batch.schema,
2874        batch.num_rows,
2875        batch.num_rows as u32,
2876        is_structural,
2877        should_validate,
2878        spawn_structural_batch_decode_tasks,
2879        rx,
2880        None,
2881    )?;
2882    decode_stream.next().await.unwrap().task.await
2883}
2884
2885#[cfg(test)]
2886// test coalesce indices to ranges
2887mod tests {
2888    use super::*;
2889
2890    #[test]
2891    fn test_coalesce_indices_to_ranges_with_single_index() {
2892        let indices = vec![1];
2893        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2894        assert_eq!(ranges, vec![1..2]);
2895    }
2896
2897    #[test]
2898    fn test_coalesce_indices_to_ranges() {
2899        let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2900        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2901        assert_eq!(ranges, vec![1..10]);
2902    }
2903
2904    #[test]
2905    fn test_coalesce_indices_to_ranges_with_gaps() {
2906        let indices = vec![1, 2, 3, 5, 6, 7, 9];
2907        let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2908        assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2909    }
2910
2911    #[test]
2912    fn test_estimate_bytes_per_row() {
2913        assert_eq!(estimate_bytes_per_row(&DataType::Int32), 4.0);
2914        assert_eq!(estimate_bytes_per_row(&DataType::Int64), 8.0);
2915        assert_eq!(estimate_bytes_per_row(&DataType::Float32), 4.0);
2916        assert_eq!(estimate_bytes_per_row(&DataType::Boolean), 1.0 / 8.0);
2917        assert_eq!(estimate_bytes_per_row(&DataType::Utf8), 64.0);
2918        assert_eq!(estimate_bytes_per_row(&DataType::Binary), 64.0);
2919        // Struct of 4 x Int32 = 16 bytes
2920        let struct_type = DataType::Struct(Fields::from(vec![
2921            ArrowField::new("a", DataType::Int32, false),
2922            ArrowField::new("b", DataType::Int32, false),
2923            ArrowField::new("c", DataType::Int32, false),
2924            ArrowField::new("d", DataType::Int32, false),
2925        ]));
2926        assert_eq!(estimate_bytes_per_row(&struct_type), 16.0);
2927    }
2928
2929    /// Helper: encode a batch, then decode it as a stream with optional
2930    /// `batch_size_bytes`, collecting all output batches.
2931    async fn decode_batches_with_byte_limit(
2932        batch: &RecordBatch,
2933        batch_size: u32,
2934        batch_size_bytes: Option<u64>,
2935    ) -> Vec<RecordBatch> {
2936        use crate::encoder::{EncodingOptions, default_encoding_strategy, encode_batch};
2937        use crate::version::LanceFileVersion;
2938
2939        let version = LanceFileVersion::V2_1;
2940        let options = EncodingOptions {
2941            version,
2942            ..Default::default()
2943        };
2944        let strategy = default_encoding_strategy(version);
2945        let schema = Schema::try_from(batch.schema().as_ref()).unwrap();
2946        let encoded = encode_batch(batch, Arc::new(schema.clone()), strategy.as_ref(), &options)
2947            .await
2948            .unwrap();
2949
2950        let io_scheduler =
2951            Arc::new(BufferScheduler::new(encoded.data.clone())) as Arc<dyn EncodingsIo>;
2952        let cache = Arc::new(lance_core::cache::LanceCache::with_capacity(
2953            128 * 1024 * 1024,
2954        ));
2955        let decoder_plugins = Arc::new(DecoderPlugins::default());
2956
2957        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2958            encoded.schema.as_ref(),
2959            &encoded.top_level_columns,
2960            &encoded.page_table,
2961            &vec![],
2962            encoded.num_rows,
2963            decoder_plugins,
2964            io_scheduler.clone(),
2965            cache,
2966            &FilterExpression::no_filter(),
2967            &DecoderConfig::default(),
2968        )
2969        .await
2970        .unwrap();
2971
2972        let (tx, rx) = unbounded_channel();
2973        decode_scheduler.schedule_range(
2974            0..encoded.num_rows,
2975            &FilterExpression::no_filter(),
2976            tx,
2977            io_scheduler,
2978        );
2979
2980        let mut decode_stream = create_decode_stream(
2981            &encoded.schema,
2982            encoded.num_rows,
2983            batch_size,
2984            /*is_structural=*/ true,
2985            /*should_validate=*/ true,
2986            /*spawn_structural_batch_decode_tasks=*/ true,
2987            rx,
2988            batch_size_bytes,
2989        )
2990        .unwrap();
2991
2992        let mut batches = Vec::new();
2993        while let Some(task) = decode_stream.next().await {
2994            batches.push(task.task.await.unwrap());
2995        }
2996        batches
2997    }
2998
2999    #[tokio::test]
3000    async fn test_byte_sized_batches_fixed_width() {
3001        use arrow_array::Int32Array;
3002
3003        // 1000 rows x 4 Int32 columns = 16 bytes/row
3004        let num_rows: i32 = 1000;
3005        let arrays: Vec<Arc<dyn arrow_array::Array>> = (0..4)
3006            .map(|col| {
3007                Arc::new(Int32Array::from_iter_values(
3008                    (0..num_rows).map(move |row| row * 10 + col),
3009                )) as _
3010            })
3011            .collect();
3012
3013        let schema = Arc::new(ArrowSchema::new(vec![
3014            ArrowField::new("a", DataType::Int32, false),
3015            ArrowField::new("b", DataType::Int32, false),
3016            ArrowField::new("c", DataType::Int32, false),
3017            ArrowField::new("d", DataType::Int32, false),
3018        ]));
3019        let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3020
3021        // 16 bytes/row, batch_size_bytes=1600 => 100 rows/batch
3022        let batches =
3023            decode_batches_with_byte_limit(&input_batch, /*batch_size=*/ 1024, Some(1600)).await;
3024
3025        // Should produce 10 batches of 100 rows each
3026        assert_eq!(batches.len(), 10);
3027        for (i, batch) in batches.iter().enumerate() {
3028            assert_eq!(
3029                batch.num_rows(),
3030                100,
3031                "batch {i} should have 100 rows, got {}",
3032                batch.num_rows()
3033            );
3034        }
3035
3036        // Verify roundtrip: concatenate and compare
3037        let all_batches: Vec<&RecordBatch> = batches.iter().collect();
3038        let concatenated =
3039            arrow_select::concat::concat_batches(&batches[0].schema(), all_batches.iter().copied())
3040                .unwrap();
3041        assert_eq!(concatenated.num_rows(), num_rows as usize);
3042        for col in 0..4 {
3043            assert_eq!(
3044                concatenated.column(col).as_ref(),
3045                input_batch.column(col).as_ref(),
3046                "column {col} roundtrip mismatch"
3047            );
3048        }
3049    }
3050
3051    #[tokio::test]
3052    async fn test_byte_sized_batches_none_unchanged() {
3053        use arrow_array::Int32Array;
3054
3055        // Without batch_size_bytes, rows_per_batch controls batching
3056        let num_rows: i32 = 1000;
3057        let arrays: Vec<Arc<dyn arrow_array::Array>> = (0..2)
3058            .map(|col| {
3059                Arc::new(Int32Array::from_iter_values(
3060                    (0..num_rows).map(move |row| row * 10 + col),
3061                )) as _
3062            })
3063            .collect();
3064
3065        let schema = Arc::new(ArrowSchema::new(vec![
3066            ArrowField::new("x", DataType::Int32, false),
3067            ArrowField::new("y", DataType::Int32, false),
3068        ]));
3069        let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3070
3071        // batch_size=250, batch_size_bytes=None => 4 batches of 250 rows
3072        let batches = decode_batches_with_byte_limit(&input_batch, /*batch_size=*/ 250, None).await;
3073        assert_eq!(batches.len(), 4);
3074        for (i, batch) in batches.iter().enumerate() {
3075            assert_eq!(
3076                batch.num_rows(),
3077                250,
3078                "batch {i} should have 250 rows, got {}",
3079                batch.num_rows()
3080            );
3081        }
3082    }
3083
3084    #[tokio::test]
3085    async fn test_byte_sized_batches_feedback_convergence() {
3086        use arrow_array::StringArray;
3087
3088        // Each row has a 100-byte string. Schema estimate = 64 bytes (default
3089        // for Utf8), so the first batch will overshoot. The feedback loop
3090        // should correct subsequent batches toward the target.
3091        let num_rows = 500;
3092        let value: String = "x".repeat(100);
3093        let arrays: Vec<Arc<dyn arrow_array::Array>> = vec![Arc::new(StringArray::from(
3094            (0..num_rows).map(|_| value.as_str()).collect::<Vec<_>>(),
3095        ))];
3096        let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
3097            "s",
3098            DataType::Utf8,
3099            false,
3100        )]));
3101        let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3102
3103        // Target 5000 bytes/batch. At 100 bytes/row the ideal is 50 rows/batch.
3104        // Schema estimate is 64 bytes/row → first batch ~78 rows (overshoot).
3105        // After feedback kicks in, batches should converge to ~50 rows.
3106        let target_bytes: u64 = 5000;
3107        let batches = decode_batches_with_byte_limit(
3108            &input_batch,
3109            /*batch_size=*/ 1024,
3110            Some(target_bytes),
3111        )
3112        .await;
3113
3114        // Verify all data round-trips correctly
3115        let all_batches: Vec<&RecordBatch> = batches.iter().collect();
3116        let concatenated =
3117            arrow_select::concat::concat_batches(&batches[0].schema(), all_batches.iter().copied())
3118                .unwrap();
3119        assert_eq!(concatenated.num_rows(), num_rows as usize);
3120        assert_eq!(
3121            concatenated.column(0).as_ref(),
3122            input_batch.column(0).as_ref()
3123        );
3124
3125        // After the first batch, subsequent batches should be closer to the
3126        // target. The ideal is 50 rows/batch.
3127        assert!(
3128            batches.len() >= 2,
3129            "need at least 2 batches to test convergence"
3130        );
3131        // The first batch uses the schema estimate (64 bytes/row) →
3132        // ~78 rows. After feedback the rows should settle near 50.
3133        if batches.len() >= 3 {
3134            let second_batch_rows = batches[1].num_rows();
3135            let third_batch_rows = batches[2].num_rows();
3136            // Both should be within 20% of the ideal (50 rows)
3137            assert!(
3138                (40..=60).contains(&second_batch_rows),
3139                "second batch should be near 50 rows, got {second_batch_rows}"
3140            );
3141            assert!(
3142                (40..=60).contains(&third_batch_rows),
3143                "third batch should be near 50 rows, got {third_batch_rows}"
3144            );
3145        }
3146    }
3147}