Skip to main content

lance_file/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19    decoder::{
20        schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21        DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22        SchedulerDecoderConfig,
23    },
24    encoder::EncodedBatch,
25    version::LanceFileVersion,
26    EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36    Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41    scheduler::FileScheduler,
42    stream::{RecordBatchStream, RecordBatchStreamAdapter},
43    ReadBatchParams,
44};
45
46use crate::{
47    datatypes::{Fields, FieldsWithMeta},
48    format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49    io::LanceEncodingsIo,
50    writer::PAGE_BUFFER_ALIGNMENT,
51};
52
53/// Default chunk size for reading large pages (8MiB)
54/// Pages larger than this will be split into multiple chunks during read
55pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
56
57// For now, we don't use global buffers for anything other than schema.  If we
58// use these later we should make them lazily loaded and then cached once loaded.
59//
60// We store their position / length for debugging purposes
61#[derive(Debug, DeepSizeOf)]
62pub struct BufferDescriptor {
63    pub position: u64,
64    pub size: u64,
65}
66
67/// Statistics summarize some of the file metadata for quick summary info
68#[derive(Debug)]
69pub struct FileStatistics {
70    /// Statistics about each of the columns in the file
71    pub columns: Vec<ColumnStatistics>,
72}
73
74/// Summary information describing a column
75#[derive(Debug)]
76pub struct ColumnStatistics {
77    /// The number of pages in the column
78    pub num_pages: usize,
79    /// The total number of data & metadata bytes in the column
80    ///
81    /// This is the compressed on-disk size
82    pub size_bytes: u64,
83}
84
85// TODO: Caching
86#[derive(Debug)]
87pub struct CachedFileMetadata {
88    /// The schema of the file
89    pub file_schema: Arc<Schema>,
90    /// The column metadatas
91    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
92    pub column_infos: Vec<Arc<ColumnInfo>>,
93    /// The number of rows in the file
94    pub num_rows: u64,
95    pub file_buffers: Vec<BufferDescriptor>,
96    /// The number of bytes contained in the data page section of the file
97    pub num_data_bytes: u64,
98    /// The number of bytes contained in the column metadata (not including buffers
99    /// referenced by the metadata)
100    pub num_column_metadata_bytes: u64,
101    /// The number of bytes contained in global buffers
102    pub num_global_buffer_bytes: u64,
103    /// The number of bytes contained in the CMO and GBO tables
104    pub num_footer_bytes: u64,
105    pub major_version: u16,
106    pub minor_version: u16,
107}
108
109impl DeepSizeOf for CachedFileMetadata {
110    // TODO: include size for `column_metadatas` and `column_infos`.
111    fn deep_size_of_children(&self, context: &mut Context) -> usize {
112        self.file_schema.deep_size_of_children(context)
113            + self
114                .file_buffers
115                .iter()
116                .map(|file_buffer| file_buffer.deep_size_of_children(context))
117                .sum::<usize>()
118    }
119}
120
121impl CachedFileMetadata {
122    pub fn version(&self) -> LanceFileVersion {
123        match (self.major_version, self.minor_version) {
124            (0, 3) => LanceFileVersion::V2_0,
125            (2, 0) => LanceFileVersion::V2_0,
126            (2, 1) => LanceFileVersion::V2_1,
127            (2, 2) => LanceFileVersion::V2_2,
128            _ => panic!(
129                "Unsupported version: {}.{}",
130                self.major_version, self.minor_version
131            ),
132        }
133    }
134}
135
136/// Selecting columns from a lance file requires specifying both the
137/// index of the column and the data type of the column
138///
139/// Partly, this is because it is not strictly required that columns
140/// be read into the same type.  For example, a string column may be
141/// read as a string, large_string or string_view type.
142///
143/// A read will only succeed if the decoder for a column is capable
144/// of decoding into the requested type.
145///
146/// Note that this should generally be limited to different in-memory
147/// representations of the same semantic type.  An encoding could
148/// theoretically support "casting" (e.g. int to string, etc.) but
149/// there is little advantage in doing so here.
150///
151/// Note: in order to specify a projection the user will need some way
152/// to figure out the column indices.  In the table format we do this
153/// using field IDs and keeping track of the field id->column index mapping.
154///
155/// If users are not using the table format then they will need to figure
156/// out some way to do this themselves.
157#[derive(Debug, Clone)]
158pub struct ReaderProjection {
159    /// The data types (schema) of the selected columns.  The names
160    /// of the schema are arbitrary and ignored.
161    pub schema: Arc<Schema>,
162    /// The indices of the columns to load.
163    ///
164    /// The content of this vector depends on the file version.
165    ///
166    /// In Lance File Version 2.0 we need ids for structural fields as
167    /// well as leaf fields:
168    ///
169    ///   - Primitive: the index of the column in the schema
170    ///   - List: the index of the list column in the schema
171    ///     followed by the column indices of the children
172    ///   - FixedSizeList (of primitive): the index of the column in the schema
173    ///     (this case is not nested)
174    ///   - FixedSizeList (of non-primitive): not yet implemented
175    ///   - Dictionary: same as primitive
176    ///   - Struct: the index of the struct column in the schema
177    ///     followed by the column indices of the children
178    ///
179    ///   In other words, this should be a DFS listing of the desired schema.
180    ///
181    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
182    /// fields are completely transparent.
183    ///
184    /// For example, if the goal is to load:
185    ///
186    ///   x: int32
187    ///   y: `struct<z: int32, w: string>`
188    ///   z: `list<int32>`
189    ///
190    /// and the schema originally used to store the data was:
191    ///
192    ///   a: `struct<x: int32>`
193    ///   b: int64
194    ///   y: `struct<z: int32, c: int64, w: string>`
195    ///   z: `list<int32>`
196    ///
197    /// Then the column_indices should be:
198    ///
199    /// - 2.0: [1, 3, 4, 6, 7, 8]
200    /// - 2.1: [0, 2, 4, 5]
201    pub column_indices: Vec<u32>,
202}
203
204impl ReaderProjection {
205    fn from_field_ids_helper<'a>(
206        file_version: LanceFileVersion,
207        fields: impl Iterator<Item = &'a Field>,
208        field_id_to_column_index: &BTreeMap<u32, u32>,
209        column_indices: &mut Vec<u32>,
210    ) -> Result<()> {
211        for field in fields {
212            let is_structural = file_version >= LanceFileVersion::V2_1;
213            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
214            // we only need ids for leaf fields.
215            if !is_structural
216                || field.children.is_empty()
217                || field.is_blob()
218                || field.is_packed_struct()
219            {
220                if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
221                {
222                    column_indices.push(column_idx);
223                }
224            }
225            // Don't recurse into children if the field is a blob or packed struct in 2.1
226            if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
227                Self::from_field_ids_helper(
228                    file_version,
229                    field.children.iter(),
230                    field_id_to_column_index,
231                    column_indices,
232                )?;
233            }
234        }
235        Ok(())
236    }
237
238    /// Creates a projection using a mapping from field IDs to column indices
239    ///
240    /// You can obtain such a mapping when the file is written using the
241    /// [`crate::writer::FileWriter::field_id_to_column_indices`] method.
242    pub fn from_field_ids(
243        file_version: LanceFileVersion,
244        schema: &Schema,
245        field_id_to_column_index: &BTreeMap<u32, u32>,
246    ) -> Result<Self> {
247        let mut column_indices = Vec::new();
248        Self::from_field_ids_helper(
249            file_version,
250            schema.fields.iter(),
251            field_id_to_column_index,
252            &mut column_indices,
253        )?;
254        let projection = Self {
255            schema: Arc::new(schema.clone()),
256            column_indices,
257        };
258        Ok(projection)
259    }
260
261    /// Creates a projection that reads the entire file
262    ///
263    /// If the schema provided is not the schema of the entire file then
264    /// the projection will be invalid and the read will fail.
265    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
266    /// the whole struct has one column index.
267    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
268    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
269        let schema = Arc::new(schema.clone());
270        let is_structural = version >= LanceFileVersion::V2_1;
271        let mut column_indices = vec![];
272        let mut curr_column_idx = 0;
273        let mut packed_struct_fields_num = 0;
274        for field in schema.fields_pre_order() {
275            if packed_struct_fields_num > 0 {
276                packed_struct_fields_num -= 1;
277                continue;
278            }
279            if field.is_packed_struct() {
280                column_indices.push(curr_column_idx);
281                curr_column_idx += 1;
282                packed_struct_fields_num = field.children.len();
283            } else if field.children.is_empty() || !is_structural {
284                column_indices.push(curr_column_idx);
285                curr_column_idx += 1;
286            }
287        }
288        Self {
289            schema,
290            column_indices,
291        }
292    }
293
294    /// Creates a projection that reads the specified columns provided by name
295    ///
296    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
297    ///
298    /// If the schema provided is not the schema of the entire file then
299    /// the projection will be invalid and the read will fail.
300    pub fn from_column_names(
301        file_version: LanceFileVersion,
302        schema: &Schema,
303        column_names: &[&str],
304    ) -> Result<Self> {
305        let field_id_to_column_index = schema
306            .fields_pre_order()
307            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
308            // we only need ids for leaf fields.
309            .filter(|field| {
310                file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
311            })
312            .enumerate()
313            .map(|(idx, field)| (field.id as u32, idx as u32))
314            .collect::<BTreeMap<_, _>>();
315        let projected = schema.project(column_names)?;
316        let mut column_indices = Vec::new();
317        Self::from_field_ids_helper(
318            file_version,
319            projected.fields.iter(),
320            &field_id_to_column_index,
321            &mut column_indices,
322        )?;
323        Ok(Self {
324            schema: Arc::new(projected),
325            column_indices,
326        })
327    }
328}
329
330/// File Reader Options that can control reading behaviors, such as whether to enable caching on repetition indices
331#[derive(Clone, Debug)]
332pub struct FileReaderOptions {
333    pub decoder_config: DecoderConfig,
334    /// Size of chunks when reading large pages. Pages larger than this
335    /// will be read in multiple chunks to control memory usage.
336    /// Default: 8MB (DEFAULT_READ_CHUNK_SIZE)
337    pub read_chunk_size: u64,
338}
339
340impl Default for FileReaderOptions {
341    fn default() -> Self {
342        Self {
343            decoder_config: DecoderConfig::default(),
344            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
345        }
346    }
347}
348
349#[derive(Debug)]
350pub struct FileReader {
351    scheduler: Arc<dyn EncodingsIo>,
352    // The default projection to be applied to all reads
353    base_projection: ReaderProjection,
354    num_rows: u64,
355    metadata: Arc<CachedFileMetadata>,
356    decoder_plugins: Arc<DecoderPlugins>,
357    cache: Arc<LanceCache>,
358    options: FileReaderOptions,
359}
360#[derive(Debug)]
361struct Footer {
362    #[allow(dead_code)]
363    column_meta_start: u64,
364    // We don't use this today because we always load metadata for every column
365    // and don't yet support "metadata projection"
366    #[allow(dead_code)]
367    column_meta_offsets_start: u64,
368    global_buff_offsets_start: u64,
369    num_global_buffers: u32,
370    num_columns: u32,
371    major_version: u16,
372    minor_version: u16,
373}
374
375const FOOTER_LEN: usize = 40;
376
377impl FileReader {
378    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
379        Self {
380            scheduler,
381            base_projection: self.base_projection.clone(),
382            cache: self.cache.clone(),
383            decoder_plugins: self.decoder_plugins.clone(),
384            metadata: self.metadata.clone(),
385            options: self.options.clone(),
386            num_rows: self.num_rows,
387        }
388    }
389
390    pub fn num_rows(&self) -> u64 {
391        self.num_rows
392    }
393
394    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
395        &self.metadata
396    }
397
398    pub fn file_statistics(&self) -> FileStatistics {
399        let column_metadatas = &self.metadata().column_metadatas;
400
401        let column_stats = column_metadatas
402            .iter()
403            .map(|col_metadata| {
404                let num_pages = col_metadata.pages.len();
405                let size_bytes = col_metadata
406                    .pages
407                    .iter()
408                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
409                    .sum::<u64>();
410                ColumnStatistics {
411                    num_pages,
412                    size_bytes,
413                }
414            })
415            .collect();
416
417        FileStatistics {
418            columns: column_stats,
419        }
420    }
421
422    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
423        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
424        self.scheduler
425            .submit_single(
426                buffer_desc.position..buffer_desc.position + buffer_desc.size,
427                0,
428            )
429            .await
430    }
431
432    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
433        let file_size = scheduler.reader().size().await? as u64;
434        let begin = if file_size < scheduler.reader().block_size() as u64 {
435            0
436        } else {
437            file_size - scheduler.reader().block_size() as u64
438        };
439        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
440        Ok((tail_bytes, file_size))
441    }
442
443    // Checks to make sure the footer is written correctly and returns the
444    // position of the file descriptor (which comes from the footer)
445    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
446        let len = footer_bytes.len();
447        if len < FOOTER_LEN {
448            return Err(Error::invalid_input(
449                format!(
450                    "does not have sufficient data, len: {}, bytes: {:?}",
451                    len, footer_bytes
452                ),
453                location!(),
454            ));
455        }
456        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
457
458        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
459        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
460        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
461        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
462        let num_columns = cursor.read_u32::<LittleEndian>()?;
463        let major_version = cursor.read_u16::<LittleEndian>()?;
464        let minor_version = cursor.read_u16::<LittleEndian>()?;
465
466        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
467            return Err(Error::version_conflict(
468                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
469                major_version,
470                minor_version,
471                location!(),
472            ));
473        }
474
475        let magic_bytes = footer_bytes.slice(len - 4..);
476        if magic_bytes.as_ref() != MAGIC {
477            return Err(Error::invalid_input(
478                format!(
479                    "file does not appear to be a Lance file (invalid magic: {:?})",
480                    MAGIC
481                ),
482                location!(),
483            ));
484        }
485        Ok(Footer {
486            column_meta_start,
487            column_meta_offsets_start,
488            global_buff_offsets_start,
489            num_global_buffers,
490            num_columns,
491            major_version,
492            minor_version,
493        })
494    }
495
496    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
497    fn read_all_column_metadata(
498        column_metadata_bytes: Bytes,
499        footer: &Footer,
500    ) -> Result<Vec<pbfile::ColumnMetadata>> {
501        let column_metadata_start = footer.column_meta_start;
502        // cmo == column_metadata_offsets
503        let cmo_table_size = 16 * footer.num_columns as usize;
504        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
505
506        (0..footer.num_columns)
507            .map(|col_idx| {
508                let offset = (col_idx * 16) as usize;
509                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
510                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
511                let normalized_position = (position - column_metadata_start) as usize;
512                let normalized_end = normalized_position + (length as usize);
513                Ok(pbfile::ColumnMetadata::decode(
514                    &column_metadata_bytes[normalized_position..normalized_end],
515                )?)
516            })
517            .collect::<Result<Vec<_>>>()
518    }
519
520    async fn optimistic_tail_read(
521        data: &Bytes,
522        start_pos: u64,
523        scheduler: &FileScheduler,
524        file_len: u64,
525    ) -> Result<Bytes> {
526        let num_bytes_needed = (file_len - start_pos) as usize;
527        if data.len() >= num_bytes_needed {
528            Ok(data.slice((data.len() - num_bytes_needed)..))
529        } else {
530            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
531            let start = file_len - num_bytes_needed as u64;
532            let missing_bytes = scheduler
533                .submit_single(start..start + num_bytes_missing, 0)
534                .await?;
535            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
536            combined.extend(missing_bytes);
537            combined.extend(data);
538            Ok(combined.freeze())
539        }
540    }
541
542    fn do_decode_gbo_table(
543        gbo_bytes: &Bytes,
544        footer: &Footer,
545        version: LanceFileVersion,
546    ) -> Result<Vec<BufferDescriptor>> {
547        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
548
549        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
550        for _ in 0..footer.num_global_buffers {
551            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
552            assert!(
553                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
554            );
555            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
556            global_buffers.push(BufferDescriptor {
557                position: buf_pos,
558                size: buf_size,
559            });
560        }
561
562        Ok(global_buffers)
563    }
564
565    async fn decode_gbo_table(
566        tail_bytes: &Bytes,
567        file_len: u64,
568        scheduler: &FileScheduler,
569        footer: &Footer,
570        version: LanceFileVersion,
571    ) -> Result<Vec<BufferDescriptor>> {
572        // This could, in theory, trigger another IOP but the GBO table should never be large
573        // enough for that to happen
574        let gbo_bytes = Self::optimistic_tail_read(
575            tail_bytes,
576            footer.global_buff_offsets_start,
577            scheduler,
578            file_len,
579        )
580        .await?;
581        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
582    }
583
584    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
585        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
586        let pb_schema = file_descriptor.schema.unwrap();
587        let num_rows = file_descriptor.length;
588        let fields_with_meta = FieldsWithMeta {
589            fields: Fields(pb_schema.fields),
590            metadata: pb_schema.metadata,
591        };
592        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
593        Ok((num_rows, schema))
594    }
595
596    // TODO: Support late projection.  Currently, if we want to perform a
597    // projected read of a file, we load all of the column metadata, and then
598    // only read the column data that is requested.  This is fine for most cases.
599    //
600    // However, if there are many columns then loading all of the column metadata
601    // may be expensive.  We should support a mode where we only load the column
602    // metadata for the columns that are requested (the file format supports this).
603    //
604    // The main challenge is that we either need to ignore the column metadata cache
605    // or have a more sophisticated cache that can cache per-column metadata.
606    //
607    // Also, if the number of columns is fairly small, it's faster to read them as a
608    // single IOP, but we can fix this through coalescing.
609    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
610        // 1. read the footer
611        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
612        let footer = Self::decode_footer(&tail_bytes)?;
613
614        let file_version = LanceFileVersion::try_from_major_minor(
615            footer.major_version as u32,
616            footer.minor_version as u32,
617        )?;
618
619        let gbo_table =
620            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
621        if gbo_table.is_empty() {
622            return Err(Error::Internal {
623                message: "File did not contain any global buffers, schema expected".to_string(),
624                location: location!(),
625            });
626        }
627        let schema_start = gbo_table[0].position;
628        let schema_size = gbo_table[0].size;
629
630        let num_footer_bytes = file_len - schema_start;
631
632        // By default we read all column metadatas.  We do NOT read the column metadata buffers
633        // at this point.  We only want to read the column metadata for columns we are actually loading.
634        let all_metadata_bytes =
635            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
636
637        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
638        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
639
640        // Next, read the metadata for the columns
641        // This is both the column metadata and the CMO table
642        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
643        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
644        let column_metadata_bytes =
645            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
646        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
647
648        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
649        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
650        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
651
652        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
653
654        Ok(CachedFileMetadata {
655            file_schema: Arc::new(schema),
656            column_metadatas,
657            column_infos,
658            num_rows,
659            num_data_bytes,
660            num_column_metadata_bytes,
661            num_global_buffer_bytes,
662            num_footer_bytes,
663            file_buffers: gbo_table,
664            major_version: footer.major_version,
665            minor_version: footer.minor_version,
666        })
667    }
668
669    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
670        match &encoding.location {
671            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
672            Some(pbfile::encoding::Location::Direct(encoding)) => {
673                let encoding_buf = Bytes::from(encoding.encoding.clone());
674                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
675                encoding_any.to_msg::<M>().unwrap()
676            }
677            Some(pbfile::encoding::Location::None(_)) => panic!(),
678            None => panic!(),
679        }
680    }
681
682    fn meta_to_col_infos(
683        column_metadatas: &[pbfile::ColumnMetadata],
684        file_version: LanceFileVersion,
685    ) -> Vec<Arc<ColumnInfo>> {
686        column_metadatas
687            .iter()
688            .enumerate()
689            .map(|(col_idx, col_meta)| {
690                let page_infos = col_meta
691                    .pages
692                    .iter()
693                    .map(|page| {
694                        let num_rows = page.length;
695                        let encoding = match file_version {
696                            LanceFileVersion::V2_0 => {
697                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
698                                    page.encoding.as_ref().unwrap(),
699                                ))
700                            }
701                            _ => PageEncoding::Structural(Self::fetch_encoding::<
702                                pbenc21::PageLayout,
703                            >(
704                                page.encoding.as_ref().unwrap()
705                            )),
706                        };
707                        let buffer_offsets_and_sizes = Arc::from(
708                            page.buffer_offsets
709                                .iter()
710                                .zip(page.buffer_sizes.iter())
711                                .map(|(offset, size)| {
712                                    // Starting with version 2.1 we can assert that page buffers are aligned
713                                    assert!(
714                                        file_version < LanceFileVersion::V2_1
715                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
716                                    );
717                                    (*offset, *size)
718                                })
719                                .collect::<Vec<_>>(),
720                        );
721                        PageInfo {
722                            buffer_offsets_and_sizes,
723                            encoding,
724                            num_rows,
725                            priority: page.priority,
726                        }
727                    })
728                    .collect::<Vec<_>>();
729                let buffer_offsets_and_sizes = Arc::from(
730                    col_meta
731                        .buffer_offsets
732                        .iter()
733                        .zip(col_meta.buffer_sizes.iter())
734                        .map(|(offset, size)| (*offset, *size))
735                        .collect::<Vec<_>>(),
736                );
737                Arc::new(ColumnInfo {
738                    index: col_idx as u32,
739                    page_infos: Arc::from(page_infos),
740                    buffer_offsets_and_sizes,
741                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
742                })
743            })
744            .collect::<Vec<_>>()
745    }
746
747    fn validate_projection(
748        projection: &ReaderProjection,
749        metadata: &CachedFileMetadata,
750    ) -> Result<()> {
751        if projection.schema.fields.is_empty() {
752            return Err(Error::invalid_input(
753                "Attempt to read zero columns from the file, at least one column must be specified"
754                    .to_string(),
755                location!(),
756            ));
757        }
758        let mut column_indices_seen = BTreeSet::new();
759        for column_index in &projection.column_indices {
760            if !column_indices_seen.insert(*column_index) {
761                return Err(Error::invalid_input(
762                    format!(
763                        "The projection specified the column index {} more than once",
764                        column_index
765                    ),
766                    location!(),
767                ));
768            }
769            if *column_index >= metadata.column_infos.len() as u32 {
770                return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
771            }
772        }
773        Ok(())
774    }
775
776    /// Opens a new file reader without any pre-existing knowledge
777    ///
778    /// This will read the file schema from the file itself and thus requires a bit more I/O
779    ///
780    /// A `base_projection` can also be provided.  If provided, then the projection will apply
781    /// to all reads from the file that do not specify their own projection.
782    pub async fn try_open(
783        scheduler: FileScheduler,
784        base_projection: Option<ReaderProjection>,
785        decoder_plugins: Arc<DecoderPlugins>,
786        cache: &LanceCache,
787        options: FileReaderOptions,
788    ) -> Result<Self> {
789        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
790        let path = scheduler.reader().path().clone();
791
792        // Create LanceEncodingsIo with read chunk size from options
793        let encodings_io =
794            LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
795
796        Self::try_open_with_file_metadata(
797            Arc::new(encodings_io),
798            path,
799            base_projection,
800            decoder_plugins,
801            file_metadata,
802            cache,
803            options,
804        )
805        .await
806    }
807
808    /// Same as `try_open` but with the file metadata already loaded.
809    ///
810    /// This method also can accept any kind of `EncodingsIo` implementation allowing
811    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
812    /// disks it may be better to avoid asynchronous overhead).
813    pub async fn try_open_with_file_metadata(
814        scheduler: Arc<dyn EncodingsIo>,
815        path: Path,
816        base_projection: Option<ReaderProjection>,
817        decoder_plugins: Arc<DecoderPlugins>,
818        file_metadata: Arc<CachedFileMetadata>,
819        cache: &LanceCache,
820        options: FileReaderOptions,
821    ) -> Result<Self> {
822        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
823
824        if let Some(base_projection) = base_projection.as_ref() {
825            Self::validate_projection(base_projection, &file_metadata)?;
826        }
827        let num_rows = file_metadata.num_rows;
828        Ok(Self {
829            scheduler,
830            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
831                file_metadata.file_schema.as_ref(),
832                file_metadata.version(),
833            )),
834            num_rows,
835            metadata: file_metadata,
836            decoder_plugins,
837            cache,
838            options,
839        })
840    }
841
842    // The actual decoder needs all the column infos that make up a type.  In other words, if
843    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
844    //
845    // This is a file reader concern because the file reader needs to support late projection of columns
846    // and so it will need to figure this out anyways.
847    //
848    // It's a bit of a tricky process though because the number of column infos may depend on the
849    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
850    // only be a single column in the file (and not 3).
851    //
852    // At the moment this method words because our rules are simple and we just repeat them here.  See
853    // Self::default_projection for a similar problem.  In the future this is something the encodings
854    // registry will need to figure out.
855    fn collect_columns_from_projection(
856        &self,
857        _projection: &ReaderProjection,
858    ) -> Result<Vec<Arc<ColumnInfo>>> {
859        Ok(self.metadata.column_infos.clone())
860    }
861
862    #[allow(clippy::too_many_arguments)]
863    fn do_read_range(
864        column_infos: Vec<Arc<ColumnInfo>>,
865        io: Arc<dyn EncodingsIo>,
866        cache: Arc<LanceCache>,
867        num_rows: u64,
868        decoder_plugins: Arc<DecoderPlugins>,
869        range: Range<u64>,
870        batch_size: u32,
871        projection: ReaderProjection,
872        filter: FilterExpression,
873        decoder_config: DecoderConfig,
874    ) -> Result<BoxStream<'static, ReadBatchTask>> {
875        debug!(
876            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
877            range,
878            batch_size,
879            num_rows,
880            column_infos.len(),
881            projection.schema.fields.len(),
882        );
883
884        let config = SchedulerDecoderConfig {
885            batch_size,
886            cache,
887            decoder_plugins,
888            io,
889            decoder_config,
890        };
891
892        let requested_rows = RequestedRows::Ranges(vec![range]);
893
894        Ok(schedule_and_decode(
895            column_infos,
896            requested_rows,
897            filter,
898            projection.column_indices,
899            projection.schema,
900            config,
901        ))
902    }
903
904    fn read_range(
905        &self,
906        range: Range<u64>,
907        batch_size: u32,
908        projection: ReaderProjection,
909        filter: FilterExpression,
910    ) -> Result<BoxStream<'static, ReadBatchTask>> {
911        // Create and initialize the stream
912        Self::do_read_range(
913            self.collect_columns_from_projection(&projection)?,
914            self.scheduler.clone(),
915            self.cache.clone(),
916            self.num_rows,
917            self.decoder_plugins.clone(),
918            range,
919            batch_size,
920            projection,
921            filter,
922            self.options.decoder_config.clone(),
923        )
924    }
925
926    #[allow(clippy::too_many_arguments)]
927    fn do_take_rows(
928        column_infos: Vec<Arc<ColumnInfo>>,
929        io: Arc<dyn EncodingsIo>,
930        cache: Arc<LanceCache>,
931        decoder_plugins: Arc<DecoderPlugins>,
932        indices: Vec<u64>,
933        batch_size: u32,
934        projection: ReaderProjection,
935        filter: FilterExpression,
936        decoder_config: DecoderConfig,
937    ) -> Result<BoxStream<'static, ReadBatchTask>> {
938        debug!(
939            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
940            indices.len(),
941            indices[0],
942            indices[indices.len() - 1],
943            batch_size,
944            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
945        );
946
947        let config = SchedulerDecoderConfig {
948            batch_size,
949            cache,
950            decoder_plugins,
951            io,
952            decoder_config,
953        };
954
955        let requested_rows = RequestedRows::Indices(indices);
956
957        Ok(schedule_and_decode(
958            column_infos,
959            requested_rows,
960            filter,
961            projection.column_indices,
962            projection.schema,
963            config,
964        ))
965    }
966
967    fn take_rows(
968        &self,
969        indices: Vec<u64>,
970        batch_size: u32,
971        projection: ReaderProjection,
972    ) -> Result<BoxStream<'static, ReadBatchTask>> {
973        // Create and initialize the stream
974        Self::do_take_rows(
975            self.collect_columns_from_projection(&projection)?,
976            self.scheduler.clone(),
977            self.cache.clone(),
978            self.decoder_plugins.clone(),
979            indices,
980            batch_size,
981            projection,
982            FilterExpression::no_filter(),
983            self.options.decoder_config.clone(),
984        )
985    }
986
987    #[allow(clippy::too_many_arguments)]
988    fn do_read_ranges(
989        column_infos: Vec<Arc<ColumnInfo>>,
990        io: Arc<dyn EncodingsIo>,
991        cache: Arc<LanceCache>,
992        decoder_plugins: Arc<DecoderPlugins>,
993        ranges: Vec<Range<u64>>,
994        batch_size: u32,
995        projection: ReaderProjection,
996        filter: FilterExpression,
997        decoder_config: DecoderConfig,
998    ) -> Result<BoxStream<'static, ReadBatchTask>> {
999        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1000        debug!(
1001            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1002            ranges.len(),
1003            num_rows,
1004            ranges[0].start,
1005            ranges[ranges.len() - 1].end,
1006            batch_size,
1007            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1008        );
1009
1010        let config = SchedulerDecoderConfig {
1011            batch_size,
1012            cache,
1013            decoder_plugins,
1014            io,
1015            decoder_config,
1016        };
1017
1018        let requested_rows = RequestedRows::Ranges(ranges);
1019
1020        Ok(schedule_and_decode(
1021            column_infos,
1022            requested_rows,
1023            filter,
1024            projection.column_indices,
1025            projection.schema,
1026            config,
1027        ))
1028    }
1029
1030    fn read_ranges(
1031        &self,
1032        ranges: Vec<Range<u64>>,
1033        batch_size: u32,
1034        projection: ReaderProjection,
1035        filter: FilterExpression,
1036    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1037        Self::do_read_ranges(
1038            self.collect_columns_from_projection(&projection)?,
1039            self.scheduler.clone(),
1040            self.cache.clone(),
1041            self.decoder_plugins.clone(),
1042            ranges,
1043            batch_size,
1044            projection,
1045            filter,
1046            self.options.decoder_config.clone(),
1047        )
1048    }
1049
1050    /// Creates a stream of "read tasks" to read the data from the file
1051    ///
1052    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1053    /// of record batches it returns a stream of "read tasks".
1054    ///
1055    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1056    ///
1057    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1058    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1059    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1060    pub fn read_tasks(
1061        &self,
1062        params: ReadBatchParams,
1063        batch_size: u32,
1064        projection: Option<ReaderProjection>,
1065        filter: FilterExpression,
1066    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1067        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1068        Self::validate_projection(&projection, &self.metadata)?;
1069        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1070            if bound > self.num_rows || bound == self.num_rows && inclusive {
1071                Err(Error::invalid_input(
1072                    format!(
1073                        "cannot read {:?} from file with {} rows",
1074                        params, self.num_rows
1075                    ),
1076                    location!(),
1077                ))
1078            } else {
1079                Ok(())
1080            }
1081        };
1082        match &params {
1083            ReadBatchParams::Indices(indices) => {
1084                for idx in indices {
1085                    match idx {
1086                        None => {
1087                            return Err(Error::invalid_input(
1088                                "Null value in indices array",
1089                                location!(),
1090                            ));
1091                        }
1092                        Some(idx) => {
1093                            verify_bound(&params, idx as u64, true)?;
1094                        }
1095                    }
1096                }
1097                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1098                self.take_rows(indices, batch_size, projection)
1099            }
1100            ReadBatchParams::Range(range) => {
1101                verify_bound(&params, range.end as u64, false)?;
1102                self.read_range(
1103                    range.start as u64..range.end as u64,
1104                    batch_size,
1105                    projection,
1106                    filter,
1107                )
1108            }
1109            ReadBatchParams::Ranges(ranges) => {
1110                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1111                for range in ranges.as_ref() {
1112                    verify_bound(&params, range.end, false)?;
1113                    ranges_u64.push(range.start..range.end);
1114                }
1115                self.read_ranges(ranges_u64, batch_size, projection, filter)
1116            }
1117            ReadBatchParams::RangeFrom(range) => {
1118                verify_bound(&params, range.start as u64, true)?;
1119                self.read_range(
1120                    range.start as u64..self.num_rows,
1121                    batch_size,
1122                    projection,
1123                    filter,
1124                )
1125            }
1126            ReadBatchParams::RangeTo(range) => {
1127                verify_bound(&params, range.end as u64, false)?;
1128                self.read_range(0..range.end as u64, batch_size, projection, filter)
1129            }
1130            ReadBatchParams::RangeFull => {
1131                self.read_range(0..self.num_rows, batch_size, projection, filter)
1132            }
1133        }
1134    }
1135
1136    /// Reads data from the file as a stream of record batches
1137    ///
1138    /// * `params` - Specifies the range (or indices) of data to read
1139    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1140    ///   if it is the last batch or if it is not possible to create a batch of the
1141    ///   requested size.
1142    ///
1143    ///   For example, if the batch size is 1024 and one of the columns is a string
1144    ///   column then there may be some ranges of 1024 rows that contain more than
1145    ///   2^31 bytes of string data (which is the maximum size of a string column
1146    ///   in Arrow).  In this case smaller batches may be emitted.
1147    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1148    ///   amount of CPU parallelism of the read.  In other words it controls how many
1149    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1150    ///   of the read (how many I/O requests are in flight at once).
1151    ///
1152    ///   This parameter also is also related to backpressure.  If the consumer of the
1153    ///   stream is slow then the reader will build up RAM.
1154    /// * `projection` - A projection to apply to the read.  This controls which columns
1155    ///   are read from the file.  The projection is NOT applied on top of the base
1156    ///   projection.  The projection is applied directly to the file schema.
1157    pub fn read_stream_projected(
1158        &self,
1159        params: ReadBatchParams,
1160        batch_size: u32,
1161        batch_readahead: u32,
1162        projection: ReaderProjection,
1163        filter: FilterExpression,
1164    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1165        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1166        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1167        let batch_stream = tasks_stream
1168            .map(|task| task.task)
1169            .buffered(batch_readahead as usize)
1170            .boxed();
1171        Ok(Box::pin(RecordBatchStreamAdapter::new(
1172            arrow_schema,
1173            batch_stream,
1174        )))
1175    }
1176
1177    fn take_rows_blocking(
1178        &self,
1179        indices: Vec<u64>,
1180        batch_size: u32,
1181        projection: ReaderProjection,
1182        filter: FilterExpression,
1183    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1184        let column_infos = self.collect_columns_from_projection(&projection)?;
1185        debug!(
1186            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1187            indices.len(),
1188            indices[0],
1189            indices[indices.len() - 1],
1190            batch_size,
1191            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1192        );
1193
1194        let config = SchedulerDecoderConfig {
1195            batch_size,
1196            cache: self.cache.clone(),
1197            decoder_plugins: self.decoder_plugins.clone(),
1198            io: self.scheduler.clone(),
1199            decoder_config: self.options.decoder_config.clone(),
1200        };
1201
1202        let requested_rows = RequestedRows::Indices(indices);
1203
1204        schedule_and_decode_blocking(
1205            column_infos,
1206            requested_rows,
1207            filter,
1208            projection.column_indices,
1209            projection.schema,
1210            config,
1211        )
1212    }
1213
1214    fn read_ranges_blocking(
1215        &self,
1216        ranges: Vec<Range<u64>>,
1217        batch_size: u32,
1218        projection: ReaderProjection,
1219        filter: FilterExpression,
1220    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1221        let column_infos = self.collect_columns_from_projection(&projection)?;
1222        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1223        debug!(
1224            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1225            ranges.len(),
1226            num_rows,
1227            ranges[0].start,
1228            ranges[ranges.len() - 1].end,
1229            batch_size,
1230            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1231        );
1232
1233        let config = SchedulerDecoderConfig {
1234            batch_size,
1235            cache: self.cache.clone(),
1236            decoder_plugins: self.decoder_plugins.clone(),
1237            io: self.scheduler.clone(),
1238            decoder_config: self.options.decoder_config.clone(),
1239        };
1240
1241        let requested_rows = RequestedRows::Ranges(ranges);
1242
1243        schedule_and_decode_blocking(
1244            column_infos,
1245            requested_rows,
1246            filter,
1247            projection.column_indices,
1248            projection.schema,
1249            config,
1250        )
1251    }
1252
1253    fn read_range_blocking(
1254        &self,
1255        range: Range<u64>,
1256        batch_size: u32,
1257        projection: ReaderProjection,
1258        filter: FilterExpression,
1259    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1260        let column_infos = self.collect_columns_from_projection(&projection)?;
1261        let num_rows = self.num_rows;
1262
1263        debug!(
1264            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1265            range,
1266            batch_size,
1267            num_rows,
1268            column_infos.len(),
1269            projection.schema.fields.len(),
1270        );
1271
1272        let config = SchedulerDecoderConfig {
1273            batch_size,
1274            cache: self.cache.clone(),
1275            decoder_plugins: self.decoder_plugins.clone(),
1276            io: self.scheduler.clone(),
1277            decoder_config: self.options.decoder_config.clone(),
1278        };
1279
1280        let requested_rows = RequestedRows::Ranges(vec![range]);
1281
1282        schedule_and_decode_blocking(
1283            column_infos,
1284            requested_rows,
1285            filter,
1286            projection.column_indices,
1287            projection.schema,
1288            config,
1289        )
1290    }
1291
1292    /// Read data from the file as an iterator of record batches
1293    ///
1294    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1295    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1296    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1297    ///
1298    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1299    /// use this method) because we can parallelize the decode.
1300    ///
1301    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1302    /// from a spawn_blocking context.
1303    pub fn read_stream_projected_blocking(
1304        &self,
1305        params: ReadBatchParams,
1306        batch_size: u32,
1307        projection: Option<ReaderProjection>,
1308        filter: FilterExpression,
1309    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1310        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1311        Self::validate_projection(&projection, &self.metadata)?;
1312        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1313            if bound > self.num_rows || bound == self.num_rows && inclusive {
1314                Err(Error::invalid_input(
1315                    format!(
1316                        "cannot read {:?} from file with {} rows",
1317                        params, self.num_rows
1318                    ),
1319                    location!(),
1320                ))
1321            } else {
1322                Ok(())
1323            }
1324        };
1325        match &params {
1326            ReadBatchParams::Indices(indices) => {
1327                for idx in indices {
1328                    match idx {
1329                        None => {
1330                            return Err(Error::invalid_input(
1331                                "Null value in indices array",
1332                                location!(),
1333                            ));
1334                        }
1335                        Some(idx) => {
1336                            verify_bound(&params, idx as u64, true)?;
1337                        }
1338                    }
1339                }
1340                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1341                self.take_rows_blocking(indices, batch_size, projection, filter)
1342            }
1343            ReadBatchParams::Range(range) => {
1344                verify_bound(&params, range.end as u64, false)?;
1345                self.read_range_blocking(
1346                    range.start as u64..range.end as u64,
1347                    batch_size,
1348                    projection,
1349                    filter,
1350                )
1351            }
1352            ReadBatchParams::Ranges(ranges) => {
1353                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1354                for range in ranges.as_ref() {
1355                    verify_bound(&params, range.end, false)?;
1356                    ranges_u64.push(range.start..range.end);
1357                }
1358                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1359            }
1360            ReadBatchParams::RangeFrom(range) => {
1361                verify_bound(&params, range.start as u64, true)?;
1362                self.read_range_blocking(
1363                    range.start as u64..self.num_rows,
1364                    batch_size,
1365                    projection,
1366                    filter,
1367                )
1368            }
1369            ReadBatchParams::RangeTo(range) => {
1370                verify_bound(&params, range.end as u64, false)?;
1371                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1372            }
1373            ReadBatchParams::RangeFull => {
1374                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1375            }
1376        }
1377    }
1378
1379    /// Reads data from the file as a stream of record batches
1380    ///
1381    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1382    /// provided when the file was opened (or reads all columns if the file was
1383    /// opened without a base projection)
1384    pub fn read_stream(
1385        &self,
1386        params: ReadBatchParams,
1387        batch_size: u32,
1388        batch_readahead: u32,
1389        filter: FilterExpression,
1390    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1391        self.read_stream_projected(
1392            params,
1393            batch_size,
1394            batch_readahead,
1395            self.base_projection.clone(),
1396            filter,
1397        )
1398    }
1399
1400    pub fn schema(&self) -> &Arc<Schema> {
1401        &self.metadata.file_schema
1402    }
1403}
1404
1405/// Inspects a page and returns a String describing the page's encoding
1406pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1407    if let Some(encoding) = &page.encoding {
1408        if let Some(style) = &encoding.location {
1409            match style {
1410                pbfile::encoding::Location::Indirect(indirect) => {
1411                    format!(
1412                        "IndirectEncoding(pos={},size={})",
1413                        indirect.buffer_location, indirect.buffer_length
1414                    )
1415                }
1416                pbfile::encoding::Location::Direct(direct) => {
1417                    let encoding_any =
1418                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1419                            .expect("failed to deserialize encoding as protobuf");
1420                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1421                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1422                        match encoding {
1423                            Ok(encoding) => {
1424                                format!("{:#?}", encoding)
1425                            }
1426                            Err(err) => {
1427                                format!("Unsupported(decode_err={})", err)
1428                            }
1429                        }
1430                    } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1431                        let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1432                        match encoding {
1433                            Ok(encoding) => {
1434                                format!("{:#?}", encoding)
1435                            }
1436                            Err(err) => {
1437                                format!("Unsupported(decode_err={})", err)
1438                            }
1439                        }
1440                    } else {
1441                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1442                    }
1443                }
1444                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1445            }
1446        } else {
1447            "MISSING STYLE".to_string()
1448        }
1449    } else {
1450        "MISSING".to_string()
1451    }
1452}
1453
1454pub trait EncodedBatchReaderExt {
1455    fn try_from_mini_lance(
1456        bytes: Bytes,
1457        schema: &Schema,
1458        version: LanceFileVersion,
1459    ) -> Result<Self>
1460    where
1461        Self: Sized;
1462    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1463    where
1464        Self: Sized;
1465}
1466
1467impl EncodedBatchReaderExt for EncodedBatch {
1468    fn try_from_mini_lance(
1469        bytes: Bytes,
1470        schema: &Schema,
1471        file_version: LanceFileVersion,
1472    ) -> Result<Self>
1473    where
1474        Self: Sized,
1475    {
1476        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1477        let footer = FileReader::decode_footer(&bytes)?;
1478
1479        // Next, read the metadata for the columns
1480        // This is both the column metadata and the CMO table
1481        let column_metadata_start = footer.column_meta_start as usize;
1482        let column_metadata_end = footer.global_buff_offsets_start as usize;
1483        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1484        let column_metadatas =
1485            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1486
1487        let file_version = LanceFileVersion::try_from_major_minor(
1488            footer.major_version as u32,
1489            footer.minor_version as u32,
1490        )?;
1491
1492        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1493
1494        Ok(Self {
1495            data: bytes,
1496            num_rows: page_table
1497                .first()
1498                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1499                .unwrap_or(0),
1500            page_table,
1501            top_level_columns: projection.column_indices,
1502            schema: Arc::new(schema.clone()),
1503        })
1504    }
1505
1506    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1507    where
1508        Self: Sized,
1509    {
1510        let footer = FileReader::decode_footer(&bytes)?;
1511        let file_version = LanceFileVersion::try_from_major_minor(
1512            footer.major_version as u32,
1513            footer.minor_version as u32,
1514        )?;
1515
1516        let gbo_table = FileReader::do_decode_gbo_table(
1517            &bytes.slice(footer.global_buff_offsets_start as usize..),
1518            &footer,
1519            file_version,
1520        )?;
1521        if gbo_table.is_empty() {
1522            return Err(Error::Internal {
1523                message: "File did not contain any global buffers, schema expected".to_string(),
1524                location: location!(),
1525            });
1526        }
1527        let schema_start = gbo_table[0].position as usize;
1528        let schema_size = gbo_table[0].size as usize;
1529
1530        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1531        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1532        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1533
1534        // Next, read the metadata for the columns
1535        // This is both the column metadata and the CMO table
1536        let column_metadata_start = footer.column_meta_start as usize;
1537        let column_metadata_end = footer.global_buff_offsets_start as usize;
1538        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1539        let column_metadatas =
1540            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1541
1542        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1543
1544        Ok(Self {
1545            data: bytes,
1546            num_rows: page_table
1547                .first()
1548                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1549                .unwrap_or(0),
1550            page_table,
1551            top_level_columns: projection.column_indices,
1552            schema: Arc::new(schema),
1553        })
1554    }
1555}
1556
1557#[cfg(test)]
1558pub mod tests {
1559    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1560
1561    use arrow_array::{
1562        types::{Float64Type, Int32Type},
1563        RecordBatch, UInt32Array,
1564    };
1565    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1566    use bytes::Bytes;
1567    use futures::{prelude::stream::TryStreamExt, StreamExt};
1568    use lance_arrow::RecordBatchExt;
1569    use lance_core::{datatypes::Schema, ArrowResult};
1570    use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1571    use lance_encoding::{
1572        decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1573        encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1574        version::LanceFileVersion,
1575    };
1576    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1577    use log::debug;
1578    use rstest::rstest;
1579    use tokio::sync::mpsc;
1580
1581    use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1582    use crate::testing::{test_cache, write_lance_file, FsFixture, WrittenFile};
1583    use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1584    use lance_encoding::decoder::DecoderConfig;
1585
1586    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1587        let location_type = DataType::Struct(Fields::from(vec![
1588            Field::new("x", DataType::Float64, true),
1589            Field::new("y", DataType::Float64, true),
1590        ]));
1591        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1592
1593        let mut reader = gen_batch()
1594            .col("score", array::rand::<Float64Type>())
1595            .col("location", array::rand_type(&location_type))
1596            .col("categories", array::rand_type(&categories_type))
1597            .col("binary", array::rand_type(&DataType::Binary));
1598        if version <= LanceFileVersion::V2_0 {
1599            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1600        }
1601        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1602
1603        write_lance_file(
1604            reader,
1605            fs,
1606            FileWriterOptions {
1607                format_version: Some(version),
1608                ..Default::default()
1609            },
1610        )
1611        .await
1612    }
1613
1614    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1615
1616    async fn verify_expected(
1617        expected: &[RecordBatch],
1618        mut actual: Pin<Box<dyn RecordBatchStream>>,
1619        read_size: u32,
1620        transform: Option<Transformer>,
1621    ) {
1622        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1623        let mut expected_iter = expected.iter().map(|batch| {
1624            if let Some(transform) = &transform {
1625                transform(batch)
1626            } else {
1627                batch.clone()
1628            }
1629        });
1630        let mut next_expected = expected_iter.next().unwrap().clone();
1631        while let Some(actual) = actual.next().await {
1632            let mut actual = actual.unwrap();
1633            let mut rows_to_verify = actual.num_rows() as u32;
1634            let expected_length = remaining.min(read_size);
1635            assert_eq!(expected_length, rows_to_verify);
1636
1637            while rows_to_verify > 0 {
1638                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1639                assert_eq!(
1640                    next_expected.slice(0, next_slice_len as usize),
1641                    actual.slice(0, next_slice_len as usize)
1642                );
1643                remaining -= next_slice_len;
1644                rows_to_verify -= next_slice_len;
1645                if remaining > 0 {
1646                    if next_slice_len == next_expected.num_rows() as u32 {
1647                        next_expected = expected_iter.next().unwrap().clone();
1648                    } else {
1649                        next_expected = next_expected.slice(
1650                            next_slice_len as usize,
1651                            next_expected.num_rows() - next_slice_len as usize,
1652                        );
1653                    }
1654                }
1655                if rows_to_verify > 0 {
1656                    actual = actual.slice(
1657                        next_slice_len as usize,
1658                        actual.num_rows() - next_slice_len as usize,
1659                    );
1660                }
1661            }
1662        }
1663        assert_eq!(remaining, 0);
1664    }
1665
1666    #[tokio::test]
1667    async fn test_round_trip() {
1668        let fs = FsFixture::default();
1669
1670        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1671
1672        for read_size in [32, 1024, 1024 * 1024] {
1673            let file_scheduler = fs
1674                .scheduler
1675                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1676                .await
1677                .unwrap();
1678            let file_reader = FileReader::try_open(
1679                file_scheduler,
1680                None,
1681                Arc::<DecoderPlugins>::default(),
1682                &test_cache(),
1683                FileReaderOptions::default(),
1684            )
1685            .await
1686            .unwrap();
1687
1688            let schema = file_reader.schema();
1689            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1690
1691            let batch_stream = file_reader
1692                .read_stream(
1693                    lance_io::ReadBatchParams::RangeFull,
1694                    read_size,
1695                    16,
1696                    FilterExpression::no_filter(),
1697                )
1698                .unwrap();
1699
1700            verify_expected(&data, batch_stream, read_size, None).await;
1701        }
1702    }
1703
1704    #[rstest]
1705    #[test_log::test(tokio::test)]
1706    async fn test_encoded_batch_round_trip(
1707        // TODO: Add V2_1 (currently fails)
1708        #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1709    ) {
1710        let data = gen_batch()
1711            .col("x", array::rand::<Int32Type>())
1712            .col("y", array::rand_utf8(ByteCount::from(16), false))
1713            .into_batch_rows(RowCount::from(10000))
1714            .unwrap();
1715
1716        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1717
1718        let encoding_options = EncodingOptions {
1719            cache_bytes_per_column: 4096,
1720            max_page_bytes: 32 * 1024 * 1024,
1721            keep_original_array: true,
1722            buffer_alignment: 64,
1723            version,
1724        };
1725
1726        let encoding_strategy = default_encoding_strategy(version);
1727
1728        let encoded_batch = encode_batch(
1729            &data,
1730            lance_schema.clone(),
1731            encoding_strategy.as_ref(),
1732            &encoding_options,
1733        )
1734        .await
1735        .unwrap();
1736
1737        // Test self described
1738        let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1739
1740        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1741
1742        let decoded = decode_batch(
1743            &decoded_batch,
1744            &FilterExpression::no_filter(),
1745            Arc::<DecoderPlugins>::default(),
1746            false,
1747            version,
1748            None,
1749        )
1750        .await
1751        .unwrap();
1752
1753        assert_eq!(data, decoded);
1754
1755        // Test mini
1756        let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1757        let decoded_batch =
1758            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1759                .unwrap();
1760        let decoded = decode_batch(
1761            &decoded_batch,
1762            &FilterExpression::no_filter(),
1763            Arc::<DecoderPlugins>::default(),
1764            false,
1765            version,
1766            None,
1767        )
1768        .await
1769        .unwrap();
1770
1771        assert_eq!(data, decoded);
1772    }
1773
1774    #[rstest]
1775    #[test_log::test(tokio::test)]
1776    async fn test_projection(
1777        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1778    ) {
1779        let fs = FsFixture::default();
1780
1781        let written_file = create_some_file(&fs, version).await;
1782        let file_scheduler = fs
1783            .scheduler
1784            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1785            .await
1786            .unwrap();
1787
1788        let field_id_mapping = written_file
1789            .field_id_mapping
1790            .iter()
1791            .copied()
1792            .collect::<BTreeMap<_, _>>();
1793
1794        let empty_projection = ReaderProjection {
1795            column_indices: Vec::default(),
1796            schema: Arc::new(Schema::default()),
1797        };
1798
1799        for columns in [
1800            vec!["score"],
1801            vec!["location"],
1802            vec!["categories"],
1803            vec!["score.x"],
1804            vec!["score", "categories"],
1805            vec!["score", "location"],
1806            vec!["location", "categories"],
1807            vec!["score.y", "location", "categories"],
1808        ] {
1809            debug!("Testing round trip with projection {:?}", columns);
1810            for use_field_ids in [true, false] {
1811                // We can specify the projection as part of the read operation via read_stream_projected
1812                let file_reader = FileReader::try_open(
1813                    file_scheduler.clone(),
1814                    None,
1815                    Arc::<DecoderPlugins>::default(),
1816                    &test_cache(),
1817                    FileReaderOptions::default(),
1818                )
1819                .await
1820                .unwrap();
1821
1822                let projected_schema = written_file.schema.project(&columns).unwrap();
1823                let projection = if use_field_ids {
1824                    ReaderProjection::from_field_ids(
1825                        file_reader.metadata.version(),
1826                        &projected_schema,
1827                        &field_id_mapping,
1828                    )
1829                    .unwrap()
1830                } else {
1831                    ReaderProjection::from_column_names(
1832                        file_reader.metadata.version(),
1833                        &written_file.schema,
1834                        &columns,
1835                    )
1836                    .unwrap()
1837                };
1838
1839                let batch_stream = file_reader
1840                    .read_stream_projected(
1841                        lance_io::ReadBatchParams::RangeFull,
1842                        1024,
1843                        16,
1844                        projection.clone(),
1845                        FilterExpression::no_filter(),
1846                    )
1847                    .unwrap();
1848
1849                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1850                verify_expected(
1851                    &written_file.data,
1852                    batch_stream,
1853                    1024,
1854                    Some(Box::new(move |batch: &RecordBatch| {
1855                        batch.project_by_schema(&projection_arrow).unwrap()
1856                    })),
1857                )
1858                .await;
1859
1860                // We can also specify the projection as a base projection when we open the file
1861                let file_reader = FileReader::try_open(
1862                    file_scheduler.clone(),
1863                    Some(projection.clone()),
1864                    Arc::<DecoderPlugins>::default(),
1865                    &test_cache(),
1866                    FileReaderOptions::default(),
1867                )
1868                .await
1869                .unwrap();
1870
1871                let batch_stream = file_reader
1872                    .read_stream(
1873                        lance_io::ReadBatchParams::RangeFull,
1874                        1024,
1875                        16,
1876                        FilterExpression::no_filter(),
1877                    )
1878                    .unwrap();
1879
1880                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1881                verify_expected(
1882                    &written_file.data,
1883                    batch_stream,
1884                    1024,
1885                    Some(Box::new(move |batch: &RecordBatch| {
1886                        batch.project_by_schema(&projection_arrow).unwrap()
1887                    })),
1888                )
1889                .await;
1890
1891                assert!(file_reader
1892                    .read_stream_projected(
1893                        lance_io::ReadBatchParams::RangeFull,
1894                        1024,
1895                        16,
1896                        empty_projection.clone(),
1897                        FilterExpression::no_filter(),
1898                    )
1899                    .is_err());
1900            }
1901        }
1902
1903        assert!(FileReader::try_open(
1904            file_scheduler.clone(),
1905            Some(empty_projection),
1906            Arc::<DecoderPlugins>::default(),
1907            &test_cache(),
1908            FileReaderOptions::default(),
1909        )
1910        .await
1911        .is_err());
1912
1913        let arrow_schema = ArrowSchema::new(vec![
1914            Field::new("x", DataType::Int32, true),
1915            Field::new("y", DataType::Int32, true),
1916        ]);
1917        let schema = Schema::try_from(&arrow_schema).unwrap();
1918
1919        let projection_with_dupes = ReaderProjection {
1920            column_indices: vec![0, 0],
1921            schema: Arc::new(schema),
1922        };
1923
1924        assert!(FileReader::try_open(
1925            file_scheduler.clone(),
1926            Some(projection_with_dupes),
1927            Arc::<DecoderPlugins>::default(),
1928            &test_cache(),
1929            FileReaderOptions::default(),
1930        )
1931        .await
1932        .is_err());
1933    }
1934
1935    #[test_log::test(tokio::test)]
1936    async fn test_compressing_buffer() {
1937        let fs = FsFixture::default();
1938
1939        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1940        let file_scheduler = fs
1941            .scheduler
1942            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1943            .await
1944            .unwrap();
1945
1946        // We can specify the projection as part of the read operation via read_stream_projected
1947        let file_reader = FileReader::try_open(
1948            file_scheduler.clone(),
1949            None,
1950            Arc::<DecoderPlugins>::default(),
1951            &test_cache(),
1952            FileReaderOptions::default(),
1953        )
1954        .await
1955        .unwrap();
1956
1957        let mut projection = written_file.schema.project(&["score"]).unwrap();
1958        for field in projection.fields.iter_mut() {
1959            field
1960                .metadata
1961                .insert("lance:compression".to_string(), "zstd".to_string());
1962        }
1963        let projection = ReaderProjection {
1964            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1965            schema: Arc::new(projection),
1966        };
1967
1968        let batch_stream = file_reader
1969            .read_stream_projected(
1970                lance_io::ReadBatchParams::RangeFull,
1971                1024,
1972                16,
1973                projection.clone(),
1974                FilterExpression::no_filter(),
1975            )
1976            .unwrap();
1977
1978        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1979        verify_expected(
1980            &written_file.data,
1981            batch_stream,
1982            1024,
1983            Some(Box::new(move |batch: &RecordBatch| {
1984                batch.project_by_schema(&projection_arrow).unwrap()
1985            })),
1986        )
1987        .await;
1988    }
1989
1990    #[tokio::test]
1991    async fn test_read_all() {
1992        let fs = FsFixture::default();
1993        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1994        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1995
1996        let file_scheduler = fs
1997            .scheduler
1998            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1999            .await
2000            .unwrap();
2001        let file_reader = FileReader::try_open(
2002            file_scheduler.clone(),
2003            None,
2004            Arc::<DecoderPlugins>::default(),
2005            &test_cache(),
2006            FileReaderOptions::default(),
2007        )
2008        .await
2009        .unwrap();
2010
2011        let batches = file_reader
2012            .read_stream(
2013                lance_io::ReadBatchParams::RangeFull,
2014                total_rows as u32,
2015                16,
2016                FilterExpression::no_filter(),
2017            )
2018            .unwrap()
2019            .try_collect::<Vec<_>>()
2020            .await
2021            .unwrap();
2022        assert_eq!(batches.len(), 1);
2023        assert_eq!(batches[0].num_rows(), total_rows);
2024    }
2025
2026    #[rstest]
2027    #[tokio::test]
2028    async fn test_blocking_take(
2029        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
2030    ) {
2031        let fs = FsFixture::default();
2032        let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2033        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2034
2035        let file_scheduler = fs
2036            .scheduler
2037            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2038            .await
2039            .unwrap();
2040        let file_reader = FileReader::try_open(
2041            file_scheduler.clone(),
2042            Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2043            Arc::<DecoderPlugins>::default(),
2044            &test_cache(),
2045            FileReaderOptions::default(),
2046        )
2047        .await
2048        .unwrap();
2049
2050        let batches = tokio::task::spawn_blocking(move || {
2051            file_reader
2052                .read_stream_projected_blocking(
2053                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2054                    total_rows as u32,
2055                    None,
2056                    FilterExpression::no_filter(),
2057                )
2058                .unwrap()
2059                .collect::<ArrowResult<Vec<_>>>()
2060                .unwrap()
2061        })
2062        .await
2063        .unwrap();
2064
2065        assert_eq!(batches.len(), 1);
2066        assert_eq!(batches[0].num_rows(), 5);
2067        assert_eq!(batches[0].num_columns(), 1);
2068    }
2069
2070    #[tokio::test(flavor = "multi_thread")]
2071    async fn test_drop_in_progress() {
2072        let fs = FsFixture::default();
2073        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2074        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2075
2076        let file_scheduler = fs
2077            .scheduler
2078            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2079            .await
2080            .unwrap();
2081        let file_reader = FileReader::try_open(
2082            file_scheduler.clone(),
2083            None,
2084            Arc::<DecoderPlugins>::default(),
2085            &test_cache(),
2086            FileReaderOptions::default(),
2087        )
2088        .await
2089        .unwrap();
2090
2091        let mut batches = file_reader
2092            .read_stream(
2093                lance_io::ReadBatchParams::RangeFull,
2094                (total_rows / 10) as u32,
2095                16,
2096                FilterExpression::no_filter(),
2097            )
2098            .unwrap();
2099
2100        drop(file_reader);
2101
2102        let batch = batches.next().await.unwrap().unwrap();
2103        assert!(batch.num_rows() > 0);
2104
2105        // Drop in-progress scan
2106        drop(batches);
2107    }
2108
2109    #[tokio::test]
2110    async fn drop_while_scheduling() {
2111        // This is a bit of a white-box test, pokes at the internals.  We want to
2112        // test the case where the read stream is dropped before the scheduling
2113        // thread finishes.  We can't do that in a black-box fashion because the
2114        // scheduling thread runs in the background and there is no easy way to
2115        // pause / gate it.
2116
2117        // It's a regression for a bug where the scheduling thread would panic
2118        // if the stream was dropped before it finished.
2119
2120        let fs = FsFixture::default();
2121        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2122        let total_rows = written_file
2123            .data
2124            .iter()
2125            .map(|batch| batch.num_rows())
2126            .sum::<usize>();
2127
2128        let file_scheduler = fs
2129            .scheduler
2130            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2131            .await
2132            .unwrap();
2133        let file_reader = FileReader::try_open(
2134            file_scheduler.clone(),
2135            None,
2136            Arc::<DecoderPlugins>::default(),
2137            &test_cache(),
2138            FileReaderOptions::default(),
2139        )
2140        .await
2141        .unwrap();
2142
2143        let projection =
2144            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2145        let column_infos = file_reader
2146            .collect_columns_from_projection(&projection)
2147            .unwrap();
2148        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2149            &projection.schema,
2150            &projection.column_indices,
2151            &column_infos,
2152            &vec![],
2153            total_rows as u64,
2154            Arc::<DecoderPlugins>::default(),
2155            file_reader.scheduler.clone(),
2156            test_cache(),
2157            &FilterExpression::no_filter(),
2158            &DecoderConfig::default(),
2159        )
2160        .await
2161        .unwrap();
2162
2163        let range = 0..total_rows as u64;
2164
2165        let (tx, rx) = mpsc::unbounded_channel();
2166
2167        // Simulate the stream / decoder being dropped
2168        drop(rx);
2169
2170        // Scheduling should not panic
2171        decode_scheduler.schedule_range(
2172            range,
2173            &FilterExpression::no_filter(),
2174            tx,
2175            file_reader.scheduler.clone(),
2176        )
2177    }
2178
2179    #[tokio::test]
2180    async fn test_read_empty_range() {
2181        let fs = FsFixture::default();
2182        create_some_file(&fs, LanceFileVersion::V2_0).await;
2183
2184        let file_scheduler = fs
2185            .scheduler
2186            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2187            .await
2188            .unwrap();
2189        let file_reader = FileReader::try_open(
2190            file_scheduler.clone(),
2191            None,
2192            Arc::<DecoderPlugins>::default(),
2193            &test_cache(),
2194            FileReaderOptions::default(),
2195        )
2196        .await
2197        .unwrap();
2198
2199        // All ranges empty, no data
2200        let batches = file_reader
2201            .read_stream(
2202                lance_io::ReadBatchParams::Range(0..0),
2203                1024,
2204                16,
2205                FilterExpression::no_filter(),
2206            )
2207            .unwrap()
2208            .try_collect::<Vec<_>>()
2209            .await
2210            .unwrap();
2211
2212        assert_eq!(batches.len(), 0);
2213
2214        // Some ranges empty
2215        let batches = file_reader
2216            .read_stream(
2217                lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2218                1024,
2219                16,
2220                FilterExpression::no_filter(),
2221            )
2222            .unwrap()
2223            .try_collect::<Vec<_>>()
2224            .await
2225            .unwrap();
2226        assert_eq!(batches.len(), 1);
2227    }
2228
2229    #[tokio::test]
2230    async fn test_global_buffers() {
2231        let fs = FsFixture::default();
2232
2233        let lance_schema =
2234            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2235                "foo",
2236                DataType::Int32,
2237                true,
2238            )]))
2239            .unwrap();
2240
2241        let mut file_writer = FileWriter::try_new(
2242            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2243            lance_schema.clone(),
2244            FileWriterOptions::default(),
2245        )
2246        .unwrap();
2247
2248        let test_bytes = Bytes::from_static(b"hello");
2249
2250        let buf_index = file_writer
2251            .add_global_buffer(test_bytes.clone())
2252            .await
2253            .unwrap();
2254
2255        assert_eq!(buf_index, 1);
2256
2257        file_writer.finish().await.unwrap();
2258
2259        let file_scheduler = fs
2260            .scheduler
2261            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2262            .await
2263            .unwrap();
2264        let file_reader = FileReader::try_open(
2265            file_scheduler.clone(),
2266            None,
2267            Arc::<DecoderPlugins>::default(),
2268            &test_cache(),
2269            FileReaderOptions::default(),
2270        )
2271        .await
2272        .unwrap();
2273
2274        let buf = file_reader.read_global_buffer(1).await.unwrap();
2275        assert_eq!(buf, test_bytes);
2276    }
2277}