lance_file/v2/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19    decoder::{
20        schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21        DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22        SchedulerDecoderConfig,
23    },
24    encoder::EncodedBatch,
25    version::LanceFileVersion,
26    EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36    Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41    scheduler::FileScheduler,
42    stream::{RecordBatchStream, RecordBatchStreamAdapter},
43    ReadBatchParams,
44};
45
46use crate::{
47    datatypes::{Fields, FieldsWithMeta},
48    format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49    v2::writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52use super::io::LanceEncodingsIo;
53
54/// Default chunk size for reading large pages (8MiB)
55/// Pages larger than this will be split into multiple chunks during read
56pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
57
58// For now, we don't use global buffers for anything other than schema.  If we
59// use these later we should make them lazily loaded and then cached once loaded.
60//
61// We store their position / length for debugging purposes
62#[derive(Debug, DeepSizeOf)]
63pub struct BufferDescriptor {
64    pub position: u64,
65    pub size: u64,
66}
67
68/// Statistics summarize some of the file metadata for quick summary info
69#[derive(Debug)]
70pub struct FileStatistics {
71    /// Statistics about each of the columns in the file
72    pub columns: Vec<ColumnStatistics>,
73}
74
75/// Summary information describing a column
76#[derive(Debug)]
77pub struct ColumnStatistics {
78    /// The number of pages in the column
79    pub num_pages: usize,
80    /// The total number of data & metadata bytes in the column
81    ///
82    /// This is the compressed on-disk size
83    pub size_bytes: u64,
84}
85
86// TODO: Caching
87#[derive(Debug)]
88pub struct CachedFileMetadata {
89    /// The schema of the file
90    pub file_schema: Arc<Schema>,
91    /// The column metadatas
92    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
93    pub column_infos: Vec<Arc<ColumnInfo>>,
94    /// The number of rows in the file
95    pub num_rows: u64,
96    pub file_buffers: Vec<BufferDescriptor>,
97    /// The number of bytes contained in the data page section of the file
98    pub num_data_bytes: u64,
99    /// The number of bytes contained in the column metadata (not including buffers
100    /// referenced by the metadata)
101    pub num_column_metadata_bytes: u64,
102    /// The number of bytes contained in global buffers
103    pub num_global_buffer_bytes: u64,
104    /// The number of bytes contained in the CMO and GBO tables
105    pub num_footer_bytes: u64,
106    pub major_version: u16,
107    pub minor_version: u16,
108}
109
110impl DeepSizeOf for CachedFileMetadata {
111    // TODO: include size for `column_metadatas` and `column_infos`.
112    fn deep_size_of_children(&self, context: &mut Context) -> usize {
113        self.file_schema.deep_size_of_children(context)
114            + self
115                .file_buffers
116                .iter()
117                .map(|file_buffer| file_buffer.deep_size_of_children(context))
118                .sum::<usize>()
119    }
120}
121
122impl CachedFileMetadata {
123    pub fn version(&self) -> LanceFileVersion {
124        match (self.major_version, self.minor_version) {
125            (0, 3) => LanceFileVersion::V2_0,
126            (2, 1) => LanceFileVersion::V2_1,
127            _ => panic!(
128                "Unsupported version: {}.{}",
129                self.major_version, self.minor_version
130            ),
131        }
132    }
133}
134
135/// Selecting columns from a lance file requires specifying both the
136/// index of the column and the data type of the column
137///
138/// Partly, this is because it is not strictly required that columns
139/// be read into the same type.  For example, a string column may be
140/// read as a string, large_string or string_view type.
141///
142/// A read will only succeed if the decoder for a column is capable
143/// of decoding into the requested type.
144///
145/// Note that this should generally be limited to different in-memory
146/// representations of the same semantic type.  An encoding could
147/// theoretically support "casting" (e.g. int to string, etc.) but
148/// there is little advantage in doing so here.
149///
150/// Note: in order to specify a projection the user will need some way
151/// to figure out the column indices.  In the table format we do this
152/// using field IDs and keeping track of the field id->column index mapping.
153///
154/// If users are not using the table format then they will need to figure
155/// out some way to do this themselves.
156#[derive(Debug, Clone)]
157pub struct ReaderProjection {
158    /// The data types (schema) of the selected columns.  The names
159    /// of the schema are arbitrary and ignored.
160    pub schema: Arc<Schema>,
161    /// The indices of the columns to load.
162    ///
163    /// The content of this vector depends on the file version.
164    ///
165    /// In Lance File Version 2.0 we need ids for structural fields as
166    /// well as leaf fields:
167    ///
168    ///   - Primitive: the index of the column in the schema
169    ///   - List: the index of the list column in the schema
170    ///     followed by the column indices of the children
171    ///   - FixedSizeList (of primitive): the index of the column in the schema
172    ///     (this case is not nested)
173    ///   - FixedSizeList (of non-primitive): not yet implemented
174    ///   - Dictionary: same as primitive
175    ///   - Struct: the index of the struct column in the schema
176    ///     followed by the column indices of the children
177    ///
178    ///   In other words, this should be a DFS listing of the desired schema.
179    ///
180    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
181    /// fields are completely transparent.
182    ///
183    /// For example, if the goal is to load:
184    ///
185    ///   x: int32
186    ///   y: struct<z: int32, w: string>
187    ///   z: list<int32>
188    ///
189    /// and the schema originally used to store the data was:
190    ///
191    ///   a: struct<x: int32>
192    ///   b: int64
193    ///   y: struct<z: int32, c: int64, w: string>
194    ///   z: list<int32>
195    ///
196    /// Then the column_indices should be:
197    ///
198    /// - 2.0: [1, 3, 4, 6, 7, 8]
199    /// - 2.1: [0, 2, 4, 5]
200    pub column_indices: Vec<u32>,
201}
202
203impl ReaderProjection {
204    fn from_field_ids_helper<'a>(
205        file_version: LanceFileVersion,
206        fields: impl Iterator<Item = &'a Field>,
207        field_id_to_column_index: &BTreeMap<u32, u32>,
208        column_indices: &mut Vec<u32>,
209    ) -> Result<()> {
210        for field in fields {
211            let is_structural = file_version >= LanceFileVersion::V2_1;
212            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
213            // we only need ids for leaf fields.
214            if !is_structural || field.children.is_empty() {
215                if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
216                {
217                    column_indices.push(column_idx);
218                }
219            }
220            Self::from_field_ids_helper(
221                file_version,
222                field.children.iter(),
223                field_id_to_column_index,
224                column_indices,
225            )?;
226        }
227        Ok(())
228    }
229
230    /// Creates a projection using a mapping from field IDs to column indices
231    ///
232    /// You can obtain such a mapping when the file is written using the
233    /// [`crate::v2::writer::FileWriter::field_id_to_column_indices`] method.
234    pub fn from_field_ids(
235        file_version: LanceFileVersion,
236        schema: &Schema,
237        field_id_to_column_index: &BTreeMap<u32, u32>,
238    ) -> Result<Self> {
239        let mut column_indices = Vec::new();
240        Self::from_field_ids_helper(
241            file_version,
242            schema.fields.iter(),
243            field_id_to_column_index,
244            &mut column_indices,
245        )?;
246        Ok(Self {
247            schema: Arc::new(schema.clone()),
248            column_indices,
249        })
250    }
251
252    /// Creates a projection that reads the entire file
253    ///
254    /// If the schema provided is not the schema of the entire file then
255    /// the projection will be invalid and the read will fail.
256    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
257    /// the whole struct has one column index.
258    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
259    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
260        let schema = Arc::new(schema.clone());
261        let is_structural = version >= LanceFileVersion::V2_1;
262        let mut column_indices = vec![];
263        let mut curr_column_idx = 0;
264        let mut packed_struct_fields_num = 0;
265        for field in schema.fields_pre_order() {
266            if packed_struct_fields_num > 0 {
267                packed_struct_fields_num -= 1;
268                continue;
269            }
270            if field.is_packed_struct() {
271                column_indices.push(curr_column_idx);
272                curr_column_idx += 1;
273                packed_struct_fields_num = field.children.len();
274            } else if field.children.is_empty() || !is_structural {
275                column_indices.push(curr_column_idx);
276                curr_column_idx += 1;
277            }
278        }
279        Self {
280            schema,
281            column_indices,
282        }
283    }
284
285    /// Creates a projection that reads the specified columns provided by name
286    ///
287    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
288    ///
289    /// If the schema provided is not the schema of the entire file then
290    /// the projection will be invalid and the read will fail.
291    pub fn from_column_names(
292        file_version: LanceFileVersion,
293        schema: &Schema,
294        column_names: &[&str],
295    ) -> Result<Self> {
296        let field_id_to_column_index = schema
297            .fields_pre_order()
298            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
299            // we only need ids for leaf fields.
300            .filter(|field| file_version < LanceFileVersion::V2_1 || field.is_leaf())
301            .enumerate()
302            .map(|(idx, field)| (field.id as u32, idx as u32))
303            .collect::<BTreeMap<_, _>>();
304        let projected = schema.project(column_names)?;
305        let mut column_indices = Vec::new();
306        Self::from_field_ids_helper(
307            file_version,
308            projected.fields.iter(),
309            &field_id_to_column_index,
310            &mut column_indices,
311        )?;
312        Ok(Self {
313            schema: Arc::new(projected),
314            column_indices,
315        })
316    }
317}
318
319/// File Reader Options that can control reading behaviors, such as whether to enable caching on repetition indices
320#[derive(Clone, Debug)]
321pub struct FileReaderOptions {
322    pub decoder_config: DecoderConfig,
323    /// Size of chunks when reading large pages. Pages larger than this
324    /// will be read in multiple chunks to control memory usage.
325    /// Default: 8MB (DEFAULT_READ_CHUNK_SIZE)
326    pub read_chunk_size: u64,
327}
328
329impl Default for FileReaderOptions {
330    fn default() -> Self {
331        Self {
332            decoder_config: DecoderConfig::default(),
333            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
334        }
335    }
336}
337
338#[derive(Debug)]
339pub struct FileReader {
340    scheduler: Arc<dyn EncodingsIo>,
341    // The default projection to be applied to all reads
342    base_projection: ReaderProjection,
343    num_rows: u64,
344    metadata: Arc<CachedFileMetadata>,
345    decoder_plugins: Arc<DecoderPlugins>,
346    cache: Arc<LanceCache>,
347    options: FileReaderOptions,
348}
349#[derive(Debug)]
350struct Footer {
351    #[allow(dead_code)]
352    column_meta_start: u64,
353    // We don't use this today because we always load metadata for every column
354    // and don't yet support "metadata projection"
355    #[allow(dead_code)]
356    column_meta_offsets_start: u64,
357    global_buff_offsets_start: u64,
358    num_global_buffers: u32,
359    num_columns: u32,
360    major_version: u16,
361    minor_version: u16,
362}
363
364const FOOTER_LEN: usize = 40;
365
366impl FileReader {
367    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
368        Self {
369            scheduler,
370            base_projection: self.base_projection.clone(),
371            cache: self.cache.clone(),
372            decoder_plugins: self.decoder_plugins.clone(),
373            metadata: self.metadata.clone(),
374            options: self.options.clone(),
375            num_rows: self.num_rows,
376        }
377    }
378
379    pub fn num_rows(&self) -> u64 {
380        self.num_rows
381    }
382
383    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
384        &self.metadata
385    }
386
387    pub fn file_statistics(&self) -> FileStatistics {
388        let column_metadatas = &self.metadata().column_metadatas;
389
390        let column_stats = column_metadatas
391            .iter()
392            .map(|col_metadata| {
393                let num_pages = col_metadata.pages.len();
394                let size_bytes = col_metadata
395                    .pages
396                    .iter()
397                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
398                    .sum::<u64>();
399                ColumnStatistics {
400                    num_pages,
401                    size_bytes,
402                }
403            })
404            .collect();
405
406        FileStatistics {
407            columns: column_stats,
408        }
409    }
410
411    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
412        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
413        self.scheduler
414            .submit_single(
415                buffer_desc.position..buffer_desc.position + buffer_desc.size,
416                0,
417            )
418            .await
419    }
420
421    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
422        let file_size = scheduler.reader().size().await? as u64;
423        let begin = if file_size < scheduler.reader().block_size() as u64 {
424            0
425        } else {
426            file_size - scheduler.reader().block_size() as u64
427        };
428        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
429        Ok((tail_bytes, file_size))
430    }
431
432    // Checks to make sure the footer is written correctly and returns the
433    // position of the file descriptor (which comes from the footer)
434    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
435        let len = footer_bytes.len();
436        if len < FOOTER_LEN {
437            return Err(Error::io(
438                format!(
439                    "does not have sufficient data, len: {}, bytes: {:?}",
440                    len, footer_bytes
441                ),
442                location!(),
443            ));
444        }
445        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
446
447        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
448        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
449        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
450        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
451        let num_columns = cursor.read_u32::<LittleEndian>()?;
452        let major_version = cursor.read_u16::<LittleEndian>()?;
453        let minor_version = cursor.read_u16::<LittleEndian>()?;
454
455        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
456            return Err(Error::version_conflict(
457                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
458                major_version,
459                minor_version,
460                location!(),
461            ));
462        }
463
464        let magic_bytes = footer_bytes.slice(len - 4..);
465        if magic_bytes.as_ref() != MAGIC {
466            return Err(Error::io(
467                format!(
468                    "file does not appear to be a Lance file (invalid magic: {:?})",
469                    MAGIC
470                ),
471                location!(),
472            ));
473        }
474        Ok(Footer {
475            column_meta_start,
476            column_meta_offsets_start,
477            global_buff_offsets_start,
478            num_global_buffers,
479            num_columns,
480            major_version,
481            minor_version,
482        })
483    }
484
485    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
486    fn read_all_column_metadata(
487        column_metadata_bytes: Bytes,
488        footer: &Footer,
489    ) -> Result<Vec<pbfile::ColumnMetadata>> {
490        let column_metadata_start = footer.column_meta_start;
491        // cmo == column_metadata_offsets
492        let cmo_table_size = 16 * footer.num_columns as usize;
493        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
494
495        (0..footer.num_columns)
496            .map(|col_idx| {
497                let offset = (col_idx * 16) as usize;
498                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
499                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
500                let normalized_position = (position - column_metadata_start) as usize;
501                let normalized_end = normalized_position + (length as usize);
502                Ok(pbfile::ColumnMetadata::decode(
503                    &column_metadata_bytes[normalized_position..normalized_end],
504                )?)
505            })
506            .collect::<Result<Vec<_>>>()
507    }
508
509    async fn optimistic_tail_read(
510        data: &Bytes,
511        start_pos: u64,
512        scheduler: &FileScheduler,
513        file_len: u64,
514    ) -> Result<Bytes> {
515        let num_bytes_needed = (file_len - start_pos) as usize;
516        if data.len() >= num_bytes_needed {
517            Ok(data.slice((data.len() - num_bytes_needed)..))
518        } else {
519            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
520            let start = file_len - num_bytes_needed as u64;
521            let missing_bytes = scheduler
522                .submit_single(start..start + num_bytes_missing, 0)
523                .await?;
524            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
525            combined.extend(missing_bytes);
526            combined.extend(data);
527            Ok(combined.freeze())
528        }
529    }
530
531    fn do_decode_gbo_table(
532        gbo_bytes: &Bytes,
533        footer: &Footer,
534        version: LanceFileVersion,
535    ) -> Result<Vec<BufferDescriptor>> {
536        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
537
538        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
539        for _ in 0..footer.num_global_buffers {
540            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
541            assert!(
542                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
543            );
544            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
545            global_buffers.push(BufferDescriptor {
546                position: buf_pos,
547                size: buf_size,
548            });
549        }
550
551        Ok(global_buffers)
552    }
553
554    async fn decode_gbo_table(
555        tail_bytes: &Bytes,
556        file_len: u64,
557        scheduler: &FileScheduler,
558        footer: &Footer,
559        version: LanceFileVersion,
560    ) -> Result<Vec<BufferDescriptor>> {
561        // This could, in theory, trigger another IOP but the GBO table should never be large
562        // enough for that to happen
563        let gbo_bytes = Self::optimistic_tail_read(
564            tail_bytes,
565            footer.global_buff_offsets_start,
566            scheduler,
567            file_len,
568        )
569        .await?;
570        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
571    }
572
573    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
574        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
575        let pb_schema = file_descriptor.schema.unwrap();
576        let num_rows = file_descriptor.length;
577        let fields_with_meta = FieldsWithMeta {
578            fields: Fields(pb_schema.fields),
579            metadata: pb_schema.metadata,
580        };
581        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
582        Ok((num_rows, schema))
583    }
584
585    // TODO: Support late projection.  Currently, if we want to perform a
586    // projected read of a file, we load all of the column metadata, and then
587    // only read the column data that is requested.  This is fine for most cases.
588    //
589    // However, if there are many columns then loading all of the column metadata
590    // may be expensive.  We should support a mode where we only load the column
591    // metadata for the columns that are requested (the file format supports this).
592    //
593    // The main challenge is that we either need to ignore the column metadata cache
594    // or have a more sophisticated cache that can cache per-column metadata.
595    //
596    // Also, if the number of columns is fairly small, it's faster to read them as a
597    // single IOP, but we can fix this through coalescing.
598    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
599        // 1. read the footer
600        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
601        let footer = Self::decode_footer(&tail_bytes)?;
602
603        let file_version = LanceFileVersion::try_from_major_minor(
604            footer.major_version as u32,
605            footer.minor_version as u32,
606        )?;
607
608        let gbo_table =
609            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
610        if gbo_table.is_empty() {
611            return Err(Error::Internal {
612                message: "File did not contain any global buffers, schema expected".to_string(),
613                location: location!(),
614            });
615        }
616        let schema_start = gbo_table[0].position;
617        let schema_size = gbo_table[0].size;
618
619        let num_footer_bytes = file_len - schema_start;
620
621        // By default we read all column metadatas.  We do NOT read the column metadata buffers
622        // at this point.  We only want to read the column metadata for columns we are actually loading.
623        let all_metadata_bytes =
624            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
625
626        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
627        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
628
629        // Next, read the metadata for the columns
630        // This is both the column metadata and the CMO table
631        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
632        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
633        let column_metadata_bytes =
634            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
635        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
636
637        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
638        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
639        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
640
641        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
642
643        Ok(CachedFileMetadata {
644            file_schema: Arc::new(schema),
645            column_metadatas,
646            column_infos,
647            num_rows,
648            num_data_bytes,
649            num_column_metadata_bytes,
650            num_global_buffer_bytes,
651            num_footer_bytes,
652            file_buffers: gbo_table,
653            major_version: footer.major_version,
654            minor_version: footer.minor_version,
655        })
656    }
657
658    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
659        match &encoding.location {
660            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
661            Some(pbfile::encoding::Location::Direct(encoding)) => {
662                let encoding_buf = Bytes::from(encoding.encoding.clone());
663                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
664                encoding_any.to_msg::<M>().unwrap()
665            }
666            Some(pbfile::encoding::Location::None(_)) => panic!(),
667            None => panic!(),
668        }
669    }
670
671    fn meta_to_col_infos(
672        column_metadatas: &[pbfile::ColumnMetadata],
673        file_version: LanceFileVersion,
674    ) -> Vec<Arc<ColumnInfo>> {
675        column_metadatas
676            .iter()
677            .enumerate()
678            .map(|(col_idx, col_meta)| {
679                let page_infos = col_meta
680                    .pages
681                    .iter()
682                    .map(|page| {
683                        let num_rows = page.length;
684                        let encoding = match file_version {
685                            LanceFileVersion::V2_0 => {
686                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
687                                    page.encoding.as_ref().unwrap(),
688                                ))
689                            }
690                            _ => PageEncoding::Structural(Self::fetch_encoding::<
691                                pbenc21::PageLayout,
692                            >(
693                                page.encoding.as_ref().unwrap()
694                            )),
695                        };
696                        let buffer_offsets_and_sizes = Arc::from(
697                            page.buffer_offsets
698                                .iter()
699                                .zip(page.buffer_sizes.iter())
700                                .map(|(offset, size)| {
701                                    // Starting with version 2.1 we can assert that page buffers are aligned
702                                    assert!(
703                                        file_version < LanceFileVersion::V2_1
704                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
705                                    );
706                                    (*offset, *size)
707                                })
708                                .collect::<Vec<_>>(),
709                        );
710                        PageInfo {
711                            buffer_offsets_and_sizes,
712                            encoding,
713                            num_rows,
714                            priority: page.priority,
715                        }
716                    })
717                    .collect::<Vec<_>>();
718                let buffer_offsets_and_sizes = Arc::from(
719                    col_meta
720                        .buffer_offsets
721                        .iter()
722                        .zip(col_meta.buffer_sizes.iter())
723                        .map(|(offset, size)| (*offset, *size))
724                        .collect::<Vec<_>>(),
725                );
726                Arc::new(ColumnInfo {
727                    index: col_idx as u32,
728                    page_infos: Arc::from(page_infos),
729                    buffer_offsets_and_sizes,
730                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
731                })
732            })
733            .collect::<Vec<_>>()
734    }
735
736    fn validate_projection(
737        projection: &ReaderProjection,
738        metadata: &CachedFileMetadata,
739    ) -> Result<()> {
740        if projection.schema.fields.is_empty() {
741            return Err(Error::invalid_input(
742                "Attempt to read zero columns from the file, at least one column must be specified"
743                    .to_string(),
744                location!(),
745            ));
746        }
747        let mut column_indices_seen = BTreeSet::new();
748        for column_index in &projection.column_indices {
749            if !column_indices_seen.insert(*column_index) {
750                return Err(Error::invalid_input(
751                    format!(
752                        "The projection specified the column index {} more than once",
753                        column_index
754                    ),
755                    location!(),
756                ));
757            }
758            if *column_index >= metadata.column_infos.len() as u32 {
759                return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
760            }
761        }
762        Ok(())
763    }
764
765    /// Opens a new file reader without any pre-existing knowledge
766    ///
767    /// This will read the file schema from the file itself and thus requires a bit more I/O
768    ///
769    /// A `base_projection` can also be provided.  If provided, then the projection will apply
770    /// to all reads from the file that do not specify their own projection.
771    pub async fn try_open(
772        scheduler: FileScheduler,
773        base_projection: Option<ReaderProjection>,
774        decoder_plugins: Arc<DecoderPlugins>,
775        cache: &LanceCache,
776        options: FileReaderOptions,
777    ) -> Result<Self> {
778        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
779        let path = scheduler.reader().path().clone();
780
781        // Create LanceEncodingsIo with read chunk size from options
782        let encodings_io =
783            LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
784
785        Self::try_open_with_file_metadata(
786            Arc::new(encodings_io),
787            path,
788            base_projection,
789            decoder_plugins,
790            file_metadata,
791            cache,
792            options,
793        )
794        .await
795    }
796
797    /// Same as `try_open` but with the file metadata already loaded.
798    ///
799    /// This method also can accept any kind of `EncodingsIo` implementation allowing
800    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
801    /// disks it may be better to avoid asynchronous overhead).
802    pub async fn try_open_with_file_metadata(
803        scheduler: Arc<dyn EncodingsIo>,
804        path: Path,
805        base_projection: Option<ReaderProjection>,
806        decoder_plugins: Arc<DecoderPlugins>,
807        file_metadata: Arc<CachedFileMetadata>,
808        cache: &LanceCache,
809        options: FileReaderOptions,
810    ) -> Result<Self> {
811        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
812
813        if let Some(base_projection) = base_projection.as_ref() {
814            Self::validate_projection(base_projection, &file_metadata)?;
815        }
816        let num_rows = file_metadata.num_rows;
817        Ok(Self {
818            scheduler,
819            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
820                file_metadata.file_schema.as_ref(),
821                file_metadata.version(),
822            )),
823            num_rows,
824            metadata: file_metadata,
825            decoder_plugins,
826            cache,
827            options,
828        })
829    }
830
831    // The actual decoder needs all the column infos that make up a type.  In other words, if
832    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
833    //
834    // This is a file reader concern because the file reader needs to support late projection of columns
835    // and so it will need to figure this out anyways.
836    //
837    // It's a bit of a tricky process though because the number of column infos may depend on the
838    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
839    // only be a single column in the file (and not 3).
840    //
841    // At the moment this method words because our rules are simple and we just repeat them here.  See
842    // Self::default_projection for a similar problem.  In the future this is something the encodings
843    // registry will need to figure out.
844    fn collect_columns_from_projection(
845        &self,
846        _projection: &ReaderProjection,
847    ) -> Result<Vec<Arc<ColumnInfo>>> {
848        Ok(self.metadata.column_infos.to_vec())
849    }
850
851    #[allow(clippy::too_many_arguments)]
852    fn do_read_range(
853        column_infos: Vec<Arc<ColumnInfo>>,
854        io: Arc<dyn EncodingsIo>,
855        cache: Arc<LanceCache>,
856        num_rows: u64,
857        decoder_plugins: Arc<DecoderPlugins>,
858        range: Range<u64>,
859        batch_size: u32,
860        projection: ReaderProjection,
861        filter: FilterExpression,
862        decoder_config: DecoderConfig,
863    ) -> Result<BoxStream<'static, ReadBatchTask>> {
864        debug!(
865            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
866            range,
867            batch_size,
868            num_rows,
869            column_infos.len(),
870            projection.schema.fields.len(),
871        );
872
873        let config = SchedulerDecoderConfig {
874            batch_size,
875            cache,
876            decoder_plugins,
877            io,
878            decoder_config,
879        };
880
881        let requested_rows = RequestedRows::Ranges(vec![range]);
882
883        Ok(schedule_and_decode(
884            column_infos,
885            requested_rows,
886            filter,
887            projection.column_indices,
888            projection.schema,
889            config,
890        ))
891    }
892
893    fn read_range(
894        &self,
895        range: Range<u64>,
896        batch_size: u32,
897        projection: ReaderProjection,
898        filter: FilterExpression,
899    ) -> Result<BoxStream<'static, ReadBatchTask>> {
900        // Create and initialize the stream
901        Self::do_read_range(
902            self.collect_columns_from_projection(&projection)?,
903            self.scheduler.clone(),
904            self.cache.clone(),
905            self.num_rows,
906            self.decoder_plugins.clone(),
907            range,
908            batch_size,
909            projection,
910            filter,
911            self.options.decoder_config.clone(),
912        )
913    }
914
915    #[allow(clippy::too_many_arguments)]
916    fn do_take_rows(
917        column_infos: Vec<Arc<ColumnInfo>>,
918        io: Arc<dyn EncodingsIo>,
919        cache: Arc<LanceCache>,
920        decoder_plugins: Arc<DecoderPlugins>,
921        indices: Vec<u64>,
922        batch_size: u32,
923        projection: ReaderProjection,
924        filter: FilterExpression,
925        decoder_config: DecoderConfig,
926    ) -> Result<BoxStream<'static, ReadBatchTask>> {
927        debug!(
928            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
929            indices.len(),
930            indices[0],
931            indices[indices.len() - 1],
932            batch_size,
933            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
934        );
935
936        let config = SchedulerDecoderConfig {
937            batch_size,
938            cache,
939            decoder_plugins,
940            io,
941            decoder_config,
942        };
943
944        let requested_rows = RequestedRows::Indices(indices);
945
946        Ok(schedule_and_decode(
947            column_infos,
948            requested_rows,
949            filter,
950            projection.column_indices,
951            projection.schema,
952            config,
953        ))
954    }
955
956    fn take_rows(
957        &self,
958        indices: Vec<u64>,
959        batch_size: u32,
960        projection: ReaderProjection,
961    ) -> Result<BoxStream<'static, ReadBatchTask>> {
962        // Create and initialize the stream
963        Self::do_take_rows(
964            self.collect_columns_from_projection(&projection)?,
965            self.scheduler.clone(),
966            self.cache.clone(),
967            self.decoder_plugins.clone(),
968            indices,
969            batch_size,
970            projection,
971            FilterExpression::no_filter(),
972            self.options.decoder_config.clone(),
973        )
974    }
975
976    #[allow(clippy::too_many_arguments)]
977    fn do_read_ranges(
978        column_infos: Vec<Arc<ColumnInfo>>,
979        io: Arc<dyn EncodingsIo>,
980        cache: Arc<LanceCache>,
981        decoder_plugins: Arc<DecoderPlugins>,
982        ranges: Vec<Range<u64>>,
983        batch_size: u32,
984        projection: ReaderProjection,
985        filter: FilterExpression,
986        decoder_config: DecoderConfig,
987    ) -> Result<BoxStream<'static, ReadBatchTask>> {
988        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
989        debug!(
990            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
991            ranges.len(),
992            num_rows,
993            ranges[0].start,
994            ranges[ranges.len() - 1].end,
995            batch_size,
996            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
997        );
998
999        let config = SchedulerDecoderConfig {
1000            batch_size,
1001            cache,
1002            decoder_plugins,
1003            io,
1004            decoder_config,
1005        };
1006
1007        let requested_rows = RequestedRows::Ranges(ranges);
1008
1009        Ok(schedule_and_decode(
1010            column_infos,
1011            requested_rows,
1012            filter,
1013            projection.column_indices,
1014            projection.schema,
1015            config,
1016        ))
1017    }
1018
1019    fn read_ranges(
1020        &self,
1021        ranges: Vec<Range<u64>>,
1022        batch_size: u32,
1023        projection: ReaderProjection,
1024        filter: FilterExpression,
1025    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1026        Self::do_read_ranges(
1027            self.collect_columns_from_projection(&projection)?,
1028            self.scheduler.clone(),
1029            self.cache.clone(),
1030            self.decoder_plugins.clone(),
1031            ranges,
1032            batch_size,
1033            projection,
1034            filter,
1035            self.options.decoder_config.clone(),
1036        )
1037    }
1038
1039    /// Creates a stream of "read tasks" to read the data from the file
1040    ///
1041    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1042    /// of record batches it returns a stream of "read tasks".
1043    ///
1044    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1045    ///
1046    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1047    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1048    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1049    pub fn read_tasks(
1050        &self,
1051        params: ReadBatchParams,
1052        batch_size: u32,
1053        projection: Option<ReaderProjection>,
1054        filter: FilterExpression,
1055    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1056        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1057        Self::validate_projection(&projection, &self.metadata)?;
1058        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1059            if bound > self.num_rows || bound == self.num_rows && inclusive {
1060                Err(Error::invalid_input(
1061                    format!(
1062                        "cannot read {:?} from file with {} rows",
1063                        params, self.num_rows
1064                    ),
1065                    location!(),
1066                ))
1067            } else {
1068                Ok(())
1069            }
1070        };
1071        match &params {
1072            ReadBatchParams::Indices(indices) => {
1073                for idx in indices {
1074                    match idx {
1075                        None => {
1076                            return Err(Error::invalid_input(
1077                                "Null value in indices array",
1078                                location!(),
1079                            ));
1080                        }
1081                        Some(idx) => {
1082                            verify_bound(&params, idx as u64, true)?;
1083                        }
1084                    }
1085                }
1086                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1087                self.take_rows(indices, batch_size, projection)
1088            }
1089            ReadBatchParams::Range(range) => {
1090                verify_bound(&params, range.end as u64, false)?;
1091                self.read_range(
1092                    range.start as u64..range.end as u64,
1093                    batch_size,
1094                    projection,
1095                    filter,
1096                )
1097            }
1098            ReadBatchParams::Ranges(ranges) => {
1099                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1100                for range in ranges.as_ref() {
1101                    verify_bound(&params, range.end, false)?;
1102                    ranges_u64.push(range.start..range.end);
1103                }
1104                self.read_ranges(ranges_u64, batch_size, projection, filter)
1105            }
1106            ReadBatchParams::RangeFrom(range) => {
1107                verify_bound(&params, range.start as u64, true)?;
1108                self.read_range(
1109                    range.start as u64..self.num_rows,
1110                    batch_size,
1111                    projection,
1112                    filter,
1113                )
1114            }
1115            ReadBatchParams::RangeTo(range) => {
1116                verify_bound(&params, range.end as u64, false)?;
1117                self.read_range(0..range.end as u64, batch_size, projection, filter)
1118            }
1119            ReadBatchParams::RangeFull => {
1120                self.read_range(0..self.num_rows, batch_size, projection, filter)
1121            }
1122        }
1123    }
1124
1125    /// Reads data from the file as a stream of record batches
1126    ///
1127    /// * `params` - Specifies the range (or indices) of data to read
1128    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1129    ///   if it is the last batch or if it is not possible to create a batch of the
1130    ///   requested size.
1131    ///
1132    ///   For example, if the batch size is 1024 and one of the columns is a string
1133    ///   column then there may be some ranges of 1024 rows that contain more than
1134    ///   2^31 bytes of string data (which is the maximum size of a string column
1135    ///   in Arrow).  In this case smaller batches may be emitted.
1136    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1137    ///   amount of CPU parallelism of the read.  In other words it controls how many
1138    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1139    ///   of the read (how many I/O requests are in flight at once).
1140    ///
1141    ///   This parameter also is also related to backpressure.  If the consumer of the
1142    ///   stream is slow then the reader will build up RAM.
1143    /// * `projection` - A projection to apply to the read.  This controls which columns
1144    ///   are read from the file.  The projection is NOT applied on top of the base
1145    ///   projection.  The projection is applied directly to the file schema.
1146    pub fn read_stream_projected(
1147        &self,
1148        params: ReadBatchParams,
1149        batch_size: u32,
1150        batch_readahead: u32,
1151        projection: ReaderProjection,
1152        filter: FilterExpression,
1153    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1154        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1155        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1156        let batch_stream = tasks_stream
1157            .map(|task| task.task)
1158            .buffered(batch_readahead as usize)
1159            .boxed();
1160        Ok(Box::pin(RecordBatchStreamAdapter::new(
1161            arrow_schema,
1162            batch_stream,
1163        )))
1164    }
1165
1166    fn take_rows_blocking(
1167        &self,
1168        indices: Vec<u64>,
1169        batch_size: u32,
1170        projection: ReaderProjection,
1171        filter: FilterExpression,
1172    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1173        let column_infos = self.collect_columns_from_projection(&projection)?;
1174        debug!(
1175            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1176            indices.len(),
1177            indices[0],
1178            indices[indices.len() - 1],
1179            batch_size,
1180            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1181        );
1182
1183        let config = SchedulerDecoderConfig {
1184            batch_size,
1185            cache: self.cache.clone(),
1186            decoder_plugins: self.decoder_plugins.clone(),
1187            io: self.scheduler.clone(),
1188            decoder_config: self.options.decoder_config.clone(),
1189        };
1190
1191        let requested_rows = RequestedRows::Indices(indices);
1192
1193        schedule_and_decode_blocking(
1194            column_infos,
1195            requested_rows,
1196            filter,
1197            projection.column_indices,
1198            projection.schema,
1199            config,
1200        )
1201    }
1202
1203    fn read_ranges_blocking(
1204        &self,
1205        ranges: Vec<Range<u64>>,
1206        batch_size: u32,
1207        projection: ReaderProjection,
1208        filter: FilterExpression,
1209    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1210        let column_infos = self.collect_columns_from_projection(&projection)?;
1211        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1212        debug!(
1213            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1214            ranges.len(),
1215            num_rows,
1216            ranges[0].start,
1217            ranges[ranges.len() - 1].end,
1218            batch_size,
1219            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1220        );
1221
1222        let config = SchedulerDecoderConfig {
1223            batch_size,
1224            cache: self.cache.clone(),
1225            decoder_plugins: self.decoder_plugins.clone(),
1226            io: self.scheduler.clone(),
1227            decoder_config: self.options.decoder_config.clone(),
1228        };
1229
1230        let requested_rows = RequestedRows::Ranges(ranges);
1231
1232        schedule_and_decode_blocking(
1233            column_infos,
1234            requested_rows,
1235            filter,
1236            projection.column_indices,
1237            projection.schema,
1238            config,
1239        )
1240    }
1241
1242    fn read_range_blocking(
1243        &self,
1244        range: Range<u64>,
1245        batch_size: u32,
1246        projection: ReaderProjection,
1247        filter: FilterExpression,
1248    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1249        let column_infos = self.collect_columns_from_projection(&projection)?;
1250        let num_rows = self.num_rows;
1251
1252        debug!(
1253            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1254            range,
1255            batch_size,
1256            num_rows,
1257            column_infos.len(),
1258            projection.schema.fields.len(),
1259        );
1260
1261        let config = SchedulerDecoderConfig {
1262            batch_size,
1263            cache: self.cache.clone(),
1264            decoder_plugins: self.decoder_plugins.clone(),
1265            io: self.scheduler.clone(),
1266            decoder_config: self.options.decoder_config.clone(),
1267        };
1268
1269        let requested_rows = RequestedRows::Ranges(vec![range]);
1270
1271        schedule_and_decode_blocking(
1272            column_infos,
1273            requested_rows,
1274            filter,
1275            projection.column_indices,
1276            projection.schema,
1277            config,
1278        )
1279    }
1280
1281    /// Read data from the file as an iterator of record batches
1282    ///
1283    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1284    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1285    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1286    ///
1287    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1288    /// use this method) because we can parallelize the decode.
1289    ///
1290    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1291    /// from a spawn_blocking context.
1292    pub fn read_stream_projected_blocking(
1293        &self,
1294        params: ReadBatchParams,
1295        batch_size: u32,
1296        projection: Option<ReaderProjection>,
1297        filter: FilterExpression,
1298    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1299        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1300        Self::validate_projection(&projection, &self.metadata)?;
1301        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1302            if bound > self.num_rows || bound == self.num_rows && inclusive {
1303                Err(Error::invalid_input(
1304                    format!(
1305                        "cannot read {:?} from file with {} rows",
1306                        params, self.num_rows
1307                    ),
1308                    location!(),
1309                ))
1310            } else {
1311                Ok(())
1312            }
1313        };
1314        match &params {
1315            ReadBatchParams::Indices(indices) => {
1316                for idx in indices {
1317                    match idx {
1318                        None => {
1319                            return Err(Error::invalid_input(
1320                                "Null value in indices array",
1321                                location!(),
1322                            ));
1323                        }
1324                        Some(idx) => {
1325                            verify_bound(&params, idx as u64, true)?;
1326                        }
1327                    }
1328                }
1329                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1330                self.take_rows_blocking(indices, batch_size, projection, filter)
1331            }
1332            ReadBatchParams::Range(range) => {
1333                verify_bound(&params, range.end as u64, false)?;
1334                self.read_range_blocking(
1335                    range.start as u64..range.end as u64,
1336                    batch_size,
1337                    projection,
1338                    filter,
1339                )
1340            }
1341            ReadBatchParams::Ranges(ranges) => {
1342                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1343                for range in ranges.as_ref() {
1344                    verify_bound(&params, range.end, false)?;
1345                    ranges_u64.push(range.start..range.end);
1346                }
1347                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1348            }
1349            ReadBatchParams::RangeFrom(range) => {
1350                verify_bound(&params, range.start as u64, true)?;
1351                self.read_range_blocking(
1352                    range.start as u64..self.num_rows,
1353                    batch_size,
1354                    projection,
1355                    filter,
1356                )
1357            }
1358            ReadBatchParams::RangeTo(range) => {
1359                verify_bound(&params, range.end as u64, false)?;
1360                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1361            }
1362            ReadBatchParams::RangeFull => {
1363                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1364            }
1365        }
1366    }
1367
1368    /// Reads data from the file as a stream of record batches
1369    ///
1370    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1371    /// provided when the file was opened (or reads all columns if the file was
1372    /// opened without a base projection)
1373    pub fn read_stream(
1374        &self,
1375        params: ReadBatchParams,
1376        batch_size: u32,
1377        batch_readahead: u32,
1378        filter: FilterExpression,
1379    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1380        self.read_stream_projected(
1381            params,
1382            batch_size,
1383            batch_readahead,
1384            self.base_projection.clone(),
1385            filter,
1386        )
1387    }
1388
1389    pub fn schema(&self) -> &Arc<Schema> {
1390        &self.metadata.file_schema
1391    }
1392}
1393
1394/// Inspects a page and returns a String describing the page's encoding
1395pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1396    if let Some(encoding) = &page.encoding {
1397        if let Some(style) = &encoding.location {
1398            match style {
1399                pbfile::encoding::Location::Indirect(indirect) => {
1400                    format!(
1401                        "IndirectEncoding(pos={},size={})",
1402                        indirect.buffer_location, indirect.buffer_length
1403                    )
1404                }
1405                pbfile::encoding::Location::Direct(direct) => {
1406                    let encoding_any =
1407                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1408                            .expect("failed to deserialize encoding as protobuf");
1409                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1410                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1411                        match encoding {
1412                            Ok(encoding) => {
1413                                format!("{:#?}", encoding)
1414                            }
1415                            Err(err) => {
1416                                format!("Unsupported(decode_err={})", err)
1417                            }
1418                        }
1419                    } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1420                        let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1421                        match encoding {
1422                            Ok(encoding) => {
1423                                format!("{:#?}", encoding)
1424                            }
1425                            Err(err) => {
1426                                format!("Unsupported(decode_err={})", err)
1427                            }
1428                        }
1429                    } else {
1430                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1431                    }
1432                }
1433                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1434            }
1435        } else {
1436            "MISSING STYLE".to_string()
1437        }
1438    } else {
1439        "MISSING".to_string()
1440    }
1441}
1442
1443pub trait EncodedBatchReaderExt {
1444    fn try_from_mini_lance(
1445        bytes: Bytes,
1446        schema: &Schema,
1447        version: LanceFileVersion,
1448    ) -> Result<Self>
1449    where
1450        Self: Sized;
1451    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1452    where
1453        Self: Sized;
1454}
1455
1456impl EncodedBatchReaderExt for EncodedBatch {
1457    fn try_from_mini_lance(
1458        bytes: Bytes,
1459        schema: &Schema,
1460        file_version: LanceFileVersion,
1461    ) -> Result<Self>
1462    where
1463        Self: Sized,
1464    {
1465        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1466        let footer = FileReader::decode_footer(&bytes)?;
1467
1468        // Next, read the metadata for the columns
1469        // This is both the column metadata and the CMO table
1470        let column_metadata_start = footer.column_meta_start as usize;
1471        let column_metadata_end = footer.global_buff_offsets_start as usize;
1472        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1473        let column_metadatas =
1474            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1475
1476        let file_version = LanceFileVersion::try_from_major_minor(
1477            footer.major_version as u32,
1478            footer.minor_version as u32,
1479        )?;
1480
1481        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1482
1483        Ok(Self {
1484            data: bytes,
1485            num_rows: page_table
1486                .first()
1487                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1488                .unwrap_or(0),
1489            page_table,
1490            top_level_columns: projection.column_indices,
1491            schema: Arc::new(schema.clone()),
1492        })
1493    }
1494
1495    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1496    where
1497        Self: Sized,
1498    {
1499        let footer = FileReader::decode_footer(&bytes)?;
1500        let file_version = LanceFileVersion::try_from_major_minor(
1501            footer.major_version as u32,
1502            footer.minor_version as u32,
1503        )?;
1504
1505        let gbo_table = FileReader::do_decode_gbo_table(
1506            &bytes.slice(footer.global_buff_offsets_start as usize..),
1507            &footer,
1508            file_version,
1509        )?;
1510        if gbo_table.is_empty() {
1511            return Err(Error::Internal {
1512                message: "File did not contain any global buffers, schema expected".to_string(),
1513                location: location!(),
1514            });
1515        }
1516        let schema_start = gbo_table[0].position as usize;
1517        let schema_size = gbo_table[0].size as usize;
1518
1519        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1520        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1521        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1522
1523        // Next, read the metadata for the columns
1524        // This is both the column metadata and the CMO table
1525        let column_metadata_start = footer.column_meta_start as usize;
1526        let column_metadata_end = footer.global_buff_offsets_start as usize;
1527        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1528        let column_metadatas =
1529            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1530
1531        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1532
1533        Ok(Self {
1534            data: bytes,
1535            num_rows: page_table
1536                .first()
1537                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1538                .unwrap_or(0),
1539            page_table,
1540            top_level_columns: projection.column_indices,
1541            schema: Arc::new(schema),
1542        })
1543    }
1544}
1545
1546#[cfg(test)]
1547pub mod tests {
1548    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1549
1550    use arrow_array::{
1551        types::{Float64Type, Int32Type},
1552        RecordBatch, UInt32Array,
1553    };
1554    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1555    use bytes::Bytes;
1556    use futures::{prelude::stream::TryStreamExt, StreamExt};
1557    use lance_arrow::RecordBatchExt;
1558    use lance_core::{datatypes::Schema, ArrowResult};
1559    use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1560    use lance_encoding::{
1561        decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1562        encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1563        version::LanceFileVersion,
1564    };
1565    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1566    use log::debug;
1567    use rstest::rstest;
1568    use tokio::sync::mpsc;
1569
1570    use crate::v2::{
1571        reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1572        testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1573        writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1574    };
1575    use lance_encoding::decoder::DecoderConfig;
1576
1577    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1578        let location_type = DataType::Struct(Fields::from(vec![
1579            Field::new("x", DataType::Float64, true),
1580            Field::new("y", DataType::Float64, true),
1581        ]));
1582        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1583
1584        let mut reader = gen_batch()
1585            .col("score", array::rand::<Float64Type>())
1586            .col("location", array::rand_type(&location_type))
1587            .col("categories", array::rand_type(&categories_type))
1588            .col("binary", array::rand_type(&DataType::Binary));
1589        if version <= LanceFileVersion::V2_0 {
1590            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1591        }
1592        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1593
1594        write_lance_file(
1595            reader,
1596            fs,
1597            FileWriterOptions {
1598                format_version: Some(version),
1599                ..Default::default()
1600            },
1601        )
1602        .await
1603    }
1604
1605    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1606
1607    async fn verify_expected(
1608        expected: &[RecordBatch],
1609        mut actual: Pin<Box<dyn RecordBatchStream>>,
1610        read_size: u32,
1611        transform: Option<Transformer>,
1612    ) {
1613        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1614        let mut expected_iter = expected.iter().map(|batch| {
1615            if let Some(transform) = &transform {
1616                transform(batch)
1617            } else {
1618                batch.clone()
1619            }
1620        });
1621        let mut next_expected = expected_iter.next().unwrap().clone();
1622        while let Some(actual) = actual.next().await {
1623            let mut actual = actual.unwrap();
1624            let mut rows_to_verify = actual.num_rows() as u32;
1625            let expected_length = remaining.min(read_size);
1626            assert_eq!(expected_length, rows_to_verify);
1627
1628            while rows_to_verify > 0 {
1629                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1630                assert_eq!(
1631                    next_expected.slice(0, next_slice_len as usize),
1632                    actual.slice(0, next_slice_len as usize)
1633                );
1634                remaining -= next_slice_len;
1635                rows_to_verify -= next_slice_len;
1636                if remaining > 0 {
1637                    if next_slice_len == next_expected.num_rows() as u32 {
1638                        next_expected = expected_iter.next().unwrap().clone();
1639                    } else {
1640                        next_expected = next_expected.slice(
1641                            next_slice_len as usize,
1642                            next_expected.num_rows() - next_slice_len as usize,
1643                        );
1644                    }
1645                }
1646                if rows_to_verify > 0 {
1647                    actual = actual.slice(
1648                        next_slice_len as usize,
1649                        actual.num_rows() - next_slice_len as usize,
1650                    );
1651                }
1652            }
1653        }
1654        assert_eq!(remaining, 0);
1655    }
1656
1657    #[tokio::test]
1658    async fn test_round_trip() {
1659        let fs = FsFixture::default();
1660
1661        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1662
1663        for read_size in [32, 1024, 1024 * 1024] {
1664            let file_scheduler = fs
1665                .scheduler
1666                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1667                .await
1668                .unwrap();
1669            let file_reader = FileReader::try_open(
1670                file_scheduler,
1671                None,
1672                Arc::<DecoderPlugins>::default(),
1673                &test_cache(),
1674                FileReaderOptions::default(),
1675            )
1676            .await
1677            .unwrap();
1678
1679            let schema = file_reader.schema();
1680            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1681
1682            let batch_stream = file_reader
1683                .read_stream(
1684                    lance_io::ReadBatchParams::RangeFull,
1685                    read_size,
1686                    16,
1687                    FilterExpression::no_filter(),
1688                )
1689                .unwrap();
1690
1691            verify_expected(&data, batch_stream, read_size, None).await;
1692        }
1693    }
1694
1695    #[rstest]
1696    #[test_log::test(tokio::test)]
1697    async fn test_encoded_batch_round_trip(
1698        // TODO: Add V2_1 (currently fails)
1699        #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1700    ) {
1701        let data = gen_batch()
1702            .col("x", array::rand::<Int32Type>())
1703            .col("y", array::rand_utf8(ByteCount::from(16), false))
1704            .into_batch_rows(RowCount::from(10000))
1705            .unwrap();
1706
1707        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1708
1709        let encoding_options = EncodingOptions {
1710            cache_bytes_per_column: 4096,
1711            max_page_bytes: 32 * 1024 * 1024,
1712            keep_original_array: true,
1713            buffer_alignment: 64,
1714        };
1715
1716        let encoding_strategy = default_encoding_strategy(version);
1717
1718        let encoded_batch = encode_batch(
1719            &data,
1720            lance_schema.clone(),
1721            encoding_strategy.as_ref(),
1722            &encoding_options,
1723        )
1724        .await
1725        .unwrap();
1726
1727        // Test self described
1728        let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1729
1730        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1731
1732        let decoded = decode_batch(
1733            &decoded_batch,
1734            &FilterExpression::no_filter(),
1735            Arc::<DecoderPlugins>::default(),
1736            false,
1737            LanceFileVersion::default(),
1738            None,
1739        )
1740        .await
1741        .unwrap();
1742
1743        assert_eq!(data, decoded);
1744
1745        // Test mini
1746        let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1747        let decoded_batch =
1748            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1749                .unwrap();
1750        let decoded = decode_batch(
1751            &decoded_batch,
1752            &FilterExpression::no_filter(),
1753            Arc::<DecoderPlugins>::default(),
1754            false,
1755            LanceFileVersion::default(),
1756            None,
1757        )
1758        .await
1759        .unwrap();
1760
1761        assert_eq!(data, decoded);
1762    }
1763
1764    #[rstest]
1765    #[test_log::test(tokio::test)]
1766    async fn test_projection(
1767        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1768    ) {
1769        let fs = FsFixture::default();
1770
1771        let written_file = create_some_file(&fs, version).await;
1772        let file_scheduler = fs
1773            .scheduler
1774            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1775            .await
1776            .unwrap();
1777
1778        let field_id_mapping = written_file
1779            .field_id_mapping
1780            .iter()
1781            .copied()
1782            .collect::<BTreeMap<_, _>>();
1783
1784        let empty_projection = ReaderProjection {
1785            column_indices: Vec::default(),
1786            schema: Arc::new(Schema::default()),
1787        };
1788
1789        for columns in [
1790            vec!["score"],
1791            vec!["location"],
1792            vec!["categories"],
1793            vec!["score.x"],
1794            vec!["score", "categories"],
1795            vec!["score", "location"],
1796            vec!["location", "categories"],
1797            vec!["score.y", "location", "categories"],
1798        ] {
1799            debug!("Testing round trip with projection {:?}", columns);
1800            for use_field_ids in [true, false] {
1801                // We can specify the projection as part of the read operation via read_stream_projected
1802                let file_reader = FileReader::try_open(
1803                    file_scheduler.clone(),
1804                    None,
1805                    Arc::<DecoderPlugins>::default(),
1806                    &test_cache(),
1807                    FileReaderOptions::default(),
1808                )
1809                .await
1810                .unwrap();
1811
1812                let projected_schema = written_file.schema.project(&columns).unwrap();
1813                let projection = if use_field_ids {
1814                    ReaderProjection::from_field_ids(
1815                        file_reader.metadata.version(),
1816                        &projected_schema,
1817                        &field_id_mapping,
1818                    )
1819                    .unwrap()
1820                } else {
1821                    ReaderProjection::from_column_names(
1822                        file_reader.metadata.version(),
1823                        &written_file.schema,
1824                        &columns,
1825                    )
1826                    .unwrap()
1827                };
1828
1829                let batch_stream = file_reader
1830                    .read_stream_projected(
1831                        lance_io::ReadBatchParams::RangeFull,
1832                        1024,
1833                        16,
1834                        projection.clone(),
1835                        FilterExpression::no_filter(),
1836                    )
1837                    .unwrap();
1838
1839                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1840                verify_expected(
1841                    &written_file.data,
1842                    batch_stream,
1843                    1024,
1844                    Some(Box::new(move |batch: &RecordBatch| {
1845                        batch.project_by_schema(&projection_arrow).unwrap()
1846                    })),
1847                )
1848                .await;
1849
1850                // We can also specify the projection as a base projection when we open the file
1851                let file_reader = FileReader::try_open(
1852                    file_scheduler.clone(),
1853                    Some(projection.clone()),
1854                    Arc::<DecoderPlugins>::default(),
1855                    &test_cache(),
1856                    FileReaderOptions::default(),
1857                )
1858                .await
1859                .unwrap();
1860
1861                let batch_stream = file_reader
1862                    .read_stream(
1863                        lance_io::ReadBatchParams::RangeFull,
1864                        1024,
1865                        16,
1866                        FilterExpression::no_filter(),
1867                    )
1868                    .unwrap();
1869
1870                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1871                verify_expected(
1872                    &written_file.data,
1873                    batch_stream,
1874                    1024,
1875                    Some(Box::new(move |batch: &RecordBatch| {
1876                        batch.project_by_schema(&projection_arrow).unwrap()
1877                    })),
1878                )
1879                .await;
1880
1881                assert!(file_reader
1882                    .read_stream_projected(
1883                        lance_io::ReadBatchParams::RangeFull,
1884                        1024,
1885                        16,
1886                        empty_projection.clone(),
1887                        FilterExpression::no_filter(),
1888                    )
1889                    .is_err());
1890            }
1891        }
1892
1893        assert!(FileReader::try_open(
1894            file_scheduler.clone(),
1895            Some(empty_projection),
1896            Arc::<DecoderPlugins>::default(),
1897            &test_cache(),
1898            FileReaderOptions::default(),
1899        )
1900        .await
1901        .is_err());
1902
1903        let arrow_schema = ArrowSchema::new(vec![
1904            Field::new("x", DataType::Int32, true),
1905            Field::new("y", DataType::Int32, true),
1906        ]);
1907        let schema = Schema::try_from(&arrow_schema).unwrap();
1908
1909        let projection_with_dupes = ReaderProjection {
1910            column_indices: vec![0, 0],
1911            schema: Arc::new(schema),
1912        };
1913
1914        assert!(FileReader::try_open(
1915            file_scheduler.clone(),
1916            Some(projection_with_dupes),
1917            Arc::<DecoderPlugins>::default(),
1918            &test_cache(),
1919            FileReaderOptions::default(),
1920        )
1921        .await
1922        .is_err());
1923    }
1924
1925    #[test_log::test(tokio::test)]
1926    async fn test_compressing_buffer() {
1927        let fs = FsFixture::default();
1928
1929        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1930        let file_scheduler = fs
1931            .scheduler
1932            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1933            .await
1934            .unwrap();
1935
1936        // We can specify the projection as part of the read operation via read_stream_projected
1937        let file_reader = FileReader::try_open(
1938            file_scheduler.clone(),
1939            None,
1940            Arc::<DecoderPlugins>::default(),
1941            &test_cache(),
1942            FileReaderOptions::default(),
1943        )
1944        .await
1945        .unwrap();
1946
1947        let mut projection = written_file.schema.project(&["score"]).unwrap();
1948        for field in projection.fields.iter_mut() {
1949            field
1950                .metadata
1951                .insert("lance:compression".to_string(), "zstd".to_string());
1952        }
1953        let projection = ReaderProjection {
1954            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1955            schema: Arc::new(projection),
1956        };
1957
1958        let batch_stream = file_reader
1959            .read_stream_projected(
1960                lance_io::ReadBatchParams::RangeFull,
1961                1024,
1962                16,
1963                projection.clone(),
1964                FilterExpression::no_filter(),
1965            )
1966            .unwrap();
1967
1968        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1969        verify_expected(
1970            &written_file.data,
1971            batch_stream,
1972            1024,
1973            Some(Box::new(move |batch: &RecordBatch| {
1974                batch.project_by_schema(&projection_arrow).unwrap()
1975            })),
1976        )
1977        .await;
1978    }
1979
1980    #[tokio::test]
1981    async fn test_read_all() {
1982        let fs = FsFixture::default();
1983        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1984        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1985
1986        let file_scheduler = fs
1987            .scheduler
1988            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1989            .await
1990            .unwrap();
1991        let file_reader = FileReader::try_open(
1992            file_scheduler.clone(),
1993            None,
1994            Arc::<DecoderPlugins>::default(),
1995            &test_cache(),
1996            FileReaderOptions::default(),
1997        )
1998        .await
1999        .unwrap();
2000
2001        let batches = file_reader
2002            .read_stream(
2003                lance_io::ReadBatchParams::RangeFull,
2004                total_rows as u32,
2005                16,
2006                FilterExpression::no_filter(),
2007            )
2008            .unwrap()
2009            .try_collect::<Vec<_>>()
2010            .await
2011            .unwrap();
2012        assert_eq!(batches.len(), 1);
2013        assert_eq!(batches[0].num_rows(), total_rows);
2014    }
2015
2016    #[tokio::test]
2017    async fn test_blocking_take() {
2018        let fs = FsFixture::default();
2019        let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
2020        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2021
2022        let file_scheduler = fs
2023            .scheduler
2024            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2025            .await
2026            .unwrap();
2027        let file_reader = FileReader::try_open(
2028            file_scheduler.clone(),
2029            Some(
2030                ReaderProjection::from_column_names(LanceFileVersion::V2_1, &schema, &["score"])
2031                    .unwrap(),
2032            ),
2033            Arc::<DecoderPlugins>::default(),
2034            &test_cache(),
2035            FileReaderOptions::default(),
2036        )
2037        .await
2038        .unwrap();
2039
2040        let batches = tokio::task::spawn_blocking(move || {
2041            file_reader
2042                .read_stream_projected_blocking(
2043                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2044                    total_rows as u32,
2045                    None,
2046                    FilterExpression::no_filter(),
2047                )
2048                .unwrap()
2049                .collect::<ArrowResult<Vec<_>>>()
2050                .unwrap()
2051        })
2052        .await
2053        .unwrap();
2054
2055        assert_eq!(batches.len(), 1);
2056        assert_eq!(batches[0].num_rows(), 5);
2057        assert_eq!(batches[0].num_columns(), 1);
2058    }
2059
2060    #[tokio::test(flavor = "multi_thread")]
2061    async fn test_drop_in_progress() {
2062        let fs = FsFixture::default();
2063        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2064        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2065
2066        let file_scheduler = fs
2067            .scheduler
2068            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2069            .await
2070            .unwrap();
2071        let file_reader = FileReader::try_open(
2072            file_scheduler.clone(),
2073            None,
2074            Arc::<DecoderPlugins>::default(),
2075            &test_cache(),
2076            FileReaderOptions::default(),
2077        )
2078        .await
2079        .unwrap();
2080
2081        let mut batches = file_reader
2082            .read_stream(
2083                lance_io::ReadBatchParams::RangeFull,
2084                (total_rows / 10) as u32,
2085                16,
2086                FilterExpression::no_filter(),
2087            )
2088            .unwrap();
2089
2090        drop(file_reader);
2091
2092        let batch = batches.next().await.unwrap().unwrap();
2093        assert!(batch.num_rows() > 0);
2094
2095        // Drop in-progress scan
2096        drop(batches);
2097    }
2098
2099    #[tokio::test]
2100    async fn drop_while_scheduling() {
2101        // This is a bit of a white-box test, pokes at the internals.  We want to
2102        // test the case where the read stream is dropped before the scheduling
2103        // thread finishes.  We can't do that in a black-box fashion because the
2104        // scheduling thread runs in the background and there is no easy way to
2105        // pause / gate it.
2106
2107        // It's a regression for a bug where the scheduling thread would panic
2108        // if the stream was dropped before it finished.
2109
2110        let fs = FsFixture::default();
2111        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2112        let total_rows = written_file
2113            .data
2114            .iter()
2115            .map(|batch| batch.num_rows())
2116            .sum::<usize>();
2117
2118        let file_scheduler = fs
2119            .scheduler
2120            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2121            .await
2122            .unwrap();
2123        let file_reader = FileReader::try_open(
2124            file_scheduler.clone(),
2125            None,
2126            Arc::<DecoderPlugins>::default(),
2127            &test_cache(),
2128            FileReaderOptions::default(),
2129        )
2130        .await
2131        .unwrap();
2132
2133        let projection =
2134            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2135        let column_infos = file_reader
2136            .collect_columns_from_projection(&projection)
2137            .unwrap();
2138        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2139            &projection.schema,
2140            &projection.column_indices,
2141            &column_infos,
2142            &vec![],
2143            total_rows as u64,
2144            Arc::<DecoderPlugins>::default(),
2145            file_reader.scheduler.clone(),
2146            test_cache(),
2147            &FilterExpression::no_filter(),
2148            &DecoderConfig::default(),
2149        )
2150        .await
2151        .unwrap();
2152
2153        let range = 0..total_rows as u64;
2154
2155        let (tx, rx) = mpsc::unbounded_channel();
2156
2157        // Simulate the stream / decoder being dropped
2158        drop(rx);
2159
2160        // Scheduling should not panic
2161        decode_scheduler.schedule_range(
2162            range,
2163            &FilterExpression::no_filter(),
2164            tx,
2165            file_reader.scheduler.clone(),
2166        )
2167    }
2168
2169    #[tokio::test]
2170    async fn test_read_empty_range() {
2171        let fs = FsFixture::default();
2172        create_some_file(&fs, LanceFileVersion::V2_0).await;
2173
2174        let file_scheduler = fs
2175            .scheduler
2176            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2177            .await
2178            .unwrap();
2179        let file_reader = FileReader::try_open(
2180            file_scheduler.clone(),
2181            None,
2182            Arc::<DecoderPlugins>::default(),
2183            &test_cache(),
2184            FileReaderOptions::default(),
2185        )
2186        .await
2187        .unwrap();
2188
2189        // All ranges empty, no data
2190        let batches = file_reader
2191            .read_stream(
2192                lance_io::ReadBatchParams::Range(0..0),
2193                1024,
2194                16,
2195                FilterExpression::no_filter(),
2196            )
2197            .unwrap()
2198            .try_collect::<Vec<_>>()
2199            .await
2200            .unwrap();
2201
2202        assert_eq!(batches.len(), 0);
2203
2204        // Some ranges empty
2205        let batches = file_reader
2206            .read_stream(
2207                lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2208                1024,
2209                16,
2210                FilterExpression::no_filter(),
2211            )
2212            .unwrap()
2213            .try_collect::<Vec<_>>()
2214            .await
2215            .unwrap();
2216        assert_eq!(batches.len(), 1);
2217    }
2218
2219    #[tokio::test]
2220    async fn test_global_buffers() {
2221        let fs = FsFixture::default();
2222
2223        let lance_schema =
2224            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2225                "foo",
2226                DataType::Int32,
2227                true,
2228            )]))
2229            .unwrap();
2230
2231        let mut file_writer = FileWriter::try_new(
2232            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2233            lance_schema.clone(),
2234            FileWriterOptions::default(),
2235        )
2236        .unwrap();
2237
2238        let test_bytes = Bytes::from_static(b"hello");
2239
2240        let buf_index = file_writer
2241            .add_global_buffer(test_bytes.clone())
2242            .await
2243            .unwrap();
2244
2245        assert_eq!(buf_index, 1);
2246
2247        file_writer.finish().await.unwrap();
2248
2249        let file_scheduler = fs
2250            .scheduler
2251            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2252            .await
2253            .unwrap();
2254        let file_reader = FileReader::try_open(
2255            file_scheduler.clone(),
2256            None,
2257            Arc::<DecoderPlugins>::default(),
2258            &test_cache(),
2259            FileReaderOptions::default(),
2260        )
2261        .await
2262        .unwrap();
2263
2264        let buf = file_reader.read_global_buffer(1).await.unwrap();
2265        assert_eq!(buf, test_bytes);
2266    }
2267}