Skip to main content

lance_file/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{Stream, StreamExt, stream::BoxStream};
18use lance_encoding::{
19    EncodingsIo,
20    decoder::{
21        ColumnInfo, DecoderConfig, DecoderPlugins, FilterExpression, PageEncoding, PageInfo,
22        ReadBatchTask, RequestedRows, SchedulerDecoderConfig, schedule_and_decode,
23        schedule_and_decode_blocking,
24    },
25    encoder::EncodedBatch,
26    version::LanceFileVersion,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31
32use lance_core::{
33    Error, Result,
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36};
37use lance_encoding::format::pb as pbenc;
38use lance_encoding::format::pb21 as pbenc21;
39use lance_io::{
40    ReadBatchParams,
41    scheduler::FileScheduler,
42    stream::{RecordBatchStream, RecordBatchStreamAdapter},
43};
44
45use crate::{
46    datatypes::{Fields, FieldsWithMeta},
47    format::{MAGIC, MAJOR_VERSION, MINOR_VERSION, pb, pbfile},
48    io::LanceEncodingsIo,
49    writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52/// Default chunk size for reading large pages (8MiB)
53/// Pages larger than this will be split into multiple chunks during read
54pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
55
56// For now, we don't use global buffers for anything other than schema.  If we
57// use these later we should make them lazily loaded and then cached once loaded.
58//
59// We store their position / length for debugging purposes
60#[derive(Debug, DeepSizeOf)]
61pub struct BufferDescriptor {
62    pub position: u64,
63    pub size: u64,
64}
65
66/// Statistics summarize some of the file metadata for quick summary info
67#[derive(Debug)]
68pub struct FileStatistics {
69    /// Statistics about each of the columns in the file
70    pub columns: Vec<ColumnStatistics>,
71}
72
73/// Summary information describing a column
74#[derive(Debug)]
75pub struct ColumnStatistics {
76    /// The number of pages in the column
77    pub num_pages: usize,
78    /// The total number of data & metadata bytes in the column
79    ///
80    /// This is the compressed on-disk size
81    pub size_bytes: u64,
82}
83
84// TODO: Caching
85#[derive(Debug)]
86pub struct CachedFileMetadata {
87    /// The schema of the file
88    pub file_schema: Arc<Schema>,
89    /// The column metadatas
90    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
91    pub column_infos: Vec<Arc<ColumnInfo>>,
92    /// The number of rows in the file
93    pub num_rows: u64,
94    pub file_buffers: Vec<BufferDescriptor>,
95    /// The number of bytes contained in the data page section of the file
96    pub num_data_bytes: u64,
97    /// The number of bytes contained in the column metadata (not including buffers
98    /// referenced by the metadata)
99    pub num_column_metadata_bytes: u64,
100    /// The number of bytes contained in global buffers
101    pub num_global_buffer_bytes: u64,
102    /// The number of bytes contained in the CMO and GBO tables
103    pub num_footer_bytes: u64,
104    pub major_version: u16,
105    pub minor_version: u16,
106    /// The actual total file size in bytes, as reported by the object store.
107    pub file_size_bytes: u64,
108}
109
110impl CachedFileMetadata {
111    /// Total file size in bytes.
112    pub fn file_size(&self) -> u64 {
113        self.file_size_bytes
114    }
115}
116
117impl DeepSizeOf for CachedFileMetadata {
118    fn deep_size_of_children(&self, context: &mut Context) -> usize {
119        let schema_size = self.file_schema.deep_size_of_children(context);
120
121        let buffers_size: usize = self
122            .file_buffers
123            .iter()
124            .map(|fb| fb.deep_size_of_children(context))
125            .sum();
126
127        // column_metadatas is Vec<pbfile::ColumnMetadata> (protobuf generated,
128        // does not implement DeepSizeOf). We use prost::Message::encoded_len()
129        // as a proxy for in-memory size. The decoded representation is typically
130        // several times larger than the wire format due to heap-allocated
131        // repeated/string/bytes fields, so we apply a 4x multiplier.
132        let column_metadatas_size: usize = self
133            .column_metadatas
134            .iter()
135            .map(|cm| cm.encoded_len() * 4)
136            .sum::<usize>()
137            + std::mem::size_of_val(self.column_metadatas.as_slice());
138
139        // column_infos is Vec<Arc<ColumnInfo>>. Each ColumnInfo contains
140        // page_infos (with protobuf PageEncoding), buffer offsets, and a
141        // column-level ColumnEncoding protobuf.
142        let column_infos_size: usize = self
143            .column_infos
144            .iter()
145            .map(|ci| {
146                let pages_size: usize = ci
147                    .page_infos
148                    .iter()
149                    .map(|pi| {
150                        let enc_size = match &pi.encoding {
151                            lance_encoding::decoder::PageEncoding::Legacy(e) => e.encoded_len() * 4,
152                            lance_encoding::decoder::PageEncoding::Structural(e) => {
153                                e.encoded_len() * 4
154                            }
155                        };
156                        enc_size
157                            + std::mem::size_of_val(pi.buffer_offsets_and_sizes.as_ref())
158                            + std::mem::size_of::<u64>() * 2 // num_rows + priority
159                    })
160                    .sum();
161                pages_size
162                    + std::mem::size_of_val(ci.buffer_offsets_and_sizes.as_ref())
163                    + ci.encoding.encoded_len() * 4
164                    + std::mem::size_of::<u32>() // index
165                    + std::mem::size_of::<usize>() * 2 // Arc overhead
166            })
167            .sum();
168
169        schema_size + buffers_size + column_metadatas_size + column_infos_size
170    }
171}
172
173impl CachedFileMetadata {
174    pub fn version(&self) -> LanceFileVersion {
175        match (self.major_version, self.minor_version) {
176            (0, 3) => LanceFileVersion::V2_0,
177            (2, 0) => LanceFileVersion::V2_0,
178            (2, 1) => LanceFileVersion::V2_1,
179            (2, 2) => LanceFileVersion::V2_2,
180            (2, 3) => LanceFileVersion::V2_3,
181            _ => panic!(
182                "Unsupported version: {}.{}",
183                self.major_version, self.minor_version
184            ),
185        }
186    }
187}
188
189/// Selecting columns from a lance file requires specifying both the
190/// index of the column and the data type of the column
191///
192/// Partly, this is because it is not strictly required that columns
193/// be read into the same type.  For example, a string column may be
194/// read as a string, large_string or string_view type.
195///
196/// A read will only succeed if the decoder for a column is capable
197/// of decoding into the requested type.
198///
199/// Note that this should generally be limited to different in-memory
200/// representations of the same semantic type.  An encoding could
201/// theoretically support "casting" (e.g. int to string, etc.) but
202/// there is little advantage in doing so here.
203///
204/// Note: in order to specify a projection the user will need some way
205/// to figure out the column indices.  In the table format we do this
206/// using field IDs and keeping track of the field id->column index mapping.
207///
208/// If users are not using the table format then they will need to figure
209/// out some way to do this themselves.
210#[derive(Debug, Clone)]
211pub struct ReaderProjection {
212    /// The data types (schema) of the selected columns.  The names
213    /// of the schema are arbitrary and ignored.
214    pub schema: Arc<Schema>,
215    /// The indices of the columns to load.
216    ///
217    /// The content of this vector depends on the file version.
218    ///
219    /// In Lance File Version 2.0 we need ids for structural fields as
220    /// well as leaf fields:
221    ///
222    ///   - Primitive: the index of the column in the schema
223    ///   - List: the index of the list column in the schema
224    ///     followed by the column indices of the children
225    ///   - FixedSizeList (of primitive): the index of the column in the schema
226    ///     (this case is not nested)
227    ///   - FixedSizeList (of non-primitive): not yet implemented
228    ///   - Dictionary: same as primitive
229    ///   - Struct: the index of the struct column in the schema
230    ///     followed by the column indices of the children
231    ///
232    ///   In other words, this should be a DFS listing of the desired schema.
233    ///
234    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
235    /// fields are completely transparent.
236    ///
237    /// For example, if the goal is to load:
238    ///
239    ///   x: int32
240    ///   y: `struct<z: int32, w: string>`
241    ///   z: `list<int32>`
242    ///
243    /// and the schema originally used to store the data was:
244    ///
245    ///   a: `struct<x: int32>`
246    ///   b: int64
247    ///   y: `struct<z: int32, c: int64, w: string>`
248    ///   z: `list<int32>`
249    ///
250    /// Then the column_indices should be:
251    ///
252    /// - 2.0: [1, 3, 4, 6, 7, 8]
253    /// - 2.1: [0, 2, 4, 5]
254    pub column_indices: Vec<u32>,
255}
256
257impl ReaderProjection {
258    fn from_field_ids_helper<'a>(
259        file_version: LanceFileVersion,
260        fields: impl Iterator<Item = &'a Field>,
261        field_id_to_column_index: &BTreeMap<u32, u32>,
262        column_indices: &mut Vec<u32>,
263    ) -> Result<()> {
264        for field in fields {
265            let is_structural = file_version >= LanceFileVersion::V2_1;
266            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
267            // we only need ids for leaf fields.
268            if (!is_structural
269                || field.children.is_empty()
270                || field.is_blob()
271                || field.is_packed_struct())
272                && let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
273            {
274                column_indices.push(column_idx);
275            }
276            // Don't recurse into children if the field is a blob or packed struct in 2.1
277            if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
278                Self::from_field_ids_helper(
279                    file_version,
280                    field.children.iter(),
281                    field_id_to_column_index,
282                    column_indices,
283                )?;
284            }
285        }
286        Ok(())
287    }
288
289    /// Creates a projection using a mapping from field IDs to column indices
290    ///
291    /// You can obtain such a mapping when the file is written using the
292    /// [`crate::writer::FileWriter::field_id_to_column_indices`] method.
293    pub fn from_field_ids(
294        file_version: LanceFileVersion,
295        schema: &Schema,
296        field_id_to_column_index: &BTreeMap<u32, u32>,
297    ) -> Result<Self> {
298        let mut column_indices = Vec::new();
299        Self::from_field_ids_helper(
300            file_version,
301            schema.fields.iter(),
302            field_id_to_column_index,
303            &mut column_indices,
304        )?;
305        let projection = Self {
306            schema: Arc::new(schema.clone()),
307            column_indices,
308        };
309        Ok(projection)
310    }
311
312    /// Creates a projection that reads the entire file
313    ///
314    /// If the schema provided is not the schema of the entire file then
315    /// the projection will be invalid and the read will fail.
316    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
317    /// the whole struct has one column index.
318    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
319    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
320        let schema = Arc::new(schema.clone());
321        let is_structural = version >= LanceFileVersion::V2_1;
322        let mut column_indices = vec![];
323        let mut curr_column_idx = 0;
324        let mut packed_struct_fields_num = 0;
325        for field in schema.fields_pre_order() {
326            if packed_struct_fields_num > 0 {
327                packed_struct_fields_num -= 1;
328                continue;
329            }
330            if field.is_packed_struct() {
331                column_indices.push(curr_column_idx);
332                curr_column_idx += 1;
333                packed_struct_fields_num = field.children.len();
334            } else if field.children.is_empty() || !is_structural {
335                column_indices.push(curr_column_idx);
336                curr_column_idx += 1;
337            }
338        }
339        Self {
340            schema,
341            column_indices,
342        }
343    }
344
345    /// Creates a projection that reads the specified columns provided by name
346    ///
347    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
348    ///
349    /// If the schema provided is not the schema of the entire file then
350    /// the projection will be invalid and the read will fail.
351    pub fn from_column_names(
352        file_version: LanceFileVersion,
353        schema: &Schema,
354        column_names: &[&str],
355    ) -> Result<Self> {
356        let field_id_to_column_index = schema
357            .fields_pre_order()
358            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
359            // we only need ids for leaf fields.
360            .filter(|field| {
361                file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
362            })
363            .enumerate()
364            .map(|(idx, field)| (field.id as u32, idx as u32))
365            .collect::<BTreeMap<_, _>>();
366        let projected = schema.project(column_names)?;
367        let mut column_indices = Vec::new();
368        Self::from_field_ids_helper(
369            file_version,
370            projected.fields.iter(),
371            &field_id_to_column_index,
372            &mut column_indices,
373        )?;
374        Ok(Self {
375            schema: Arc::new(projected),
376            column_indices,
377        })
378    }
379}
380
381/// File Reader Options that can control reading behaviors, such as whether to enable caching on repetition indices
382#[derive(Clone, Debug)]
383pub struct FileReaderOptions {
384    pub decoder_config: DecoderConfig,
385    /// Size of chunks when reading large pages. Pages larger than this
386    /// will be read in multiple chunks to control memory usage.
387    /// Default: 8MB (DEFAULT_READ_CHUNK_SIZE)
388    pub read_chunk_size: u64,
389    /// If set, the reader will produce batches whose total size in bytes
390    /// is approximately this value, overriding the row-based `batch_size`.
391    ///
392    /// This can be set at the dataset level (via `ReadParams::file_reader_options`)
393    /// to provide a default for all scans, or at the scanner level (via
394    /// `Scanner::batch_size_bytes`) to override per scan.
395    pub batch_size_bytes: Option<u64>,
396}
397
398impl Default for FileReaderOptions {
399    fn default() -> Self {
400        Self {
401            decoder_config: DecoderConfig::default(),
402            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
403            batch_size_bytes: None,
404        }
405    }
406}
407
408#[derive(Debug, Clone)]
409pub struct FileReader {
410    scheduler: Arc<dyn EncodingsIo>,
411    // The default projection to be applied to all reads
412    base_projection: ReaderProjection,
413    num_rows: u64,
414    metadata: Arc<CachedFileMetadata>,
415    decoder_plugins: Arc<DecoderPlugins>,
416    cache: Arc<LanceCache>,
417    options: FileReaderOptions,
418}
419#[derive(Debug)]
420struct Footer {
421    #[allow(dead_code)]
422    column_meta_start: u64,
423    // We don't use this today because we always load metadata for every column
424    // and don't yet support "metadata projection"
425    #[allow(dead_code)]
426    column_meta_offsets_start: u64,
427    global_buff_offsets_start: u64,
428    num_global_buffers: u32,
429    num_columns: u32,
430    major_version: u16,
431    minor_version: u16,
432}
433
434const FOOTER_LEN: usize = 40;
435
436impl FileReader {
437    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
438        Self {
439            scheduler,
440            base_projection: self.base_projection.clone(),
441            cache: self.cache.clone(),
442            decoder_plugins: self.decoder_plugins.clone(),
443            metadata: self.metadata.clone(),
444            options: self.options.clone(),
445            num_rows: self.num_rows,
446        }
447    }
448
449    pub fn num_rows(&self) -> u64 {
450        self.num_rows
451    }
452
453    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
454        &self.metadata
455    }
456
457    pub fn file_statistics(&self) -> FileStatistics {
458        let column_metadatas = &self.metadata().column_metadatas;
459
460        let column_stats = column_metadatas
461            .iter()
462            .map(|col_metadata| {
463                let num_pages = col_metadata.pages.len();
464                let size_bytes = col_metadata
465                    .pages
466                    .iter()
467                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
468                    .sum::<u64>();
469                ColumnStatistics {
470                    num_pages,
471                    size_bytes,
472                }
473            })
474            .collect();
475
476        FileStatistics {
477            columns: column_stats,
478        }
479    }
480
481    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
482        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?;
483        self.scheduler
484            .submit_single(
485                buffer_desc.position..buffer_desc.position + buffer_desc.size,
486                0,
487            )
488            .await
489    }
490
491    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
492        let file_size = scheduler.reader().size().await? as u64;
493        let begin = if file_size < scheduler.reader().block_size() as u64 {
494            0
495        } else {
496            file_size - scheduler.reader().block_size() as u64
497        };
498        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
499        Ok((tail_bytes, file_size))
500    }
501
502    // Checks to make sure the footer is written correctly and returns the
503    // position of the file descriptor (which comes from the footer)
504    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
505        let len = footer_bytes.len();
506        if len < FOOTER_LEN {
507            return Err(Error::invalid_input(format!(
508                "does not have sufficient data, len: {}, bytes: {:?}",
509                len, footer_bytes
510            )));
511        }
512        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
513
514        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
515        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
516        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
517        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
518        let num_columns = cursor.read_u32::<LittleEndian>()?;
519        let major_version = cursor.read_u16::<LittleEndian>()?;
520        let minor_version = cursor.read_u16::<LittleEndian>()?;
521
522        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
523            return Err(Error::version_conflict(
524                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
525                major_version,
526                minor_version,
527            ));
528        }
529
530        let magic_bytes = footer_bytes.slice(len - 4..);
531        if magic_bytes.as_ref() != MAGIC {
532            return Err(Error::invalid_input(format!(
533                "file does not appear to be a Lance file (invalid magic: {:?})",
534                MAGIC
535            )));
536        }
537        Ok(Footer {
538            column_meta_start,
539            column_meta_offsets_start,
540            global_buff_offsets_start,
541            num_global_buffers,
542            num_columns,
543            major_version,
544            minor_version,
545        })
546    }
547
548    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
549    fn read_all_column_metadata(
550        column_metadata_bytes: Bytes,
551        footer: &Footer,
552    ) -> Result<Vec<pbfile::ColumnMetadata>> {
553        let column_metadata_start = footer.column_meta_start;
554        // cmo == column_metadata_offsets
555        let cmo_table_size = 16 * footer.num_columns as usize;
556        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
557
558        (0..footer.num_columns)
559            .map(|col_idx| {
560                let offset = (col_idx * 16) as usize;
561                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
562                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
563                let normalized_position = (position - column_metadata_start) as usize;
564                let normalized_end = normalized_position + (length as usize);
565                Ok(pbfile::ColumnMetadata::decode(
566                    &column_metadata_bytes[normalized_position..normalized_end],
567                )?)
568            })
569            .collect::<Result<Vec<_>>>()
570    }
571
572    async fn optimistic_tail_read(
573        data: &Bytes,
574        start_pos: u64,
575        scheduler: &FileScheduler,
576        file_len: u64,
577    ) -> Result<Bytes> {
578        let num_bytes_needed = (file_len - start_pos) as usize;
579        if data.len() >= num_bytes_needed {
580            Ok(data.slice((data.len() - num_bytes_needed)..))
581        } else {
582            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
583            let start = file_len - num_bytes_needed as u64;
584            let missing_bytes = scheduler
585                .submit_single(start..start + num_bytes_missing, 0)
586                .await?;
587            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
588            combined.extend(missing_bytes);
589            combined.extend(data);
590            Ok(combined.freeze())
591        }
592    }
593
594    fn do_decode_gbo_table(
595        gbo_bytes: &Bytes,
596        footer: &Footer,
597        version: LanceFileVersion,
598    ) -> Result<Vec<BufferDescriptor>> {
599        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
600
601        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
602        for _ in 0..footer.num_global_buffers {
603            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
604            assert!(
605                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
606            );
607            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
608            global_buffers.push(BufferDescriptor {
609                position: buf_pos,
610                size: buf_size,
611            });
612        }
613
614        Ok(global_buffers)
615    }
616
617    async fn decode_gbo_table(
618        tail_bytes: &Bytes,
619        file_len: u64,
620        scheduler: &FileScheduler,
621        footer: &Footer,
622        version: LanceFileVersion,
623    ) -> Result<Vec<BufferDescriptor>> {
624        // This could, in theory, trigger another IOP but the GBO table should never be large
625        // enough for that to happen
626        let gbo_bytes = Self::optimistic_tail_read(
627            tail_bytes,
628            footer.global_buff_offsets_start,
629            scheduler,
630            file_len,
631        )
632        .await?;
633        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
634    }
635
636    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
637        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
638        let pb_schema = file_descriptor.schema.unwrap();
639        let num_rows = file_descriptor.length;
640        let fields_with_meta = FieldsWithMeta {
641            fields: Fields(pb_schema.fields),
642            metadata: pb_schema.metadata,
643        };
644        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
645        Ok((num_rows, schema))
646    }
647
648    // TODO: Support late projection.  Currently, if we want to perform a
649    // projected read of a file, we load all of the column metadata, and then
650    // only read the column data that is requested.  This is fine for most cases.
651    //
652    // However, if there are many columns then loading all of the column metadata
653    // may be expensive.  We should support a mode where we only load the column
654    // metadata for the columns that are requested (the file format supports this).
655    //
656    // The main challenge is that we either need to ignore the column metadata cache
657    // or have a more sophisticated cache that can cache per-column metadata.
658    //
659    // Also, if the number of columns is fairly small, it's faster to read them as a
660    // single IOP, but we can fix this through coalescing.
661    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
662        // 1. read the footer
663        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
664        let footer = Self::decode_footer(&tail_bytes)?;
665
666        let file_version = LanceFileVersion::try_from_major_minor(
667            footer.major_version as u32,
668            footer.minor_version as u32,
669        )?;
670
671        let gbo_table =
672            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
673        if gbo_table.is_empty() {
674            return Err(Error::internal(
675                "File did not contain any global buffers, schema expected".to_string(),
676            ));
677        }
678        let schema_start = gbo_table[0].position;
679        let schema_size = gbo_table[0].size;
680
681        let num_footer_bytes = file_len - schema_start;
682
683        // By default we read all column metadatas.  We do NOT read the column metadata buffers
684        // at this point.  We only want to read the column metadata for columns we are actually loading.
685        let all_metadata_bytes =
686            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
687
688        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
689        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
690
691        // Next, read the metadata for the columns
692        // This is both the column metadata and the CMO table
693        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
694        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
695        let column_metadata_bytes =
696            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
697        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
698
699        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
700        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
701        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
702
703        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
704
705        Ok(CachedFileMetadata {
706            file_schema: Arc::new(schema),
707            column_metadatas,
708            column_infos,
709            num_rows,
710            num_data_bytes,
711            num_column_metadata_bytes,
712            num_global_buffer_bytes,
713            num_footer_bytes,
714            file_buffers: gbo_table,
715            major_version: footer.major_version,
716            minor_version: footer.minor_version,
717            file_size_bytes: file_len,
718        })
719    }
720
721    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
722        match &encoding.location {
723            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
724            Some(pbfile::encoding::Location::Direct(encoding)) => {
725                let encoding_buf = Bytes::from(encoding.encoding.clone());
726                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
727                encoding_any.to_msg::<M>().unwrap()
728            }
729            Some(pbfile::encoding::Location::None(_)) => panic!(),
730            None => panic!(),
731        }
732    }
733
734    fn meta_to_col_infos(
735        column_metadatas: &[pbfile::ColumnMetadata],
736        file_version: LanceFileVersion,
737    ) -> Vec<Arc<ColumnInfo>> {
738        column_metadatas
739            .iter()
740            .enumerate()
741            .map(|(col_idx, col_meta)| {
742                let page_infos = col_meta
743                    .pages
744                    .iter()
745                    .map(|page| {
746                        let num_rows = page.length;
747                        let encoding = match file_version {
748                            LanceFileVersion::V2_0 => {
749                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
750                                    page.encoding.as_ref().unwrap(),
751                                ))
752                            }
753                            _ => PageEncoding::Structural(Self::fetch_encoding::<
754                                pbenc21::PageLayout,
755                            >(
756                                page.encoding.as_ref().unwrap()
757                            )),
758                        };
759                        let buffer_offsets_and_sizes = Arc::from(
760                            page.buffer_offsets
761                                .iter()
762                                .zip(page.buffer_sizes.iter())
763                                .map(|(offset, size)| {
764                                    // Starting with version 2.1 we can assert that page buffers are aligned
765                                    assert!(
766                                        file_version < LanceFileVersion::V2_1
767                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
768                                    );
769                                    (*offset, *size)
770                                })
771                                .collect::<Vec<_>>(),
772                        );
773                        PageInfo {
774                            buffer_offsets_and_sizes,
775                            encoding,
776                            num_rows,
777                            priority: page.priority,
778                        }
779                    })
780                    .collect::<Vec<_>>();
781                let buffer_offsets_and_sizes = Arc::from(
782                    col_meta
783                        .buffer_offsets
784                        .iter()
785                        .zip(col_meta.buffer_sizes.iter())
786                        .map(|(offset, size)| (*offset, *size))
787                        .collect::<Vec<_>>(),
788                );
789                Arc::new(ColumnInfo {
790                    index: col_idx as u32,
791                    page_infos: Arc::from(page_infos),
792                    buffer_offsets_and_sizes,
793                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
794                })
795            })
796            .collect::<Vec<_>>()
797    }
798
799    fn validate_projection(
800        projection: &ReaderProjection,
801        metadata: &CachedFileMetadata,
802    ) -> Result<()> {
803        if projection.schema.fields.is_empty() {
804            return Err(Error::invalid_input(
805                "Attempt to read zero columns from the file, at least one column must be specified"
806                    .to_string(),
807            ));
808        }
809        let mut column_indices_seen = BTreeSet::new();
810        for column_index in &projection.column_indices {
811            if !column_indices_seen.insert(*column_index) {
812                return Err(Error::invalid_input(format!(
813                    "The projection specified the column index {} more than once",
814                    column_index
815                )));
816            }
817            if *column_index >= metadata.column_infos.len() as u32 {
818                return Err(Error::invalid_input(format!(
819                    "The projection specified the column index {} but there are only {} columns in the file",
820                    column_index,
821                    metadata.column_infos.len()
822                )));
823            }
824        }
825        Ok(())
826    }
827
828    /// Opens a new file reader without any pre-existing knowledge
829    ///
830    /// This will read the file schema from the file itself and thus requires a bit more I/O
831    ///
832    /// A `base_projection` can also be provided.  If provided, then the projection will apply
833    /// to all reads from the file that do not specify their own projection.
834    pub async fn try_open(
835        scheduler: FileScheduler,
836        base_projection: Option<ReaderProjection>,
837        decoder_plugins: Arc<DecoderPlugins>,
838        cache: &LanceCache,
839        options: FileReaderOptions,
840    ) -> Result<Self> {
841        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
842        let path = scheduler.reader().path().clone();
843
844        // Create LanceEncodingsIo with read chunk size from options
845        let encodings_io =
846            LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
847
848        Self::try_open_with_file_metadata(
849            Arc::new(encodings_io),
850            path,
851            base_projection,
852            decoder_plugins,
853            file_metadata,
854            cache,
855            options,
856        )
857        .await
858    }
859
860    /// Same as `try_open` but with the file metadata already loaded.
861    ///
862    /// This method also can accept any kind of `EncodingsIo` implementation allowing
863    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
864    /// disks it may be better to avoid asynchronous overhead).
865    pub async fn try_open_with_file_metadata(
866        scheduler: Arc<dyn EncodingsIo>,
867        path: Path,
868        base_projection: Option<ReaderProjection>,
869        decoder_plugins: Arc<DecoderPlugins>,
870        file_metadata: Arc<CachedFileMetadata>,
871        cache: &LanceCache,
872        options: FileReaderOptions,
873    ) -> Result<Self> {
874        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
875
876        if let Some(base_projection) = base_projection.as_ref() {
877            Self::validate_projection(base_projection, &file_metadata)?;
878        }
879        let num_rows = file_metadata.num_rows;
880        Ok(Self {
881            scheduler,
882            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
883                file_metadata.file_schema.as_ref(),
884                file_metadata.version(),
885            )),
886            num_rows,
887            metadata: file_metadata,
888            decoder_plugins,
889            cache,
890            options,
891        })
892    }
893
894    // The actual decoder needs all the column infos that make up a type.  In other words, if
895    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
896    //
897    // This is a file reader concern because the file reader needs to support late projection of columns
898    // and so it will need to figure this out anyways.
899    //
900    // It's a bit of a tricky process though because the number of column infos may depend on the
901    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
902    // only be a single column in the file (and not 3).
903    //
904    // At the moment this method words because our rules are simple and we just repeat them here.  See
905    // Self::default_projection for a similar problem.  In the future this is something the encodings
906    // registry will need to figure out.
907    fn collect_columns_from_projection(
908        &self,
909        _projection: &ReaderProjection,
910    ) -> Result<Vec<Arc<ColumnInfo>>> {
911        Ok(self.metadata.column_infos.clone())
912    }
913
914    #[allow(clippy::too_many_arguments)]
915    async fn do_read_range(
916        column_infos: Vec<Arc<ColumnInfo>>,
917        io: Arc<dyn EncodingsIo>,
918        cache: Arc<LanceCache>,
919        num_rows: u64,
920        decoder_plugins: Arc<DecoderPlugins>,
921        range: Range<u64>,
922        batch_size: u32,
923        projection: ReaderProjection,
924        filter: FilterExpression,
925        decoder_config: DecoderConfig,
926        batch_size_bytes: Option<u64>,
927    ) -> Result<BoxStream<'static, ReadBatchTask>> {
928        debug!(
929            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
930            range,
931            batch_size,
932            num_rows,
933            column_infos.len(),
934            projection.schema.fields.len(),
935        );
936
937        let config = SchedulerDecoderConfig {
938            batch_size,
939            cache,
940            decoder_plugins,
941            io,
942            decoder_config,
943            batch_size_bytes,
944        };
945
946        let requested_rows = RequestedRows::Ranges(vec![range]);
947
948        schedule_and_decode(
949            column_infos,
950            requested_rows,
951            filter,
952            projection.column_indices,
953            projection.schema,
954            config,
955        )
956        .await
957    }
958
959    async fn read_range(
960        &self,
961        range: Range<u64>,
962        batch_size: u32,
963        projection: ReaderProjection,
964        filter: FilterExpression,
965    ) -> Result<BoxStream<'static, ReadBatchTask>> {
966        // Create and initialize the stream
967        Self::do_read_range(
968            self.collect_columns_from_projection(&projection)?,
969            self.scheduler.clone(),
970            self.cache.clone(),
971            self.num_rows,
972            self.decoder_plugins.clone(),
973            range,
974            batch_size,
975            projection,
976            filter,
977            self.options.decoder_config.clone(),
978            self.options.batch_size_bytes,
979        )
980        .await
981    }
982
983    #[allow(clippy::too_many_arguments)]
984    async fn do_take_rows(
985        column_infos: Vec<Arc<ColumnInfo>>,
986        io: Arc<dyn EncodingsIo>,
987        cache: Arc<LanceCache>,
988        decoder_plugins: Arc<DecoderPlugins>,
989        indices: Vec<u64>,
990        batch_size: u32,
991        projection: ReaderProjection,
992        filter: FilterExpression,
993        decoder_config: DecoderConfig,
994        batch_size_bytes: Option<u64>,
995    ) -> Result<BoxStream<'static, ReadBatchTask>> {
996        debug!(
997            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
998            indices.len(),
999            indices[0],
1000            indices[indices.len() - 1],
1001            batch_size,
1002            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1003        );
1004
1005        let config = SchedulerDecoderConfig {
1006            batch_size,
1007            cache,
1008            decoder_plugins,
1009            io,
1010            decoder_config,
1011            batch_size_bytes,
1012        };
1013
1014        let requested_rows = RequestedRows::Indices(indices);
1015
1016        schedule_and_decode(
1017            column_infos,
1018            requested_rows,
1019            filter,
1020            projection.column_indices,
1021            projection.schema,
1022            config,
1023        )
1024        .await
1025    }
1026
1027    async fn take_rows(
1028        &self,
1029        indices: Vec<u64>,
1030        batch_size: u32,
1031        projection: ReaderProjection,
1032    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1033        // Create and initialize the stream
1034        Self::do_take_rows(
1035            self.collect_columns_from_projection(&projection)?,
1036            self.scheduler.clone(),
1037            self.cache.clone(),
1038            self.decoder_plugins.clone(),
1039            indices,
1040            batch_size,
1041            projection,
1042            FilterExpression::no_filter(),
1043            self.options.decoder_config.clone(),
1044            self.options.batch_size_bytes,
1045        )
1046        .await
1047    }
1048
1049    #[allow(clippy::too_many_arguments)]
1050    async fn do_read_ranges(
1051        column_infos: Vec<Arc<ColumnInfo>>,
1052        io: Arc<dyn EncodingsIo>,
1053        cache: Arc<LanceCache>,
1054        decoder_plugins: Arc<DecoderPlugins>,
1055        ranges: Vec<Range<u64>>,
1056        batch_size: u32,
1057        projection: ReaderProjection,
1058        filter: FilterExpression,
1059        decoder_config: DecoderConfig,
1060        batch_size_bytes: Option<u64>,
1061    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1062        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1063        debug!(
1064            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1065            ranges.len(),
1066            num_rows,
1067            ranges[0].start,
1068            ranges[ranges.len() - 1].end,
1069            batch_size,
1070            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1071        );
1072
1073        let config = SchedulerDecoderConfig {
1074            batch_size,
1075            cache,
1076            decoder_plugins,
1077            io,
1078            decoder_config,
1079            batch_size_bytes,
1080        };
1081
1082        let requested_rows = RequestedRows::Ranges(ranges);
1083
1084        schedule_and_decode(
1085            column_infos,
1086            requested_rows,
1087            filter,
1088            projection.column_indices,
1089            projection.schema,
1090            config,
1091        )
1092        .await
1093    }
1094
1095    async fn read_ranges(
1096        &self,
1097        ranges: Vec<Range<u64>>,
1098        batch_size: u32,
1099        projection: ReaderProjection,
1100        filter: FilterExpression,
1101    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1102        Self::do_read_ranges(
1103            self.collect_columns_from_projection(&projection)?,
1104            self.scheduler.clone(),
1105            self.cache.clone(),
1106            self.decoder_plugins.clone(),
1107            ranges,
1108            batch_size,
1109            projection,
1110            filter,
1111            self.options.decoder_config.clone(),
1112            self.options.batch_size_bytes,
1113        )
1114        .await
1115    }
1116
1117    /// Creates a stream of "read tasks" to read the data from the file
1118    ///
1119    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1120    /// of record batches it returns a stream of "read tasks".
1121    ///
1122    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1123    ///
1124    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1125    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1126    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1127    ///
1128    /// # Why is this async?
1129    ///
1130    /// Constructing the read stream requires running the decode scheduler's
1131    /// `initialize` step, which performs the metadata I/O (chunk metadata,
1132    /// dictionaries, repetition index, ...) needed to plan the read.  We
1133    /// drive that I/O on the awaiting task rather than smuggling it into
1134    /// the stream's first poll.  This way callers control where the
1135    /// scheduling I/O runs (typically inside a per-fragment
1136    /// `tokio::spawn`), planning errors surface from the await instead of
1137    /// from the first stream item, and small reads can also complete the
1138    /// synchronous scheduling step before returning (see
1139    /// [`DecoderConfig::inline_scheduling`]).
1140    pub async fn read_tasks(
1141        &self,
1142        params: ReadBatchParams,
1143        batch_size: u32,
1144        projection: Option<ReaderProjection>,
1145        filter: FilterExpression,
1146    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1147        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1148        Self::validate_projection(&projection, &self.metadata)?;
1149        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1150            if bound > self.num_rows || bound == self.num_rows && inclusive {
1151                Err(Error::invalid_input(format!(
1152                    "cannot read {:?} from file with {} rows",
1153                    params, self.num_rows
1154                )))
1155            } else {
1156                Ok(())
1157            }
1158        };
1159        match &params {
1160            ReadBatchParams::Indices(indices) => {
1161                for idx in indices {
1162                    match idx {
1163                        None => {
1164                            return Err(Error::invalid_input("Null value in indices array"));
1165                        }
1166                        Some(idx) => {
1167                            verify_bound(&params, idx as u64, true)?;
1168                        }
1169                    }
1170                }
1171                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1172                self.take_rows(indices, batch_size, projection).await
1173            }
1174            ReadBatchParams::Range(range) => {
1175                verify_bound(&params, range.end as u64, false)?;
1176                self.read_range(
1177                    range.start as u64..range.end as u64,
1178                    batch_size,
1179                    projection,
1180                    filter,
1181                )
1182                .await
1183            }
1184            ReadBatchParams::Ranges(ranges) => {
1185                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1186                for range in ranges.as_ref() {
1187                    verify_bound(&params, range.end, false)?;
1188                    ranges_u64.push(range.start..range.end);
1189                }
1190                self.read_ranges(ranges_u64, batch_size, projection, filter)
1191                    .await
1192            }
1193            ReadBatchParams::RangeFrom(range) => {
1194                verify_bound(&params, range.start as u64, true)?;
1195                self.read_range(
1196                    range.start as u64..self.num_rows,
1197                    batch_size,
1198                    projection,
1199                    filter,
1200                )
1201                .await
1202            }
1203            ReadBatchParams::RangeTo(range) => {
1204                verify_bound(&params, range.end as u64, false)?;
1205                self.read_range(0..range.end as u64, batch_size, projection, filter)
1206                    .await
1207            }
1208            ReadBatchParams::RangeFull => {
1209                self.read_range(0..self.num_rows, batch_size, projection, filter)
1210                    .await
1211            }
1212        }
1213    }
1214
1215    /// Reads data from the file as a stream of record batches
1216    ///
1217    /// * `params` - Specifies the range (or indices) of data to read
1218    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1219    ///   if it is the last batch or if it is not possible to create a batch of the
1220    ///   requested size.
1221    ///
1222    ///   For example, if the batch size is 1024 and one of the columns is a string
1223    ///   column then there may be some ranges of 1024 rows that contain more than
1224    ///   2^31 bytes of string data (which is the maximum size of a string column
1225    ///   in Arrow).  In this case smaller batches may be emitted.
1226    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1227    ///   amount of CPU parallelism of the read.  In other words it controls how many
1228    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1229    ///   of the read (how many I/O requests are in flight at once).
1230    ///
1231    ///   This parameter also is also related to backpressure.  If the consumer of the
1232    ///   stream is slow then the reader will build up RAM.
1233    /// * `projection` - A projection to apply to the read.  This controls which columns
1234    ///   are read from the file.  The projection is NOT applied on top of the base
1235    ///   projection.  The projection is applied directly to the file schema.
1236    ///
1237    /// # Why is this async?
1238    ///
1239    /// This delegates to [`Self::read_tasks`], which awaits the decode
1240    /// scheduler's `initialize` step (and, for small reads, the synchronous
1241    /// scheduling that follows) before returning.  See `read_tasks` for
1242    /// details on why this work is performed up front rather than on the
1243    /// stream's first poll.
1244    pub async fn read_stream_projected(
1245        &self,
1246        params: ReadBatchParams,
1247        batch_size: u32,
1248        batch_readahead: u32,
1249        projection: ReaderProjection,
1250        filter: FilterExpression,
1251    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1252        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1253        let tasks_stream = self
1254            .read_tasks(params, batch_size, Some(projection), filter)
1255            .await?;
1256        let batch_stream = tasks_stream
1257            .map(|task| task.task)
1258            .buffered(batch_readahead as usize)
1259            .boxed();
1260        Ok(Box::pin(RecordBatchStreamAdapter::new(
1261            arrow_schema,
1262            batch_stream,
1263        )))
1264    }
1265
1266    fn take_rows_blocking(
1267        &self,
1268        indices: Vec<u64>,
1269        batch_size: u32,
1270        projection: ReaderProjection,
1271        filter: FilterExpression,
1272    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1273        let column_infos = self.collect_columns_from_projection(&projection)?;
1274        debug!(
1275            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1276            indices.len(),
1277            indices[0],
1278            indices[indices.len() - 1],
1279            batch_size,
1280            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1281        );
1282
1283        let config = SchedulerDecoderConfig {
1284            batch_size,
1285            cache: self.cache.clone(),
1286            decoder_plugins: self.decoder_plugins.clone(),
1287            io: self.scheduler.clone(),
1288            decoder_config: self.options.decoder_config.clone(),
1289            batch_size_bytes: self.options.batch_size_bytes,
1290        };
1291
1292        let requested_rows = RequestedRows::Indices(indices);
1293
1294        schedule_and_decode_blocking(
1295            column_infos,
1296            requested_rows,
1297            filter,
1298            projection.column_indices,
1299            projection.schema,
1300            config,
1301        )
1302    }
1303
1304    fn read_ranges_blocking(
1305        &self,
1306        ranges: Vec<Range<u64>>,
1307        batch_size: u32,
1308        projection: ReaderProjection,
1309        filter: FilterExpression,
1310    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1311        let column_infos = self.collect_columns_from_projection(&projection)?;
1312        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1313        debug!(
1314            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1315            ranges.len(),
1316            num_rows,
1317            ranges[0].start,
1318            ranges[ranges.len() - 1].end,
1319            batch_size,
1320            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1321        );
1322
1323        let config = SchedulerDecoderConfig {
1324            batch_size,
1325            cache: self.cache.clone(),
1326            decoder_plugins: self.decoder_plugins.clone(),
1327            io: self.scheduler.clone(),
1328            decoder_config: self.options.decoder_config.clone(),
1329            batch_size_bytes: self.options.batch_size_bytes,
1330        };
1331
1332        let requested_rows = RequestedRows::Ranges(ranges);
1333
1334        schedule_and_decode_blocking(
1335            column_infos,
1336            requested_rows,
1337            filter,
1338            projection.column_indices,
1339            projection.schema,
1340            config,
1341        )
1342    }
1343
1344    fn read_range_blocking(
1345        &self,
1346        range: Range<u64>,
1347        batch_size: u32,
1348        projection: ReaderProjection,
1349        filter: FilterExpression,
1350    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1351        let column_infos = self.collect_columns_from_projection(&projection)?;
1352        let num_rows = self.num_rows;
1353
1354        debug!(
1355            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1356            range,
1357            batch_size,
1358            num_rows,
1359            column_infos.len(),
1360            projection.schema.fields.len(),
1361        );
1362
1363        let config = SchedulerDecoderConfig {
1364            batch_size,
1365            cache: self.cache.clone(),
1366            decoder_plugins: self.decoder_plugins.clone(),
1367            io: self.scheduler.clone(),
1368            decoder_config: self.options.decoder_config.clone(),
1369            batch_size_bytes: self.options.batch_size_bytes,
1370        };
1371
1372        let requested_rows = RequestedRows::Ranges(vec![range]);
1373
1374        schedule_and_decode_blocking(
1375            column_infos,
1376            requested_rows,
1377            filter,
1378            projection.column_indices,
1379            projection.schema,
1380            config,
1381        )
1382    }
1383
1384    /// Read data from the file as an iterator of record batches
1385    ///
1386    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1387    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1388    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1389    ///
1390    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1391    /// use this method) because we can parallelize the decode.
1392    ///
1393    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1394    /// from a spawn_blocking context.
1395    pub fn read_stream_projected_blocking(
1396        &self,
1397        params: ReadBatchParams,
1398        batch_size: u32,
1399        projection: Option<ReaderProjection>,
1400        filter: FilterExpression,
1401    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1402        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1403        Self::validate_projection(&projection, &self.metadata)?;
1404        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1405            if bound > self.num_rows || bound == self.num_rows && inclusive {
1406                Err(Error::invalid_input(format!(
1407                    "cannot read {:?} from file with {} rows",
1408                    params, self.num_rows
1409                )))
1410            } else {
1411                Ok(())
1412            }
1413        };
1414        match &params {
1415            ReadBatchParams::Indices(indices) => {
1416                for idx in indices {
1417                    match idx {
1418                        None => {
1419                            return Err(Error::invalid_input("Null value in indices array"));
1420                        }
1421                        Some(idx) => {
1422                            verify_bound(&params, idx as u64, true)?;
1423                        }
1424                    }
1425                }
1426                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1427                self.take_rows_blocking(indices, batch_size, projection, filter)
1428            }
1429            ReadBatchParams::Range(range) => {
1430                verify_bound(&params, range.end as u64, false)?;
1431                self.read_range_blocking(
1432                    range.start as u64..range.end as u64,
1433                    batch_size,
1434                    projection,
1435                    filter,
1436                )
1437            }
1438            ReadBatchParams::Ranges(ranges) => {
1439                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1440                for range in ranges.as_ref() {
1441                    verify_bound(&params, range.end, false)?;
1442                    ranges_u64.push(range.start..range.end);
1443                }
1444                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1445            }
1446            ReadBatchParams::RangeFrom(range) => {
1447                verify_bound(&params, range.start as u64, true)?;
1448                self.read_range_blocking(
1449                    range.start as u64..self.num_rows,
1450                    batch_size,
1451                    projection,
1452                    filter,
1453                )
1454            }
1455            ReadBatchParams::RangeTo(range) => {
1456                verify_bound(&params, range.end as u64, false)?;
1457                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1458            }
1459            ReadBatchParams::RangeFull => {
1460                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1461            }
1462        }
1463    }
1464
1465    /// Reads data from the file as a stream of record batches
1466    ///
1467    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1468    /// provided when the file was opened (or reads all columns if the file was
1469    /// opened without a base projection)
1470    ///
1471    /// # Why is this async?
1472    ///
1473    /// This delegates to [`Self::read_stream_projected`], which awaits the
1474    /// decode scheduler's `initialize` step before returning the stream.
1475    /// See [`Self::read_tasks`] for the rationale.
1476    pub async fn read_stream(
1477        &self,
1478        params: ReadBatchParams,
1479        batch_size: u32,
1480        batch_readahead: u32,
1481        filter: FilterExpression,
1482    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1483        self.read_stream_projected(
1484            params,
1485            batch_size,
1486            batch_readahead,
1487            self.base_projection.clone(),
1488            filter,
1489        )
1490        .await
1491    }
1492
1493    pub fn schema(&self) -> &Arc<Schema> {
1494        &self.metadata.file_schema
1495    }
1496}
1497
1498/// Inspects a page and returns a String describing the page's encoding
1499pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1500    if let Some(encoding) = &page.encoding {
1501        if let Some(style) = &encoding.location {
1502            match style {
1503                pbfile::encoding::Location::Indirect(indirect) => {
1504                    format!(
1505                        "IndirectEncoding(pos={},size={})",
1506                        indirect.buffer_location, indirect.buffer_length
1507                    )
1508                }
1509                pbfile::encoding::Location::Direct(direct) => {
1510                    let encoding_any =
1511                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1512                            .expect("failed to deserialize encoding as protobuf");
1513                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1514                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1515                        match encoding {
1516                            Ok(encoding) => {
1517                                format!("{:#?}", encoding)
1518                            }
1519                            Err(err) => {
1520                                format!("Unsupported(decode_err={})", err)
1521                            }
1522                        }
1523                    } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1524                        let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1525                        match encoding {
1526                            Ok(encoding) => {
1527                                format!("{:#?}", encoding)
1528                            }
1529                            Err(err) => {
1530                                format!("Unsupported(decode_err={})", err)
1531                            }
1532                        }
1533                    } else {
1534                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1535                    }
1536                }
1537                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1538            }
1539        } else {
1540            "MISSING STYLE".to_string()
1541        }
1542    } else {
1543        "MISSING".to_string()
1544    }
1545}
1546
1547pub trait EncodedBatchReaderExt {
1548    fn try_from_mini_lance(
1549        bytes: Bytes,
1550        schema: &Schema,
1551        version: LanceFileVersion,
1552    ) -> Result<Self>
1553    where
1554        Self: Sized;
1555    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1556    where
1557        Self: Sized;
1558}
1559
1560impl EncodedBatchReaderExt for EncodedBatch {
1561    fn try_from_mini_lance(
1562        bytes: Bytes,
1563        schema: &Schema,
1564        file_version: LanceFileVersion,
1565    ) -> Result<Self>
1566    where
1567        Self: Sized,
1568    {
1569        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1570        let footer = FileReader::decode_footer(&bytes)?;
1571
1572        // Next, read the metadata for the columns
1573        // This is both the column metadata and the CMO table
1574        let column_metadata_start = footer.column_meta_start as usize;
1575        let column_metadata_end = footer.global_buff_offsets_start as usize;
1576        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1577        let column_metadatas =
1578            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1579
1580        let file_version = LanceFileVersion::try_from_major_minor(
1581            footer.major_version as u32,
1582            footer.minor_version as u32,
1583        )?;
1584
1585        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1586
1587        Ok(Self {
1588            data: bytes,
1589            num_rows: page_table
1590                .first()
1591                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1592                .unwrap_or(0),
1593            page_table,
1594            top_level_columns: projection.column_indices,
1595            schema: Arc::new(schema.clone()),
1596        })
1597    }
1598
1599    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1600    where
1601        Self: Sized,
1602    {
1603        let footer = FileReader::decode_footer(&bytes)?;
1604        let file_version = LanceFileVersion::try_from_major_minor(
1605            footer.major_version as u32,
1606            footer.minor_version as u32,
1607        )?;
1608
1609        let gbo_table = FileReader::do_decode_gbo_table(
1610            &bytes.slice(footer.global_buff_offsets_start as usize..),
1611            &footer,
1612            file_version,
1613        )?;
1614        if gbo_table.is_empty() {
1615            return Err(Error::internal(
1616                "File did not contain any global buffers, schema expected".to_string(),
1617            ));
1618        }
1619        let schema_start = gbo_table[0].position as usize;
1620        let schema_size = gbo_table[0].size as usize;
1621
1622        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1623        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1624        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1625
1626        // Next, read the metadata for the columns
1627        // This is both the column metadata and the CMO table
1628        let column_metadata_start = footer.column_meta_start as usize;
1629        let column_metadata_end = footer.global_buff_offsets_start as usize;
1630        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1631        let column_metadatas =
1632            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1633
1634        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1635
1636        Ok(Self {
1637            data: bytes,
1638            num_rows: page_table
1639                .first()
1640                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1641                .unwrap_or(0),
1642            page_table,
1643            top_level_columns: projection.column_indices,
1644            schema: Arc::new(schema),
1645        })
1646    }
1647}
1648
1649#[cfg(test)]
1650mod tests {
1651    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1652
1653    use arrow_array::{
1654        RecordBatch, UInt32Array,
1655        types::{Float64Type, Int32Type},
1656    };
1657    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1658    use bytes::Bytes;
1659    use futures::{StreamExt, prelude::stream::TryStreamExt};
1660    use lance_arrow::RecordBatchExt;
1661    use lance_core::{ArrowResult, datatypes::Schema};
1662    use lance_datagen::{BatchCount, ByteCount, RowCount, array, gen_batch};
1663    use lance_encoding::{
1664        decoder::{DecodeBatchScheduler, DecoderPlugins, FilterExpression, decode_batch},
1665        encoder::{EncodedBatch, EncodingOptions, default_encoding_strategy, encode_batch},
1666        version::LanceFileVersion,
1667    };
1668    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1669    use log::debug;
1670    use rstest::rstest;
1671    use tokio::sync::mpsc;
1672
1673    use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1674    use crate::testing::{FsFixture, WrittenFile, test_cache, write_lance_file};
1675    use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1676    use lance_encoding::decoder::DecoderConfig;
1677
1678    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1679        let location_type = DataType::Struct(Fields::from(vec![
1680            Field::new("x", DataType::Float64, true),
1681            Field::new("y", DataType::Float64, true),
1682        ]));
1683        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1684
1685        let mut reader = gen_batch()
1686            .col("score", array::rand::<Float64Type>())
1687            .col("location", array::rand_type(&location_type))
1688            .col("categories", array::rand_type(&categories_type))
1689            .col("binary", array::rand_type(&DataType::Binary));
1690        if version <= LanceFileVersion::V2_0 {
1691            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1692        }
1693        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1694
1695        write_lance_file(
1696            reader,
1697            fs,
1698            FileWriterOptions {
1699                format_version: Some(version),
1700                ..Default::default()
1701            },
1702        )
1703        .await
1704    }
1705
1706    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1707
1708    async fn verify_expected(
1709        expected: &[RecordBatch],
1710        mut actual: Pin<Box<dyn RecordBatchStream>>,
1711        read_size: u32,
1712        transform: Option<Transformer>,
1713    ) {
1714        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1715        let mut expected_iter = expected.iter().map(|batch| {
1716            if let Some(transform) = &transform {
1717                transform(batch)
1718            } else {
1719                batch.clone()
1720            }
1721        });
1722        let mut next_expected = expected_iter.next().unwrap().clone();
1723        while let Some(actual) = actual.next().await {
1724            let mut actual = actual.unwrap();
1725            let mut rows_to_verify = actual.num_rows() as u32;
1726            let expected_length = remaining.min(read_size);
1727            assert_eq!(expected_length, rows_to_verify);
1728
1729            while rows_to_verify > 0 {
1730                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1731                assert_eq!(
1732                    next_expected.slice(0, next_slice_len as usize),
1733                    actual.slice(0, next_slice_len as usize)
1734                );
1735                remaining -= next_slice_len;
1736                rows_to_verify -= next_slice_len;
1737                if remaining > 0 {
1738                    if next_slice_len == next_expected.num_rows() as u32 {
1739                        next_expected = expected_iter.next().unwrap().clone();
1740                    } else {
1741                        next_expected = next_expected.slice(
1742                            next_slice_len as usize,
1743                            next_expected.num_rows() - next_slice_len as usize,
1744                        );
1745                    }
1746                }
1747                if rows_to_verify > 0 {
1748                    actual = actual.slice(
1749                        next_slice_len as usize,
1750                        actual.num_rows() - next_slice_len as usize,
1751                    );
1752                }
1753            }
1754        }
1755        assert_eq!(remaining, 0);
1756    }
1757
1758    #[tokio::test]
1759    async fn test_round_trip() {
1760        let fs = FsFixture::default();
1761
1762        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1763
1764        for read_size in [32, 1024, 1024 * 1024] {
1765            let file_scheduler = fs
1766                .scheduler
1767                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1768                .await
1769                .unwrap();
1770            let file_reader = FileReader::try_open(
1771                file_scheduler,
1772                None,
1773                Arc::<DecoderPlugins>::default(),
1774                &test_cache(),
1775                FileReaderOptions::default(),
1776            )
1777            .await
1778            .unwrap();
1779
1780            let schema = file_reader.schema();
1781            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1782
1783            let batch_stream = file_reader
1784                .read_stream(
1785                    lance_io::ReadBatchParams::RangeFull,
1786                    read_size,
1787                    16,
1788                    FilterExpression::no_filter(),
1789                )
1790                .await
1791                .unwrap();
1792
1793            verify_expected(&data, batch_stream, read_size, None).await;
1794        }
1795    }
1796
1797    #[rstest]
1798    #[test_log::test(tokio::test)]
1799    async fn test_encoded_batch_round_trip(
1800        // TODO: Add V2_1 (currently fails)
1801        #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1802    ) {
1803        let data = gen_batch()
1804            .col("x", array::rand::<Int32Type>())
1805            .col("y", array::rand_utf8(ByteCount::from(16), false))
1806            .into_batch_rows(RowCount::from(10000))
1807            .unwrap();
1808
1809        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1810
1811        let encoding_options = EncodingOptions {
1812            cache_bytes_per_column: 4096,
1813            max_page_bytes: 32 * 1024 * 1024,
1814            keep_original_array: true,
1815            buffer_alignment: 64,
1816            version,
1817        };
1818
1819        let encoding_strategy = default_encoding_strategy(version);
1820
1821        let encoded_batch = encode_batch(
1822            &data,
1823            lance_schema.clone(),
1824            encoding_strategy.as_ref(),
1825            &encoding_options,
1826        )
1827        .await
1828        .unwrap();
1829
1830        // Test self described
1831        let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1832
1833        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1834
1835        let decoded = decode_batch(
1836            &decoded_batch,
1837            &FilterExpression::no_filter(),
1838            Arc::<DecoderPlugins>::default(),
1839            false,
1840            version,
1841            None,
1842        )
1843        .await
1844        .unwrap();
1845
1846        assert_eq!(data, decoded);
1847
1848        // Test mini
1849        let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1850        let decoded_batch =
1851            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1852                .unwrap();
1853        let decoded = decode_batch(
1854            &decoded_batch,
1855            &FilterExpression::no_filter(),
1856            Arc::<DecoderPlugins>::default(),
1857            false,
1858            version,
1859            None,
1860        )
1861        .await
1862        .unwrap();
1863
1864        assert_eq!(data, decoded);
1865    }
1866
1867    #[rstest]
1868    #[test_log::test(tokio::test)]
1869    async fn test_projection(
1870        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
1871        version: LanceFileVersion,
1872    ) {
1873        let fs = FsFixture::default();
1874
1875        let written_file = create_some_file(&fs, version).await;
1876        let file_scheduler = fs
1877            .scheduler
1878            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1879            .await
1880            .unwrap();
1881
1882        let field_id_mapping = written_file
1883            .field_id_mapping
1884            .iter()
1885            .copied()
1886            .collect::<BTreeMap<_, _>>();
1887
1888        let empty_projection = ReaderProjection {
1889            column_indices: Vec::default(),
1890            schema: Arc::new(Schema::default()),
1891        };
1892
1893        for columns in [
1894            vec!["score"],
1895            vec!["location"],
1896            vec!["categories"],
1897            vec!["score.x"],
1898            vec!["score", "categories"],
1899            vec!["score", "location"],
1900            vec!["location", "categories"],
1901            vec!["score.y", "location", "categories"],
1902        ] {
1903            debug!("Testing round trip with projection {:?}", columns);
1904            for use_field_ids in [true, false] {
1905                // We can specify the projection as part of the read operation via read_stream_projected
1906                let file_reader = FileReader::try_open(
1907                    file_scheduler.clone(),
1908                    None,
1909                    Arc::<DecoderPlugins>::default(),
1910                    &test_cache(),
1911                    FileReaderOptions::default(),
1912                )
1913                .await
1914                .unwrap();
1915
1916                let projected_schema = written_file.schema.project(&columns).unwrap();
1917                let projection = if use_field_ids {
1918                    ReaderProjection::from_field_ids(
1919                        file_reader.metadata.version(),
1920                        &projected_schema,
1921                        &field_id_mapping,
1922                    )
1923                    .unwrap()
1924                } else {
1925                    ReaderProjection::from_column_names(
1926                        file_reader.metadata.version(),
1927                        &written_file.schema,
1928                        &columns,
1929                    )
1930                    .unwrap()
1931                };
1932
1933                let batch_stream = file_reader
1934                    .read_stream_projected(
1935                        lance_io::ReadBatchParams::RangeFull,
1936                        1024,
1937                        16,
1938                        projection.clone(),
1939                        FilterExpression::no_filter(),
1940                    )
1941                    .await
1942                    .unwrap();
1943
1944                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1945                verify_expected(
1946                    &written_file.data,
1947                    batch_stream,
1948                    1024,
1949                    Some(Box::new(move |batch: &RecordBatch| {
1950                        batch.project_by_schema(&projection_arrow).unwrap()
1951                    })),
1952                )
1953                .await;
1954
1955                // We can also specify the projection as a base projection when we open the file
1956                let file_reader = FileReader::try_open(
1957                    file_scheduler.clone(),
1958                    Some(projection.clone()),
1959                    Arc::<DecoderPlugins>::default(),
1960                    &test_cache(),
1961                    FileReaderOptions::default(),
1962                )
1963                .await
1964                .unwrap();
1965
1966                let batch_stream = file_reader
1967                    .read_stream(
1968                        lance_io::ReadBatchParams::RangeFull,
1969                        1024,
1970                        16,
1971                        FilterExpression::no_filter(),
1972                    )
1973                    .await
1974                    .unwrap();
1975
1976                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1977                verify_expected(
1978                    &written_file.data,
1979                    batch_stream,
1980                    1024,
1981                    Some(Box::new(move |batch: &RecordBatch| {
1982                        batch.project_by_schema(&projection_arrow).unwrap()
1983                    })),
1984                )
1985                .await;
1986
1987                assert!(
1988                    file_reader
1989                        .read_stream_projected(
1990                            lance_io::ReadBatchParams::RangeFull,
1991                            1024,
1992                            16,
1993                            empty_projection.clone(),
1994                            FilterExpression::no_filter(),
1995                        )
1996                        .await
1997                        .is_err()
1998                );
1999            }
2000        }
2001
2002        assert!(
2003            FileReader::try_open(
2004                file_scheduler.clone(),
2005                Some(empty_projection),
2006                Arc::<DecoderPlugins>::default(),
2007                &test_cache(),
2008                FileReaderOptions::default(),
2009            )
2010            .await
2011            .is_err()
2012        );
2013
2014        let arrow_schema = ArrowSchema::new(vec![
2015            Field::new("x", DataType::Int32, true),
2016            Field::new("y", DataType::Int32, true),
2017        ]);
2018        let schema = Schema::try_from(&arrow_schema).unwrap();
2019
2020        let projection_with_dupes = ReaderProjection {
2021            column_indices: vec![0, 0],
2022            schema: Arc::new(schema),
2023        };
2024
2025        assert!(
2026            FileReader::try_open(
2027                file_scheduler.clone(),
2028                Some(projection_with_dupes),
2029                Arc::<DecoderPlugins>::default(),
2030                &test_cache(),
2031                FileReaderOptions::default(),
2032            )
2033            .await
2034            .is_err()
2035        );
2036    }
2037
2038    #[test_log::test(tokio::test)]
2039    async fn test_compressing_buffer() {
2040        let fs = FsFixture::default();
2041
2042        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2043        let file_scheduler = fs
2044            .scheduler
2045            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2046            .await
2047            .unwrap();
2048
2049        // We can specify the projection as part of the read operation via read_stream_projected
2050        let file_reader = FileReader::try_open(
2051            file_scheduler.clone(),
2052            None,
2053            Arc::<DecoderPlugins>::default(),
2054            &test_cache(),
2055            FileReaderOptions::default(),
2056        )
2057        .await
2058        .unwrap();
2059
2060        let mut projection = written_file.schema.project(&["score"]).unwrap();
2061        for field in projection.fields.iter_mut() {
2062            field
2063                .metadata
2064                .insert("lance:compression".to_string(), "zstd".to_string());
2065        }
2066        let projection = ReaderProjection {
2067            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
2068            schema: Arc::new(projection),
2069        };
2070
2071        let batch_stream = file_reader
2072            .read_stream_projected(
2073                lance_io::ReadBatchParams::RangeFull,
2074                1024,
2075                16,
2076                projection.clone(),
2077                FilterExpression::no_filter(),
2078            )
2079            .await
2080            .unwrap();
2081
2082        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
2083        verify_expected(
2084            &written_file.data,
2085            batch_stream,
2086            1024,
2087            Some(Box::new(move |batch: &RecordBatch| {
2088                batch.project_by_schema(&projection_arrow).unwrap()
2089            })),
2090        )
2091        .await;
2092    }
2093
2094    #[tokio::test]
2095    async fn test_read_all() {
2096        let fs = FsFixture::default();
2097        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2098        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2099
2100        let file_scheduler = fs
2101            .scheduler
2102            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2103            .await
2104            .unwrap();
2105        let file_reader = FileReader::try_open(
2106            file_scheduler.clone(),
2107            None,
2108            Arc::<DecoderPlugins>::default(),
2109            &test_cache(),
2110            FileReaderOptions::default(),
2111        )
2112        .await
2113        .unwrap();
2114
2115        let batches = file_reader
2116            .read_stream(
2117                lance_io::ReadBatchParams::RangeFull,
2118                total_rows as u32,
2119                16,
2120                FilterExpression::no_filter(),
2121            )
2122            .await
2123            .unwrap()
2124            .try_collect::<Vec<_>>()
2125            .await
2126            .unwrap();
2127        assert_eq!(batches.len(), 1);
2128        assert_eq!(batches[0].num_rows(), total_rows);
2129    }
2130
2131    #[rstest]
2132    #[tokio::test]
2133    async fn test_blocking_take(
2134        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
2135        version: LanceFileVersion,
2136    ) {
2137        let fs = FsFixture::default();
2138        let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2139        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2140
2141        let file_scheduler = fs
2142            .scheduler
2143            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2144            .await
2145            .unwrap();
2146        let file_reader = FileReader::try_open(
2147            file_scheduler.clone(),
2148            Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2149            Arc::<DecoderPlugins>::default(),
2150            &test_cache(),
2151            FileReaderOptions::default(),
2152        )
2153        .await
2154        .unwrap();
2155
2156        let batches = tokio::task::spawn_blocking(move || {
2157            file_reader
2158                .read_stream_projected_blocking(
2159                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2160                    total_rows as u32,
2161                    None,
2162                    FilterExpression::no_filter(),
2163                )
2164                .unwrap()
2165                .collect::<ArrowResult<Vec<_>>>()
2166                .unwrap()
2167        })
2168        .await
2169        .unwrap();
2170
2171        assert_eq!(batches.len(), 1);
2172        assert_eq!(batches[0].num_rows(), 5);
2173        assert_eq!(batches[0].num_columns(), 1);
2174    }
2175
2176    #[tokio::test(flavor = "multi_thread")]
2177    async fn test_drop_in_progress() {
2178        let fs = FsFixture::default();
2179        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2180        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2181
2182        let file_scheduler = fs
2183            .scheduler
2184            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2185            .await
2186            .unwrap();
2187        let file_reader = FileReader::try_open(
2188            file_scheduler.clone(),
2189            None,
2190            Arc::<DecoderPlugins>::default(),
2191            &test_cache(),
2192            FileReaderOptions::default(),
2193        )
2194        .await
2195        .unwrap();
2196
2197        let mut batches = file_reader
2198            .read_stream(
2199                lance_io::ReadBatchParams::RangeFull,
2200                (total_rows / 10) as u32,
2201                16,
2202                FilterExpression::no_filter(),
2203            )
2204            .await
2205            .unwrap();
2206
2207        drop(file_reader);
2208
2209        let batch = batches.next().await.unwrap().unwrap();
2210        assert!(batch.num_rows() > 0);
2211
2212        // Drop in-progress scan
2213        drop(batches);
2214    }
2215
2216    #[tokio::test]
2217    async fn drop_while_scheduling() {
2218        // This is a bit of a white-box test, pokes at the internals.  We want to
2219        // test the case where the read stream is dropped before the scheduling
2220        // thread finishes.  We can't do that in a black-box fashion because the
2221        // scheduling thread runs in the background and there is no easy way to
2222        // pause / gate it.
2223
2224        // It's a regression for a bug where the scheduling thread would panic
2225        // if the stream was dropped before it finished.
2226
2227        let fs = FsFixture::default();
2228        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2229        let total_rows = written_file
2230            .data
2231            .iter()
2232            .map(|batch| batch.num_rows())
2233            .sum::<usize>();
2234
2235        let file_scheduler = fs
2236            .scheduler
2237            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2238            .await
2239            .unwrap();
2240        let file_reader = FileReader::try_open(
2241            file_scheduler.clone(),
2242            None,
2243            Arc::<DecoderPlugins>::default(),
2244            &test_cache(),
2245            FileReaderOptions::default(),
2246        )
2247        .await
2248        .unwrap();
2249
2250        let projection =
2251            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2252        let column_infos = file_reader
2253            .collect_columns_from_projection(&projection)
2254            .unwrap();
2255        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2256            &projection.schema,
2257            &projection.column_indices,
2258            &column_infos,
2259            &vec![],
2260            total_rows as u64,
2261            Arc::<DecoderPlugins>::default(),
2262            file_reader.scheduler.clone(),
2263            test_cache(),
2264            &FilterExpression::no_filter(),
2265            &DecoderConfig::default(),
2266        )
2267        .await
2268        .unwrap();
2269
2270        let range = 0..total_rows as u64;
2271
2272        let (tx, rx) = mpsc::unbounded_channel();
2273
2274        // Simulate the stream / decoder being dropped
2275        drop(rx);
2276
2277        // Scheduling should not panic
2278        decode_scheduler.schedule_range(
2279            range,
2280            &FilterExpression::no_filter(),
2281            tx,
2282            file_reader.scheduler.clone(),
2283        )
2284    }
2285
2286    #[tokio::test]
2287    async fn test_read_empty_range() {
2288        let fs = FsFixture::default();
2289        create_some_file(&fs, LanceFileVersion::V2_0).await;
2290
2291        let file_scheduler = fs
2292            .scheduler
2293            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2294            .await
2295            .unwrap();
2296        let file_reader = FileReader::try_open(
2297            file_scheduler.clone(),
2298            None,
2299            Arc::<DecoderPlugins>::default(),
2300            &test_cache(),
2301            FileReaderOptions::default(),
2302        )
2303        .await
2304        .unwrap();
2305
2306        // All ranges empty, no data
2307        let batches = file_reader
2308            .read_stream(
2309                lance_io::ReadBatchParams::Range(0..0),
2310                1024,
2311                16,
2312                FilterExpression::no_filter(),
2313            )
2314            .await
2315            .unwrap()
2316            .try_collect::<Vec<_>>()
2317            .await
2318            .unwrap();
2319
2320        assert_eq!(batches.len(), 0);
2321
2322        // Some ranges empty
2323        let batches = file_reader
2324            .read_stream(
2325                lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2326                1024,
2327                16,
2328                FilterExpression::no_filter(),
2329            )
2330            .await
2331            .unwrap()
2332            .try_collect::<Vec<_>>()
2333            .await
2334            .unwrap();
2335        assert_eq!(batches.len(), 1);
2336    }
2337
2338    #[tokio::test]
2339    async fn test_global_buffers() {
2340        let fs = FsFixture::default();
2341
2342        let lance_schema =
2343            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2344                "foo",
2345                DataType::Int32,
2346                true,
2347            )]))
2348            .unwrap();
2349
2350        let mut file_writer = FileWriter::try_new(
2351            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2352            lance_schema.clone(),
2353            FileWriterOptions::default(),
2354        )
2355        .unwrap();
2356
2357        let test_bytes = Bytes::from_static(b"hello");
2358
2359        let buf_index = file_writer
2360            .add_global_buffer(test_bytes.clone())
2361            .await
2362            .unwrap();
2363
2364        assert_eq!(buf_index, 1);
2365
2366        file_writer.finish().await.unwrap();
2367
2368        let file_scheduler = fs
2369            .scheduler
2370            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2371            .await
2372            .unwrap();
2373        let file_reader = FileReader::try_open(
2374            file_scheduler.clone(),
2375            None,
2376            Arc::<DecoderPlugins>::default(),
2377            &test_cache(),
2378            FileReaderOptions::default(),
2379        )
2380        .await
2381        .unwrap();
2382
2383        let buf = file_reader.read_global_buffer(1).await.unwrap();
2384        assert_eq!(buf, test_bytes);
2385    }
2386
2387    #[rstest]
2388    #[tokio::test]
2389    async fn test_deep_size_of_includes_column_metadata(
2390        #[values(
2391            LanceFileVersion::V2_0,
2392            LanceFileVersion::V2_1,
2393            LanceFileVersion::V2_2,
2394            LanceFileVersion::V2_3
2395        )]
2396        version: LanceFileVersion,
2397    ) {
2398        // Regression test: CachedFileMetadata::deep_size_of must account for
2399        // column_metadatas and column_infos, otherwise the moka cache weigher
2400        // dramatically underestimates entry sizes and never evicts, causing
2401        // unbounded memory growth on random-access workloads.
2402        use deepsize::DeepSizeOf;
2403
2404        let fs = FsFixture::default();
2405        let _written = create_some_file(&fs, version).await;
2406        let cache = test_cache();
2407        let file_scheduler = fs
2408            .scheduler
2409            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2410            .await
2411            .unwrap();
2412        let file_reader = FileReader::try_open(
2413            file_scheduler,
2414            None,
2415            Arc::<DecoderPlugins>::default(),
2416            &cache,
2417            FileReaderOptions::default(),
2418        )
2419        .await
2420        .unwrap();
2421
2422        let metadata = file_reader.metadata();
2423        let deep_size = metadata.deep_size_of();
2424
2425        // The file has multiple columns (score, location, categories, binary,
2426        // maybe large_bin). The deep_size_of must be substantially more than
2427        // just the schema — it should include column_metadatas + column_infos.
2428        // A naive implementation that ignores these fields reports < 1 KB;
2429        // a correct one should report at least several KB for this test file.
2430        assert!(
2431            deep_size > 1024,
2432            "deep_size_of ({deep_size}) is suspiciously small — \
2433             column_metadatas and column_infos may not be accounted for"
2434        );
2435
2436        // Verify column_metadatas is non-empty (sanity check).
2437        assert!(
2438            !metadata.column_metadatas.is_empty(),
2439            "Expected non-empty column_metadatas"
2440        );
2441
2442        // Verify the size scales with the number of columns: a file with more
2443        // columns should have a larger deep_size_of.
2444        let num_columns = metadata.column_metadatas.len();
2445        assert!(
2446            deep_size > num_columns * 50,
2447            "deep_size_of ({deep_size}) should scale with column count ({num_columns})"
2448        );
2449    }
2450}