Skip to main content

lance_file/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use futures::{Stream, StreamExt, stream::BoxStream};
17use lance_core::deepsize::{Context, DeepSizeOf};
18use lance_encoding::{
19    EncodingsIo,
20    decoder::{
21        ColumnInfo, DecoderConfig, DecoderPlugins, FilterExpression, PageEncoding, PageInfo,
22        ReadBatchTask, RequestedRows, SchedulerDecoderConfig, schedule_and_decode,
23        schedule_and_decode_blocking,
24    },
25    encoder::EncodedBatch,
26    version::LanceFileVersion,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31
32use lance_core::{
33    Error, Result,
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36};
37use lance_encoding::format::pb as pbenc;
38use lance_encoding::format::pb21 as pbenc21;
39use lance_io::{
40    ReadBatchParams,
41    scheduler::FileScheduler,
42    stream::{RecordBatchStream, RecordBatchStreamAdapter},
43};
44
45use crate::{
46    datatypes::{Fields, FieldsWithMeta},
47    format::{MAGIC, MAJOR_VERSION, MINOR_VERSION, pb, pbfile},
48    io::LanceEncodingsIo,
49    writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52/// Default chunk size for reading large pages (8MiB)
53/// Pages larger than this will be split into multiple chunks during read
54pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
55
56// For now, we don't use global buffers for anything other than schema.  If we
57// use these later we should make them lazily loaded and then cached once loaded.
58//
59// We store their position / length for debugging purposes
60#[derive(Debug, DeepSizeOf)]
61pub struct BufferDescriptor {
62    pub position: u64,
63    pub size: u64,
64}
65
66/// Statistics summarize some of the file metadata for quick summary info
67#[derive(Debug)]
68pub struct FileStatistics {
69    /// Statistics about each of the columns in the file
70    pub columns: Vec<ColumnStatistics>,
71}
72
73/// Summary information describing a column
74#[derive(Debug)]
75pub struct ColumnStatistics {
76    /// The number of pages in the column
77    pub num_pages: usize,
78    /// The total number of data & metadata bytes in the column
79    ///
80    /// This is the compressed on-disk size
81    pub size_bytes: u64,
82}
83
84// TODO: Caching
85#[derive(Debug)]
86pub struct CachedFileMetadata {
87    /// The schema of the file
88    pub file_schema: Arc<Schema>,
89    /// The column metadatas
90    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
91    pub column_infos: Vec<Arc<ColumnInfo>>,
92    /// The number of rows in the file
93    pub num_rows: u64,
94    pub file_buffers: Vec<BufferDescriptor>,
95    /// The number of bytes contained in the data page section of the file
96    pub num_data_bytes: u64,
97    /// The number of bytes contained in the column metadata (not including buffers
98    /// referenced by the metadata)
99    pub num_column_metadata_bytes: u64,
100    /// The number of bytes contained in global buffers
101    pub num_global_buffer_bytes: u64,
102    /// The number of bytes contained in the CMO and GBO tables
103    pub num_footer_bytes: u64,
104    pub major_version: u16,
105    pub minor_version: u16,
106    /// The actual total file size in bytes, as reported by the object store.
107    pub file_size_bytes: u64,
108    /// User global buffers (index >= 1) whose bytes were already captured by the
109    /// tail read that `read_all_metadata` performs at open, keyed by buffer index.
110    ///
111    /// All global buffers are laid out contiguously starting at the schema, so on
112    /// small/medium files they land inside the captured tail window. Retaining
113    /// those bytes lets `read_global_buffer` serve them with zero additional I/O.
114    /// The bytes are copied out of the tail (rather than sliced) so the much
115    /// larger tail allocation can be dropped — we only hold what we will serve.
116    ///
117    /// The schema (buffer 0) is excluded: it is already decoded at open and is
118    /// not fetched through `read_global_buffer`. Buffers that fall outside the
119    /// window (large files) are absent here and fall back to a dedicated read.
120    pub retained_global_buffers: BTreeMap<u32, Bytes>,
121}
122
123impl CachedFileMetadata {
124    /// Total file size in bytes.
125    pub fn file_size(&self) -> u64 {
126        self.file_size_bytes
127    }
128}
129
130impl DeepSizeOf for CachedFileMetadata {
131    fn deep_size_of_children(&self, context: &mut Context) -> usize {
132        let schema_size = self.file_schema.deep_size_of_children(context);
133
134        let buffers_size: usize = self
135            .file_buffers
136            .iter()
137            .map(|fb| fb.deep_size_of_children(context))
138            .sum();
139
140        // column_metadatas is Vec<pbfile::ColumnMetadata> (protobuf generated,
141        // does not implement DeepSizeOf). We use prost::Message::encoded_len()
142        // as a proxy for in-memory size. The decoded representation is typically
143        // several times larger than the wire format due to heap-allocated
144        // repeated/string/bytes fields, so we apply a 4x multiplier.
145        let column_metadatas_size: usize = self
146            .column_metadatas
147            .iter()
148            .map(|cm| cm.encoded_len() * 4)
149            .sum::<usize>()
150            + std::mem::size_of_val(self.column_metadatas.as_slice());
151
152        // column_infos is Vec<Arc<ColumnInfo>>. Each ColumnInfo contains
153        // page_infos (with protobuf PageEncoding), buffer offsets, and a
154        // column-level ColumnEncoding protobuf.
155        let column_infos_size: usize = self
156            .column_infos
157            .iter()
158            .map(|ci| {
159                let pages_size: usize = ci
160                    .page_infos
161                    .iter()
162                    .map(|pi| {
163                        let enc_size = match &pi.encoding {
164                            lance_encoding::decoder::PageEncoding::Legacy(e) => e.encoded_len() * 4,
165                            lance_encoding::decoder::PageEncoding::Structural(e) => {
166                                e.encoded_len() * 4
167                            }
168                        };
169                        enc_size
170                            + std::mem::size_of_val(pi.buffer_offsets_and_sizes.as_ref())
171                            + std::mem::size_of::<u64>() * 2 // num_rows + priority
172                    })
173                    .sum();
174                pages_size
175                    + std::mem::size_of_val(ci.buffer_offsets_and_sizes.as_ref())
176                    + ci.encoding.encoded_len() * 4
177                    + std::mem::size_of::<u32>() // index
178                    + std::mem::size_of::<usize>() * 2 // Arc overhead
179            })
180            .sum();
181
182        // Global buffer bytes retained for zero-IO reads (copied out of the tail).
183        let retained_buffers_size: usize = self
184            .retained_global_buffers
185            .values()
186            .map(|buf| buf.len())
187            .sum();
188
189        schema_size
190            + buffers_size
191            + column_metadatas_size
192            + column_infos_size
193            + retained_buffers_size
194    }
195}
196
197impl CachedFileMetadata {
198    pub fn version(&self) -> LanceFileVersion {
199        match (self.major_version, self.minor_version) {
200            (0, 3) => LanceFileVersion::V2_0,
201            (2, 0) => LanceFileVersion::V2_0,
202            (2, 1) => LanceFileVersion::V2_1,
203            (2, 2) => LanceFileVersion::V2_2,
204            (2, 3) => LanceFileVersion::V2_3,
205            _ => panic!(
206                "Unsupported version: {}.{}",
207                self.major_version, self.minor_version
208            ),
209        }
210    }
211}
212
213/// Selecting columns from a lance file requires specifying both the
214/// index of the column and the data type of the column
215///
216/// Partly, this is because it is not strictly required that columns
217/// be read into the same type.  For example, a string column may be
218/// read as a string, large_string or string_view type.
219///
220/// A read will only succeed if the decoder for a column is capable
221/// of decoding into the requested type.
222///
223/// Note that this should generally be limited to different in-memory
224/// representations of the same semantic type.  An encoding could
225/// theoretically support "casting" (e.g. int to string, etc.) but
226/// there is little advantage in doing so here.
227///
228/// Note: in order to specify a projection the user will need some way
229/// to figure out the column indices.  In the table format we do this
230/// using field IDs and keeping track of the field id->column index mapping.
231///
232/// If users are not using the table format then they will need to figure
233/// out some way to do this themselves.
234#[derive(Debug, Clone)]
235pub struct ReaderProjection {
236    /// The data types (schema) of the selected columns.  The names
237    /// of the schema are arbitrary and ignored.
238    pub schema: Arc<Schema>,
239    /// The indices of the columns to load.
240    ///
241    /// The content of this vector depends on the file version.
242    ///
243    /// In Lance File Version 2.0 we need ids for structural fields as
244    /// well as leaf fields:
245    ///
246    ///   - Primitive: the index of the column in the schema
247    ///   - List: the index of the list column in the schema
248    ///     followed by the column indices of the children
249    ///   - FixedSizeList (of primitive): the index of the column in the schema
250    ///     (this case is not nested)
251    ///   - FixedSizeList (of non-primitive): not yet implemented
252    ///   - Dictionary: same as primitive
253    ///   - Struct: the index of the struct column in the schema
254    ///     followed by the column indices of the children
255    ///
256    ///   In other words, this should be a DFS listing of the desired schema.
257    ///
258    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
259    /// fields are completely transparent.
260    ///
261    /// For example, if the goal is to load:
262    ///
263    ///   x: int32
264    ///   y: `struct<z: int32, w: string>`
265    ///   z: `list<int32>`
266    ///
267    /// and the schema originally used to store the data was:
268    ///
269    ///   a: `struct<x: int32>`
270    ///   b: int64
271    ///   y: `struct<z: int32, c: int64, w: string>`
272    ///   z: `list<int32>`
273    ///
274    /// Then the column_indices should be:
275    ///
276    /// - 2.0: [1, 3, 4, 6, 7, 8]
277    /// - 2.1: [0, 2, 4, 5]
278    pub column_indices: Vec<u32>,
279}
280
281impl ReaderProjection {
282    fn from_field_ids_helper<'a>(
283        file_version: LanceFileVersion,
284        fields: impl Iterator<Item = &'a Field>,
285        field_id_to_column_index: &BTreeMap<u32, u32>,
286        column_indices: &mut Vec<u32>,
287    ) -> Result<()> {
288        for field in fields {
289            let is_structural = file_version >= LanceFileVersion::V2_1;
290            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
291            // we only need ids for leaf fields.
292            if (!is_structural
293                || field.children.is_empty()
294                || field.is_blob()
295                || field.is_packed_struct())
296                && let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
297            {
298                column_indices.push(column_idx);
299            }
300            // Don't recurse into children if the field is a blob or packed struct in 2.1
301            if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
302                Self::from_field_ids_helper(
303                    file_version,
304                    field.children.iter(),
305                    field_id_to_column_index,
306                    column_indices,
307                )?;
308            }
309        }
310        Ok(())
311    }
312
313    /// Creates a projection using a mapping from field IDs to column indices
314    ///
315    /// You can obtain such a mapping when the file is written using the
316    /// [`crate::writer::FileWriter::field_id_to_column_indices`] method.
317    pub fn from_field_ids(
318        file_version: LanceFileVersion,
319        schema: &Schema,
320        field_id_to_column_index: &BTreeMap<u32, u32>,
321    ) -> Result<Self> {
322        let mut column_indices = Vec::new();
323        Self::from_field_ids_helper(
324            file_version,
325            schema.fields.iter(),
326            field_id_to_column_index,
327            &mut column_indices,
328        )?;
329        let projection = Self {
330            schema: Arc::new(schema.clone()),
331            column_indices,
332        };
333        Ok(projection)
334    }
335
336    /// Creates a projection that reads the entire file
337    ///
338    /// If the schema provided is not the schema of the entire file then
339    /// the projection will be invalid and the read will fail.
340    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
341    /// the whole struct has one column index.
342    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
343    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
344        let schema = Arc::new(schema.clone());
345        let is_structural = version >= LanceFileVersion::V2_1;
346        let mut column_indices = vec![];
347        let mut curr_column_idx = 0;
348        let mut packed_struct_fields_num = 0;
349        for field in schema.fields_pre_order() {
350            if packed_struct_fields_num > 0 {
351                packed_struct_fields_num -= 1;
352                continue;
353            }
354            if field.is_packed_struct() {
355                column_indices.push(curr_column_idx);
356                curr_column_idx += 1;
357                packed_struct_fields_num = field.children.len();
358            } else if field.children.is_empty() || !is_structural {
359                column_indices.push(curr_column_idx);
360                curr_column_idx += 1;
361            }
362        }
363        Self {
364            schema,
365            column_indices,
366        }
367    }
368
369    /// Creates a projection that reads the specified columns provided by name
370    ///
371    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
372    ///
373    /// If the schema provided is not the schema of the entire file then
374    /// the projection will be invalid and the read will fail.
375    pub fn from_column_names(
376        file_version: LanceFileVersion,
377        schema: &Schema,
378        column_names: &[&str],
379    ) -> Result<Self> {
380        let field_id_to_column_index = schema
381            .fields_pre_order()
382            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
383            // we only need ids for leaf fields.
384            .filter(|field| {
385                file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
386            })
387            .enumerate()
388            .map(|(idx, field)| (field.id as u32, idx as u32))
389            .collect::<BTreeMap<_, _>>();
390        let projected = schema.project(column_names)?;
391        let mut column_indices = Vec::new();
392        Self::from_field_ids_helper(
393            file_version,
394            projected.fields.iter(),
395            &field_id_to_column_index,
396            &mut column_indices,
397        )?;
398        Ok(Self {
399            schema: Arc::new(projected),
400            column_indices,
401        })
402    }
403}
404
405/// File Reader Options that can control reading behaviors, such as whether to enable caching on repetition indices
406#[derive(Clone, Debug)]
407pub struct FileReaderOptions {
408    pub decoder_config: DecoderConfig,
409    /// Size of chunks when reading large pages. Pages larger than this
410    /// will be read in multiple chunks to control memory usage.
411    /// Default: 8MB (DEFAULT_READ_CHUNK_SIZE)
412    pub read_chunk_size: u64,
413    /// If set, the reader will produce batches whose total size in bytes
414    /// is approximately this value, overriding the row-based `batch_size`.
415    ///
416    /// This can be set at the dataset level (via `ReadParams::file_reader_options`)
417    /// to provide a default for all scans, or at the scanner level (via
418    /// `Scanner::batch_size_bytes`) to override per scan.
419    pub batch_size_bytes: Option<u64>,
420}
421
422impl Default for FileReaderOptions {
423    fn default() -> Self {
424        Self {
425            decoder_config: DecoderConfig::default(),
426            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
427            batch_size_bytes: None,
428        }
429    }
430}
431
432#[derive(Debug, Clone)]
433pub struct FileReader {
434    scheduler: Arc<dyn EncodingsIo>,
435    // The default projection to be applied to all reads
436    base_projection: ReaderProjection,
437    num_rows: u64,
438    metadata: Arc<CachedFileMetadata>,
439    decoder_plugins: Arc<DecoderPlugins>,
440    cache: Arc<LanceCache>,
441    options: FileReaderOptions,
442}
443#[derive(Debug)]
444struct Footer {
445    #[allow(dead_code)]
446    column_meta_start: u64,
447    // We don't use this today because we always load metadata for every column
448    // and don't yet support "metadata projection"
449    #[allow(dead_code)]
450    column_meta_offsets_start: u64,
451    global_buff_offsets_start: u64,
452    num_global_buffers: u32,
453    num_columns: u32,
454    major_version: u16,
455    minor_version: u16,
456}
457
458const FOOTER_LEN: usize = 40;
459
460impl FileReader {
461    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
462        Self {
463            scheduler,
464            base_projection: self.base_projection.clone(),
465            cache: self.cache.clone(),
466            decoder_plugins: self.decoder_plugins.clone(),
467            metadata: self.metadata.clone(),
468            options: self.options.clone(),
469            num_rows: self.num_rows,
470        }
471    }
472
473    /// Returns a clone of this reader whose I/O is additionally recorded into
474    /// `stats`, on top of the scheduler's global accounting.
475    ///
476    /// All cached metadata is shared with `self`, so no file is re-opened and
477    /// only a few `Arc` clones are performed.  If the underlying I/O service
478    /// does not support per-scope statistics (e.g. an in-memory scheduler), the
479    /// returned reader is an ordinary, uninstrumented clone.
480    pub fn with_io_stats(
481        &self,
482        stats: Arc<dyn lance_core::utils::io_stats::IoStatsRecorder>,
483    ) -> Self {
484        match self.scheduler.with_io_stats(stats) {
485            Some(scheduler) => self.with_scheduler(scheduler),
486            None => self.clone(),
487        }
488    }
489
490    pub fn num_rows(&self) -> u64 {
491        self.num_rows
492    }
493
494    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
495        &self.metadata
496    }
497
498    pub fn file_statistics(&self) -> FileStatistics {
499        let column_metadatas = &self.metadata().column_metadatas;
500
501        let column_stats = column_metadatas
502            .iter()
503            .map(|col_metadata| {
504                let num_pages = col_metadata.pages.len();
505                let size_bytes = col_metadata
506                    .pages
507                    .iter()
508                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
509                    .sum::<u64>();
510                ColumnStatistics {
511                    num_pages,
512                    size_bytes,
513                }
514            })
515            .collect();
516
517        FileStatistics {
518            columns: column_stats,
519        }
520    }
521
522    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
523        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?;
524
525        // If the buffer's bytes were captured by the tail read at open, serve them
526        // from memory with no additional I/O. Larger buffers (outside the window)
527        // are not retained and fall back to a dedicated read.
528        if let Some(bytes) = self.metadata.retained_global_buffers.get(&index) {
529            return Ok(bytes.clone());
530        }
531
532        self.scheduler
533            .submit_single(
534                buffer_desc.position..buffer_desc.position + buffer_desc.size,
535                0,
536            )
537            .await
538    }
539
540    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
541        let file_size = scheduler.reader().size().await? as u64;
542        let begin = if file_size < scheduler.reader().block_size() as u64 {
543            0
544        } else {
545            file_size - scheduler.reader().block_size() as u64
546        };
547        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
548        Ok((tail_bytes, file_size))
549    }
550
551    // Checks to make sure the footer is written correctly and returns the
552    // position of the file descriptor (which comes from the footer)
553    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
554        let len = footer_bytes.len();
555        if len < FOOTER_LEN {
556            return Err(Error::invalid_input(format!(
557                "does not have sufficient data, len: {}, bytes: {:?}",
558                len, footer_bytes
559            )));
560        }
561        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
562
563        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
564        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
565        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
566        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
567        let num_columns = cursor.read_u32::<LittleEndian>()?;
568        let major_version = cursor.read_u16::<LittleEndian>()?;
569        let minor_version = cursor.read_u16::<LittleEndian>()?;
570
571        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
572            return Err(Error::version_conflict(
573                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
574                major_version,
575                minor_version,
576            ));
577        }
578
579        let magic_bytes = footer_bytes.slice(len - 4..);
580        if magic_bytes.as_ref() != MAGIC {
581            return Err(Error::invalid_input(format!(
582                "file does not appear to be a Lance file (invalid magic: {:?})",
583                MAGIC
584            )));
585        }
586        Ok(Footer {
587            column_meta_start,
588            column_meta_offsets_start,
589            global_buff_offsets_start,
590            num_global_buffers,
591            num_columns,
592            major_version,
593            minor_version,
594        })
595    }
596
597    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
598    fn read_all_column_metadata(
599        column_metadata_bytes: Bytes,
600        footer: &Footer,
601    ) -> Result<Vec<pbfile::ColumnMetadata>> {
602        let column_metadata_start = footer.column_meta_start;
603        // cmo == column_metadata_offsets
604        let cmo_table_size = 16 * footer.num_columns as usize;
605        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
606
607        (0..footer.num_columns)
608            .map(|col_idx| {
609                let offset = (col_idx * 16) as usize;
610                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
611                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
612                let normalized_position = (position - column_metadata_start) as usize;
613                let normalized_end = normalized_position + (length as usize);
614                Ok(pbfile::ColumnMetadata::decode(
615                    &column_metadata_bytes[normalized_position..normalized_end],
616                )?)
617            })
618            .collect::<Result<Vec<_>>>()
619    }
620
621    async fn optimistic_tail_read(
622        data: &Bytes,
623        start_pos: u64,
624        scheduler: &FileScheduler,
625        file_len: u64,
626    ) -> Result<Bytes> {
627        let num_bytes_needed = (file_len - start_pos) as usize;
628        if data.len() >= num_bytes_needed {
629            Ok(data.slice((data.len() - num_bytes_needed)..))
630        } else {
631            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
632            let start = file_len - num_bytes_needed as u64;
633            let missing_bytes = scheduler
634                .submit_single(start..start + num_bytes_missing, 0)
635                .await?;
636            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
637            combined.extend(missing_bytes);
638            combined.extend(data);
639            Ok(combined.freeze())
640        }
641    }
642
643    fn do_decode_gbo_table(
644        gbo_bytes: &Bytes,
645        footer: &Footer,
646        version: LanceFileVersion,
647    ) -> Result<Vec<BufferDescriptor>> {
648        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
649
650        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
651        for _ in 0..footer.num_global_buffers {
652            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
653            assert!(
654                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
655            );
656            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
657            global_buffers.push(BufferDescriptor {
658                position: buf_pos,
659                size: buf_size,
660            });
661        }
662
663        Ok(global_buffers)
664    }
665
666    async fn decode_gbo_table(
667        tail_bytes: &Bytes,
668        file_len: u64,
669        scheduler: &FileScheduler,
670        footer: &Footer,
671        version: LanceFileVersion,
672    ) -> Result<Vec<BufferDescriptor>> {
673        // This could, in theory, trigger another IOP but the GBO table should never be large
674        // enough for that to happen
675        let gbo_bytes = Self::optimistic_tail_read(
676            tail_bytes,
677            footer.global_buff_offsets_start,
678            scheduler,
679            file_len,
680        )
681        .await?;
682        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
683    }
684
685    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
686        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
687        let pb_schema = file_descriptor.schema.unwrap();
688        let num_rows = file_descriptor.length;
689        let fields_with_meta = FieldsWithMeta {
690            fields: Fields(pb_schema.fields),
691            metadata: pb_schema.metadata,
692        };
693        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
694        Ok((num_rows, schema))
695    }
696
697    // TODO: Support late projection.  Currently, if we want to perform a
698    // projected read of a file, we load all of the column metadata, and then
699    // only read the column data that is requested.  This is fine for most cases.
700    //
701    // However, if there are many columns then loading all of the column metadata
702    // may be expensive.  We should support a mode where we only load the column
703    // metadata for the columns that are requested (the file format supports this).
704    //
705    // The main challenge is that we either need to ignore the column metadata cache
706    // or have a more sophisticated cache that can cache per-column metadata.
707    //
708    // Also, if the number of columns is fairly small, it's faster to read them as a
709    // single IOP, but we can fix this through coalescing.
710    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
711        // 1. read the footer
712        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
713        let tail_offset = file_len - tail_bytes.len() as u64;
714        let footer = Self::decode_footer(&tail_bytes)?;
715
716        let file_version = LanceFileVersion::try_from_major_minor(
717            footer.major_version as u32,
718            footer.minor_version as u32,
719        )?;
720
721        let gbo_table =
722            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
723        if gbo_table.is_empty() {
724            return Err(Error::internal(
725                "File did not contain any global buffers, schema expected".to_string(),
726            ));
727        }
728        let schema_start = gbo_table[0].position;
729        let schema_size = gbo_table[0].size;
730
731        let num_footer_bytes = file_len - schema_start;
732
733        // By default we read all column metadatas.  We do NOT read the column metadata buffers
734        // at this point.  We only want to read the column metadata for columns we are actually loading.
735        let all_metadata_bytes =
736            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
737
738        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
739        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
740
741        // Next, read the metadata for the columns
742        // This is both the column metadata and the CMO table
743        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
744        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
745        let column_metadata_bytes =
746            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
747        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
748
749        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
750        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
751        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
752
753        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
754
755        // The tail read above already pulled in any global buffer that lives within
756        // the captured window. Copy those user buffers (index >= 1; the schema at 0
757        // is decoded above and never fetched via read_global_buffer) out of the tail
758        // so read_global_buffer can serve them without I/O. We copy rather than slice
759        // so the much larger tail allocation can be released once decoding is done.
760        let tail_end = tail_offset + tail_bytes.len() as u64;
761        let retained_global_buffers = gbo_table
762            .iter()
763            .enumerate()
764            .skip(1)
765            .filter_map(|(index, buffer)| {
766                let start = buffer.position;
767                let end = buffer.position + buffer.size;
768                if start >= tail_offset && end <= tail_end {
769                    let rel_start = (start - tail_offset) as usize;
770                    let rel_end = (end - tail_offset) as usize;
771                    let bytes = Bytes::copy_from_slice(&tail_bytes[rel_start..rel_end]);
772                    Some((index as u32, bytes))
773                } else {
774                    None
775                }
776            })
777            .collect();
778
779        Ok(CachedFileMetadata {
780            file_schema: Arc::new(schema),
781            column_metadatas,
782            column_infos,
783            num_rows,
784            num_data_bytes,
785            num_column_metadata_bytes,
786            num_global_buffer_bytes,
787            num_footer_bytes,
788            file_buffers: gbo_table,
789            major_version: footer.major_version,
790            minor_version: footer.minor_version,
791            file_size_bytes: file_len,
792            retained_global_buffers,
793        })
794    }
795
796    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
797        match &encoding.location {
798            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
799            Some(pbfile::encoding::Location::Direct(encoding)) => {
800                let encoding_buf = Bytes::from(encoding.encoding.clone());
801                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
802                encoding_any.to_msg::<M>().unwrap()
803            }
804            Some(pbfile::encoding::Location::None(_)) => panic!(),
805            None => panic!(),
806        }
807    }
808
809    fn meta_to_col_infos(
810        column_metadatas: &[pbfile::ColumnMetadata],
811        file_version: LanceFileVersion,
812    ) -> Vec<Arc<ColumnInfo>> {
813        column_metadatas
814            .iter()
815            .enumerate()
816            .map(|(col_idx, col_meta)| {
817                let page_infos = col_meta
818                    .pages
819                    .iter()
820                    .map(|page| {
821                        let num_rows = page.length;
822                        let encoding = match file_version {
823                            LanceFileVersion::V2_0 => {
824                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
825                                    page.encoding.as_ref().unwrap(),
826                                ))
827                            }
828                            _ => PageEncoding::Structural(Self::fetch_encoding::<
829                                pbenc21::PageLayout,
830                            >(
831                                page.encoding.as_ref().unwrap()
832                            )),
833                        };
834                        let buffer_offsets_and_sizes = Arc::from(
835                            page.buffer_offsets
836                                .iter()
837                                .zip(page.buffer_sizes.iter())
838                                .map(|(offset, size)| {
839                                    // Starting with version 2.1 we can assert that page buffers are aligned
840                                    assert!(
841                                        file_version < LanceFileVersion::V2_1
842                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
843                                    );
844                                    (*offset, *size)
845                                })
846                                .collect::<Vec<_>>(),
847                        );
848                        PageInfo {
849                            buffer_offsets_and_sizes,
850                            encoding,
851                            num_rows,
852                            priority: page.priority,
853                        }
854                    })
855                    .collect::<Vec<_>>();
856                let buffer_offsets_and_sizes = Arc::from(
857                    col_meta
858                        .buffer_offsets
859                        .iter()
860                        .zip(col_meta.buffer_sizes.iter())
861                        .map(|(offset, size)| (*offset, *size))
862                        .collect::<Vec<_>>(),
863                );
864                Arc::new(ColumnInfo {
865                    index: col_idx as u32,
866                    page_infos: Arc::from(page_infos),
867                    buffer_offsets_and_sizes,
868                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
869                })
870            })
871            .collect::<Vec<_>>()
872    }
873
874    fn validate_projection(
875        projection: &ReaderProjection,
876        metadata: &CachedFileMetadata,
877    ) -> Result<()> {
878        if projection.schema.fields.is_empty() {
879            return Err(Error::invalid_input(
880                "Attempt to read zero columns from the file, at least one column must be specified"
881                    .to_string(),
882            ));
883        }
884        let mut column_indices_seen = BTreeSet::new();
885        for column_index in &projection.column_indices {
886            if !column_indices_seen.insert(*column_index) {
887                return Err(Error::invalid_input(format!(
888                    "The projection specified the column index {} more than once",
889                    column_index
890                )));
891            }
892            if *column_index >= metadata.column_infos.len() as u32 {
893                return Err(Error::invalid_input(format!(
894                    "The projection specified the column index {} but there are only {} columns in the file",
895                    column_index,
896                    metadata.column_infos.len()
897                )));
898            }
899        }
900        Ok(())
901    }
902
903    /// Opens a new file reader without any pre-existing knowledge
904    ///
905    /// This will read the file schema from the file itself and thus requires a bit more I/O
906    ///
907    /// A `base_projection` can also be provided.  If provided, then the projection will apply
908    /// to all reads from the file that do not specify their own projection.
909    pub async fn try_open(
910        scheduler: FileScheduler,
911        base_projection: Option<ReaderProjection>,
912        decoder_plugins: Arc<DecoderPlugins>,
913        cache: &LanceCache,
914        options: FileReaderOptions,
915    ) -> Result<Self> {
916        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
917        let path = scheduler.reader().path().clone();
918
919        // Create LanceEncodingsIo with read chunk size from options
920        let encodings_io =
921            LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
922
923        Self::try_open_with_file_metadata(
924            Arc::new(encodings_io),
925            path,
926            base_projection,
927            decoder_plugins,
928            file_metadata,
929            cache,
930            options,
931        )
932        .await
933    }
934
935    /// Same as `try_open` but with the file metadata already loaded.
936    ///
937    /// This method also can accept any kind of `EncodingsIo` implementation allowing
938    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
939    /// disks it may be better to avoid asynchronous overhead).
940    pub async fn try_open_with_file_metadata(
941        scheduler: Arc<dyn EncodingsIo>,
942        path: Path,
943        base_projection: Option<ReaderProjection>,
944        decoder_plugins: Arc<DecoderPlugins>,
945        file_metadata: Arc<CachedFileMetadata>,
946        cache: &LanceCache,
947        options: FileReaderOptions,
948    ) -> Result<Self> {
949        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
950
951        if let Some(base_projection) = base_projection.as_ref() {
952            Self::validate_projection(base_projection, &file_metadata)?;
953        }
954        let num_rows = file_metadata.num_rows;
955        Ok(Self {
956            scheduler,
957            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
958                file_metadata.file_schema.as_ref(),
959                file_metadata.version(),
960            )),
961            num_rows,
962            metadata: file_metadata,
963            decoder_plugins,
964            cache,
965            options,
966        })
967    }
968
969    // The actual decoder needs all the column infos that make up a type.  In other words, if
970    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
971    //
972    // This is a file reader concern because the file reader needs to support late projection of columns
973    // and so it will need to figure this out anyways.
974    //
975    // It's a bit of a tricky process though because the number of column infos may depend on the
976    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
977    // only be a single column in the file (and not 3).
978    //
979    // At the moment this method words because our rules are simple and we just repeat them here.  See
980    // Self::default_projection for a similar problem.  In the future this is something the encodings
981    // registry will need to figure out.
982    fn collect_columns_from_projection(
983        &self,
984        _projection: &ReaderProjection,
985    ) -> Result<Vec<Arc<ColumnInfo>>> {
986        Ok(self.metadata.column_infos.clone())
987    }
988
989    #[allow(clippy::too_many_arguments)]
990    async fn do_read_range(
991        column_infos: Vec<Arc<ColumnInfo>>,
992        io: Arc<dyn EncodingsIo>,
993        cache: Arc<LanceCache>,
994        num_rows: u64,
995        decoder_plugins: Arc<DecoderPlugins>,
996        range: Range<u64>,
997        batch_size: u32,
998        projection: ReaderProjection,
999        filter: FilterExpression,
1000        decoder_config: DecoderConfig,
1001        batch_size_bytes: Option<u64>,
1002    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1003        debug!(
1004            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1005            range,
1006            batch_size,
1007            num_rows,
1008            column_infos.len(),
1009            projection.schema.fields.len(),
1010        );
1011
1012        let config = SchedulerDecoderConfig {
1013            batch_size,
1014            cache,
1015            decoder_plugins,
1016            io,
1017            decoder_config,
1018            batch_size_bytes,
1019        };
1020
1021        let requested_rows = RequestedRows::Ranges(vec![range]);
1022
1023        schedule_and_decode(
1024            column_infos,
1025            requested_rows,
1026            filter,
1027            projection.column_indices,
1028            projection.schema,
1029            config,
1030        )
1031        .await
1032    }
1033
1034    async fn read_range(
1035        &self,
1036        range: Range<u64>,
1037        batch_size: u32,
1038        projection: ReaderProjection,
1039        filter: FilterExpression,
1040    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1041        // Create and initialize the stream
1042        Self::do_read_range(
1043            self.collect_columns_from_projection(&projection)?,
1044            self.scheduler.clone(),
1045            self.cache.clone(),
1046            self.num_rows,
1047            self.decoder_plugins.clone(),
1048            range,
1049            batch_size,
1050            projection,
1051            filter,
1052            self.options.decoder_config.clone(),
1053            self.options.batch_size_bytes,
1054        )
1055        .await
1056    }
1057
1058    #[allow(clippy::too_many_arguments)]
1059    async fn do_take_rows(
1060        column_infos: Vec<Arc<ColumnInfo>>,
1061        io: Arc<dyn EncodingsIo>,
1062        cache: Arc<LanceCache>,
1063        decoder_plugins: Arc<DecoderPlugins>,
1064        indices: Vec<u64>,
1065        batch_size: u32,
1066        projection: ReaderProjection,
1067        filter: FilterExpression,
1068        decoder_config: DecoderConfig,
1069        batch_size_bytes: Option<u64>,
1070    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1071        debug!(
1072            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1073            indices.len(),
1074            indices[0],
1075            indices[indices.len() - 1],
1076            batch_size,
1077            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1078        );
1079
1080        let config = SchedulerDecoderConfig {
1081            batch_size,
1082            cache,
1083            decoder_plugins,
1084            io,
1085            decoder_config,
1086            batch_size_bytes,
1087        };
1088
1089        let requested_rows = RequestedRows::Indices(indices);
1090
1091        schedule_and_decode(
1092            column_infos,
1093            requested_rows,
1094            filter,
1095            projection.column_indices,
1096            projection.schema,
1097            config,
1098        )
1099        .await
1100    }
1101
1102    async fn take_rows(
1103        &self,
1104        indices: Vec<u64>,
1105        batch_size: u32,
1106        projection: ReaderProjection,
1107    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1108        // Create and initialize the stream
1109        Self::do_take_rows(
1110            self.collect_columns_from_projection(&projection)?,
1111            self.scheduler.clone(),
1112            self.cache.clone(),
1113            self.decoder_plugins.clone(),
1114            indices,
1115            batch_size,
1116            projection,
1117            FilterExpression::no_filter(),
1118            self.options.decoder_config.clone(),
1119            self.options.batch_size_bytes,
1120        )
1121        .await
1122    }
1123
1124    #[allow(clippy::too_many_arguments)]
1125    async fn do_read_ranges(
1126        column_infos: Vec<Arc<ColumnInfo>>,
1127        io: Arc<dyn EncodingsIo>,
1128        cache: Arc<LanceCache>,
1129        decoder_plugins: Arc<DecoderPlugins>,
1130        ranges: Vec<Range<u64>>,
1131        batch_size: u32,
1132        projection: ReaderProjection,
1133        filter: FilterExpression,
1134        decoder_config: DecoderConfig,
1135        batch_size_bytes: Option<u64>,
1136    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1137        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1138        debug!(
1139            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1140            ranges.len(),
1141            num_rows,
1142            ranges[0].start,
1143            ranges[ranges.len() - 1].end,
1144            batch_size,
1145            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1146        );
1147
1148        let config = SchedulerDecoderConfig {
1149            batch_size,
1150            cache,
1151            decoder_plugins,
1152            io,
1153            decoder_config,
1154            batch_size_bytes,
1155        };
1156
1157        let requested_rows = RequestedRows::Ranges(ranges);
1158
1159        schedule_and_decode(
1160            column_infos,
1161            requested_rows,
1162            filter,
1163            projection.column_indices,
1164            projection.schema,
1165            config,
1166        )
1167        .await
1168    }
1169
1170    async fn read_ranges(
1171        &self,
1172        ranges: Vec<Range<u64>>,
1173        batch_size: u32,
1174        projection: ReaderProjection,
1175        filter: FilterExpression,
1176    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1177        Self::do_read_ranges(
1178            self.collect_columns_from_projection(&projection)?,
1179            self.scheduler.clone(),
1180            self.cache.clone(),
1181            self.decoder_plugins.clone(),
1182            ranges,
1183            batch_size,
1184            projection,
1185            filter,
1186            self.options.decoder_config.clone(),
1187            self.options.batch_size_bytes,
1188        )
1189        .await
1190    }
1191
1192    /// Creates a stream of "read tasks" to read the data from the file
1193    ///
1194    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1195    /// of record batches it returns a stream of "read tasks".
1196    ///
1197    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1198    ///
1199    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1200    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1201    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1202    ///
1203    /// # Why is this async?
1204    ///
1205    /// Constructing the read stream requires running the decode scheduler's
1206    /// `initialize` step, which performs the metadata I/O (chunk metadata,
1207    /// dictionaries, repetition index, ...) needed to plan the read.  We
1208    /// drive that I/O on the awaiting task rather than smuggling it into
1209    /// the stream's first poll.  This way callers control where the
1210    /// scheduling I/O runs (typically inside a per-fragment
1211    /// `tokio::spawn`), planning errors surface from the await instead of
1212    /// from the first stream item, and small reads can also complete the
1213    /// synchronous scheduling step before returning (see
1214    /// [`DecoderConfig::inline_scheduling`]).
1215    pub async fn read_tasks(
1216        &self,
1217        params: ReadBatchParams,
1218        batch_size: u32,
1219        projection: Option<ReaderProjection>,
1220        filter: FilterExpression,
1221    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1222        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1223        Self::validate_projection(&projection, &self.metadata)?;
1224        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1225            if bound > self.num_rows || bound == self.num_rows && inclusive {
1226                Err(Error::invalid_input(format!(
1227                    "cannot read {:?} from file with {} rows",
1228                    params, self.num_rows
1229                )))
1230            } else {
1231                Ok(())
1232            }
1233        };
1234        match &params {
1235            ReadBatchParams::Indices(indices) => {
1236                for idx in indices {
1237                    match idx {
1238                        None => {
1239                            return Err(Error::invalid_input("Null value in indices array"));
1240                        }
1241                        Some(idx) => {
1242                            verify_bound(&params, idx as u64, true)?;
1243                        }
1244                    }
1245                }
1246                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1247                self.take_rows(indices, batch_size, projection).await
1248            }
1249            ReadBatchParams::Range(range) => {
1250                verify_bound(&params, range.end as u64, false)?;
1251                self.read_range(
1252                    range.start as u64..range.end as u64,
1253                    batch_size,
1254                    projection,
1255                    filter,
1256                )
1257                .await
1258            }
1259            ReadBatchParams::Ranges(ranges) => {
1260                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1261                for range in ranges.as_ref() {
1262                    verify_bound(&params, range.end, false)?;
1263                    ranges_u64.push(range.start..range.end);
1264                }
1265                self.read_ranges(ranges_u64, batch_size, projection, filter)
1266                    .await
1267            }
1268            ReadBatchParams::RangeFrom(range) => {
1269                verify_bound(&params, range.start as u64, true)?;
1270                self.read_range(
1271                    range.start as u64..self.num_rows,
1272                    batch_size,
1273                    projection,
1274                    filter,
1275                )
1276                .await
1277            }
1278            ReadBatchParams::RangeTo(range) => {
1279                verify_bound(&params, range.end as u64, false)?;
1280                self.read_range(0..range.end as u64, batch_size, projection, filter)
1281                    .await
1282            }
1283            ReadBatchParams::RangeFull => {
1284                self.read_range(0..self.num_rows, batch_size, projection, filter)
1285                    .await
1286            }
1287        }
1288    }
1289
1290    /// Reads data from the file as a stream of record batches
1291    ///
1292    /// * `params` - Specifies the range (or indices) of data to read
1293    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1294    ///   if it is the last batch or if it is not possible to create a batch of the
1295    ///   requested size.
1296    ///
1297    ///   For example, if the batch size is 1024 and one of the columns is a string
1298    ///   column then there may be some ranges of 1024 rows that contain more than
1299    ///   2^31 bytes of string data (which is the maximum size of a string column
1300    ///   in Arrow).  In this case smaller batches may be emitted.
1301    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1302    ///   amount of CPU parallelism of the read.  In other words it controls how many
1303    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1304    ///   of the read (how many I/O requests are in flight at once).
1305    ///
1306    ///   This parameter also is also related to backpressure.  If the consumer of the
1307    ///   stream is slow then the reader will build up RAM.
1308    /// * `projection` - A projection to apply to the read.  This controls which columns
1309    ///   are read from the file.  The projection is NOT applied on top of the base
1310    ///   projection.  The projection is applied directly to the file schema.
1311    ///
1312    /// # Why is this async?
1313    ///
1314    /// This delegates to [`Self::read_tasks`], which awaits the decode
1315    /// scheduler's `initialize` step (and, for small reads, the synchronous
1316    /// scheduling that follows) before returning.  See `read_tasks` for
1317    /// details on why this work is performed up front rather than on the
1318    /// stream's first poll.
1319    pub async fn read_stream_projected(
1320        &self,
1321        params: ReadBatchParams,
1322        batch_size: u32,
1323        batch_readahead: u32,
1324        projection: ReaderProjection,
1325        filter: FilterExpression,
1326    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1327        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1328        let tasks_stream = self
1329            .read_tasks(params, batch_size, Some(projection), filter)
1330            .await?;
1331        let batch_stream = tasks_stream
1332            .map(|task| task.task)
1333            .buffered(batch_readahead as usize)
1334            .boxed();
1335        Ok(Box::pin(RecordBatchStreamAdapter::new(
1336            arrow_schema,
1337            batch_stream,
1338        )))
1339    }
1340
1341    fn take_rows_blocking(
1342        &self,
1343        indices: Vec<u64>,
1344        batch_size: u32,
1345        projection: ReaderProjection,
1346        filter: FilterExpression,
1347    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1348        let column_infos = self.collect_columns_from_projection(&projection)?;
1349        debug!(
1350            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1351            indices.len(),
1352            indices[0],
1353            indices[indices.len() - 1],
1354            batch_size,
1355            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1356        );
1357
1358        let config = SchedulerDecoderConfig {
1359            batch_size,
1360            cache: self.cache.clone(),
1361            decoder_plugins: self.decoder_plugins.clone(),
1362            io: self.scheduler.clone(),
1363            decoder_config: self.options.decoder_config.clone(),
1364            batch_size_bytes: self.options.batch_size_bytes,
1365        };
1366
1367        let requested_rows = RequestedRows::Indices(indices);
1368
1369        schedule_and_decode_blocking(
1370            column_infos,
1371            requested_rows,
1372            filter,
1373            projection.column_indices,
1374            projection.schema,
1375            config,
1376        )
1377    }
1378
1379    fn read_ranges_blocking(
1380        &self,
1381        ranges: Vec<Range<u64>>,
1382        batch_size: u32,
1383        projection: ReaderProjection,
1384        filter: FilterExpression,
1385    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1386        let column_infos = self.collect_columns_from_projection(&projection)?;
1387        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1388        debug!(
1389            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1390            ranges.len(),
1391            num_rows,
1392            ranges[0].start,
1393            ranges[ranges.len() - 1].end,
1394            batch_size,
1395            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1396        );
1397
1398        let config = SchedulerDecoderConfig {
1399            batch_size,
1400            cache: self.cache.clone(),
1401            decoder_plugins: self.decoder_plugins.clone(),
1402            io: self.scheduler.clone(),
1403            decoder_config: self.options.decoder_config.clone(),
1404            batch_size_bytes: self.options.batch_size_bytes,
1405        };
1406
1407        let requested_rows = RequestedRows::Ranges(ranges);
1408
1409        schedule_and_decode_blocking(
1410            column_infos,
1411            requested_rows,
1412            filter,
1413            projection.column_indices,
1414            projection.schema,
1415            config,
1416        )
1417    }
1418
1419    fn read_range_blocking(
1420        &self,
1421        range: Range<u64>,
1422        batch_size: u32,
1423        projection: ReaderProjection,
1424        filter: FilterExpression,
1425    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1426        let column_infos = self.collect_columns_from_projection(&projection)?;
1427        let num_rows = self.num_rows;
1428
1429        debug!(
1430            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1431            range,
1432            batch_size,
1433            num_rows,
1434            column_infos.len(),
1435            projection.schema.fields.len(),
1436        );
1437
1438        let config = SchedulerDecoderConfig {
1439            batch_size,
1440            cache: self.cache.clone(),
1441            decoder_plugins: self.decoder_plugins.clone(),
1442            io: self.scheduler.clone(),
1443            decoder_config: self.options.decoder_config.clone(),
1444            batch_size_bytes: self.options.batch_size_bytes,
1445        };
1446
1447        let requested_rows = RequestedRows::Ranges(vec![range]);
1448
1449        schedule_and_decode_blocking(
1450            column_infos,
1451            requested_rows,
1452            filter,
1453            projection.column_indices,
1454            projection.schema,
1455            config,
1456        )
1457    }
1458
1459    /// Read data from the file as an iterator of record batches
1460    ///
1461    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1462    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1463    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1464    ///
1465    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1466    /// use this method) because we can parallelize the decode.
1467    ///
1468    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1469    /// from a spawn_blocking context.
1470    pub fn read_stream_projected_blocking(
1471        &self,
1472        params: ReadBatchParams,
1473        batch_size: u32,
1474        projection: Option<ReaderProjection>,
1475        filter: FilterExpression,
1476    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1477        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1478        Self::validate_projection(&projection, &self.metadata)?;
1479        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1480            if bound > self.num_rows || bound == self.num_rows && inclusive {
1481                Err(Error::invalid_input(format!(
1482                    "cannot read {:?} from file with {} rows",
1483                    params, self.num_rows
1484                )))
1485            } else {
1486                Ok(())
1487            }
1488        };
1489        match &params {
1490            ReadBatchParams::Indices(indices) => {
1491                for idx in indices {
1492                    match idx {
1493                        None => {
1494                            return Err(Error::invalid_input("Null value in indices array"));
1495                        }
1496                        Some(idx) => {
1497                            verify_bound(&params, idx as u64, true)?;
1498                        }
1499                    }
1500                }
1501                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1502                self.take_rows_blocking(indices, batch_size, projection, filter)
1503            }
1504            ReadBatchParams::Range(range) => {
1505                verify_bound(&params, range.end as u64, false)?;
1506                self.read_range_blocking(
1507                    range.start as u64..range.end as u64,
1508                    batch_size,
1509                    projection,
1510                    filter,
1511                )
1512            }
1513            ReadBatchParams::Ranges(ranges) => {
1514                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1515                for range in ranges.as_ref() {
1516                    verify_bound(&params, range.end, false)?;
1517                    ranges_u64.push(range.start..range.end);
1518                }
1519                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1520            }
1521            ReadBatchParams::RangeFrom(range) => {
1522                verify_bound(&params, range.start as u64, true)?;
1523                self.read_range_blocking(
1524                    range.start as u64..self.num_rows,
1525                    batch_size,
1526                    projection,
1527                    filter,
1528                )
1529            }
1530            ReadBatchParams::RangeTo(range) => {
1531                verify_bound(&params, range.end as u64, false)?;
1532                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1533            }
1534            ReadBatchParams::RangeFull => {
1535                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1536            }
1537        }
1538    }
1539
1540    /// Reads data from the file as a stream of record batches
1541    ///
1542    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1543    /// provided when the file was opened (or reads all columns if the file was
1544    /// opened without a base projection)
1545    ///
1546    /// # Why is this async?
1547    ///
1548    /// This delegates to [`Self::read_stream_projected`], which awaits the
1549    /// decode scheduler's `initialize` step before returning the stream.
1550    /// See [`Self::read_tasks`] for the rationale.
1551    pub async fn read_stream(
1552        &self,
1553        params: ReadBatchParams,
1554        batch_size: u32,
1555        batch_readahead: u32,
1556        filter: FilterExpression,
1557    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1558        self.read_stream_projected(
1559            params,
1560            batch_size,
1561            batch_readahead,
1562            self.base_projection.clone(),
1563            filter,
1564        )
1565        .await
1566    }
1567
1568    pub fn schema(&self) -> &Arc<Schema> {
1569        &self.metadata.file_schema
1570    }
1571}
1572
1573/// Inspects a page and returns a String describing the page's encoding
1574pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1575    if let Some(encoding) = &page.encoding {
1576        if let Some(style) = &encoding.location {
1577            match style {
1578                pbfile::encoding::Location::Indirect(indirect) => {
1579                    format!(
1580                        "IndirectEncoding(pos={},size={})",
1581                        indirect.buffer_location, indirect.buffer_length
1582                    )
1583                }
1584                pbfile::encoding::Location::Direct(direct) => {
1585                    let encoding_any =
1586                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1587                            .expect("failed to deserialize encoding as protobuf");
1588                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1589                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1590                        match encoding {
1591                            Ok(encoding) => {
1592                                format!("{:#?}", encoding)
1593                            }
1594                            Err(err) => {
1595                                format!("Unsupported(decode_err={})", err)
1596                            }
1597                        }
1598                    } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1599                        let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1600                        match encoding {
1601                            Ok(encoding) => {
1602                                format!("{:#?}", encoding)
1603                            }
1604                            Err(err) => {
1605                                format!("Unsupported(decode_err={})", err)
1606                            }
1607                        }
1608                    } else {
1609                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1610                    }
1611                }
1612                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1613            }
1614        } else {
1615            "MISSING STYLE".to_string()
1616        }
1617    } else {
1618        "MISSING".to_string()
1619    }
1620}
1621
1622pub trait EncodedBatchReaderExt {
1623    fn try_from_mini_lance(
1624        bytes: Bytes,
1625        schema: &Schema,
1626        version: LanceFileVersion,
1627    ) -> Result<Self>
1628    where
1629        Self: Sized;
1630    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1631    where
1632        Self: Sized;
1633}
1634
1635impl EncodedBatchReaderExt for EncodedBatch {
1636    fn try_from_mini_lance(
1637        bytes: Bytes,
1638        schema: &Schema,
1639        file_version: LanceFileVersion,
1640    ) -> Result<Self>
1641    where
1642        Self: Sized,
1643    {
1644        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1645        let footer = FileReader::decode_footer(&bytes)?;
1646
1647        // Next, read the metadata for the columns
1648        // This is both the column metadata and the CMO table
1649        let column_metadata_start = footer.column_meta_start as usize;
1650        let column_metadata_end = footer.global_buff_offsets_start as usize;
1651        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1652        let column_metadatas =
1653            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1654
1655        let file_version = LanceFileVersion::try_from_major_minor(
1656            footer.major_version as u32,
1657            footer.minor_version as u32,
1658        )?;
1659
1660        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1661
1662        Ok(Self {
1663            data: bytes,
1664            num_rows: page_table
1665                .first()
1666                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1667                .unwrap_or(0),
1668            page_table,
1669            top_level_columns: projection.column_indices,
1670            schema: Arc::new(schema.clone()),
1671        })
1672    }
1673
1674    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1675    where
1676        Self: Sized,
1677    {
1678        let footer = FileReader::decode_footer(&bytes)?;
1679        let file_version = LanceFileVersion::try_from_major_minor(
1680            footer.major_version as u32,
1681            footer.minor_version as u32,
1682        )?;
1683
1684        let gbo_table = FileReader::do_decode_gbo_table(
1685            &bytes.slice(footer.global_buff_offsets_start as usize..),
1686            &footer,
1687            file_version,
1688        )?;
1689        if gbo_table.is_empty() {
1690            return Err(Error::internal(
1691                "File did not contain any global buffers, schema expected".to_string(),
1692            ));
1693        }
1694        let schema_start = gbo_table[0].position as usize;
1695        let schema_size = gbo_table[0].size as usize;
1696
1697        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1698        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1699        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1700
1701        // Next, read the metadata for the columns
1702        // This is both the column metadata and the CMO table
1703        let column_metadata_start = footer.column_meta_start as usize;
1704        let column_metadata_end = footer.global_buff_offsets_start as usize;
1705        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1706        let column_metadatas =
1707            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1708
1709        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1710
1711        Ok(Self {
1712            data: bytes,
1713            num_rows: page_table
1714                .first()
1715                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1716                .unwrap_or(0),
1717            page_table,
1718            top_level_columns: projection.column_indices,
1719            schema: Arc::new(schema),
1720        })
1721    }
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1727
1728    use arrow_array::{
1729        RecordBatch, UInt32Array,
1730        types::{Float64Type, Int32Type},
1731    };
1732    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1733    use bytes::Bytes;
1734    use futures::{StreamExt, prelude::stream::TryStreamExt};
1735    use lance_arrow::RecordBatchExt;
1736    use lance_core::{ArrowResult, datatypes::Schema};
1737    use lance_datagen::{BatchCount, ByteCount, RowCount, array, gen_batch};
1738    use lance_encoding::{
1739        decoder::{DecodeBatchScheduler, DecoderPlugins, FilterExpression, decode_batch},
1740        encoder::{EncodedBatch, EncodingOptions, default_encoding_strategy, encode_batch},
1741        version::LanceFileVersion,
1742    };
1743    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1744    use log::debug;
1745    use rstest::rstest;
1746    use tokio::sync::mpsc;
1747
1748    use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1749    use crate::testing::{FsFixture, WrittenFile, test_cache, write_lance_file};
1750    use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1751    use lance_encoding::decoder::DecoderConfig;
1752
1753    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1754        let location_type = DataType::Struct(Fields::from(vec![
1755            Field::new("x", DataType::Float64, true),
1756            Field::new("y", DataType::Float64, true),
1757        ]));
1758        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1759
1760        let mut reader = gen_batch()
1761            .col("score", array::rand::<Float64Type>())
1762            .col("location", array::rand_type(&location_type))
1763            .col("categories", array::rand_type(&categories_type))
1764            .col("binary", array::rand_type(&DataType::Binary));
1765        if version <= LanceFileVersion::V2_0 {
1766            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1767        }
1768        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1769
1770        write_lance_file(
1771            reader,
1772            fs,
1773            FileWriterOptions {
1774                format_version: Some(version),
1775                ..Default::default()
1776            },
1777        )
1778        .await
1779    }
1780
1781    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1782
1783    async fn verify_expected(
1784        expected: &[RecordBatch],
1785        mut actual: Pin<Box<dyn RecordBatchStream>>,
1786        read_size: u32,
1787        transform: Option<Transformer>,
1788    ) {
1789        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1790        let mut expected_iter = expected.iter().map(|batch| {
1791            if let Some(transform) = &transform {
1792                transform(batch)
1793            } else {
1794                batch.clone()
1795            }
1796        });
1797        let mut next_expected = expected_iter.next().unwrap().clone();
1798        while let Some(actual) = actual.next().await {
1799            let mut actual = actual.unwrap();
1800            let mut rows_to_verify = actual.num_rows() as u32;
1801            let expected_length = remaining.min(read_size);
1802            assert_eq!(expected_length, rows_to_verify);
1803
1804            while rows_to_verify > 0 {
1805                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1806                assert_eq!(
1807                    next_expected.slice(0, next_slice_len as usize),
1808                    actual.slice(0, next_slice_len as usize)
1809                );
1810                remaining -= next_slice_len;
1811                rows_to_verify -= next_slice_len;
1812                if remaining > 0 {
1813                    if next_slice_len == next_expected.num_rows() as u32 {
1814                        next_expected = expected_iter.next().unwrap().clone();
1815                    } else {
1816                        next_expected = next_expected.slice(
1817                            next_slice_len as usize,
1818                            next_expected.num_rows() - next_slice_len as usize,
1819                        );
1820                    }
1821                }
1822                if rows_to_verify > 0 {
1823                    actual = actual.slice(
1824                        next_slice_len as usize,
1825                        actual.num_rows() - next_slice_len as usize,
1826                    );
1827                }
1828            }
1829        }
1830        assert_eq!(remaining, 0);
1831    }
1832
1833    #[tokio::test]
1834    async fn test_round_trip() {
1835        let fs = FsFixture::default();
1836
1837        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1838
1839        for read_size in [32, 1024, 1024 * 1024] {
1840            let file_scheduler = fs
1841                .scheduler
1842                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1843                .await
1844                .unwrap();
1845            let file_reader = FileReader::try_open(
1846                file_scheduler,
1847                None,
1848                Arc::<DecoderPlugins>::default(),
1849                &test_cache(),
1850                FileReaderOptions::default(),
1851            )
1852            .await
1853            .unwrap();
1854
1855            let schema = file_reader.schema();
1856            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1857
1858            let batch_stream = file_reader
1859                .read_stream(
1860                    lance_io::ReadBatchParams::RangeFull,
1861                    read_size,
1862                    16,
1863                    FilterExpression::no_filter(),
1864                )
1865                .await
1866                .unwrap();
1867
1868            verify_expected(&data, batch_stream, read_size, None).await;
1869        }
1870    }
1871
1872    #[rstest]
1873    #[test_log::test(tokio::test)]
1874    async fn test_encoded_batch_round_trip(
1875        // TODO: Add V2_1 (currently fails)
1876        #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1877    ) {
1878        let data = gen_batch()
1879            .col("x", array::rand::<Int32Type>())
1880            .col("y", array::rand_utf8(ByteCount::from(16), false))
1881            .into_batch_rows(RowCount::from(10000))
1882            .unwrap();
1883
1884        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1885
1886        let encoding_options = EncodingOptions {
1887            cache_bytes_per_column: 4096,
1888            max_page_bytes: 32 * 1024 * 1024,
1889            keep_original_array: true,
1890            buffer_alignment: 64,
1891            version,
1892        };
1893
1894        let encoding_strategy = default_encoding_strategy(version);
1895
1896        let encoded_batch = encode_batch(
1897            &data,
1898            lance_schema.clone(),
1899            encoding_strategy.as_ref(),
1900            &encoding_options,
1901        )
1902        .await
1903        .unwrap();
1904
1905        // Test self described
1906        let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1907
1908        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1909
1910        let decoded = decode_batch(
1911            &decoded_batch,
1912            &FilterExpression::no_filter(),
1913            Arc::<DecoderPlugins>::default(),
1914            false,
1915            version,
1916            None,
1917        )
1918        .await
1919        .unwrap();
1920
1921        assert_eq!(data, decoded);
1922
1923        // Test mini
1924        let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1925        let decoded_batch =
1926            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1927                .unwrap();
1928        let decoded = decode_batch(
1929            &decoded_batch,
1930            &FilterExpression::no_filter(),
1931            Arc::<DecoderPlugins>::default(),
1932            false,
1933            version,
1934            None,
1935        )
1936        .await
1937        .unwrap();
1938
1939        assert_eq!(data, decoded);
1940    }
1941
1942    #[rstest]
1943    #[test_log::test(tokio::test)]
1944    async fn test_projection(
1945        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
1946        version: LanceFileVersion,
1947    ) {
1948        let fs = FsFixture::default();
1949
1950        let written_file = create_some_file(&fs, version).await;
1951        let file_scheduler = fs
1952            .scheduler
1953            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1954            .await
1955            .unwrap();
1956
1957        let field_id_mapping = written_file
1958            .field_id_mapping
1959            .iter()
1960            .copied()
1961            .collect::<BTreeMap<_, _>>();
1962
1963        let empty_projection = ReaderProjection {
1964            column_indices: Vec::default(),
1965            schema: Arc::new(Schema::default()),
1966        };
1967
1968        for columns in [
1969            vec!["score"],
1970            vec!["location"],
1971            vec!["categories"],
1972            vec!["score.x"],
1973            vec!["score", "categories"],
1974            vec!["score", "location"],
1975            vec!["location", "categories"],
1976            vec!["score.y", "location", "categories"],
1977        ] {
1978            debug!("Testing round trip with projection {:?}", columns);
1979            for use_field_ids in [true, false] {
1980                // We can specify the projection as part of the read operation via read_stream_projected
1981                let file_reader = FileReader::try_open(
1982                    file_scheduler.clone(),
1983                    None,
1984                    Arc::<DecoderPlugins>::default(),
1985                    &test_cache(),
1986                    FileReaderOptions::default(),
1987                )
1988                .await
1989                .unwrap();
1990
1991                let projected_schema = written_file.schema.project(&columns).unwrap();
1992                let projection = if use_field_ids {
1993                    ReaderProjection::from_field_ids(
1994                        file_reader.metadata.version(),
1995                        &projected_schema,
1996                        &field_id_mapping,
1997                    )
1998                    .unwrap()
1999                } else {
2000                    ReaderProjection::from_column_names(
2001                        file_reader.metadata.version(),
2002                        &written_file.schema,
2003                        &columns,
2004                    )
2005                    .unwrap()
2006                };
2007
2008                let batch_stream = file_reader
2009                    .read_stream_projected(
2010                        lance_io::ReadBatchParams::RangeFull,
2011                        1024,
2012                        16,
2013                        projection.clone(),
2014                        FilterExpression::no_filter(),
2015                    )
2016                    .await
2017                    .unwrap();
2018
2019                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
2020                verify_expected(
2021                    &written_file.data,
2022                    batch_stream,
2023                    1024,
2024                    Some(Box::new(move |batch: &RecordBatch| {
2025                        batch.project_by_schema(&projection_arrow).unwrap()
2026                    })),
2027                )
2028                .await;
2029
2030                // We can also specify the projection as a base projection when we open the file
2031                let file_reader = FileReader::try_open(
2032                    file_scheduler.clone(),
2033                    Some(projection.clone()),
2034                    Arc::<DecoderPlugins>::default(),
2035                    &test_cache(),
2036                    FileReaderOptions::default(),
2037                )
2038                .await
2039                .unwrap();
2040
2041                let batch_stream = file_reader
2042                    .read_stream(
2043                        lance_io::ReadBatchParams::RangeFull,
2044                        1024,
2045                        16,
2046                        FilterExpression::no_filter(),
2047                    )
2048                    .await
2049                    .unwrap();
2050
2051                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
2052                verify_expected(
2053                    &written_file.data,
2054                    batch_stream,
2055                    1024,
2056                    Some(Box::new(move |batch: &RecordBatch| {
2057                        batch.project_by_schema(&projection_arrow).unwrap()
2058                    })),
2059                )
2060                .await;
2061
2062                assert!(
2063                    file_reader
2064                        .read_stream_projected(
2065                            lance_io::ReadBatchParams::RangeFull,
2066                            1024,
2067                            16,
2068                            empty_projection.clone(),
2069                            FilterExpression::no_filter(),
2070                        )
2071                        .await
2072                        .is_err()
2073                );
2074            }
2075        }
2076
2077        assert!(
2078            FileReader::try_open(
2079                file_scheduler.clone(),
2080                Some(empty_projection),
2081                Arc::<DecoderPlugins>::default(),
2082                &test_cache(),
2083                FileReaderOptions::default(),
2084            )
2085            .await
2086            .is_err()
2087        );
2088
2089        let arrow_schema = ArrowSchema::new(vec![
2090            Field::new("x", DataType::Int32, true),
2091            Field::new("y", DataType::Int32, true),
2092        ]);
2093        let schema = Schema::try_from(&arrow_schema).unwrap();
2094
2095        let projection_with_dupes = ReaderProjection {
2096            column_indices: vec![0, 0],
2097            schema: Arc::new(schema),
2098        };
2099
2100        assert!(
2101            FileReader::try_open(
2102                file_scheduler.clone(),
2103                Some(projection_with_dupes),
2104                Arc::<DecoderPlugins>::default(),
2105                &test_cache(),
2106                FileReaderOptions::default(),
2107            )
2108            .await
2109            .is_err()
2110        );
2111    }
2112
2113    #[test_log::test(tokio::test)]
2114    async fn test_compressing_buffer() {
2115        let fs = FsFixture::default();
2116
2117        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2118        let file_scheduler = fs
2119            .scheduler
2120            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2121            .await
2122            .unwrap();
2123
2124        // We can specify the projection as part of the read operation via read_stream_projected
2125        let file_reader = FileReader::try_open(
2126            file_scheduler.clone(),
2127            None,
2128            Arc::<DecoderPlugins>::default(),
2129            &test_cache(),
2130            FileReaderOptions::default(),
2131        )
2132        .await
2133        .unwrap();
2134
2135        let mut projection = written_file.schema.project(&["score"]).unwrap();
2136        for field in projection.fields.iter_mut() {
2137            field
2138                .metadata
2139                .insert("lance:compression".to_string(), "zstd".to_string());
2140        }
2141        let projection = ReaderProjection {
2142            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
2143            schema: Arc::new(projection),
2144        };
2145
2146        let batch_stream = file_reader
2147            .read_stream_projected(
2148                lance_io::ReadBatchParams::RangeFull,
2149                1024,
2150                16,
2151                projection.clone(),
2152                FilterExpression::no_filter(),
2153            )
2154            .await
2155            .unwrap();
2156
2157        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
2158        verify_expected(
2159            &written_file.data,
2160            batch_stream,
2161            1024,
2162            Some(Box::new(move |batch: &RecordBatch| {
2163                batch.project_by_schema(&projection_arrow).unwrap()
2164            })),
2165        )
2166        .await;
2167    }
2168
2169    #[tokio::test]
2170    async fn test_read_all() {
2171        let fs = FsFixture::default();
2172        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2173        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2174
2175        let file_scheduler = fs
2176            .scheduler
2177            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2178            .await
2179            .unwrap();
2180        let file_reader = FileReader::try_open(
2181            file_scheduler.clone(),
2182            None,
2183            Arc::<DecoderPlugins>::default(),
2184            &test_cache(),
2185            FileReaderOptions::default(),
2186        )
2187        .await
2188        .unwrap();
2189
2190        let batches = file_reader
2191            .read_stream(
2192                lance_io::ReadBatchParams::RangeFull,
2193                total_rows as u32,
2194                16,
2195                FilterExpression::no_filter(),
2196            )
2197            .await
2198            .unwrap()
2199            .try_collect::<Vec<_>>()
2200            .await
2201            .unwrap();
2202        assert_eq!(batches.len(), 1);
2203        assert_eq!(batches[0].num_rows(), total_rows);
2204    }
2205
2206    #[rstest]
2207    #[tokio::test]
2208    async fn test_blocking_take(
2209        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
2210        version: LanceFileVersion,
2211    ) {
2212        let fs = FsFixture::default();
2213        let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2214        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2215
2216        let file_scheduler = fs
2217            .scheduler
2218            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2219            .await
2220            .unwrap();
2221        let file_reader = FileReader::try_open(
2222            file_scheduler.clone(),
2223            Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2224            Arc::<DecoderPlugins>::default(),
2225            &test_cache(),
2226            FileReaderOptions::default(),
2227        )
2228        .await
2229        .unwrap();
2230
2231        let batches = tokio::task::spawn_blocking(move || {
2232            file_reader
2233                .read_stream_projected_blocking(
2234                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2235                    total_rows as u32,
2236                    None,
2237                    FilterExpression::no_filter(),
2238                )
2239                .unwrap()
2240                .collect::<ArrowResult<Vec<_>>>()
2241                .unwrap()
2242        })
2243        .await
2244        .unwrap();
2245
2246        assert_eq!(batches.len(), 1);
2247        assert_eq!(batches[0].num_rows(), 5);
2248        assert_eq!(batches[0].num_columns(), 1);
2249    }
2250
2251    #[tokio::test(flavor = "multi_thread")]
2252    async fn test_drop_in_progress() {
2253        let fs = FsFixture::default();
2254        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2255        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2256
2257        let file_scheduler = fs
2258            .scheduler
2259            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2260            .await
2261            .unwrap();
2262        let file_reader = FileReader::try_open(
2263            file_scheduler.clone(),
2264            None,
2265            Arc::<DecoderPlugins>::default(),
2266            &test_cache(),
2267            FileReaderOptions::default(),
2268        )
2269        .await
2270        .unwrap();
2271
2272        let mut batches = file_reader
2273            .read_stream(
2274                lance_io::ReadBatchParams::RangeFull,
2275                (total_rows / 10) as u32,
2276                16,
2277                FilterExpression::no_filter(),
2278            )
2279            .await
2280            .unwrap();
2281
2282        drop(file_reader);
2283
2284        let batch = batches.next().await.unwrap().unwrap();
2285        assert!(batch.num_rows() > 0);
2286
2287        // Drop in-progress scan
2288        drop(batches);
2289    }
2290
2291    #[tokio::test]
2292    async fn drop_while_scheduling() {
2293        // This is a bit of a white-box test, pokes at the internals.  We want to
2294        // test the case where the read stream is dropped before the scheduling
2295        // thread finishes.  We can't do that in a black-box fashion because the
2296        // scheduling thread runs in the background and there is no easy way to
2297        // pause / gate it.
2298
2299        // It's a regression for a bug where the scheduling thread would panic
2300        // if the stream was dropped before it finished.
2301
2302        let fs = FsFixture::default();
2303        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2304        let total_rows = written_file
2305            .data
2306            .iter()
2307            .map(|batch| batch.num_rows())
2308            .sum::<usize>();
2309
2310        let file_scheduler = fs
2311            .scheduler
2312            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2313            .await
2314            .unwrap();
2315        let file_reader = FileReader::try_open(
2316            file_scheduler.clone(),
2317            None,
2318            Arc::<DecoderPlugins>::default(),
2319            &test_cache(),
2320            FileReaderOptions::default(),
2321        )
2322        .await
2323        .unwrap();
2324
2325        let projection =
2326            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2327        let column_infos = file_reader
2328            .collect_columns_from_projection(&projection)
2329            .unwrap();
2330        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2331            &projection.schema,
2332            &projection.column_indices,
2333            &column_infos,
2334            &vec![],
2335            total_rows as u64,
2336            Arc::<DecoderPlugins>::default(),
2337            file_reader.scheduler.clone(),
2338            test_cache(),
2339            &FilterExpression::no_filter(),
2340            &DecoderConfig::default(),
2341        )
2342        .await
2343        .unwrap();
2344
2345        let range = 0..total_rows as u64;
2346
2347        let (tx, rx) = mpsc::unbounded_channel();
2348
2349        // Simulate the stream / decoder being dropped
2350        drop(rx);
2351
2352        // Scheduling should not panic
2353        decode_scheduler.schedule_range(
2354            range,
2355            &FilterExpression::no_filter(),
2356            tx,
2357            file_reader.scheduler.clone(),
2358        )
2359    }
2360
2361    #[tokio::test]
2362    async fn test_read_empty_range() {
2363        let fs = FsFixture::default();
2364        create_some_file(&fs, LanceFileVersion::V2_0).await;
2365
2366        let file_scheduler = fs
2367            .scheduler
2368            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2369            .await
2370            .unwrap();
2371        let file_reader = FileReader::try_open(
2372            file_scheduler.clone(),
2373            None,
2374            Arc::<DecoderPlugins>::default(),
2375            &test_cache(),
2376            FileReaderOptions::default(),
2377        )
2378        .await
2379        .unwrap();
2380
2381        // All ranges empty, no data
2382        let batches = file_reader
2383            .read_stream(
2384                lance_io::ReadBatchParams::Range(0..0),
2385                1024,
2386                16,
2387                FilterExpression::no_filter(),
2388            )
2389            .await
2390            .unwrap()
2391            .try_collect::<Vec<_>>()
2392            .await
2393            .unwrap();
2394
2395        assert_eq!(batches.len(), 0);
2396
2397        // Some ranges empty
2398        let batches = file_reader
2399            .read_stream(
2400                lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2401                1024,
2402                16,
2403                FilterExpression::no_filter(),
2404            )
2405            .await
2406            .unwrap()
2407            .try_collect::<Vec<_>>()
2408            .await
2409            .unwrap();
2410        assert_eq!(batches.len(), 1);
2411    }
2412
2413    async fn write_file_with_global_buffer(fs: &FsFixture, buffer: Bytes) {
2414        let lance_schema =
2415            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2416                "foo",
2417                DataType::Int32,
2418                true,
2419            )]))
2420            .unwrap();
2421
2422        let mut file_writer = FileWriter::try_new(
2423            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2424            lance_schema,
2425            FileWriterOptions::default(),
2426        )
2427        .unwrap();
2428
2429        let buf_index = file_writer.add_global_buffer(buffer).await.unwrap();
2430        assert_eq!(buf_index, 1);
2431
2432        file_writer.finish().await.unwrap();
2433    }
2434
2435    /// A global buffer that fits inside the tail region captured at open is served
2436    /// from memory with no additional I/O.  A buffer larger than that window cannot
2437    /// fit and falls back to a dedicated read.  Both must round-trip correctly.
2438    #[rstest]
2439    #[case::within_tail_window(true)]
2440    #[case::outside_tail_window(false)]
2441    #[tokio::test]
2442    async fn test_read_global_buffer(#[case] within_window: bool) {
2443        let fs = FsFixture::default();
2444
2445        let block_size = fs.object_store.block_size();
2446        let buffer = if within_window {
2447            Bytes::from_static(b"hello")
2448        } else {
2449            Bytes::from(vec![7u8; 2 * block_size])
2450        };
2451        let expected_read_iops = if within_window { 0 } else { 1 };
2452
2453        write_file_with_global_buffer(&fs, buffer.clone()).await;
2454
2455        let file_scheduler = fs
2456            .scheduler
2457            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2458            .await
2459            .unwrap();
2460        let file_reader = FileReader::try_open(
2461            file_scheduler,
2462            None,
2463            Arc::<DecoderPlugins>::default(),
2464            &test_cache(),
2465            FileReaderOptions::default(),
2466        )
2467        .await
2468        .unwrap();
2469
2470        // The user buffer should be retained only when it fits the tail window, and
2471        // the schema (buffer 0) is never retained.
2472        let retained = &file_reader.metadata().retained_global_buffers;
2473        assert!(!retained.contains_key(&0), "schema must not be retained");
2474        assert_eq!(retained.contains_key(&1), within_window);
2475
2476        // Reset the IO counters so we only measure the read_global_buffer call.
2477        fs.object_store.io_stats_incremental();
2478
2479        let buf = file_reader.read_global_buffer(1).await.unwrap();
2480        assert_eq!(buf, buffer);
2481
2482        let stats = fs.object_store.io_stats_incremental();
2483        assert_eq!(stats.read_iops, expected_read_iops);
2484    }
2485
2486    /// A file whose only global buffer is the schema (i.e. a plain data file, the
2487    /// common case) must retain nothing — there is no user buffer to serve.
2488    #[tokio::test]
2489    async fn test_read_global_buffer_no_user_buffers() {
2490        let fs = FsFixture::default();
2491        create_some_file(&fs, LanceFileVersion::V2_1).await;
2492
2493        let file_scheduler = fs
2494            .scheduler
2495            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2496            .await
2497            .unwrap();
2498        let file_reader = FileReader::try_open(
2499            file_scheduler,
2500            None,
2501            Arc::<DecoderPlugins>::default(),
2502            &test_cache(),
2503            FileReaderOptions::default(),
2504        )
2505        .await
2506        .unwrap();
2507
2508        let metadata = file_reader.metadata();
2509        assert_eq!(metadata.file_buffers.len(), 1, "expected only the schema");
2510        assert!(
2511            metadata.retained_global_buffers.is_empty(),
2512            "a file with no user global buffers must retain nothing"
2513        );
2514    }
2515
2516    #[rstest]
2517    #[tokio::test]
2518    async fn test_deep_size_of_includes_column_metadata(
2519        #[values(
2520            LanceFileVersion::V2_0,
2521            LanceFileVersion::V2_1,
2522            LanceFileVersion::V2_2,
2523            LanceFileVersion::V2_3
2524        )]
2525        version: LanceFileVersion,
2526    ) {
2527        // Regression test: CachedFileMetadata::deep_size_of must account for
2528        // column_metadatas and column_infos, otherwise the moka cache weigher
2529        // dramatically underestimates entry sizes and never evicts, causing
2530        // unbounded memory growth on random-access workloads.
2531        use lance_core::deepsize::DeepSizeOf;
2532
2533        let fs = FsFixture::default();
2534        let _written = create_some_file(&fs, version).await;
2535        let cache = test_cache();
2536        let file_scheduler = fs
2537            .scheduler
2538            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2539            .await
2540            .unwrap();
2541        let file_reader = FileReader::try_open(
2542            file_scheduler,
2543            None,
2544            Arc::<DecoderPlugins>::default(),
2545            &cache,
2546            FileReaderOptions::default(),
2547        )
2548        .await
2549        .unwrap();
2550
2551        let metadata = file_reader.metadata();
2552        let deep_size = metadata.deep_size_of();
2553
2554        // The file has multiple columns (score, location, categories, binary,
2555        // maybe large_bin). The deep_size_of must be substantially more than
2556        // just the schema — it should include column_metadatas + column_infos.
2557        // A naive implementation that ignores these fields reports < 1 KB;
2558        // a correct one should report at least several KB for this test file.
2559        assert!(
2560            deep_size > 1024,
2561            "deep_size_of ({deep_size}) is suspiciously small — \
2562             column_metadatas and column_infos may not be accounted for"
2563        );
2564
2565        // Verify column_metadatas is non-empty (sanity check).
2566        assert!(
2567            !metadata.column_metadatas.is_empty(),
2568            "Expected non-empty column_metadatas"
2569        );
2570
2571        // Verify the size scales with the number of columns: a file with more
2572        // columns should have a larger deep_size_of.
2573        let num_columns = metadata.column_metadatas.len();
2574        assert!(
2575            deep_size > num_columns * 50,
2576            "deep_size_of ({deep_size}) should scale with column count ({num_columns})"
2577        );
2578    }
2579
2580    #[tokio::test]
2581    async fn test_read_global_buffer_out_of_range() {
2582        let fs = FsFixture::default();
2583
2584        write_file_with_global_buffer(&fs, Bytes::from_static(b"hello")).await;
2585
2586        let file_scheduler = fs
2587            .scheduler
2588            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2589            .await
2590            .unwrap();
2591        let file_reader = FileReader::try_open(
2592            file_scheduler,
2593            None,
2594            Arc::<DecoderPlugins>::default(),
2595            &test_cache(),
2596            FileReaderOptions::default(),
2597        )
2598        .await
2599        .unwrap();
2600
2601        // The file has two global buffers (schema at 0, "hello" at 1); index 2 is
2602        // out of range and must surface a descriptive error rather than panicking.
2603        let err = file_reader.read_global_buffer(2).await.unwrap_err();
2604        assert!(
2605            matches!(err, lance_core::Error::InvalidInput { .. }),
2606            "expected InvalidInput, got: {err:?}"
2607        );
2608        let msg = err.to_string();
2609        assert!(msg.contains('2'), "error should mention the index: {msg}");
2610    }
2611}