Skip to main content

lance_file/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{Stream, StreamExt, stream::BoxStream};
18use lance_encoding::{
19    EncodingsIo,
20    decoder::{
21        ColumnInfo, DecoderConfig, DecoderPlugins, FilterExpression, PageEncoding, PageInfo,
22        ReadBatchTask, RequestedRows, SchedulerDecoderConfig, schedule_and_decode,
23        schedule_and_decode_blocking,
24    },
25    encoder::EncodedBatch,
26    version::LanceFileVersion,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31
32use lance_core::{
33    Error, Result,
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36};
37use lance_encoding::format::pb as pbenc;
38use lance_encoding::format::pb21 as pbenc21;
39use lance_io::{
40    ReadBatchParams,
41    scheduler::FileScheduler,
42    stream::{RecordBatchStream, RecordBatchStreamAdapter},
43};
44
45use crate::{
46    datatypes::{Fields, FieldsWithMeta},
47    format::{MAGIC, MAJOR_VERSION, MINOR_VERSION, pb, pbfile},
48    io::LanceEncodingsIo,
49    writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52/// Default chunk size for reading large pages (8MiB)
53/// Pages larger than this will be split into multiple chunks during read
54pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
55
56// For now, we don't use global buffers for anything other than schema.  If we
57// use these later we should make them lazily loaded and then cached once loaded.
58//
59// We store their position / length for debugging purposes
60#[derive(Debug, DeepSizeOf)]
61pub struct BufferDescriptor {
62    pub position: u64,
63    pub size: u64,
64}
65
66/// Statistics summarize some of the file metadata for quick summary info
67#[derive(Debug)]
68pub struct FileStatistics {
69    /// Statistics about each of the columns in the file
70    pub columns: Vec<ColumnStatistics>,
71}
72
73/// Summary information describing a column
74#[derive(Debug)]
75pub struct ColumnStatistics {
76    /// The number of pages in the column
77    pub num_pages: usize,
78    /// The total number of data & metadata bytes in the column
79    ///
80    /// This is the compressed on-disk size
81    pub size_bytes: u64,
82}
83
84// TODO: Caching
85#[derive(Debug)]
86pub struct CachedFileMetadata {
87    /// The schema of the file
88    pub file_schema: Arc<Schema>,
89    /// The column metadatas
90    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
91    pub column_infos: Vec<Arc<ColumnInfo>>,
92    /// The number of rows in the file
93    pub num_rows: u64,
94    pub file_buffers: Vec<BufferDescriptor>,
95    /// The number of bytes contained in the data page section of the file
96    pub num_data_bytes: u64,
97    /// The number of bytes contained in the column metadata (not including buffers
98    /// referenced by the metadata)
99    pub num_column_metadata_bytes: u64,
100    /// The number of bytes contained in global buffers
101    pub num_global_buffer_bytes: u64,
102    /// The number of bytes contained in the CMO and GBO tables
103    pub num_footer_bytes: u64,
104    pub major_version: u16,
105    pub minor_version: u16,
106}
107
108impl DeepSizeOf for CachedFileMetadata {
109    // TODO: include size for `column_metadatas` and `column_infos`.
110    fn deep_size_of_children(&self, context: &mut Context) -> usize {
111        self.file_schema.deep_size_of_children(context)
112            + self
113                .file_buffers
114                .iter()
115                .map(|file_buffer| file_buffer.deep_size_of_children(context))
116                .sum::<usize>()
117    }
118}
119
120impl CachedFileMetadata {
121    pub fn version(&self) -> LanceFileVersion {
122        match (self.major_version, self.minor_version) {
123            (0, 3) => LanceFileVersion::V2_0,
124            (2, 0) => LanceFileVersion::V2_0,
125            (2, 1) => LanceFileVersion::V2_1,
126            (2, 2) => LanceFileVersion::V2_2,
127            (2, 3) => LanceFileVersion::V2_3,
128            _ => panic!(
129                "Unsupported version: {}.{}",
130                self.major_version, self.minor_version
131            ),
132        }
133    }
134}
135
136/// Selecting columns from a lance file requires specifying both the
137/// index of the column and the data type of the column
138///
139/// Partly, this is because it is not strictly required that columns
140/// be read into the same type.  For example, a string column may be
141/// read as a string, large_string or string_view type.
142///
143/// A read will only succeed if the decoder for a column is capable
144/// of decoding into the requested type.
145///
146/// Note that this should generally be limited to different in-memory
147/// representations of the same semantic type.  An encoding could
148/// theoretically support "casting" (e.g. int to string, etc.) but
149/// there is little advantage in doing so here.
150///
151/// Note: in order to specify a projection the user will need some way
152/// to figure out the column indices.  In the table format we do this
153/// using field IDs and keeping track of the field id->column index mapping.
154///
155/// If users are not using the table format then they will need to figure
156/// out some way to do this themselves.
157#[derive(Debug, Clone)]
158pub struct ReaderProjection {
159    /// The data types (schema) of the selected columns.  The names
160    /// of the schema are arbitrary and ignored.
161    pub schema: Arc<Schema>,
162    /// The indices of the columns to load.
163    ///
164    /// The content of this vector depends on the file version.
165    ///
166    /// In Lance File Version 2.0 we need ids for structural fields as
167    /// well as leaf fields:
168    ///
169    ///   - Primitive: the index of the column in the schema
170    ///   - List: the index of the list column in the schema
171    ///     followed by the column indices of the children
172    ///   - FixedSizeList (of primitive): the index of the column in the schema
173    ///     (this case is not nested)
174    ///   - FixedSizeList (of non-primitive): not yet implemented
175    ///   - Dictionary: same as primitive
176    ///   - Struct: the index of the struct column in the schema
177    ///     followed by the column indices of the children
178    ///
179    ///   In other words, this should be a DFS listing of the desired schema.
180    ///
181    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
182    /// fields are completely transparent.
183    ///
184    /// For example, if the goal is to load:
185    ///
186    ///   x: int32
187    ///   y: `struct<z: int32, w: string>`
188    ///   z: `list<int32>`
189    ///
190    /// and the schema originally used to store the data was:
191    ///
192    ///   a: `struct<x: int32>`
193    ///   b: int64
194    ///   y: `struct<z: int32, c: int64, w: string>`
195    ///   z: `list<int32>`
196    ///
197    /// Then the column_indices should be:
198    ///
199    /// - 2.0: [1, 3, 4, 6, 7, 8]
200    /// - 2.1: [0, 2, 4, 5]
201    pub column_indices: Vec<u32>,
202}
203
204impl ReaderProjection {
205    fn from_field_ids_helper<'a>(
206        file_version: LanceFileVersion,
207        fields: impl Iterator<Item = &'a Field>,
208        field_id_to_column_index: &BTreeMap<u32, u32>,
209        column_indices: &mut Vec<u32>,
210    ) -> Result<()> {
211        for field in fields {
212            let is_structural = file_version >= LanceFileVersion::V2_1;
213            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
214            // we only need ids for leaf fields.
215            if (!is_structural
216                || field.children.is_empty()
217                || field.is_blob()
218                || field.is_packed_struct())
219                && let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
220            {
221                column_indices.push(column_idx);
222            }
223            // Don't recurse into children if the field is a blob or packed struct in 2.1
224            if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
225                Self::from_field_ids_helper(
226                    file_version,
227                    field.children.iter(),
228                    field_id_to_column_index,
229                    column_indices,
230                )?;
231            }
232        }
233        Ok(())
234    }
235
236    /// Creates a projection using a mapping from field IDs to column indices
237    ///
238    /// You can obtain such a mapping when the file is written using the
239    /// [`crate::writer::FileWriter::field_id_to_column_indices`] method.
240    pub fn from_field_ids(
241        file_version: LanceFileVersion,
242        schema: &Schema,
243        field_id_to_column_index: &BTreeMap<u32, u32>,
244    ) -> Result<Self> {
245        let mut column_indices = Vec::new();
246        Self::from_field_ids_helper(
247            file_version,
248            schema.fields.iter(),
249            field_id_to_column_index,
250            &mut column_indices,
251        )?;
252        let projection = Self {
253            schema: Arc::new(schema.clone()),
254            column_indices,
255        };
256        Ok(projection)
257    }
258
259    /// Creates a projection that reads the entire file
260    ///
261    /// If the schema provided is not the schema of the entire file then
262    /// the projection will be invalid and the read will fail.
263    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
264    /// the whole struct has one column index.
265    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
266    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
267        let schema = Arc::new(schema.clone());
268        let is_structural = version >= LanceFileVersion::V2_1;
269        let mut column_indices = vec![];
270        let mut curr_column_idx = 0;
271        let mut packed_struct_fields_num = 0;
272        for field in schema.fields_pre_order() {
273            if packed_struct_fields_num > 0 {
274                packed_struct_fields_num -= 1;
275                continue;
276            }
277            if field.is_packed_struct() {
278                column_indices.push(curr_column_idx);
279                curr_column_idx += 1;
280                packed_struct_fields_num = field.children.len();
281            } else if field.children.is_empty() || !is_structural {
282                column_indices.push(curr_column_idx);
283                curr_column_idx += 1;
284            }
285        }
286        Self {
287            schema,
288            column_indices,
289        }
290    }
291
292    /// Creates a projection that reads the specified columns provided by name
293    ///
294    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
295    ///
296    /// If the schema provided is not the schema of the entire file then
297    /// the projection will be invalid and the read will fail.
298    pub fn from_column_names(
299        file_version: LanceFileVersion,
300        schema: &Schema,
301        column_names: &[&str],
302    ) -> Result<Self> {
303        let field_id_to_column_index = schema
304            .fields_pre_order()
305            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
306            // we only need ids for leaf fields.
307            .filter(|field| {
308                file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
309            })
310            .enumerate()
311            .map(|(idx, field)| (field.id as u32, idx as u32))
312            .collect::<BTreeMap<_, _>>();
313        let projected = schema.project(column_names)?;
314        let mut column_indices = Vec::new();
315        Self::from_field_ids_helper(
316            file_version,
317            projected.fields.iter(),
318            &field_id_to_column_index,
319            &mut column_indices,
320        )?;
321        Ok(Self {
322            schema: Arc::new(projected),
323            column_indices,
324        })
325    }
326}
327
328/// File Reader Options that can control reading behaviors, such as whether to enable caching on repetition indices
329#[derive(Clone, Debug)]
330pub struct FileReaderOptions {
331    pub decoder_config: DecoderConfig,
332    /// Size of chunks when reading large pages. Pages larger than this
333    /// will be read in multiple chunks to control memory usage.
334    /// Default: 8MB (DEFAULT_READ_CHUNK_SIZE)
335    pub read_chunk_size: u64,
336}
337
338impl Default for FileReaderOptions {
339    fn default() -> Self {
340        Self {
341            decoder_config: DecoderConfig::default(),
342            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
343        }
344    }
345}
346
347#[derive(Debug)]
348pub struct FileReader {
349    scheduler: Arc<dyn EncodingsIo>,
350    // The default projection to be applied to all reads
351    base_projection: ReaderProjection,
352    num_rows: u64,
353    metadata: Arc<CachedFileMetadata>,
354    decoder_plugins: Arc<DecoderPlugins>,
355    cache: Arc<LanceCache>,
356    options: FileReaderOptions,
357}
358#[derive(Debug)]
359struct Footer {
360    #[allow(dead_code)]
361    column_meta_start: u64,
362    // We don't use this today because we always load metadata for every column
363    // and don't yet support "metadata projection"
364    #[allow(dead_code)]
365    column_meta_offsets_start: u64,
366    global_buff_offsets_start: u64,
367    num_global_buffers: u32,
368    num_columns: u32,
369    major_version: u16,
370    minor_version: u16,
371}
372
373const FOOTER_LEN: usize = 40;
374
375impl FileReader {
376    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
377        Self {
378            scheduler,
379            base_projection: self.base_projection.clone(),
380            cache: self.cache.clone(),
381            decoder_plugins: self.decoder_plugins.clone(),
382            metadata: self.metadata.clone(),
383            options: self.options.clone(),
384            num_rows: self.num_rows,
385        }
386    }
387
388    pub fn num_rows(&self) -> u64 {
389        self.num_rows
390    }
391
392    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
393        &self.metadata
394    }
395
396    pub fn file_statistics(&self) -> FileStatistics {
397        let column_metadatas = &self.metadata().column_metadatas;
398
399        let column_stats = column_metadatas
400            .iter()
401            .map(|col_metadata| {
402                let num_pages = col_metadata.pages.len();
403                let size_bytes = col_metadata
404                    .pages
405                    .iter()
406                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
407                    .sum::<u64>();
408                ColumnStatistics {
409                    num_pages,
410                    size_bytes,
411                }
412            })
413            .collect();
414
415        FileStatistics {
416            columns: column_stats,
417        }
418    }
419
420    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
421        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?;
422        self.scheduler
423            .submit_single(
424                buffer_desc.position..buffer_desc.position + buffer_desc.size,
425                0,
426            )
427            .await
428    }
429
430    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
431        let file_size = scheduler.reader().size().await? as u64;
432        let begin = if file_size < scheduler.reader().block_size() as u64 {
433            0
434        } else {
435            file_size - scheduler.reader().block_size() as u64
436        };
437        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
438        Ok((tail_bytes, file_size))
439    }
440
441    // Checks to make sure the footer is written correctly and returns the
442    // position of the file descriptor (which comes from the footer)
443    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
444        let len = footer_bytes.len();
445        if len < FOOTER_LEN {
446            return Err(Error::invalid_input(format!(
447                "does not have sufficient data, len: {}, bytes: {:?}",
448                len, footer_bytes
449            )));
450        }
451        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
452
453        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
454        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
455        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
456        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
457        let num_columns = cursor.read_u32::<LittleEndian>()?;
458        let major_version = cursor.read_u16::<LittleEndian>()?;
459        let minor_version = cursor.read_u16::<LittleEndian>()?;
460
461        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
462            return Err(Error::version_conflict(
463                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
464                major_version,
465                minor_version,
466            ));
467        }
468
469        let magic_bytes = footer_bytes.slice(len - 4..);
470        if magic_bytes.as_ref() != MAGIC {
471            return Err(Error::invalid_input(format!(
472                "file does not appear to be a Lance file (invalid magic: {:?})",
473                MAGIC
474            )));
475        }
476        Ok(Footer {
477            column_meta_start,
478            column_meta_offsets_start,
479            global_buff_offsets_start,
480            num_global_buffers,
481            num_columns,
482            major_version,
483            minor_version,
484        })
485    }
486
487    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
488    fn read_all_column_metadata(
489        column_metadata_bytes: Bytes,
490        footer: &Footer,
491    ) -> Result<Vec<pbfile::ColumnMetadata>> {
492        let column_metadata_start = footer.column_meta_start;
493        // cmo == column_metadata_offsets
494        let cmo_table_size = 16 * footer.num_columns as usize;
495        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
496
497        (0..footer.num_columns)
498            .map(|col_idx| {
499                let offset = (col_idx * 16) as usize;
500                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
501                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
502                let normalized_position = (position - column_metadata_start) as usize;
503                let normalized_end = normalized_position + (length as usize);
504                Ok(pbfile::ColumnMetadata::decode(
505                    &column_metadata_bytes[normalized_position..normalized_end],
506                )?)
507            })
508            .collect::<Result<Vec<_>>>()
509    }
510
511    async fn optimistic_tail_read(
512        data: &Bytes,
513        start_pos: u64,
514        scheduler: &FileScheduler,
515        file_len: u64,
516    ) -> Result<Bytes> {
517        let num_bytes_needed = (file_len - start_pos) as usize;
518        if data.len() >= num_bytes_needed {
519            Ok(data.slice((data.len() - num_bytes_needed)..))
520        } else {
521            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
522            let start = file_len - num_bytes_needed as u64;
523            let missing_bytes = scheduler
524                .submit_single(start..start + num_bytes_missing, 0)
525                .await?;
526            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
527            combined.extend(missing_bytes);
528            combined.extend(data);
529            Ok(combined.freeze())
530        }
531    }
532
533    fn do_decode_gbo_table(
534        gbo_bytes: &Bytes,
535        footer: &Footer,
536        version: LanceFileVersion,
537    ) -> Result<Vec<BufferDescriptor>> {
538        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
539
540        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
541        for _ in 0..footer.num_global_buffers {
542            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
543            assert!(
544                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
545            );
546            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
547            global_buffers.push(BufferDescriptor {
548                position: buf_pos,
549                size: buf_size,
550            });
551        }
552
553        Ok(global_buffers)
554    }
555
556    async fn decode_gbo_table(
557        tail_bytes: &Bytes,
558        file_len: u64,
559        scheduler: &FileScheduler,
560        footer: &Footer,
561        version: LanceFileVersion,
562    ) -> Result<Vec<BufferDescriptor>> {
563        // This could, in theory, trigger another IOP but the GBO table should never be large
564        // enough for that to happen
565        let gbo_bytes = Self::optimistic_tail_read(
566            tail_bytes,
567            footer.global_buff_offsets_start,
568            scheduler,
569            file_len,
570        )
571        .await?;
572        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
573    }
574
575    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
576        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
577        let pb_schema = file_descriptor.schema.unwrap();
578        let num_rows = file_descriptor.length;
579        let fields_with_meta = FieldsWithMeta {
580            fields: Fields(pb_schema.fields),
581            metadata: pb_schema.metadata,
582        };
583        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
584        Ok((num_rows, schema))
585    }
586
587    // TODO: Support late projection.  Currently, if we want to perform a
588    // projected read of a file, we load all of the column metadata, and then
589    // only read the column data that is requested.  This is fine for most cases.
590    //
591    // However, if there are many columns then loading all of the column metadata
592    // may be expensive.  We should support a mode where we only load the column
593    // metadata for the columns that are requested (the file format supports this).
594    //
595    // The main challenge is that we either need to ignore the column metadata cache
596    // or have a more sophisticated cache that can cache per-column metadata.
597    //
598    // Also, if the number of columns is fairly small, it's faster to read them as a
599    // single IOP, but we can fix this through coalescing.
600    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
601        // 1. read the footer
602        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
603        let footer = Self::decode_footer(&tail_bytes)?;
604
605        let file_version = LanceFileVersion::try_from_major_minor(
606            footer.major_version as u32,
607            footer.minor_version as u32,
608        )?;
609
610        let gbo_table =
611            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
612        if gbo_table.is_empty() {
613            return Err(Error::internal(
614                "File did not contain any global buffers, schema expected".to_string(),
615            ));
616        }
617        let schema_start = gbo_table[0].position;
618        let schema_size = gbo_table[0].size;
619
620        let num_footer_bytes = file_len - schema_start;
621
622        // By default we read all column metadatas.  We do NOT read the column metadata buffers
623        // at this point.  We only want to read the column metadata for columns we are actually loading.
624        let all_metadata_bytes =
625            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
626
627        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
628        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
629
630        // Next, read the metadata for the columns
631        // This is both the column metadata and the CMO table
632        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
633        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
634        let column_metadata_bytes =
635            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
636        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
637
638        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
639        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
640        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
641
642        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
643
644        Ok(CachedFileMetadata {
645            file_schema: Arc::new(schema),
646            column_metadatas,
647            column_infos,
648            num_rows,
649            num_data_bytes,
650            num_column_metadata_bytes,
651            num_global_buffer_bytes,
652            num_footer_bytes,
653            file_buffers: gbo_table,
654            major_version: footer.major_version,
655            minor_version: footer.minor_version,
656        })
657    }
658
659    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
660        match &encoding.location {
661            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
662            Some(pbfile::encoding::Location::Direct(encoding)) => {
663                let encoding_buf = Bytes::from(encoding.encoding.clone());
664                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
665                encoding_any.to_msg::<M>().unwrap()
666            }
667            Some(pbfile::encoding::Location::None(_)) => panic!(),
668            None => panic!(),
669        }
670    }
671
672    fn meta_to_col_infos(
673        column_metadatas: &[pbfile::ColumnMetadata],
674        file_version: LanceFileVersion,
675    ) -> Vec<Arc<ColumnInfo>> {
676        column_metadatas
677            .iter()
678            .enumerate()
679            .map(|(col_idx, col_meta)| {
680                let page_infos = col_meta
681                    .pages
682                    .iter()
683                    .map(|page| {
684                        let num_rows = page.length;
685                        let encoding = match file_version {
686                            LanceFileVersion::V2_0 => {
687                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
688                                    page.encoding.as_ref().unwrap(),
689                                ))
690                            }
691                            _ => PageEncoding::Structural(Self::fetch_encoding::<
692                                pbenc21::PageLayout,
693                            >(
694                                page.encoding.as_ref().unwrap()
695                            )),
696                        };
697                        let buffer_offsets_and_sizes = Arc::from(
698                            page.buffer_offsets
699                                .iter()
700                                .zip(page.buffer_sizes.iter())
701                                .map(|(offset, size)| {
702                                    // Starting with version 2.1 we can assert that page buffers are aligned
703                                    assert!(
704                                        file_version < LanceFileVersion::V2_1
705                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
706                                    );
707                                    (*offset, *size)
708                                })
709                                .collect::<Vec<_>>(),
710                        );
711                        PageInfo {
712                            buffer_offsets_and_sizes,
713                            encoding,
714                            num_rows,
715                            priority: page.priority,
716                        }
717                    })
718                    .collect::<Vec<_>>();
719                let buffer_offsets_and_sizes = Arc::from(
720                    col_meta
721                        .buffer_offsets
722                        .iter()
723                        .zip(col_meta.buffer_sizes.iter())
724                        .map(|(offset, size)| (*offset, *size))
725                        .collect::<Vec<_>>(),
726                );
727                Arc::new(ColumnInfo {
728                    index: col_idx as u32,
729                    page_infos: Arc::from(page_infos),
730                    buffer_offsets_and_sizes,
731                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
732                })
733            })
734            .collect::<Vec<_>>()
735    }
736
737    fn validate_projection(
738        projection: &ReaderProjection,
739        metadata: &CachedFileMetadata,
740    ) -> Result<()> {
741        if projection.schema.fields.is_empty() {
742            return Err(Error::invalid_input(
743                "Attempt to read zero columns from the file, at least one column must be specified"
744                    .to_string(),
745            ));
746        }
747        let mut column_indices_seen = BTreeSet::new();
748        for column_index in &projection.column_indices {
749            if !column_indices_seen.insert(*column_index) {
750                return Err(Error::invalid_input(format!(
751                    "The projection specified the column index {} more than once",
752                    column_index
753                )));
754            }
755            if *column_index >= metadata.column_infos.len() as u32 {
756                return Err(Error::invalid_input(format!(
757                    "The projection specified the column index {} but there are only {} columns in the file",
758                    column_index,
759                    metadata.column_infos.len()
760                )));
761            }
762        }
763        Ok(())
764    }
765
766    /// Opens a new file reader without any pre-existing knowledge
767    ///
768    /// This will read the file schema from the file itself and thus requires a bit more I/O
769    ///
770    /// A `base_projection` can also be provided.  If provided, then the projection will apply
771    /// to all reads from the file that do not specify their own projection.
772    pub async fn try_open(
773        scheduler: FileScheduler,
774        base_projection: Option<ReaderProjection>,
775        decoder_plugins: Arc<DecoderPlugins>,
776        cache: &LanceCache,
777        options: FileReaderOptions,
778    ) -> Result<Self> {
779        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
780        let path = scheduler.reader().path().clone();
781
782        // Create LanceEncodingsIo with read chunk size from options
783        let encodings_io =
784            LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
785
786        Self::try_open_with_file_metadata(
787            Arc::new(encodings_io),
788            path,
789            base_projection,
790            decoder_plugins,
791            file_metadata,
792            cache,
793            options,
794        )
795        .await
796    }
797
798    /// Same as `try_open` but with the file metadata already loaded.
799    ///
800    /// This method also can accept any kind of `EncodingsIo` implementation allowing
801    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
802    /// disks it may be better to avoid asynchronous overhead).
803    pub async fn try_open_with_file_metadata(
804        scheduler: Arc<dyn EncodingsIo>,
805        path: Path,
806        base_projection: Option<ReaderProjection>,
807        decoder_plugins: Arc<DecoderPlugins>,
808        file_metadata: Arc<CachedFileMetadata>,
809        cache: &LanceCache,
810        options: FileReaderOptions,
811    ) -> Result<Self> {
812        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
813
814        if let Some(base_projection) = base_projection.as_ref() {
815            Self::validate_projection(base_projection, &file_metadata)?;
816        }
817        let num_rows = file_metadata.num_rows;
818        Ok(Self {
819            scheduler,
820            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
821                file_metadata.file_schema.as_ref(),
822                file_metadata.version(),
823            )),
824            num_rows,
825            metadata: file_metadata,
826            decoder_plugins,
827            cache,
828            options,
829        })
830    }
831
832    // The actual decoder needs all the column infos that make up a type.  In other words, if
833    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
834    //
835    // This is a file reader concern because the file reader needs to support late projection of columns
836    // and so it will need to figure this out anyways.
837    //
838    // It's a bit of a tricky process though because the number of column infos may depend on the
839    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
840    // only be a single column in the file (and not 3).
841    //
842    // At the moment this method words because our rules are simple and we just repeat them here.  See
843    // Self::default_projection for a similar problem.  In the future this is something the encodings
844    // registry will need to figure out.
845    fn collect_columns_from_projection(
846        &self,
847        _projection: &ReaderProjection,
848    ) -> Result<Vec<Arc<ColumnInfo>>> {
849        Ok(self.metadata.column_infos.clone())
850    }
851
852    #[allow(clippy::too_many_arguments)]
853    fn do_read_range(
854        column_infos: Vec<Arc<ColumnInfo>>,
855        io: Arc<dyn EncodingsIo>,
856        cache: Arc<LanceCache>,
857        num_rows: u64,
858        decoder_plugins: Arc<DecoderPlugins>,
859        range: Range<u64>,
860        batch_size: u32,
861        projection: ReaderProjection,
862        filter: FilterExpression,
863        decoder_config: DecoderConfig,
864    ) -> Result<BoxStream<'static, ReadBatchTask>> {
865        debug!(
866            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
867            range,
868            batch_size,
869            num_rows,
870            column_infos.len(),
871            projection.schema.fields.len(),
872        );
873
874        let config = SchedulerDecoderConfig {
875            batch_size,
876            cache,
877            decoder_plugins,
878            io,
879            decoder_config,
880        };
881
882        let requested_rows = RequestedRows::Ranges(vec![range]);
883
884        Ok(schedule_and_decode(
885            column_infos,
886            requested_rows,
887            filter,
888            projection.column_indices,
889            projection.schema,
890            config,
891        ))
892    }
893
894    fn read_range(
895        &self,
896        range: Range<u64>,
897        batch_size: u32,
898        projection: ReaderProjection,
899        filter: FilterExpression,
900    ) -> Result<BoxStream<'static, ReadBatchTask>> {
901        // Create and initialize the stream
902        Self::do_read_range(
903            self.collect_columns_from_projection(&projection)?,
904            self.scheduler.clone(),
905            self.cache.clone(),
906            self.num_rows,
907            self.decoder_plugins.clone(),
908            range,
909            batch_size,
910            projection,
911            filter,
912            self.options.decoder_config.clone(),
913        )
914    }
915
916    #[allow(clippy::too_many_arguments)]
917    fn do_take_rows(
918        column_infos: Vec<Arc<ColumnInfo>>,
919        io: Arc<dyn EncodingsIo>,
920        cache: Arc<LanceCache>,
921        decoder_plugins: Arc<DecoderPlugins>,
922        indices: Vec<u64>,
923        batch_size: u32,
924        projection: ReaderProjection,
925        filter: FilterExpression,
926        decoder_config: DecoderConfig,
927    ) -> Result<BoxStream<'static, ReadBatchTask>> {
928        debug!(
929            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
930            indices.len(),
931            indices[0],
932            indices[indices.len() - 1],
933            batch_size,
934            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
935        );
936
937        let config = SchedulerDecoderConfig {
938            batch_size,
939            cache,
940            decoder_plugins,
941            io,
942            decoder_config,
943        };
944
945        let requested_rows = RequestedRows::Indices(indices);
946
947        Ok(schedule_and_decode(
948            column_infos,
949            requested_rows,
950            filter,
951            projection.column_indices,
952            projection.schema,
953            config,
954        ))
955    }
956
957    fn take_rows(
958        &self,
959        indices: Vec<u64>,
960        batch_size: u32,
961        projection: ReaderProjection,
962    ) -> Result<BoxStream<'static, ReadBatchTask>> {
963        // Create and initialize the stream
964        Self::do_take_rows(
965            self.collect_columns_from_projection(&projection)?,
966            self.scheduler.clone(),
967            self.cache.clone(),
968            self.decoder_plugins.clone(),
969            indices,
970            batch_size,
971            projection,
972            FilterExpression::no_filter(),
973            self.options.decoder_config.clone(),
974        )
975    }
976
977    #[allow(clippy::too_many_arguments)]
978    fn do_read_ranges(
979        column_infos: Vec<Arc<ColumnInfo>>,
980        io: Arc<dyn EncodingsIo>,
981        cache: Arc<LanceCache>,
982        decoder_plugins: Arc<DecoderPlugins>,
983        ranges: Vec<Range<u64>>,
984        batch_size: u32,
985        projection: ReaderProjection,
986        filter: FilterExpression,
987        decoder_config: DecoderConfig,
988    ) -> Result<BoxStream<'static, ReadBatchTask>> {
989        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
990        debug!(
991            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
992            ranges.len(),
993            num_rows,
994            ranges[0].start,
995            ranges[ranges.len() - 1].end,
996            batch_size,
997            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
998        );
999
1000        let config = SchedulerDecoderConfig {
1001            batch_size,
1002            cache,
1003            decoder_plugins,
1004            io,
1005            decoder_config,
1006        };
1007
1008        let requested_rows = RequestedRows::Ranges(ranges);
1009
1010        Ok(schedule_and_decode(
1011            column_infos,
1012            requested_rows,
1013            filter,
1014            projection.column_indices,
1015            projection.schema,
1016            config,
1017        ))
1018    }
1019
1020    fn read_ranges(
1021        &self,
1022        ranges: Vec<Range<u64>>,
1023        batch_size: u32,
1024        projection: ReaderProjection,
1025        filter: FilterExpression,
1026    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1027        Self::do_read_ranges(
1028            self.collect_columns_from_projection(&projection)?,
1029            self.scheduler.clone(),
1030            self.cache.clone(),
1031            self.decoder_plugins.clone(),
1032            ranges,
1033            batch_size,
1034            projection,
1035            filter,
1036            self.options.decoder_config.clone(),
1037        )
1038    }
1039
1040    /// Creates a stream of "read tasks" to read the data from the file
1041    ///
1042    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1043    /// of record batches it returns a stream of "read tasks".
1044    ///
1045    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1046    ///
1047    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1048    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1049    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1050    pub fn read_tasks(
1051        &self,
1052        params: ReadBatchParams,
1053        batch_size: u32,
1054        projection: Option<ReaderProjection>,
1055        filter: FilterExpression,
1056    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1057        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1058        Self::validate_projection(&projection, &self.metadata)?;
1059        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1060            if bound > self.num_rows || bound == self.num_rows && inclusive {
1061                Err(Error::invalid_input(format!(
1062                    "cannot read {:?} from file with {} rows",
1063                    params, self.num_rows
1064                )))
1065            } else {
1066                Ok(())
1067            }
1068        };
1069        match &params {
1070            ReadBatchParams::Indices(indices) => {
1071                for idx in indices {
1072                    match idx {
1073                        None => {
1074                            return Err(Error::invalid_input("Null value in indices array"));
1075                        }
1076                        Some(idx) => {
1077                            verify_bound(&params, idx as u64, true)?;
1078                        }
1079                    }
1080                }
1081                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1082                self.take_rows(indices, batch_size, projection)
1083            }
1084            ReadBatchParams::Range(range) => {
1085                verify_bound(&params, range.end as u64, false)?;
1086                self.read_range(
1087                    range.start as u64..range.end as u64,
1088                    batch_size,
1089                    projection,
1090                    filter,
1091                )
1092            }
1093            ReadBatchParams::Ranges(ranges) => {
1094                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1095                for range in ranges.as_ref() {
1096                    verify_bound(&params, range.end, false)?;
1097                    ranges_u64.push(range.start..range.end);
1098                }
1099                self.read_ranges(ranges_u64, batch_size, projection, filter)
1100            }
1101            ReadBatchParams::RangeFrom(range) => {
1102                verify_bound(&params, range.start as u64, true)?;
1103                self.read_range(
1104                    range.start as u64..self.num_rows,
1105                    batch_size,
1106                    projection,
1107                    filter,
1108                )
1109            }
1110            ReadBatchParams::RangeTo(range) => {
1111                verify_bound(&params, range.end as u64, false)?;
1112                self.read_range(0..range.end as u64, batch_size, projection, filter)
1113            }
1114            ReadBatchParams::RangeFull => {
1115                self.read_range(0..self.num_rows, batch_size, projection, filter)
1116            }
1117        }
1118    }
1119
1120    /// Reads data from the file as a stream of record batches
1121    ///
1122    /// * `params` - Specifies the range (or indices) of data to read
1123    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1124    ///   if it is the last batch or if it is not possible to create a batch of the
1125    ///   requested size.
1126    ///
1127    ///   For example, if the batch size is 1024 and one of the columns is a string
1128    ///   column then there may be some ranges of 1024 rows that contain more than
1129    ///   2^31 bytes of string data (which is the maximum size of a string column
1130    ///   in Arrow).  In this case smaller batches may be emitted.
1131    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1132    ///   amount of CPU parallelism of the read.  In other words it controls how many
1133    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1134    ///   of the read (how many I/O requests are in flight at once).
1135    ///
1136    ///   This parameter also is also related to backpressure.  If the consumer of the
1137    ///   stream is slow then the reader will build up RAM.
1138    /// * `projection` - A projection to apply to the read.  This controls which columns
1139    ///   are read from the file.  The projection is NOT applied on top of the base
1140    ///   projection.  The projection is applied directly to the file schema.
1141    pub fn read_stream_projected(
1142        &self,
1143        params: ReadBatchParams,
1144        batch_size: u32,
1145        batch_readahead: u32,
1146        projection: ReaderProjection,
1147        filter: FilterExpression,
1148    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1149        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1150        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1151        let batch_stream = tasks_stream
1152            .map(|task| task.task)
1153            .buffered(batch_readahead as usize)
1154            .boxed();
1155        Ok(Box::pin(RecordBatchStreamAdapter::new(
1156            arrow_schema,
1157            batch_stream,
1158        )))
1159    }
1160
1161    fn take_rows_blocking(
1162        &self,
1163        indices: Vec<u64>,
1164        batch_size: u32,
1165        projection: ReaderProjection,
1166        filter: FilterExpression,
1167    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1168        let column_infos = self.collect_columns_from_projection(&projection)?;
1169        debug!(
1170            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1171            indices.len(),
1172            indices[0],
1173            indices[indices.len() - 1],
1174            batch_size,
1175            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1176        );
1177
1178        let config = SchedulerDecoderConfig {
1179            batch_size,
1180            cache: self.cache.clone(),
1181            decoder_plugins: self.decoder_plugins.clone(),
1182            io: self.scheduler.clone(),
1183            decoder_config: self.options.decoder_config.clone(),
1184        };
1185
1186        let requested_rows = RequestedRows::Indices(indices);
1187
1188        schedule_and_decode_blocking(
1189            column_infos,
1190            requested_rows,
1191            filter,
1192            projection.column_indices,
1193            projection.schema,
1194            config,
1195        )
1196    }
1197
1198    fn read_ranges_blocking(
1199        &self,
1200        ranges: Vec<Range<u64>>,
1201        batch_size: u32,
1202        projection: ReaderProjection,
1203        filter: FilterExpression,
1204    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1205        let column_infos = self.collect_columns_from_projection(&projection)?;
1206        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1207        debug!(
1208            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1209            ranges.len(),
1210            num_rows,
1211            ranges[0].start,
1212            ranges[ranges.len() - 1].end,
1213            batch_size,
1214            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1215        );
1216
1217        let config = SchedulerDecoderConfig {
1218            batch_size,
1219            cache: self.cache.clone(),
1220            decoder_plugins: self.decoder_plugins.clone(),
1221            io: self.scheduler.clone(),
1222            decoder_config: self.options.decoder_config.clone(),
1223        };
1224
1225        let requested_rows = RequestedRows::Ranges(ranges);
1226
1227        schedule_and_decode_blocking(
1228            column_infos,
1229            requested_rows,
1230            filter,
1231            projection.column_indices,
1232            projection.schema,
1233            config,
1234        )
1235    }
1236
1237    fn read_range_blocking(
1238        &self,
1239        range: Range<u64>,
1240        batch_size: u32,
1241        projection: ReaderProjection,
1242        filter: FilterExpression,
1243    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1244        let column_infos = self.collect_columns_from_projection(&projection)?;
1245        let num_rows = self.num_rows;
1246
1247        debug!(
1248            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1249            range,
1250            batch_size,
1251            num_rows,
1252            column_infos.len(),
1253            projection.schema.fields.len(),
1254        );
1255
1256        let config = SchedulerDecoderConfig {
1257            batch_size,
1258            cache: self.cache.clone(),
1259            decoder_plugins: self.decoder_plugins.clone(),
1260            io: self.scheduler.clone(),
1261            decoder_config: self.options.decoder_config.clone(),
1262        };
1263
1264        let requested_rows = RequestedRows::Ranges(vec![range]);
1265
1266        schedule_and_decode_blocking(
1267            column_infos,
1268            requested_rows,
1269            filter,
1270            projection.column_indices,
1271            projection.schema,
1272            config,
1273        )
1274    }
1275
1276    /// Read data from the file as an iterator of record batches
1277    ///
1278    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1279    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1280    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1281    ///
1282    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1283    /// use this method) because we can parallelize the decode.
1284    ///
1285    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1286    /// from a spawn_blocking context.
1287    pub fn read_stream_projected_blocking(
1288        &self,
1289        params: ReadBatchParams,
1290        batch_size: u32,
1291        projection: Option<ReaderProjection>,
1292        filter: FilterExpression,
1293    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1294        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1295        Self::validate_projection(&projection, &self.metadata)?;
1296        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1297            if bound > self.num_rows || bound == self.num_rows && inclusive {
1298                Err(Error::invalid_input(format!(
1299                    "cannot read {:?} from file with {} rows",
1300                    params, self.num_rows
1301                )))
1302            } else {
1303                Ok(())
1304            }
1305        };
1306        match &params {
1307            ReadBatchParams::Indices(indices) => {
1308                for idx in indices {
1309                    match idx {
1310                        None => {
1311                            return Err(Error::invalid_input("Null value in indices array"));
1312                        }
1313                        Some(idx) => {
1314                            verify_bound(&params, idx as u64, true)?;
1315                        }
1316                    }
1317                }
1318                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1319                self.take_rows_blocking(indices, batch_size, projection, filter)
1320            }
1321            ReadBatchParams::Range(range) => {
1322                verify_bound(&params, range.end as u64, false)?;
1323                self.read_range_blocking(
1324                    range.start as u64..range.end as u64,
1325                    batch_size,
1326                    projection,
1327                    filter,
1328                )
1329            }
1330            ReadBatchParams::Ranges(ranges) => {
1331                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1332                for range in ranges.as_ref() {
1333                    verify_bound(&params, range.end, false)?;
1334                    ranges_u64.push(range.start..range.end);
1335                }
1336                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1337            }
1338            ReadBatchParams::RangeFrom(range) => {
1339                verify_bound(&params, range.start as u64, true)?;
1340                self.read_range_blocking(
1341                    range.start as u64..self.num_rows,
1342                    batch_size,
1343                    projection,
1344                    filter,
1345                )
1346            }
1347            ReadBatchParams::RangeTo(range) => {
1348                verify_bound(&params, range.end as u64, false)?;
1349                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1350            }
1351            ReadBatchParams::RangeFull => {
1352                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1353            }
1354        }
1355    }
1356
1357    /// Reads data from the file as a stream of record batches
1358    ///
1359    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1360    /// provided when the file was opened (or reads all columns if the file was
1361    /// opened without a base projection)
1362    pub fn read_stream(
1363        &self,
1364        params: ReadBatchParams,
1365        batch_size: u32,
1366        batch_readahead: u32,
1367        filter: FilterExpression,
1368    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1369        self.read_stream_projected(
1370            params,
1371            batch_size,
1372            batch_readahead,
1373            self.base_projection.clone(),
1374            filter,
1375        )
1376    }
1377
1378    pub fn schema(&self) -> &Arc<Schema> {
1379        &self.metadata.file_schema
1380    }
1381}
1382
1383/// Inspects a page and returns a String describing the page's encoding
1384pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1385    if let Some(encoding) = &page.encoding {
1386        if let Some(style) = &encoding.location {
1387            match style {
1388                pbfile::encoding::Location::Indirect(indirect) => {
1389                    format!(
1390                        "IndirectEncoding(pos={},size={})",
1391                        indirect.buffer_location, indirect.buffer_length
1392                    )
1393                }
1394                pbfile::encoding::Location::Direct(direct) => {
1395                    let encoding_any =
1396                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1397                            .expect("failed to deserialize encoding as protobuf");
1398                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1399                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1400                        match encoding {
1401                            Ok(encoding) => {
1402                                format!("{:#?}", encoding)
1403                            }
1404                            Err(err) => {
1405                                format!("Unsupported(decode_err={})", err)
1406                            }
1407                        }
1408                    } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1409                        let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1410                        match encoding {
1411                            Ok(encoding) => {
1412                                format!("{:#?}", encoding)
1413                            }
1414                            Err(err) => {
1415                                format!("Unsupported(decode_err={})", err)
1416                            }
1417                        }
1418                    } else {
1419                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1420                    }
1421                }
1422                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1423            }
1424        } else {
1425            "MISSING STYLE".to_string()
1426        }
1427    } else {
1428        "MISSING".to_string()
1429    }
1430}
1431
1432pub trait EncodedBatchReaderExt {
1433    fn try_from_mini_lance(
1434        bytes: Bytes,
1435        schema: &Schema,
1436        version: LanceFileVersion,
1437    ) -> Result<Self>
1438    where
1439        Self: Sized;
1440    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1441    where
1442        Self: Sized;
1443}
1444
1445impl EncodedBatchReaderExt for EncodedBatch {
1446    fn try_from_mini_lance(
1447        bytes: Bytes,
1448        schema: &Schema,
1449        file_version: LanceFileVersion,
1450    ) -> Result<Self>
1451    where
1452        Self: Sized,
1453    {
1454        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1455        let footer = FileReader::decode_footer(&bytes)?;
1456
1457        // Next, read the metadata for the columns
1458        // This is both the column metadata and the CMO table
1459        let column_metadata_start = footer.column_meta_start as usize;
1460        let column_metadata_end = footer.global_buff_offsets_start as usize;
1461        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1462        let column_metadatas =
1463            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1464
1465        let file_version = LanceFileVersion::try_from_major_minor(
1466            footer.major_version as u32,
1467            footer.minor_version as u32,
1468        )?;
1469
1470        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1471
1472        Ok(Self {
1473            data: bytes,
1474            num_rows: page_table
1475                .first()
1476                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1477                .unwrap_or(0),
1478            page_table,
1479            top_level_columns: projection.column_indices,
1480            schema: Arc::new(schema.clone()),
1481        })
1482    }
1483
1484    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1485    where
1486        Self: Sized,
1487    {
1488        let footer = FileReader::decode_footer(&bytes)?;
1489        let file_version = LanceFileVersion::try_from_major_minor(
1490            footer.major_version as u32,
1491            footer.minor_version as u32,
1492        )?;
1493
1494        let gbo_table = FileReader::do_decode_gbo_table(
1495            &bytes.slice(footer.global_buff_offsets_start as usize..),
1496            &footer,
1497            file_version,
1498        )?;
1499        if gbo_table.is_empty() {
1500            return Err(Error::internal(
1501                "File did not contain any global buffers, schema expected".to_string(),
1502            ));
1503        }
1504        let schema_start = gbo_table[0].position as usize;
1505        let schema_size = gbo_table[0].size as usize;
1506
1507        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1508        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1509        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1510
1511        // Next, read the metadata for the columns
1512        // This is both the column metadata and the CMO table
1513        let column_metadata_start = footer.column_meta_start as usize;
1514        let column_metadata_end = footer.global_buff_offsets_start as usize;
1515        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1516        let column_metadatas =
1517            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1518
1519        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1520
1521        Ok(Self {
1522            data: bytes,
1523            num_rows: page_table
1524                .first()
1525                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1526                .unwrap_or(0),
1527            page_table,
1528            top_level_columns: projection.column_indices,
1529            schema: Arc::new(schema),
1530        })
1531    }
1532}
1533
1534#[cfg(test)]
1535pub mod tests {
1536    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1537
1538    use arrow_array::{
1539        RecordBatch, UInt32Array,
1540        types::{Float64Type, Int32Type},
1541    };
1542    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1543    use bytes::Bytes;
1544    use futures::{StreamExt, prelude::stream::TryStreamExt};
1545    use lance_arrow::RecordBatchExt;
1546    use lance_core::{ArrowResult, datatypes::Schema};
1547    use lance_datagen::{BatchCount, ByteCount, RowCount, array, gen_batch};
1548    use lance_encoding::{
1549        decoder::{DecodeBatchScheduler, DecoderPlugins, FilterExpression, decode_batch},
1550        encoder::{EncodedBatch, EncodingOptions, default_encoding_strategy, encode_batch},
1551        version::LanceFileVersion,
1552    };
1553    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1554    use log::debug;
1555    use rstest::rstest;
1556    use tokio::sync::mpsc;
1557
1558    use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1559    use crate::testing::{FsFixture, WrittenFile, test_cache, write_lance_file};
1560    use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1561    use lance_encoding::decoder::DecoderConfig;
1562
1563    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1564        let location_type = DataType::Struct(Fields::from(vec![
1565            Field::new("x", DataType::Float64, true),
1566            Field::new("y", DataType::Float64, true),
1567        ]));
1568        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1569
1570        let mut reader = gen_batch()
1571            .col("score", array::rand::<Float64Type>())
1572            .col("location", array::rand_type(&location_type))
1573            .col("categories", array::rand_type(&categories_type))
1574            .col("binary", array::rand_type(&DataType::Binary));
1575        if version <= LanceFileVersion::V2_0 {
1576            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1577        }
1578        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1579
1580        write_lance_file(
1581            reader,
1582            fs,
1583            FileWriterOptions {
1584                format_version: Some(version),
1585                ..Default::default()
1586            },
1587        )
1588        .await
1589    }
1590
1591    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1592
1593    async fn verify_expected(
1594        expected: &[RecordBatch],
1595        mut actual: Pin<Box<dyn RecordBatchStream>>,
1596        read_size: u32,
1597        transform: Option<Transformer>,
1598    ) {
1599        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1600        let mut expected_iter = expected.iter().map(|batch| {
1601            if let Some(transform) = &transform {
1602                transform(batch)
1603            } else {
1604                batch.clone()
1605            }
1606        });
1607        let mut next_expected = expected_iter.next().unwrap().clone();
1608        while let Some(actual) = actual.next().await {
1609            let mut actual = actual.unwrap();
1610            let mut rows_to_verify = actual.num_rows() as u32;
1611            let expected_length = remaining.min(read_size);
1612            assert_eq!(expected_length, rows_to_verify);
1613
1614            while rows_to_verify > 0 {
1615                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1616                assert_eq!(
1617                    next_expected.slice(0, next_slice_len as usize),
1618                    actual.slice(0, next_slice_len as usize)
1619                );
1620                remaining -= next_slice_len;
1621                rows_to_verify -= next_slice_len;
1622                if remaining > 0 {
1623                    if next_slice_len == next_expected.num_rows() as u32 {
1624                        next_expected = expected_iter.next().unwrap().clone();
1625                    } else {
1626                        next_expected = next_expected.slice(
1627                            next_slice_len as usize,
1628                            next_expected.num_rows() - next_slice_len as usize,
1629                        );
1630                    }
1631                }
1632                if rows_to_verify > 0 {
1633                    actual = actual.slice(
1634                        next_slice_len as usize,
1635                        actual.num_rows() - next_slice_len as usize,
1636                    );
1637                }
1638            }
1639        }
1640        assert_eq!(remaining, 0);
1641    }
1642
1643    #[tokio::test]
1644    async fn test_round_trip() {
1645        let fs = FsFixture::default();
1646
1647        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1648
1649        for read_size in [32, 1024, 1024 * 1024] {
1650            let file_scheduler = fs
1651                .scheduler
1652                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1653                .await
1654                .unwrap();
1655            let file_reader = FileReader::try_open(
1656                file_scheduler,
1657                None,
1658                Arc::<DecoderPlugins>::default(),
1659                &test_cache(),
1660                FileReaderOptions::default(),
1661            )
1662            .await
1663            .unwrap();
1664
1665            let schema = file_reader.schema();
1666            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1667
1668            let batch_stream = file_reader
1669                .read_stream(
1670                    lance_io::ReadBatchParams::RangeFull,
1671                    read_size,
1672                    16,
1673                    FilterExpression::no_filter(),
1674                )
1675                .unwrap();
1676
1677            verify_expected(&data, batch_stream, read_size, None).await;
1678        }
1679    }
1680
1681    #[rstest]
1682    #[test_log::test(tokio::test)]
1683    async fn test_encoded_batch_round_trip(
1684        // TODO: Add V2_1 (currently fails)
1685        #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1686    ) {
1687        let data = gen_batch()
1688            .col("x", array::rand::<Int32Type>())
1689            .col("y", array::rand_utf8(ByteCount::from(16), false))
1690            .into_batch_rows(RowCount::from(10000))
1691            .unwrap();
1692
1693        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1694
1695        let encoding_options = EncodingOptions {
1696            cache_bytes_per_column: 4096,
1697            max_page_bytes: 32 * 1024 * 1024,
1698            keep_original_array: true,
1699            buffer_alignment: 64,
1700            version,
1701        };
1702
1703        let encoding_strategy = default_encoding_strategy(version);
1704
1705        let encoded_batch = encode_batch(
1706            &data,
1707            lance_schema.clone(),
1708            encoding_strategy.as_ref(),
1709            &encoding_options,
1710        )
1711        .await
1712        .unwrap();
1713
1714        // Test self described
1715        let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1716
1717        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1718
1719        let decoded = decode_batch(
1720            &decoded_batch,
1721            &FilterExpression::no_filter(),
1722            Arc::<DecoderPlugins>::default(),
1723            false,
1724            version,
1725            None,
1726        )
1727        .await
1728        .unwrap();
1729
1730        assert_eq!(data, decoded);
1731
1732        // Test mini
1733        let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1734        let decoded_batch =
1735            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1736                .unwrap();
1737        let decoded = decode_batch(
1738            &decoded_batch,
1739            &FilterExpression::no_filter(),
1740            Arc::<DecoderPlugins>::default(),
1741            false,
1742            version,
1743            None,
1744        )
1745        .await
1746        .unwrap();
1747
1748        assert_eq!(data, decoded);
1749    }
1750
1751    #[rstest]
1752    #[test_log::test(tokio::test)]
1753    async fn test_projection(
1754        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
1755        version: LanceFileVersion,
1756    ) {
1757        let fs = FsFixture::default();
1758
1759        let written_file = create_some_file(&fs, version).await;
1760        let file_scheduler = fs
1761            .scheduler
1762            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1763            .await
1764            .unwrap();
1765
1766        let field_id_mapping = written_file
1767            .field_id_mapping
1768            .iter()
1769            .copied()
1770            .collect::<BTreeMap<_, _>>();
1771
1772        let empty_projection = ReaderProjection {
1773            column_indices: Vec::default(),
1774            schema: Arc::new(Schema::default()),
1775        };
1776
1777        for columns in [
1778            vec!["score"],
1779            vec!["location"],
1780            vec!["categories"],
1781            vec!["score.x"],
1782            vec!["score", "categories"],
1783            vec!["score", "location"],
1784            vec!["location", "categories"],
1785            vec!["score.y", "location", "categories"],
1786        ] {
1787            debug!("Testing round trip with projection {:?}", columns);
1788            for use_field_ids in [true, false] {
1789                // We can specify the projection as part of the read operation via read_stream_projected
1790                let file_reader = FileReader::try_open(
1791                    file_scheduler.clone(),
1792                    None,
1793                    Arc::<DecoderPlugins>::default(),
1794                    &test_cache(),
1795                    FileReaderOptions::default(),
1796                )
1797                .await
1798                .unwrap();
1799
1800                let projected_schema = written_file.schema.project(&columns).unwrap();
1801                let projection = if use_field_ids {
1802                    ReaderProjection::from_field_ids(
1803                        file_reader.metadata.version(),
1804                        &projected_schema,
1805                        &field_id_mapping,
1806                    )
1807                    .unwrap()
1808                } else {
1809                    ReaderProjection::from_column_names(
1810                        file_reader.metadata.version(),
1811                        &written_file.schema,
1812                        &columns,
1813                    )
1814                    .unwrap()
1815                };
1816
1817                let batch_stream = file_reader
1818                    .read_stream_projected(
1819                        lance_io::ReadBatchParams::RangeFull,
1820                        1024,
1821                        16,
1822                        projection.clone(),
1823                        FilterExpression::no_filter(),
1824                    )
1825                    .unwrap();
1826
1827                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1828                verify_expected(
1829                    &written_file.data,
1830                    batch_stream,
1831                    1024,
1832                    Some(Box::new(move |batch: &RecordBatch| {
1833                        batch.project_by_schema(&projection_arrow).unwrap()
1834                    })),
1835                )
1836                .await;
1837
1838                // We can also specify the projection as a base projection when we open the file
1839                let file_reader = FileReader::try_open(
1840                    file_scheduler.clone(),
1841                    Some(projection.clone()),
1842                    Arc::<DecoderPlugins>::default(),
1843                    &test_cache(),
1844                    FileReaderOptions::default(),
1845                )
1846                .await
1847                .unwrap();
1848
1849                let batch_stream = file_reader
1850                    .read_stream(
1851                        lance_io::ReadBatchParams::RangeFull,
1852                        1024,
1853                        16,
1854                        FilterExpression::no_filter(),
1855                    )
1856                    .unwrap();
1857
1858                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1859                verify_expected(
1860                    &written_file.data,
1861                    batch_stream,
1862                    1024,
1863                    Some(Box::new(move |batch: &RecordBatch| {
1864                        batch.project_by_schema(&projection_arrow).unwrap()
1865                    })),
1866                )
1867                .await;
1868
1869                assert!(
1870                    file_reader
1871                        .read_stream_projected(
1872                            lance_io::ReadBatchParams::RangeFull,
1873                            1024,
1874                            16,
1875                            empty_projection.clone(),
1876                            FilterExpression::no_filter(),
1877                        )
1878                        .is_err()
1879                );
1880            }
1881        }
1882
1883        assert!(
1884            FileReader::try_open(
1885                file_scheduler.clone(),
1886                Some(empty_projection),
1887                Arc::<DecoderPlugins>::default(),
1888                &test_cache(),
1889                FileReaderOptions::default(),
1890            )
1891            .await
1892            .is_err()
1893        );
1894
1895        let arrow_schema = ArrowSchema::new(vec![
1896            Field::new("x", DataType::Int32, true),
1897            Field::new("y", DataType::Int32, true),
1898        ]);
1899        let schema = Schema::try_from(&arrow_schema).unwrap();
1900
1901        let projection_with_dupes = ReaderProjection {
1902            column_indices: vec![0, 0],
1903            schema: Arc::new(schema),
1904        };
1905
1906        assert!(
1907            FileReader::try_open(
1908                file_scheduler.clone(),
1909                Some(projection_with_dupes),
1910                Arc::<DecoderPlugins>::default(),
1911                &test_cache(),
1912                FileReaderOptions::default(),
1913            )
1914            .await
1915            .is_err()
1916        );
1917    }
1918
1919    #[test_log::test(tokio::test)]
1920    async fn test_compressing_buffer() {
1921        let fs = FsFixture::default();
1922
1923        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1924        let file_scheduler = fs
1925            .scheduler
1926            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1927            .await
1928            .unwrap();
1929
1930        // We can specify the projection as part of the read operation via read_stream_projected
1931        let file_reader = FileReader::try_open(
1932            file_scheduler.clone(),
1933            None,
1934            Arc::<DecoderPlugins>::default(),
1935            &test_cache(),
1936            FileReaderOptions::default(),
1937        )
1938        .await
1939        .unwrap();
1940
1941        let mut projection = written_file.schema.project(&["score"]).unwrap();
1942        for field in projection.fields.iter_mut() {
1943            field
1944                .metadata
1945                .insert("lance:compression".to_string(), "zstd".to_string());
1946        }
1947        let projection = ReaderProjection {
1948            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1949            schema: Arc::new(projection),
1950        };
1951
1952        let batch_stream = file_reader
1953            .read_stream_projected(
1954                lance_io::ReadBatchParams::RangeFull,
1955                1024,
1956                16,
1957                projection.clone(),
1958                FilterExpression::no_filter(),
1959            )
1960            .unwrap();
1961
1962        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1963        verify_expected(
1964            &written_file.data,
1965            batch_stream,
1966            1024,
1967            Some(Box::new(move |batch: &RecordBatch| {
1968                batch.project_by_schema(&projection_arrow).unwrap()
1969            })),
1970        )
1971        .await;
1972    }
1973
1974    #[tokio::test]
1975    async fn test_read_all() {
1976        let fs = FsFixture::default();
1977        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1978        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1979
1980        let file_scheduler = fs
1981            .scheduler
1982            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1983            .await
1984            .unwrap();
1985        let file_reader = FileReader::try_open(
1986            file_scheduler.clone(),
1987            None,
1988            Arc::<DecoderPlugins>::default(),
1989            &test_cache(),
1990            FileReaderOptions::default(),
1991        )
1992        .await
1993        .unwrap();
1994
1995        let batches = file_reader
1996            .read_stream(
1997                lance_io::ReadBatchParams::RangeFull,
1998                total_rows as u32,
1999                16,
2000                FilterExpression::no_filter(),
2001            )
2002            .unwrap()
2003            .try_collect::<Vec<_>>()
2004            .await
2005            .unwrap();
2006        assert_eq!(batches.len(), 1);
2007        assert_eq!(batches[0].num_rows(), total_rows);
2008    }
2009
2010    #[rstest]
2011    #[tokio::test]
2012    async fn test_blocking_take(
2013        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
2014        version: LanceFileVersion,
2015    ) {
2016        let fs = FsFixture::default();
2017        let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2018        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2019
2020        let file_scheduler = fs
2021            .scheduler
2022            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2023            .await
2024            .unwrap();
2025        let file_reader = FileReader::try_open(
2026            file_scheduler.clone(),
2027            Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2028            Arc::<DecoderPlugins>::default(),
2029            &test_cache(),
2030            FileReaderOptions::default(),
2031        )
2032        .await
2033        .unwrap();
2034
2035        let batches = tokio::task::spawn_blocking(move || {
2036            file_reader
2037                .read_stream_projected_blocking(
2038                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2039                    total_rows as u32,
2040                    None,
2041                    FilterExpression::no_filter(),
2042                )
2043                .unwrap()
2044                .collect::<ArrowResult<Vec<_>>>()
2045                .unwrap()
2046        })
2047        .await
2048        .unwrap();
2049
2050        assert_eq!(batches.len(), 1);
2051        assert_eq!(batches[0].num_rows(), 5);
2052        assert_eq!(batches[0].num_columns(), 1);
2053    }
2054
2055    #[tokio::test(flavor = "multi_thread")]
2056    async fn test_drop_in_progress() {
2057        let fs = FsFixture::default();
2058        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2059        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2060
2061        let file_scheduler = fs
2062            .scheduler
2063            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2064            .await
2065            .unwrap();
2066        let file_reader = FileReader::try_open(
2067            file_scheduler.clone(),
2068            None,
2069            Arc::<DecoderPlugins>::default(),
2070            &test_cache(),
2071            FileReaderOptions::default(),
2072        )
2073        .await
2074        .unwrap();
2075
2076        let mut batches = file_reader
2077            .read_stream(
2078                lance_io::ReadBatchParams::RangeFull,
2079                (total_rows / 10) as u32,
2080                16,
2081                FilterExpression::no_filter(),
2082            )
2083            .unwrap();
2084
2085        drop(file_reader);
2086
2087        let batch = batches.next().await.unwrap().unwrap();
2088        assert!(batch.num_rows() > 0);
2089
2090        // Drop in-progress scan
2091        drop(batches);
2092    }
2093
2094    #[tokio::test]
2095    async fn drop_while_scheduling() {
2096        // This is a bit of a white-box test, pokes at the internals.  We want to
2097        // test the case where the read stream is dropped before the scheduling
2098        // thread finishes.  We can't do that in a black-box fashion because the
2099        // scheduling thread runs in the background and there is no easy way to
2100        // pause / gate it.
2101
2102        // It's a regression for a bug where the scheduling thread would panic
2103        // if the stream was dropped before it finished.
2104
2105        let fs = FsFixture::default();
2106        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2107        let total_rows = written_file
2108            .data
2109            .iter()
2110            .map(|batch| batch.num_rows())
2111            .sum::<usize>();
2112
2113        let file_scheduler = fs
2114            .scheduler
2115            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2116            .await
2117            .unwrap();
2118        let file_reader = FileReader::try_open(
2119            file_scheduler.clone(),
2120            None,
2121            Arc::<DecoderPlugins>::default(),
2122            &test_cache(),
2123            FileReaderOptions::default(),
2124        )
2125        .await
2126        .unwrap();
2127
2128        let projection =
2129            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2130        let column_infos = file_reader
2131            .collect_columns_from_projection(&projection)
2132            .unwrap();
2133        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2134            &projection.schema,
2135            &projection.column_indices,
2136            &column_infos,
2137            &vec![],
2138            total_rows as u64,
2139            Arc::<DecoderPlugins>::default(),
2140            file_reader.scheduler.clone(),
2141            test_cache(),
2142            &FilterExpression::no_filter(),
2143            &DecoderConfig::default(),
2144        )
2145        .await
2146        .unwrap();
2147
2148        let range = 0..total_rows as u64;
2149
2150        let (tx, rx) = mpsc::unbounded_channel();
2151
2152        // Simulate the stream / decoder being dropped
2153        drop(rx);
2154
2155        // Scheduling should not panic
2156        decode_scheduler.schedule_range(
2157            range,
2158            &FilterExpression::no_filter(),
2159            tx,
2160            file_reader.scheduler.clone(),
2161        )
2162    }
2163
2164    #[tokio::test]
2165    async fn test_read_empty_range() {
2166        let fs = FsFixture::default();
2167        create_some_file(&fs, LanceFileVersion::V2_0).await;
2168
2169        let file_scheduler = fs
2170            .scheduler
2171            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2172            .await
2173            .unwrap();
2174        let file_reader = FileReader::try_open(
2175            file_scheduler.clone(),
2176            None,
2177            Arc::<DecoderPlugins>::default(),
2178            &test_cache(),
2179            FileReaderOptions::default(),
2180        )
2181        .await
2182        .unwrap();
2183
2184        // All ranges empty, no data
2185        let batches = file_reader
2186            .read_stream(
2187                lance_io::ReadBatchParams::Range(0..0),
2188                1024,
2189                16,
2190                FilterExpression::no_filter(),
2191            )
2192            .unwrap()
2193            .try_collect::<Vec<_>>()
2194            .await
2195            .unwrap();
2196
2197        assert_eq!(batches.len(), 0);
2198
2199        // Some ranges empty
2200        let batches = file_reader
2201            .read_stream(
2202                lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2203                1024,
2204                16,
2205                FilterExpression::no_filter(),
2206            )
2207            .unwrap()
2208            .try_collect::<Vec<_>>()
2209            .await
2210            .unwrap();
2211        assert_eq!(batches.len(), 1);
2212    }
2213
2214    #[tokio::test]
2215    async fn test_global_buffers() {
2216        let fs = FsFixture::default();
2217
2218        let lance_schema =
2219            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2220                "foo",
2221                DataType::Int32,
2222                true,
2223            )]))
2224            .unwrap();
2225
2226        let mut file_writer = FileWriter::try_new(
2227            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2228            lance_schema.clone(),
2229            FileWriterOptions::default(),
2230        )
2231        .unwrap();
2232
2233        let test_bytes = Bytes::from_static(b"hello");
2234
2235        let buf_index = file_writer
2236            .add_global_buffer(test_bytes.clone())
2237            .await
2238            .unwrap();
2239
2240        assert_eq!(buf_index, 1);
2241
2242        file_writer.finish().await.unwrap();
2243
2244        let file_scheduler = fs
2245            .scheduler
2246            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2247            .await
2248            .unwrap();
2249        let file_reader = FileReader::try_open(
2250            file_scheduler.clone(),
2251            None,
2252            Arc::<DecoderPlugins>::default(),
2253            &test_cache(),
2254            FileReaderOptions::default(),
2255        )
2256        .await
2257        .unwrap();
2258
2259        let buf = file_reader.read_global_buffer(1).await.unwrap();
2260        assert_eq!(buf, test_bytes);
2261    }
2262}