lance_file/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19    decoder::{
20        schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21        DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22        SchedulerDecoderConfig,
23    },
24    encoder::EncodedBatch,
25    version::LanceFileVersion,
26    EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36    Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41    scheduler::FileScheduler,
42    stream::{RecordBatchStream, RecordBatchStreamAdapter},
43    ReadBatchParams,
44};
45
46use crate::{
47    datatypes::{Fields, FieldsWithMeta},
48    format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49    io::LanceEncodingsIo,
50    writer::PAGE_BUFFER_ALIGNMENT,
51};
52
53/// Default chunk size for reading large pages (8MiB)
54/// Pages larger than this will be split into multiple chunks during read
55pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
56
57// For now, we don't use global buffers for anything other than schema.  If we
58// use these later we should make them lazily loaded and then cached once loaded.
59//
60// We store their position / length for debugging purposes
61#[derive(Debug, DeepSizeOf)]
62pub struct BufferDescriptor {
63    pub position: u64,
64    pub size: u64,
65}
66
67/// Statistics summarize some of the file metadata for quick summary info
68#[derive(Debug)]
69pub struct FileStatistics {
70    /// Statistics about each of the columns in the file
71    pub columns: Vec<ColumnStatistics>,
72}
73
74/// Summary information describing a column
75#[derive(Debug)]
76pub struct ColumnStatistics {
77    /// The number of pages in the column
78    pub num_pages: usize,
79    /// The total number of data & metadata bytes in the column
80    ///
81    /// This is the compressed on-disk size
82    pub size_bytes: u64,
83}
84
85// TODO: Caching
86#[derive(Debug)]
87pub struct CachedFileMetadata {
88    /// The schema of the file
89    pub file_schema: Arc<Schema>,
90    /// The column metadatas
91    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
92    pub column_infos: Vec<Arc<ColumnInfo>>,
93    /// The number of rows in the file
94    pub num_rows: u64,
95    pub file_buffers: Vec<BufferDescriptor>,
96    /// The number of bytes contained in the data page section of the file
97    pub num_data_bytes: u64,
98    /// The number of bytes contained in the column metadata (not including buffers
99    /// referenced by the metadata)
100    pub num_column_metadata_bytes: u64,
101    /// The number of bytes contained in global buffers
102    pub num_global_buffer_bytes: u64,
103    /// The number of bytes contained in the CMO and GBO tables
104    pub num_footer_bytes: u64,
105    pub major_version: u16,
106    pub minor_version: u16,
107}
108
109impl DeepSizeOf for CachedFileMetadata {
110    // TODO: include size for `column_metadatas` and `column_infos`.
111    fn deep_size_of_children(&self, context: &mut Context) -> usize {
112        self.file_schema.deep_size_of_children(context)
113            + self
114                .file_buffers
115                .iter()
116                .map(|file_buffer| file_buffer.deep_size_of_children(context))
117                .sum::<usize>()
118    }
119}
120
121impl CachedFileMetadata {
122    pub fn version(&self) -> LanceFileVersion {
123        match (self.major_version, self.minor_version) {
124            (0, 3) => LanceFileVersion::V2_0,
125            (2, 0) => LanceFileVersion::V2_0,
126            (2, 1) => LanceFileVersion::V2_1,
127            (2, 2) => LanceFileVersion::V2_2,
128            _ => panic!(
129                "Unsupported version: {}.{}",
130                self.major_version, self.minor_version
131            ),
132        }
133    }
134}
135
136/// Selecting columns from a lance file requires specifying both the
137/// index of the column and the data type of the column
138///
139/// Partly, this is because it is not strictly required that columns
140/// be read into the same type.  For example, a string column may be
141/// read as a string, large_string or string_view type.
142///
143/// A read will only succeed if the decoder for a column is capable
144/// of decoding into the requested type.
145///
146/// Note that this should generally be limited to different in-memory
147/// representations of the same semantic type.  An encoding could
148/// theoretically support "casting" (e.g. int to string, etc.) but
149/// there is little advantage in doing so here.
150///
151/// Note: in order to specify a projection the user will need some way
152/// to figure out the column indices.  In the table format we do this
153/// using field IDs and keeping track of the field id->column index mapping.
154///
155/// If users are not using the table format then they will need to figure
156/// out some way to do this themselves.
157#[derive(Debug, Clone)]
158pub struct ReaderProjection {
159    /// The data types (schema) of the selected columns.  The names
160    /// of the schema are arbitrary and ignored.
161    pub schema: Arc<Schema>,
162    /// The indices of the columns to load.
163    ///
164    /// The content of this vector depends on the file version.
165    ///
166    /// In Lance File Version 2.0 we need ids for structural fields as
167    /// well as leaf fields:
168    ///
169    ///   - Primitive: the index of the column in the schema
170    ///   - List: the index of the list column in the schema
171    ///     followed by the column indices of the children
172    ///   - FixedSizeList (of primitive): the index of the column in the schema
173    ///     (this case is not nested)
174    ///   - FixedSizeList (of non-primitive): not yet implemented
175    ///   - Dictionary: same as primitive
176    ///   - Struct: the index of the struct column in the schema
177    ///     followed by the column indices of the children
178    ///
179    ///   In other words, this should be a DFS listing of the desired schema.
180    ///
181    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
182    /// fields are completely transparent.
183    ///
184    /// For example, if the goal is to load:
185    ///
186    ///   x: int32
187    ///   y: struct<z: int32, w: string>
188    ///   z: list<int32>
189    ///
190    /// and the schema originally used to store the data was:
191    ///
192    ///   a: struct<x: int32>
193    ///   b: int64
194    ///   y: struct<z: int32, c: int64, w: string>
195    ///   z: list<int32>
196    ///
197    /// Then the column_indices should be:
198    ///
199    /// - 2.0: [1, 3, 4, 6, 7, 8]
200    /// - 2.1: [0, 2, 4, 5]
201    pub column_indices: Vec<u32>,
202}
203
204impl ReaderProjection {
205    fn from_field_ids_helper<'a>(
206        file_version: LanceFileVersion,
207        fields: impl Iterator<Item = &'a Field>,
208        field_id_to_column_index: &BTreeMap<u32, u32>,
209        column_indices: &mut Vec<u32>,
210    ) -> Result<()> {
211        for field in fields {
212            let is_structural = file_version >= LanceFileVersion::V2_1;
213            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
214            // we only need ids for leaf fields.
215            if !is_structural
216                || field.children.is_empty()
217                || field.is_blob()
218                || field.is_packed_struct()
219            {
220                if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
221                {
222                    column_indices.push(column_idx);
223                }
224            }
225            // Don't recurse into children if the field is a blob or packed struct in 2.1
226            if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
227                Self::from_field_ids_helper(
228                    file_version,
229                    field.children.iter(),
230                    field_id_to_column_index,
231                    column_indices,
232                )?;
233            }
234        }
235        Ok(())
236    }
237
238    /// Creates a projection using a mapping from field IDs to column indices
239    ///
240    /// You can obtain such a mapping when the file is written using the
241    /// [`crate::writer::FileWriter::field_id_to_column_indices`] method.
242    pub fn from_field_ids(
243        file_version: LanceFileVersion,
244        schema: &Schema,
245        field_id_to_column_index: &BTreeMap<u32, u32>,
246    ) -> Result<Self> {
247        let mut column_indices = Vec::new();
248        Self::from_field_ids_helper(
249            file_version,
250            schema.fields.iter(),
251            field_id_to_column_index,
252            &mut column_indices,
253        )?;
254        Ok(Self {
255            schema: Arc::new(schema.clone()),
256            column_indices,
257        })
258    }
259
260    /// Creates a projection that reads the entire file
261    ///
262    /// If the schema provided is not the schema of the entire file then
263    /// the projection will be invalid and the read will fail.
264    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
265    /// the whole struct has one column index.
266    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
267    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
268        let schema = Arc::new(schema.clone());
269        let is_structural = version >= LanceFileVersion::V2_1;
270        let mut column_indices = vec![];
271        let mut curr_column_idx = 0;
272        let mut packed_struct_fields_num = 0;
273        for field in schema.fields_pre_order() {
274            if packed_struct_fields_num > 0 {
275                packed_struct_fields_num -= 1;
276                continue;
277            }
278            if field.is_packed_struct() {
279                column_indices.push(curr_column_idx);
280                curr_column_idx += 1;
281                packed_struct_fields_num = field.children.len();
282            } else if field.children.is_empty() || !is_structural {
283                column_indices.push(curr_column_idx);
284                curr_column_idx += 1;
285            }
286        }
287        Self {
288            schema,
289            column_indices,
290        }
291    }
292
293    /// Creates a projection that reads the specified columns provided by name
294    ///
295    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
296    ///
297    /// If the schema provided is not the schema of the entire file then
298    /// the projection will be invalid and the read will fail.
299    pub fn from_column_names(
300        file_version: LanceFileVersion,
301        schema: &Schema,
302        column_names: &[&str],
303    ) -> Result<Self> {
304        let field_id_to_column_index = schema
305            .fields_pre_order()
306            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
307            // we only need ids for leaf fields.
308            .filter(|field| {
309                file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
310            })
311            .enumerate()
312            .map(|(idx, field)| (field.id as u32, idx as u32))
313            .collect::<BTreeMap<_, _>>();
314        let projected = schema.project(column_names)?;
315        let mut column_indices = Vec::new();
316        Self::from_field_ids_helper(
317            file_version,
318            projected.fields.iter(),
319            &field_id_to_column_index,
320            &mut column_indices,
321        )?;
322        Ok(Self {
323            schema: Arc::new(projected),
324            column_indices,
325        })
326    }
327}
328
329/// File Reader Options that can control reading behaviors, such as whether to enable caching on repetition indices
330#[derive(Clone, Debug)]
331pub struct FileReaderOptions {
332    pub decoder_config: DecoderConfig,
333    /// Size of chunks when reading large pages. Pages larger than this
334    /// will be read in multiple chunks to control memory usage.
335    /// Default: 8MB (DEFAULT_READ_CHUNK_SIZE)
336    pub read_chunk_size: u64,
337}
338
339impl Default for FileReaderOptions {
340    fn default() -> Self {
341        Self {
342            decoder_config: DecoderConfig::default(),
343            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
344        }
345    }
346}
347
348#[derive(Debug)]
349pub struct FileReader {
350    scheduler: Arc<dyn EncodingsIo>,
351    // The default projection to be applied to all reads
352    base_projection: ReaderProjection,
353    num_rows: u64,
354    metadata: Arc<CachedFileMetadata>,
355    decoder_plugins: Arc<DecoderPlugins>,
356    cache: Arc<LanceCache>,
357    options: FileReaderOptions,
358}
359#[derive(Debug)]
360struct Footer {
361    #[allow(dead_code)]
362    column_meta_start: u64,
363    // We don't use this today because we always load metadata for every column
364    // and don't yet support "metadata projection"
365    #[allow(dead_code)]
366    column_meta_offsets_start: u64,
367    global_buff_offsets_start: u64,
368    num_global_buffers: u32,
369    num_columns: u32,
370    major_version: u16,
371    minor_version: u16,
372}
373
374const FOOTER_LEN: usize = 40;
375
376impl FileReader {
377    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
378        Self {
379            scheduler,
380            base_projection: self.base_projection.clone(),
381            cache: self.cache.clone(),
382            decoder_plugins: self.decoder_plugins.clone(),
383            metadata: self.metadata.clone(),
384            options: self.options.clone(),
385            num_rows: self.num_rows,
386        }
387    }
388
389    pub fn num_rows(&self) -> u64 {
390        self.num_rows
391    }
392
393    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
394        &self.metadata
395    }
396
397    pub fn file_statistics(&self) -> FileStatistics {
398        let column_metadatas = &self.metadata().column_metadatas;
399
400        let column_stats = column_metadatas
401            .iter()
402            .map(|col_metadata| {
403                let num_pages = col_metadata.pages.len();
404                let size_bytes = col_metadata
405                    .pages
406                    .iter()
407                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
408                    .sum::<u64>();
409                ColumnStatistics {
410                    num_pages,
411                    size_bytes,
412                }
413            })
414            .collect();
415
416        FileStatistics {
417            columns: column_stats,
418        }
419    }
420
421    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
422        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
423        self.scheduler
424            .submit_single(
425                buffer_desc.position..buffer_desc.position + buffer_desc.size,
426                0,
427            )
428            .await
429    }
430
431    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
432        let file_size = scheduler.reader().size().await? as u64;
433        let begin = if file_size < scheduler.reader().block_size() as u64 {
434            0
435        } else {
436            file_size - scheduler.reader().block_size() as u64
437        };
438        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
439        Ok((tail_bytes, file_size))
440    }
441
442    // Checks to make sure the footer is written correctly and returns the
443    // position of the file descriptor (which comes from the footer)
444    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
445        let len = footer_bytes.len();
446        if len < FOOTER_LEN {
447            return Err(Error::io(
448                format!(
449                    "does not have sufficient data, len: {}, bytes: {:?}",
450                    len, footer_bytes
451                ),
452                location!(),
453            ));
454        }
455        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
456
457        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
458        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
459        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
460        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
461        let num_columns = cursor.read_u32::<LittleEndian>()?;
462        let major_version = cursor.read_u16::<LittleEndian>()?;
463        let minor_version = cursor.read_u16::<LittleEndian>()?;
464
465        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
466            return Err(Error::version_conflict(
467                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
468                major_version,
469                minor_version,
470                location!(),
471            ));
472        }
473
474        let magic_bytes = footer_bytes.slice(len - 4..);
475        if magic_bytes.as_ref() != MAGIC {
476            return Err(Error::io(
477                format!(
478                    "file does not appear to be a Lance file (invalid magic: {:?})",
479                    MAGIC
480                ),
481                location!(),
482            ));
483        }
484        Ok(Footer {
485            column_meta_start,
486            column_meta_offsets_start,
487            global_buff_offsets_start,
488            num_global_buffers,
489            num_columns,
490            major_version,
491            minor_version,
492        })
493    }
494
495    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
496    fn read_all_column_metadata(
497        column_metadata_bytes: Bytes,
498        footer: &Footer,
499    ) -> Result<Vec<pbfile::ColumnMetadata>> {
500        let column_metadata_start = footer.column_meta_start;
501        // cmo == column_metadata_offsets
502        let cmo_table_size = 16 * footer.num_columns as usize;
503        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
504
505        (0..footer.num_columns)
506            .map(|col_idx| {
507                let offset = (col_idx * 16) as usize;
508                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
509                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
510                let normalized_position = (position - column_metadata_start) as usize;
511                let normalized_end = normalized_position + (length as usize);
512                Ok(pbfile::ColumnMetadata::decode(
513                    &column_metadata_bytes[normalized_position..normalized_end],
514                )?)
515            })
516            .collect::<Result<Vec<_>>>()
517    }
518
519    async fn optimistic_tail_read(
520        data: &Bytes,
521        start_pos: u64,
522        scheduler: &FileScheduler,
523        file_len: u64,
524    ) -> Result<Bytes> {
525        let num_bytes_needed = (file_len - start_pos) as usize;
526        if data.len() >= num_bytes_needed {
527            Ok(data.slice((data.len() - num_bytes_needed)..))
528        } else {
529            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
530            let start = file_len - num_bytes_needed as u64;
531            let missing_bytes = scheduler
532                .submit_single(start..start + num_bytes_missing, 0)
533                .await?;
534            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
535            combined.extend(missing_bytes);
536            combined.extend(data);
537            Ok(combined.freeze())
538        }
539    }
540
541    fn do_decode_gbo_table(
542        gbo_bytes: &Bytes,
543        footer: &Footer,
544        version: LanceFileVersion,
545    ) -> Result<Vec<BufferDescriptor>> {
546        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
547
548        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
549        for _ in 0..footer.num_global_buffers {
550            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
551            assert!(
552                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
553            );
554            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
555            global_buffers.push(BufferDescriptor {
556                position: buf_pos,
557                size: buf_size,
558            });
559        }
560
561        Ok(global_buffers)
562    }
563
564    async fn decode_gbo_table(
565        tail_bytes: &Bytes,
566        file_len: u64,
567        scheduler: &FileScheduler,
568        footer: &Footer,
569        version: LanceFileVersion,
570    ) -> Result<Vec<BufferDescriptor>> {
571        // This could, in theory, trigger another IOP but the GBO table should never be large
572        // enough for that to happen
573        let gbo_bytes = Self::optimistic_tail_read(
574            tail_bytes,
575            footer.global_buff_offsets_start,
576            scheduler,
577            file_len,
578        )
579        .await?;
580        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
581    }
582
583    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
584        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
585        let pb_schema = file_descriptor.schema.unwrap();
586        let num_rows = file_descriptor.length;
587        let fields_with_meta = FieldsWithMeta {
588            fields: Fields(pb_schema.fields),
589            metadata: pb_schema.metadata,
590        };
591        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
592        Ok((num_rows, schema))
593    }
594
595    // TODO: Support late projection.  Currently, if we want to perform a
596    // projected read of a file, we load all of the column metadata, and then
597    // only read the column data that is requested.  This is fine for most cases.
598    //
599    // However, if there are many columns then loading all of the column metadata
600    // may be expensive.  We should support a mode where we only load the column
601    // metadata for the columns that are requested (the file format supports this).
602    //
603    // The main challenge is that we either need to ignore the column metadata cache
604    // or have a more sophisticated cache that can cache per-column metadata.
605    //
606    // Also, if the number of columns is fairly small, it's faster to read them as a
607    // single IOP, but we can fix this through coalescing.
608    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
609        // 1. read the footer
610        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
611        let footer = Self::decode_footer(&tail_bytes)?;
612
613        let file_version = LanceFileVersion::try_from_major_minor(
614            footer.major_version as u32,
615            footer.minor_version as u32,
616        )?;
617
618        let gbo_table =
619            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
620        if gbo_table.is_empty() {
621            return Err(Error::Internal {
622                message: "File did not contain any global buffers, schema expected".to_string(),
623                location: location!(),
624            });
625        }
626        let schema_start = gbo_table[0].position;
627        let schema_size = gbo_table[0].size;
628
629        let num_footer_bytes = file_len - schema_start;
630
631        // By default we read all column metadatas.  We do NOT read the column metadata buffers
632        // at this point.  We only want to read the column metadata for columns we are actually loading.
633        let all_metadata_bytes =
634            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
635
636        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
637        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
638
639        // Next, read the metadata for the columns
640        // This is both the column metadata and the CMO table
641        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
642        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
643        let column_metadata_bytes =
644            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
645        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
646
647        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
648        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
649        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
650
651        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
652
653        Ok(CachedFileMetadata {
654            file_schema: Arc::new(schema),
655            column_metadatas,
656            column_infos,
657            num_rows,
658            num_data_bytes,
659            num_column_metadata_bytes,
660            num_global_buffer_bytes,
661            num_footer_bytes,
662            file_buffers: gbo_table,
663            major_version: footer.major_version,
664            minor_version: footer.minor_version,
665        })
666    }
667
668    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
669        match &encoding.location {
670            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
671            Some(pbfile::encoding::Location::Direct(encoding)) => {
672                let encoding_buf = Bytes::from(encoding.encoding.clone());
673                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
674                encoding_any.to_msg::<M>().unwrap()
675            }
676            Some(pbfile::encoding::Location::None(_)) => panic!(),
677            None => panic!(),
678        }
679    }
680
681    fn meta_to_col_infos(
682        column_metadatas: &[pbfile::ColumnMetadata],
683        file_version: LanceFileVersion,
684    ) -> Vec<Arc<ColumnInfo>> {
685        column_metadatas
686            .iter()
687            .enumerate()
688            .map(|(col_idx, col_meta)| {
689                let page_infos = col_meta
690                    .pages
691                    .iter()
692                    .map(|page| {
693                        let num_rows = page.length;
694                        let encoding = match file_version {
695                            LanceFileVersion::V2_0 => {
696                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
697                                    page.encoding.as_ref().unwrap(),
698                                ))
699                            }
700                            _ => PageEncoding::Structural(Self::fetch_encoding::<
701                                pbenc21::PageLayout,
702                            >(
703                                page.encoding.as_ref().unwrap()
704                            )),
705                        };
706                        let buffer_offsets_and_sizes = Arc::from(
707                            page.buffer_offsets
708                                .iter()
709                                .zip(page.buffer_sizes.iter())
710                                .map(|(offset, size)| {
711                                    // Starting with version 2.1 we can assert that page buffers are aligned
712                                    assert!(
713                                        file_version < LanceFileVersion::V2_1
714                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
715                                    );
716                                    (*offset, *size)
717                                })
718                                .collect::<Vec<_>>(),
719                        );
720                        PageInfo {
721                            buffer_offsets_and_sizes,
722                            encoding,
723                            num_rows,
724                            priority: page.priority,
725                        }
726                    })
727                    .collect::<Vec<_>>();
728                let buffer_offsets_and_sizes = Arc::from(
729                    col_meta
730                        .buffer_offsets
731                        .iter()
732                        .zip(col_meta.buffer_sizes.iter())
733                        .map(|(offset, size)| (*offset, *size))
734                        .collect::<Vec<_>>(),
735                );
736                Arc::new(ColumnInfo {
737                    index: col_idx as u32,
738                    page_infos: Arc::from(page_infos),
739                    buffer_offsets_and_sizes,
740                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
741                })
742            })
743            .collect::<Vec<_>>()
744    }
745
746    fn validate_projection(
747        projection: &ReaderProjection,
748        metadata: &CachedFileMetadata,
749    ) -> Result<()> {
750        if projection.schema.fields.is_empty() {
751            return Err(Error::invalid_input(
752                "Attempt to read zero columns from the file, at least one column must be specified"
753                    .to_string(),
754                location!(),
755            ));
756        }
757        let mut column_indices_seen = BTreeSet::new();
758        for column_index in &projection.column_indices {
759            if !column_indices_seen.insert(*column_index) {
760                return Err(Error::invalid_input(
761                    format!(
762                        "The projection specified the column index {} more than once",
763                        column_index
764                    ),
765                    location!(),
766                ));
767            }
768            if *column_index >= metadata.column_infos.len() as u32 {
769                return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
770            }
771        }
772        Ok(())
773    }
774
775    /// Opens a new file reader without any pre-existing knowledge
776    ///
777    /// This will read the file schema from the file itself and thus requires a bit more I/O
778    ///
779    /// A `base_projection` can also be provided.  If provided, then the projection will apply
780    /// to all reads from the file that do not specify their own projection.
781    pub async fn try_open(
782        scheduler: FileScheduler,
783        base_projection: Option<ReaderProjection>,
784        decoder_plugins: Arc<DecoderPlugins>,
785        cache: &LanceCache,
786        options: FileReaderOptions,
787    ) -> Result<Self> {
788        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
789        let path = scheduler.reader().path().clone();
790
791        // Create LanceEncodingsIo with read chunk size from options
792        let encodings_io =
793            LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
794
795        Self::try_open_with_file_metadata(
796            Arc::new(encodings_io),
797            path,
798            base_projection,
799            decoder_plugins,
800            file_metadata,
801            cache,
802            options,
803        )
804        .await
805    }
806
807    /// Same as `try_open` but with the file metadata already loaded.
808    ///
809    /// This method also can accept any kind of `EncodingsIo` implementation allowing
810    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
811    /// disks it may be better to avoid asynchronous overhead).
812    pub async fn try_open_with_file_metadata(
813        scheduler: Arc<dyn EncodingsIo>,
814        path: Path,
815        base_projection: Option<ReaderProjection>,
816        decoder_plugins: Arc<DecoderPlugins>,
817        file_metadata: Arc<CachedFileMetadata>,
818        cache: &LanceCache,
819        options: FileReaderOptions,
820    ) -> Result<Self> {
821        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
822
823        if let Some(base_projection) = base_projection.as_ref() {
824            Self::validate_projection(base_projection, &file_metadata)?;
825        }
826        let num_rows = file_metadata.num_rows;
827        Ok(Self {
828            scheduler,
829            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
830                file_metadata.file_schema.as_ref(),
831                file_metadata.version(),
832            )),
833            num_rows,
834            metadata: file_metadata,
835            decoder_plugins,
836            cache,
837            options,
838        })
839    }
840
841    // The actual decoder needs all the column infos that make up a type.  In other words, if
842    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
843    //
844    // This is a file reader concern because the file reader needs to support late projection of columns
845    // and so it will need to figure this out anyways.
846    //
847    // It's a bit of a tricky process though because the number of column infos may depend on the
848    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
849    // only be a single column in the file (and not 3).
850    //
851    // At the moment this method words because our rules are simple and we just repeat them here.  See
852    // Self::default_projection for a similar problem.  In the future this is something the encodings
853    // registry will need to figure out.
854    fn collect_columns_from_projection(
855        &self,
856        _projection: &ReaderProjection,
857    ) -> Result<Vec<Arc<ColumnInfo>>> {
858        Ok(self.metadata.column_infos.to_vec())
859    }
860
861    #[allow(clippy::too_many_arguments)]
862    fn do_read_range(
863        column_infos: Vec<Arc<ColumnInfo>>,
864        io: Arc<dyn EncodingsIo>,
865        cache: Arc<LanceCache>,
866        num_rows: u64,
867        decoder_plugins: Arc<DecoderPlugins>,
868        range: Range<u64>,
869        batch_size: u32,
870        projection: ReaderProjection,
871        filter: FilterExpression,
872        decoder_config: DecoderConfig,
873    ) -> Result<BoxStream<'static, ReadBatchTask>> {
874        debug!(
875            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
876            range,
877            batch_size,
878            num_rows,
879            column_infos.len(),
880            projection.schema.fields.len(),
881        );
882
883        let config = SchedulerDecoderConfig {
884            batch_size,
885            cache,
886            decoder_plugins,
887            io,
888            decoder_config,
889        };
890
891        let requested_rows = RequestedRows::Ranges(vec![range]);
892
893        Ok(schedule_and_decode(
894            column_infos,
895            requested_rows,
896            filter,
897            projection.column_indices,
898            projection.schema,
899            config,
900        ))
901    }
902
903    fn read_range(
904        &self,
905        range: Range<u64>,
906        batch_size: u32,
907        projection: ReaderProjection,
908        filter: FilterExpression,
909    ) -> Result<BoxStream<'static, ReadBatchTask>> {
910        // Create and initialize the stream
911        Self::do_read_range(
912            self.collect_columns_from_projection(&projection)?,
913            self.scheduler.clone(),
914            self.cache.clone(),
915            self.num_rows,
916            self.decoder_plugins.clone(),
917            range,
918            batch_size,
919            projection,
920            filter,
921            self.options.decoder_config.clone(),
922        )
923    }
924
925    #[allow(clippy::too_many_arguments)]
926    fn do_take_rows(
927        column_infos: Vec<Arc<ColumnInfo>>,
928        io: Arc<dyn EncodingsIo>,
929        cache: Arc<LanceCache>,
930        decoder_plugins: Arc<DecoderPlugins>,
931        indices: Vec<u64>,
932        batch_size: u32,
933        projection: ReaderProjection,
934        filter: FilterExpression,
935        decoder_config: DecoderConfig,
936    ) -> Result<BoxStream<'static, ReadBatchTask>> {
937        debug!(
938            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
939            indices.len(),
940            indices[0],
941            indices[indices.len() - 1],
942            batch_size,
943            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
944        );
945
946        let config = SchedulerDecoderConfig {
947            batch_size,
948            cache,
949            decoder_plugins,
950            io,
951            decoder_config,
952        };
953
954        let requested_rows = RequestedRows::Indices(indices);
955
956        Ok(schedule_and_decode(
957            column_infos,
958            requested_rows,
959            filter,
960            projection.column_indices,
961            projection.schema,
962            config,
963        ))
964    }
965
966    fn take_rows(
967        &self,
968        indices: Vec<u64>,
969        batch_size: u32,
970        projection: ReaderProjection,
971    ) -> Result<BoxStream<'static, ReadBatchTask>> {
972        // Create and initialize the stream
973        Self::do_take_rows(
974            self.collect_columns_from_projection(&projection)?,
975            self.scheduler.clone(),
976            self.cache.clone(),
977            self.decoder_plugins.clone(),
978            indices,
979            batch_size,
980            projection,
981            FilterExpression::no_filter(),
982            self.options.decoder_config.clone(),
983        )
984    }
985
986    #[allow(clippy::too_many_arguments)]
987    fn do_read_ranges(
988        column_infos: Vec<Arc<ColumnInfo>>,
989        io: Arc<dyn EncodingsIo>,
990        cache: Arc<LanceCache>,
991        decoder_plugins: Arc<DecoderPlugins>,
992        ranges: Vec<Range<u64>>,
993        batch_size: u32,
994        projection: ReaderProjection,
995        filter: FilterExpression,
996        decoder_config: DecoderConfig,
997    ) -> Result<BoxStream<'static, ReadBatchTask>> {
998        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
999        debug!(
1000            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1001            ranges.len(),
1002            num_rows,
1003            ranges[0].start,
1004            ranges[ranges.len() - 1].end,
1005            batch_size,
1006            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1007        );
1008
1009        let config = SchedulerDecoderConfig {
1010            batch_size,
1011            cache,
1012            decoder_plugins,
1013            io,
1014            decoder_config,
1015        };
1016
1017        let requested_rows = RequestedRows::Ranges(ranges);
1018
1019        Ok(schedule_and_decode(
1020            column_infos,
1021            requested_rows,
1022            filter,
1023            projection.column_indices,
1024            projection.schema,
1025            config,
1026        ))
1027    }
1028
1029    fn read_ranges(
1030        &self,
1031        ranges: Vec<Range<u64>>,
1032        batch_size: u32,
1033        projection: ReaderProjection,
1034        filter: FilterExpression,
1035    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1036        Self::do_read_ranges(
1037            self.collect_columns_from_projection(&projection)?,
1038            self.scheduler.clone(),
1039            self.cache.clone(),
1040            self.decoder_plugins.clone(),
1041            ranges,
1042            batch_size,
1043            projection,
1044            filter,
1045            self.options.decoder_config.clone(),
1046        )
1047    }
1048
1049    /// Creates a stream of "read tasks" to read the data from the file
1050    ///
1051    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1052    /// of record batches it returns a stream of "read tasks".
1053    ///
1054    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1055    ///
1056    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1057    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1058    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1059    pub fn read_tasks(
1060        &self,
1061        params: ReadBatchParams,
1062        batch_size: u32,
1063        projection: Option<ReaderProjection>,
1064        filter: FilterExpression,
1065    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1066        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1067        Self::validate_projection(&projection, &self.metadata)?;
1068        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1069            if bound > self.num_rows || bound == self.num_rows && inclusive {
1070                Err(Error::invalid_input(
1071                    format!(
1072                        "cannot read {:?} from file with {} rows",
1073                        params, self.num_rows
1074                    ),
1075                    location!(),
1076                ))
1077            } else {
1078                Ok(())
1079            }
1080        };
1081        match &params {
1082            ReadBatchParams::Indices(indices) => {
1083                for idx in indices {
1084                    match idx {
1085                        None => {
1086                            return Err(Error::invalid_input(
1087                                "Null value in indices array",
1088                                location!(),
1089                            ));
1090                        }
1091                        Some(idx) => {
1092                            verify_bound(&params, idx as u64, true)?;
1093                        }
1094                    }
1095                }
1096                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1097                self.take_rows(indices, batch_size, projection)
1098            }
1099            ReadBatchParams::Range(range) => {
1100                verify_bound(&params, range.end as u64, false)?;
1101                self.read_range(
1102                    range.start as u64..range.end as u64,
1103                    batch_size,
1104                    projection,
1105                    filter,
1106                )
1107            }
1108            ReadBatchParams::Ranges(ranges) => {
1109                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1110                for range in ranges.as_ref() {
1111                    verify_bound(&params, range.end, false)?;
1112                    ranges_u64.push(range.start..range.end);
1113                }
1114                self.read_ranges(ranges_u64, batch_size, projection, filter)
1115            }
1116            ReadBatchParams::RangeFrom(range) => {
1117                verify_bound(&params, range.start as u64, true)?;
1118                self.read_range(
1119                    range.start as u64..self.num_rows,
1120                    batch_size,
1121                    projection,
1122                    filter,
1123                )
1124            }
1125            ReadBatchParams::RangeTo(range) => {
1126                verify_bound(&params, range.end as u64, false)?;
1127                self.read_range(0..range.end as u64, batch_size, projection, filter)
1128            }
1129            ReadBatchParams::RangeFull => {
1130                self.read_range(0..self.num_rows, batch_size, projection, filter)
1131            }
1132        }
1133    }
1134
1135    /// Reads data from the file as a stream of record batches
1136    ///
1137    /// * `params` - Specifies the range (or indices) of data to read
1138    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1139    ///   if it is the last batch or if it is not possible to create a batch of the
1140    ///   requested size.
1141    ///
1142    ///   For example, if the batch size is 1024 and one of the columns is a string
1143    ///   column then there may be some ranges of 1024 rows that contain more than
1144    ///   2^31 bytes of string data (which is the maximum size of a string column
1145    ///   in Arrow).  In this case smaller batches may be emitted.
1146    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1147    ///   amount of CPU parallelism of the read.  In other words it controls how many
1148    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1149    ///   of the read (how many I/O requests are in flight at once).
1150    ///
1151    ///   This parameter also is also related to backpressure.  If the consumer of the
1152    ///   stream is slow then the reader will build up RAM.
1153    /// * `projection` - A projection to apply to the read.  This controls which columns
1154    ///   are read from the file.  The projection is NOT applied on top of the base
1155    ///   projection.  The projection is applied directly to the file schema.
1156    pub fn read_stream_projected(
1157        &self,
1158        params: ReadBatchParams,
1159        batch_size: u32,
1160        batch_readahead: u32,
1161        projection: ReaderProjection,
1162        filter: FilterExpression,
1163    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1164        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1165        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1166        let batch_stream = tasks_stream
1167            .map(|task| task.task)
1168            .buffered(batch_readahead as usize)
1169            .boxed();
1170        Ok(Box::pin(RecordBatchStreamAdapter::new(
1171            arrow_schema,
1172            batch_stream,
1173        )))
1174    }
1175
1176    fn take_rows_blocking(
1177        &self,
1178        indices: Vec<u64>,
1179        batch_size: u32,
1180        projection: ReaderProjection,
1181        filter: FilterExpression,
1182    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1183        let column_infos = self.collect_columns_from_projection(&projection)?;
1184        debug!(
1185            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1186            indices.len(),
1187            indices[0],
1188            indices[indices.len() - 1],
1189            batch_size,
1190            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1191        );
1192
1193        let config = SchedulerDecoderConfig {
1194            batch_size,
1195            cache: self.cache.clone(),
1196            decoder_plugins: self.decoder_plugins.clone(),
1197            io: self.scheduler.clone(),
1198            decoder_config: self.options.decoder_config.clone(),
1199        };
1200
1201        let requested_rows = RequestedRows::Indices(indices);
1202
1203        schedule_and_decode_blocking(
1204            column_infos,
1205            requested_rows,
1206            filter,
1207            projection.column_indices,
1208            projection.schema,
1209            config,
1210        )
1211    }
1212
1213    fn read_ranges_blocking(
1214        &self,
1215        ranges: Vec<Range<u64>>,
1216        batch_size: u32,
1217        projection: ReaderProjection,
1218        filter: FilterExpression,
1219    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1220        let column_infos = self.collect_columns_from_projection(&projection)?;
1221        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1222        debug!(
1223            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1224            ranges.len(),
1225            num_rows,
1226            ranges[0].start,
1227            ranges[ranges.len() - 1].end,
1228            batch_size,
1229            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1230        );
1231
1232        let config = SchedulerDecoderConfig {
1233            batch_size,
1234            cache: self.cache.clone(),
1235            decoder_plugins: self.decoder_plugins.clone(),
1236            io: self.scheduler.clone(),
1237            decoder_config: self.options.decoder_config.clone(),
1238        };
1239
1240        let requested_rows = RequestedRows::Ranges(ranges);
1241
1242        schedule_and_decode_blocking(
1243            column_infos,
1244            requested_rows,
1245            filter,
1246            projection.column_indices,
1247            projection.schema,
1248            config,
1249        )
1250    }
1251
1252    fn read_range_blocking(
1253        &self,
1254        range: Range<u64>,
1255        batch_size: u32,
1256        projection: ReaderProjection,
1257        filter: FilterExpression,
1258    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1259        let column_infos = self.collect_columns_from_projection(&projection)?;
1260        let num_rows = self.num_rows;
1261
1262        debug!(
1263            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1264            range,
1265            batch_size,
1266            num_rows,
1267            column_infos.len(),
1268            projection.schema.fields.len(),
1269        );
1270
1271        let config = SchedulerDecoderConfig {
1272            batch_size,
1273            cache: self.cache.clone(),
1274            decoder_plugins: self.decoder_plugins.clone(),
1275            io: self.scheduler.clone(),
1276            decoder_config: self.options.decoder_config.clone(),
1277        };
1278
1279        let requested_rows = RequestedRows::Ranges(vec![range]);
1280
1281        schedule_and_decode_blocking(
1282            column_infos,
1283            requested_rows,
1284            filter,
1285            projection.column_indices,
1286            projection.schema,
1287            config,
1288        )
1289    }
1290
1291    /// Read data from the file as an iterator of record batches
1292    ///
1293    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1294    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1295    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1296    ///
1297    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1298    /// use this method) because we can parallelize the decode.
1299    ///
1300    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1301    /// from a spawn_blocking context.
1302    pub fn read_stream_projected_blocking(
1303        &self,
1304        params: ReadBatchParams,
1305        batch_size: u32,
1306        projection: Option<ReaderProjection>,
1307        filter: FilterExpression,
1308    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1309        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1310        Self::validate_projection(&projection, &self.metadata)?;
1311        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1312            if bound > self.num_rows || bound == self.num_rows && inclusive {
1313                Err(Error::invalid_input(
1314                    format!(
1315                        "cannot read {:?} from file with {} rows",
1316                        params, self.num_rows
1317                    ),
1318                    location!(),
1319                ))
1320            } else {
1321                Ok(())
1322            }
1323        };
1324        match &params {
1325            ReadBatchParams::Indices(indices) => {
1326                for idx in indices {
1327                    match idx {
1328                        None => {
1329                            return Err(Error::invalid_input(
1330                                "Null value in indices array",
1331                                location!(),
1332                            ));
1333                        }
1334                        Some(idx) => {
1335                            verify_bound(&params, idx as u64, true)?;
1336                        }
1337                    }
1338                }
1339                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1340                self.take_rows_blocking(indices, batch_size, projection, filter)
1341            }
1342            ReadBatchParams::Range(range) => {
1343                verify_bound(&params, range.end as u64, false)?;
1344                self.read_range_blocking(
1345                    range.start as u64..range.end as u64,
1346                    batch_size,
1347                    projection,
1348                    filter,
1349                )
1350            }
1351            ReadBatchParams::Ranges(ranges) => {
1352                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1353                for range in ranges.as_ref() {
1354                    verify_bound(&params, range.end, false)?;
1355                    ranges_u64.push(range.start..range.end);
1356                }
1357                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1358            }
1359            ReadBatchParams::RangeFrom(range) => {
1360                verify_bound(&params, range.start as u64, true)?;
1361                self.read_range_blocking(
1362                    range.start as u64..self.num_rows,
1363                    batch_size,
1364                    projection,
1365                    filter,
1366                )
1367            }
1368            ReadBatchParams::RangeTo(range) => {
1369                verify_bound(&params, range.end as u64, false)?;
1370                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1371            }
1372            ReadBatchParams::RangeFull => {
1373                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1374            }
1375        }
1376    }
1377
1378    /// Reads data from the file as a stream of record batches
1379    ///
1380    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1381    /// provided when the file was opened (or reads all columns if the file was
1382    /// opened without a base projection)
1383    pub fn read_stream(
1384        &self,
1385        params: ReadBatchParams,
1386        batch_size: u32,
1387        batch_readahead: u32,
1388        filter: FilterExpression,
1389    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1390        self.read_stream_projected(
1391            params,
1392            batch_size,
1393            batch_readahead,
1394            self.base_projection.clone(),
1395            filter,
1396        )
1397    }
1398
1399    pub fn schema(&self) -> &Arc<Schema> {
1400        &self.metadata.file_schema
1401    }
1402}
1403
1404/// Inspects a page and returns a String describing the page's encoding
1405pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1406    if let Some(encoding) = &page.encoding {
1407        if let Some(style) = &encoding.location {
1408            match style {
1409                pbfile::encoding::Location::Indirect(indirect) => {
1410                    format!(
1411                        "IndirectEncoding(pos={},size={})",
1412                        indirect.buffer_location, indirect.buffer_length
1413                    )
1414                }
1415                pbfile::encoding::Location::Direct(direct) => {
1416                    let encoding_any =
1417                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1418                            .expect("failed to deserialize encoding as protobuf");
1419                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1420                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1421                        match encoding {
1422                            Ok(encoding) => {
1423                                format!("{:#?}", encoding)
1424                            }
1425                            Err(err) => {
1426                                format!("Unsupported(decode_err={})", err)
1427                            }
1428                        }
1429                    } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1430                        let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1431                        match encoding {
1432                            Ok(encoding) => {
1433                                format!("{:#?}", encoding)
1434                            }
1435                            Err(err) => {
1436                                format!("Unsupported(decode_err={})", err)
1437                            }
1438                        }
1439                    } else {
1440                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1441                    }
1442                }
1443                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1444            }
1445        } else {
1446            "MISSING STYLE".to_string()
1447        }
1448    } else {
1449        "MISSING".to_string()
1450    }
1451}
1452
1453pub trait EncodedBatchReaderExt {
1454    fn try_from_mini_lance(
1455        bytes: Bytes,
1456        schema: &Schema,
1457        version: LanceFileVersion,
1458    ) -> Result<Self>
1459    where
1460        Self: Sized;
1461    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1462    where
1463        Self: Sized;
1464}
1465
1466impl EncodedBatchReaderExt for EncodedBatch {
1467    fn try_from_mini_lance(
1468        bytes: Bytes,
1469        schema: &Schema,
1470        file_version: LanceFileVersion,
1471    ) -> Result<Self>
1472    where
1473        Self: Sized,
1474    {
1475        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1476        let footer = FileReader::decode_footer(&bytes)?;
1477
1478        // Next, read the metadata for the columns
1479        // This is both the column metadata and the CMO table
1480        let column_metadata_start = footer.column_meta_start as usize;
1481        let column_metadata_end = footer.global_buff_offsets_start as usize;
1482        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1483        let column_metadatas =
1484            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1485
1486        let file_version = LanceFileVersion::try_from_major_minor(
1487            footer.major_version as u32,
1488            footer.minor_version as u32,
1489        )?;
1490
1491        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1492
1493        Ok(Self {
1494            data: bytes,
1495            num_rows: page_table
1496                .first()
1497                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1498                .unwrap_or(0),
1499            page_table,
1500            top_level_columns: projection.column_indices,
1501            schema: Arc::new(schema.clone()),
1502        })
1503    }
1504
1505    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1506    where
1507        Self: Sized,
1508    {
1509        let footer = FileReader::decode_footer(&bytes)?;
1510        let file_version = LanceFileVersion::try_from_major_minor(
1511            footer.major_version as u32,
1512            footer.minor_version as u32,
1513        )?;
1514
1515        let gbo_table = FileReader::do_decode_gbo_table(
1516            &bytes.slice(footer.global_buff_offsets_start as usize..),
1517            &footer,
1518            file_version,
1519        )?;
1520        if gbo_table.is_empty() {
1521            return Err(Error::Internal {
1522                message: "File did not contain any global buffers, schema expected".to_string(),
1523                location: location!(),
1524            });
1525        }
1526        let schema_start = gbo_table[0].position as usize;
1527        let schema_size = gbo_table[0].size as usize;
1528
1529        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1530        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1531        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1532
1533        // Next, read the metadata for the columns
1534        // This is both the column metadata and the CMO table
1535        let column_metadata_start = footer.column_meta_start as usize;
1536        let column_metadata_end = footer.global_buff_offsets_start as usize;
1537        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1538        let column_metadatas =
1539            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1540
1541        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1542
1543        Ok(Self {
1544            data: bytes,
1545            num_rows: page_table
1546                .first()
1547                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1548                .unwrap_or(0),
1549            page_table,
1550            top_level_columns: projection.column_indices,
1551            schema: Arc::new(schema),
1552        })
1553    }
1554}
1555
1556#[cfg(test)]
1557pub mod tests {
1558    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1559
1560    use arrow_array::{
1561        types::{Float64Type, Int32Type},
1562        RecordBatch, UInt32Array,
1563    };
1564    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1565    use bytes::Bytes;
1566    use futures::{prelude::stream::TryStreamExt, StreamExt};
1567    use lance_arrow::RecordBatchExt;
1568    use lance_core::{datatypes::Schema, ArrowResult};
1569    use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1570    use lance_encoding::{
1571        decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1572        encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1573        version::LanceFileVersion,
1574    };
1575    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1576    use log::debug;
1577    use rstest::rstest;
1578    use tokio::sync::mpsc;
1579
1580    use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1581    use crate::testing::{test_cache, write_lance_file, FsFixture, WrittenFile};
1582    use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1583    use lance_encoding::decoder::DecoderConfig;
1584
1585    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1586        let location_type = DataType::Struct(Fields::from(vec![
1587            Field::new("x", DataType::Float64, true),
1588            Field::new("y", DataType::Float64, true),
1589        ]));
1590        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1591
1592        let mut reader = gen_batch()
1593            .col("score", array::rand::<Float64Type>())
1594            .col("location", array::rand_type(&location_type))
1595            .col("categories", array::rand_type(&categories_type))
1596            .col("binary", array::rand_type(&DataType::Binary));
1597        if version <= LanceFileVersion::V2_0 {
1598            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1599        }
1600        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1601
1602        write_lance_file(
1603            reader,
1604            fs,
1605            FileWriterOptions {
1606                format_version: Some(version),
1607                ..Default::default()
1608            },
1609        )
1610        .await
1611    }
1612
1613    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1614
1615    async fn verify_expected(
1616        expected: &[RecordBatch],
1617        mut actual: Pin<Box<dyn RecordBatchStream>>,
1618        read_size: u32,
1619        transform: Option<Transformer>,
1620    ) {
1621        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1622        let mut expected_iter = expected.iter().map(|batch| {
1623            if let Some(transform) = &transform {
1624                transform(batch)
1625            } else {
1626                batch.clone()
1627            }
1628        });
1629        let mut next_expected = expected_iter.next().unwrap().clone();
1630        while let Some(actual) = actual.next().await {
1631            let mut actual = actual.unwrap();
1632            let mut rows_to_verify = actual.num_rows() as u32;
1633            let expected_length = remaining.min(read_size);
1634            assert_eq!(expected_length, rows_to_verify);
1635
1636            while rows_to_verify > 0 {
1637                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1638                assert_eq!(
1639                    next_expected.slice(0, next_slice_len as usize),
1640                    actual.slice(0, next_slice_len as usize)
1641                );
1642                remaining -= next_slice_len;
1643                rows_to_verify -= next_slice_len;
1644                if remaining > 0 {
1645                    if next_slice_len == next_expected.num_rows() as u32 {
1646                        next_expected = expected_iter.next().unwrap().clone();
1647                    } else {
1648                        next_expected = next_expected.slice(
1649                            next_slice_len as usize,
1650                            next_expected.num_rows() - next_slice_len as usize,
1651                        );
1652                    }
1653                }
1654                if rows_to_verify > 0 {
1655                    actual = actual.slice(
1656                        next_slice_len as usize,
1657                        actual.num_rows() - next_slice_len as usize,
1658                    );
1659                }
1660            }
1661        }
1662        assert_eq!(remaining, 0);
1663    }
1664
1665    #[tokio::test]
1666    async fn test_round_trip() {
1667        let fs = FsFixture::default();
1668
1669        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1670
1671        for read_size in [32, 1024, 1024 * 1024] {
1672            let file_scheduler = fs
1673                .scheduler
1674                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1675                .await
1676                .unwrap();
1677            let file_reader = FileReader::try_open(
1678                file_scheduler,
1679                None,
1680                Arc::<DecoderPlugins>::default(),
1681                &test_cache(),
1682                FileReaderOptions::default(),
1683            )
1684            .await
1685            .unwrap();
1686
1687            let schema = file_reader.schema();
1688            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1689
1690            let batch_stream = file_reader
1691                .read_stream(
1692                    lance_io::ReadBatchParams::RangeFull,
1693                    read_size,
1694                    16,
1695                    FilterExpression::no_filter(),
1696                )
1697                .unwrap();
1698
1699            verify_expected(&data, batch_stream, read_size, None).await;
1700        }
1701    }
1702
1703    #[rstest]
1704    #[test_log::test(tokio::test)]
1705    async fn test_encoded_batch_round_trip(
1706        // TODO: Add V2_1 (currently fails)
1707        #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1708    ) {
1709        let data = gen_batch()
1710            .col("x", array::rand::<Int32Type>())
1711            .col("y", array::rand_utf8(ByteCount::from(16), false))
1712            .into_batch_rows(RowCount::from(10000))
1713            .unwrap();
1714
1715        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1716
1717        let encoding_options = EncodingOptions {
1718            cache_bytes_per_column: 4096,
1719            max_page_bytes: 32 * 1024 * 1024,
1720            keep_original_array: true,
1721            buffer_alignment: 64,
1722        };
1723
1724        let encoding_strategy = default_encoding_strategy(version);
1725
1726        let encoded_batch = encode_batch(
1727            &data,
1728            lance_schema.clone(),
1729            encoding_strategy.as_ref(),
1730            &encoding_options,
1731        )
1732        .await
1733        .unwrap();
1734
1735        // Test self described
1736        let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1737
1738        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1739
1740        let decoded = decode_batch(
1741            &decoded_batch,
1742            &FilterExpression::no_filter(),
1743            Arc::<DecoderPlugins>::default(),
1744            false,
1745            version,
1746            None,
1747        )
1748        .await
1749        .unwrap();
1750
1751        assert_eq!(data, decoded);
1752
1753        // Test mini
1754        let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1755        let decoded_batch =
1756            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1757                .unwrap();
1758        let decoded = decode_batch(
1759            &decoded_batch,
1760            &FilterExpression::no_filter(),
1761            Arc::<DecoderPlugins>::default(),
1762            false,
1763            version,
1764            None,
1765        )
1766        .await
1767        .unwrap();
1768
1769        assert_eq!(data, decoded);
1770    }
1771
1772    #[rstest]
1773    #[test_log::test(tokio::test)]
1774    async fn test_projection(
1775        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1776    ) {
1777        let fs = FsFixture::default();
1778
1779        let written_file = create_some_file(&fs, version).await;
1780        let file_scheduler = fs
1781            .scheduler
1782            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1783            .await
1784            .unwrap();
1785
1786        let field_id_mapping = written_file
1787            .field_id_mapping
1788            .iter()
1789            .copied()
1790            .collect::<BTreeMap<_, _>>();
1791
1792        let empty_projection = ReaderProjection {
1793            column_indices: Vec::default(),
1794            schema: Arc::new(Schema::default()),
1795        };
1796
1797        for columns in [
1798            vec!["score"],
1799            vec!["location"],
1800            vec!["categories"],
1801            vec!["score.x"],
1802            vec!["score", "categories"],
1803            vec!["score", "location"],
1804            vec!["location", "categories"],
1805            vec!["score.y", "location", "categories"],
1806        ] {
1807            debug!("Testing round trip with projection {:?}", columns);
1808            for use_field_ids in [true, false] {
1809                // We can specify the projection as part of the read operation via read_stream_projected
1810                let file_reader = FileReader::try_open(
1811                    file_scheduler.clone(),
1812                    None,
1813                    Arc::<DecoderPlugins>::default(),
1814                    &test_cache(),
1815                    FileReaderOptions::default(),
1816                )
1817                .await
1818                .unwrap();
1819
1820                let projected_schema = written_file.schema.project(&columns).unwrap();
1821                let projection = if use_field_ids {
1822                    ReaderProjection::from_field_ids(
1823                        file_reader.metadata.version(),
1824                        &projected_schema,
1825                        &field_id_mapping,
1826                    )
1827                    .unwrap()
1828                } else {
1829                    ReaderProjection::from_column_names(
1830                        file_reader.metadata.version(),
1831                        &written_file.schema,
1832                        &columns,
1833                    )
1834                    .unwrap()
1835                };
1836
1837                let batch_stream = file_reader
1838                    .read_stream_projected(
1839                        lance_io::ReadBatchParams::RangeFull,
1840                        1024,
1841                        16,
1842                        projection.clone(),
1843                        FilterExpression::no_filter(),
1844                    )
1845                    .unwrap();
1846
1847                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1848                verify_expected(
1849                    &written_file.data,
1850                    batch_stream,
1851                    1024,
1852                    Some(Box::new(move |batch: &RecordBatch| {
1853                        batch.project_by_schema(&projection_arrow).unwrap()
1854                    })),
1855                )
1856                .await;
1857
1858                // We can also specify the projection as a base projection when we open the file
1859                let file_reader = FileReader::try_open(
1860                    file_scheduler.clone(),
1861                    Some(projection.clone()),
1862                    Arc::<DecoderPlugins>::default(),
1863                    &test_cache(),
1864                    FileReaderOptions::default(),
1865                )
1866                .await
1867                .unwrap();
1868
1869                let batch_stream = file_reader
1870                    .read_stream(
1871                        lance_io::ReadBatchParams::RangeFull,
1872                        1024,
1873                        16,
1874                        FilterExpression::no_filter(),
1875                    )
1876                    .unwrap();
1877
1878                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1879                verify_expected(
1880                    &written_file.data,
1881                    batch_stream,
1882                    1024,
1883                    Some(Box::new(move |batch: &RecordBatch| {
1884                        batch.project_by_schema(&projection_arrow).unwrap()
1885                    })),
1886                )
1887                .await;
1888
1889                assert!(file_reader
1890                    .read_stream_projected(
1891                        lance_io::ReadBatchParams::RangeFull,
1892                        1024,
1893                        16,
1894                        empty_projection.clone(),
1895                        FilterExpression::no_filter(),
1896                    )
1897                    .is_err());
1898            }
1899        }
1900
1901        assert!(FileReader::try_open(
1902            file_scheduler.clone(),
1903            Some(empty_projection),
1904            Arc::<DecoderPlugins>::default(),
1905            &test_cache(),
1906            FileReaderOptions::default(),
1907        )
1908        .await
1909        .is_err());
1910
1911        let arrow_schema = ArrowSchema::new(vec![
1912            Field::new("x", DataType::Int32, true),
1913            Field::new("y", DataType::Int32, true),
1914        ]);
1915        let schema = Schema::try_from(&arrow_schema).unwrap();
1916
1917        let projection_with_dupes = ReaderProjection {
1918            column_indices: vec![0, 0],
1919            schema: Arc::new(schema),
1920        };
1921
1922        assert!(FileReader::try_open(
1923            file_scheduler.clone(),
1924            Some(projection_with_dupes),
1925            Arc::<DecoderPlugins>::default(),
1926            &test_cache(),
1927            FileReaderOptions::default(),
1928        )
1929        .await
1930        .is_err());
1931    }
1932
1933    #[test_log::test(tokio::test)]
1934    async fn test_compressing_buffer() {
1935        let fs = FsFixture::default();
1936
1937        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1938        let file_scheduler = fs
1939            .scheduler
1940            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1941            .await
1942            .unwrap();
1943
1944        // We can specify the projection as part of the read operation via read_stream_projected
1945        let file_reader = FileReader::try_open(
1946            file_scheduler.clone(),
1947            None,
1948            Arc::<DecoderPlugins>::default(),
1949            &test_cache(),
1950            FileReaderOptions::default(),
1951        )
1952        .await
1953        .unwrap();
1954
1955        let mut projection = written_file.schema.project(&["score"]).unwrap();
1956        for field in projection.fields.iter_mut() {
1957            field
1958                .metadata
1959                .insert("lance:compression".to_string(), "zstd".to_string());
1960        }
1961        let projection = ReaderProjection {
1962            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1963            schema: Arc::new(projection),
1964        };
1965
1966        let batch_stream = file_reader
1967            .read_stream_projected(
1968                lance_io::ReadBatchParams::RangeFull,
1969                1024,
1970                16,
1971                projection.clone(),
1972                FilterExpression::no_filter(),
1973            )
1974            .unwrap();
1975
1976        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1977        verify_expected(
1978            &written_file.data,
1979            batch_stream,
1980            1024,
1981            Some(Box::new(move |batch: &RecordBatch| {
1982                batch.project_by_schema(&projection_arrow).unwrap()
1983            })),
1984        )
1985        .await;
1986    }
1987
1988    #[tokio::test]
1989    async fn test_read_all() {
1990        let fs = FsFixture::default();
1991        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1992        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1993
1994        let file_scheduler = fs
1995            .scheduler
1996            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1997            .await
1998            .unwrap();
1999        let file_reader = FileReader::try_open(
2000            file_scheduler.clone(),
2001            None,
2002            Arc::<DecoderPlugins>::default(),
2003            &test_cache(),
2004            FileReaderOptions::default(),
2005        )
2006        .await
2007        .unwrap();
2008
2009        let batches = file_reader
2010            .read_stream(
2011                lance_io::ReadBatchParams::RangeFull,
2012                total_rows as u32,
2013                16,
2014                FilterExpression::no_filter(),
2015            )
2016            .unwrap()
2017            .try_collect::<Vec<_>>()
2018            .await
2019            .unwrap();
2020        assert_eq!(batches.len(), 1);
2021        assert_eq!(batches[0].num_rows(), total_rows);
2022    }
2023
2024    #[rstest]
2025    #[tokio::test]
2026    async fn test_blocking_take(
2027        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
2028    ) {
2029        let fs = FsFixture::default();
2030        let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2031        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2032
2033        let file_scheduler = fs
2034            .scheduler
2035            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2036            .await
2037            .unwrap();
2038        let file_reader = FileReader::try_open(
2039            file_scheduler.clone(),
2040            Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2041            Arc::<DecoderPlugins>::default(),
2042            &test_cache(),
2043            FileReaderOptions::default(),
2044        )
2045        .await
2046        .unwrap();
2047
2048        let batches = tokio::task::spawn_blocking(move || {
2049            file_reader
2050                .read_stream_projected_blocking(
2051                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2052                    total_rows as u32,
2053                    None,
2054                    FilterExpression::no_filter(),
2055                )
2056                .unwrap()
2057                .collect::<ArrowResult<Vec<_>>>()
2058                .unwrap()
2059        })
2060        .await
2061        .unwrap();
2062
2063        assert_eq!(batches.len(), 1);
2064        assert_eq!(batches[0].num_rows(), 5);
2065        assert_eq!(batches[0].num_columns(), 1);
2066    }
2067
2068    #[tokio::test(flavor = "multi_thread")]
2069    async fn test_drop_in_progress() {
2070        let fs = FsFixture::default();
2071        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2072        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2073
2074        let file_scheduler = fs
2075            .scheduler
2076            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2077            .await
2078            .unwrap();
2079        let file_reader = FileReader::try_open(
2080            file_scheduler.clone(),
2081            None,
2082            Arc::<DecoderPlugins>::default(),
2083            &test_cache(),
2084            FileReaderOptions::default(),
2085        )
2086        .await
2087        .unwrap();
2088
2089        let mut batches = file_reader
2090            .read_stream(
2091                lance_io::ReadBatchParams::RangeFull,
2092                (total_rows / 10) as u32,
2093                16,
2094                FilterExpression::no_filter(),
2095            )
2096            .unwrap();
2097
2098        drop(file_reader);
2099
2100        let batch = batches.next().await.unwrap().unwrap();
2101        assert!(batch.num_rows() > 0);
2102
2103        // Drop in-progress scan
2104        drop(batches);
2105    }
2106
2107    #[tokio::test]
2108    async fn drop_while_scheduling() {
2109        // This is a bit of a white-box test, pokes at the internals.  We want to
2110        // test the case where the read stream is dropped before the scheduling
2111        // thread finishes.  We can't do that in a black-box fashion because the
2112        // scheduling thread runs in the background and there is no easy way to
2113        // pause / gate it.
2114
2115        // It's a regression for a bug where the scheduling thread would panic
2116        // if the stream was dropped before it finished.
2117
2118        let fs = FsFixture::default();
2119        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2120        let total_rows = written_file
2121            .data
2122            .iter()
2123            .map(|batch| batch.num_rows())
2124            .sum::<usize>();
2125
2126        let file_scheduler = fs
2127            .scheduler
2128            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2129            .await
2130            .unwrap();
2131        let file_reader = FileReader::try_open(
2132            file_scheduler.clone(),
2133            None,
2134            Arc::<DecoderPlugins>::default(),
2135            &test_cache(),
2136            FileReaderOptions::default(),
2137        )
2138        .await
2139        .unwrap();
2140
2141        let projection =
2142            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2143        let column_infos = file_reader
2144            .collect_columns_from_projection(&projection)
2145            .unwrap();
2146        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2147            &projection.schema,
2148            &projection.column_indices,
2149            &column_infos,
2150            &vec![],
2151            total_rows as u64,
2152            Arc::<DecoderPlugins>::default(),
2153            file_reader.scheduler.clone(),
2154            test_cache(),
2155            &FilterExpression::no_filter(),
2156            &DecoderConfig::default(),
2157        )
2158        .await
2159        .unwrap();
2160
2161        let range = 0..total_rows as u64;
2162
2163        let (tx, rx) = mpsc::unbounded_channel();
2164
2165        // Simulate the stream / decoder being dropped
2166        drop(rx);
2167
2168        // Scheduling should not panic
2169        decode_scheduler.schedule_range(
2170            range,
2171            &FilterExpression::no_filter(),
2172            tx,
2173            file_reader.scheduler.clone(),
2174        )
2175    }
2176
2177    #[tokio::test]
2178    async fn test_read_empty_range() {
2179        let fs = FsFixture::default();
2180        create_some_file(&fs, LanceFileVersion::V2_0).await;
2181
2182        let file_scheduler = fs
2183            .scheduler
2184            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2185            .await
2186            .unwrap();
2187        let file_reader = FileReader::try_open(
2188            file_scheduler.clone(),
2189            None,
2190            Arc::<DecoderPlugins>::default(),
2191            &test_cache(),
2192            FileReaderOptions::default(),
2193        )
2194        .await
2195        .unwrap();
2196
2197        // All ranges empty, no data
2198        let batches = file_reader
2199            .read_stream(
2200                lance_io::ReadBatchParams::Range(0..0),
2201                1024,
2202                16,
2203                FilterExpression::no_filter(),
2204            )
2205            .unwrap()
2206            .try_collect::<Vec<_>>()
2207            .await
2208            .unwrap();
2209
2210        assert_eq!(batches.len(), 0);
2211
2212        // Some ranges empty
2213        let batches = file_reader
2214            .read_stream(
2215                lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2216                1024,
2217                16,
2218                FilterExpression::no_filter(),
2219            )
2220            .unwrap()
2221            .try_collect::<Vec<_>>()
2222            .await
2223            .unwrap();
2224        assert_eq!(batches.len(), 1);
2225    }
2226
2227    #[tokio::test]
2228    async fn test_global_buffers() {
2229        let fs = FsFixture::default();
2230
2231        let lance_schema =
2232            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2233                "foo",
2234                DataType::Int32,
2235                true,
2236            )]))
2237            .unwrap();
2238
2239        let mut file_writer = FileWriter::try_new(
2240            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2241            lance_schema.clone(),
2242            FileWriterOptions::default(),
2243        )
2244        .unwrap();
2245
2246        let test_bytes = Bytes::from_static(b"hello");
2247
2248        let buf_index = file_writer
2249            .add_global_buffer(test_bytes.clone())
2250            .await
2251            .unwrap();
2252
2253        assert_eq!(buf_index, 1);
2254
2255        file_writer.finish().await.unwrap();
2256
2257        let file_scheduler = fs
2258            .scheduler
2259            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2260            .await
2261            .unwrap();
2262        let file_reader = FileReader::try_open(
2263            file_scheduler.clone(),
2264            None,
2265            Arc::<DecoderPlugins>::default(),
2266            &test_cache(),
2267            FileReaderOptions::default(),
2268        )
2269        .await
2270        .unwrap();
2271
2272        let buf = file_reader.read_global_buffer(1).await.unwrap();
2273        assert_eq!(buf, test_bytes);
2274    }
2275}