lance_file/v2/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19    decoder::{
20        schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21        DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22        SchedulerDecoderConfig,
23    },
24    encoder::EncodedBatch,
25    version::LanceFileVersion,
26    EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36    Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41    scheduler::FileScheduler,
42    stream::{RecordBatchStream, RecordBatchStreamAdapter},
43    ReadBatchParams,
44};
45
46use crate::{
47    datatypes::{Fields, FieldsWithMeta},
48    format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49    v2::writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52use super::io::LanceEncodingsIo;
53
54/// Default chunk size for reading large pages (8MiB)
55/// Pages larger than this will be split into multiple chunks during read
56pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
57
58// For now, we don't use global buffers for anything other than schema.  If we
59// use these later we should make them lazily loaded and then cached once loaded.
60//
61// We store their position / length for debugging purposes
62#[derive(Debug, DeepSizeOf)]
63pub struct BufferDescriptor {
64    pub position: u64,
65    pub size: u64,
66}
67
68/// Statistics summarize some of the file metadata for quick summary info
69#[derive(Debug)]
70pub struct FileStatistics {
71    /// Statistics about each of the columns in the file
72    pub columns: Vec<ColumnStatistics>,
73}
74
75/// Summary information describing a column
76#[derive(Debug)]
77pub struct ColumnStatistics {
78    /// The number of pages in the column
79    pub num_pages: usize,
80    /// The total number of data & metadata bytes in the column
81    ///
82    /// This is the compressed on-disk size
83    pub size_bytes: u64,
84}
85
86// TODO: Caching
87#[derive(Debug)]
88pub struct CachedFileMetadata {
89    /// The schema of the file
90    pub file_schema: Arc<Schema>,
91    /// The column metadatas
92    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
93    pub column_infos: Vec<Arc<ColumnInfo>>,
94    /// The number of rows in the file
95    pub num_rows: u64,
96    pub file_buffers: Vec<BufferDescriptor>,
97    /// The number of bytes contained in the data page section of the file
98    pub num_data_bytes: u64,
99    /// The number of bytes contained in the column metadata (not including buffers
100    /// referenced by the metadata)
101    pub num_column_metadata_bytes: u64,
102    /// The number of bytes contained in global buffers
103    pub num_global_buffer_bytes: u64,
104    /// The number of bytes contained in the CMO and GBO tables
105    pub num_footer_bytes: u64,
106    pub major_version: u16,
107    pub minor_version: u16,
108}
109
110impl DeepSizeOf for CachedFileMetadata {
111    // TODO: include size for `column_metadatas` and `column_infos`.
112    fn deep_size_of_children(&self, context: &mut Context) -> usize {
113        self.file_schema.deep_size_of_children(context)
114            + self
115                .file_buffers
116                .iter()
117                .map(|file_buffer| file_buffer.deep_size_of_children(context))
118                .sum::<usize>()
119    }
120}
121
122impl CachedFileMetadata {
123    pub fn version(&self) -> LanceFileVersion {
124        match (self.major_version, self.minor_version) {
125            (0, 3) => LanceFileVersion::V2_0,
126            (2, 1) => LanceFileVersion::V2_1,
127            (2, 2) => LanceFileVersion::V2_2,
128            _ => panic!(
129                "Unsupported version: {}.{}",
130                self.major_version, self.minor_version
131            ),
132        }
133    }
134}
135
136/// Selecting columns from a lance file requires specifying both the
137/// index of the column and the data type of the column
138///
139/// Partly, this is because it is not strictly required that columns
140/// be read into the same type.  For example, a string column may be
141/// read as a string, large_string or string_view type.
142///
143/// A read will only succeed if the decoder for a column is capable
144/// of decoding into the requested type.
145///
146/// Note that this should generally be limited to different in-memory
147/// representations of the same semantic type.  An encoding could
148/// theoretically support "casting" (e.g. int to string, etc.) but
149/// there is little advantage in doing so here.
150///
151/// Note: in order to specify a projection the user will need some way
152/// to figure out the column indices.  In the table format we do this
153/// using field IDs and keeping track of the field id->column index mapping.
154///
155/// If users are not using the table format then they will need to figure
156/// out some way to do this themselves.
157#[derive(Debug, Clone)]
158pub struct ReaderProjection {
159    /// The data types (schema) of the selected columns.  The names
160    /// of the schema are arbitrary and ignored.
161    pub schema: Arc<Schema>,
162    /// The indices of the columns to load.
163    ///
164    /// The content of this vector depends on the file version.
165    ///
166    /// In Lance File Version 2.0 we need ids for structural fields as
167    /// well as leaf fields:
168    ///
169    ///   - Primitive: the index of the column in the schema
170    ///   - List: the index of the list column in the schema
171    ///     followed by the column indices of the children
172    ///   - FixedSizeList (of primitive): the index of the column in the schema
173    ///     (this case is not nested)
174    ///   - FixedSizeList (of non-primitive): not yet implemented
175    ///   - Dictionary: same as primitive
176    ///   - Struct: the index of the struct column in the schema
177    ///     followed by the column indices of the children
178    ///
179    ///   In other words, this should be a DFS listing of the desired schema.
180    ///
181    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
182    /// fields are completely transparent.
183    ///
184    /// For example, if the goal is to load:
185    ///
186    ///   x: int32
187    ///   y: struct<z: int32, w: string>
188    ///   z: list<int32>
189    ///
190    /// and the schema originally used to store the data was:
191    ///
192    ///   a: struct<x: int32>
193    ///   b: int64
194    ///   y: struct<z: int32, c: int64, w: string>
195    ///   z: list<int32>
196    ///
197    /// Then the column_indices should be:
198    ///
199    /// - 2.0: [1, 3, 4, 6, 7, 8]
200    /// - 2.1: [0, 2, 4, 5]
201    pub column_indices: Vec<u32>,
202}
203
204impl ReaderProjection {
205    fn from_field_ids_helper<'a>(
206        file_version: LanceFileVersion,
207        fields: impl Iterator<Item = &'a Field>,
208        field_id_to_column_index: &BTreeMap<u32, u32>,
209        column_indices: &mut Vec<u32>,
210    ) -> Result<()> {
211        for field in fields {
212            let is_structural = file_version >= LanceFileVersion::V2_1;
213            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
214            // we only need ids for leaf fields.
215            if !is_structural
216                || field.children.is_empty()
217                || field.is_blob()
218                || field.is_packed_struct()
219            {
220                if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
221                {
222                    column_indices.push(column_idx);
223                }
224            }
225            // Don't recurse into children if the field is a blob or packed struct in 2.1
226            if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
227                Self::from_field_ids_helper(
228                    file_version,
229                    field.children.iter(),
230                    field_id_to_column_index,
231                    column_indices,
232                )?;
233            }
234        }
235        Ok(())
236    }
237
238    /// Creates a projection using a mapping from field IDs to column indices
239    ///
240    /// You can obtain such a mapping when the file is written using the
241    /// [`crate::v2::writer::FileWriter::field_id_to_column_indices`] method.
242    pub fn from_field_ids(
243        file_version: LanceFileVersion,
244        schema: &Schema,
245        field_id_to_column_index: &BTreeMap<u32, u32>,
246    ) -> Result<Self> {
247        let mut column_indices = Vec::new();
248        Self::from_field_ids_helper(
249            file_version,
250            schema.fields.iter(),
251            field_id_to_column_index,
252            &mut column_indices,
253        )?;
254        Ok(Self {
255            schema: Arc::new(schema.clone()),
256            column_indices,
257        })
258    }
259
260    /// Creates a projection that reads the entire file
261    ///
262    /// If the schema provided is not the schema of the entire file then
263    /// the projection will be invalid and the read will fail.
264    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
265    /// the whole struct has one column index.
266    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
267    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
268        let schema = Arc::new(schema.clone());
269        let is_structural = version >= LanceFileVersion::V2_1;
270        let mut column_indices = vec![];
271        let mut curr_column_idx = 0;
272        let mut packed_struct_fields_num = 0;
273        for field in schema.fields_pre_order() {
274            if packed_struct_fields_num > 0 {
275                packed_struct_fields_num -= 1;
276                continue;
277            }
278            if field.is_packed_struct() {
279                column_indices.push(curr_column_idx);
280                curr_column_idx += 1;
281                packed_struct_fields_num = field.children.len();
282            } else if field.children.is_empty() || !is_structural {
283                column_indices.push(curr_column_idx);
284                curr_column_idx += 1;
285            }
286        }
287        Self {
288            schema,
289            column_indices,
290        }
291    }
292
293    /// Creates a projection that reads the specified columns provided by name
294    ///
295    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
296    ///
297    /// If the schema provided is not the schema of the entire file then
298    /// the projection will be invalid and the read will fail.
299    pub fn from_column_names(
300        file_version: LanceFileVersion,
301        schema: &Schema,
302        column_names: &[&str],
303    ) -> Result<Self> {
304        let field_id_to_column_index = schema
305            .fields_pre_order()
306            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
307            // we only need ids for leaf fields.
308            .filter(|field| {
309                file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
310            })
311            .enumerate()
312            .map(|(idx, field)| (field.id as u32, idx as u32))
313            .collect::<BTreeMap<_, _>>();
314        let projected = schema.project(column_names)?;
315        let mut column_indices = Vec::new();
316        Self::from_field_ids_helper(
317            file_version,
318            projected.fields.iter(),
319            &field_id_to_column_index,
320            &mut column_indices,
321        )?;
322        Ok(Self {
323            schema: Arc::new(projected),
324            column_indices,
325        })
326    }
327}
328
329/// File Reader Options that can control reading behaviors, such as whether to enable caching on repetition indices
330#[derive(Clone, Debug)]
331pub struct FileReaderOptions {
332    pub decoder_config: DecoderConfig,
333    /// Size of chunks when reading large pages. Pages larger than this
334    /// will be read in multiple chunks to control memory usage.
335    /// Default: 8MB (DEFAULT_READ_CHUNK_SIZE)
336    pub read_chunk_size: u64,
337}
338
339impl Default for FileReaderOptions {
340    fn default() -> Self {
341        Self {
342            decoder_config: DecoderConfig::default(),
343            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
344        }
345    }
346}
347
348#[derive(Debug)]
349pub struct FileReader {
350    scheduler: Arc<dyn EncodingsIo>,
351    // The default projection to be applied to all reads
352    base_projection: ReaderProjection,
353    num_rows: u64,
354    metadata: Arc<CachedFileMetadata>,
355    decoder_plugins: Arc<DecoderPlugins>,
356    cache: Arc<LanceCache>,
357    options: FileReaderOptions,
358}
359#[derive(Debug)]
360struct Footer {
361    #[allow(dead_code)]
362    column_meta_start: u64,
363    // We don't use this today because we always load metadata for every column
364    // and don't yet support "metadata projection"
365    #[allow(dead_code)]
366    column_meta_offsets_start: u64,
367    global_buff_offsets_start: u64,
368    num_global_buffers: u32,
369    num_columns: u32,
370    major_version: u16,
371    minor_version: u16,
372}
373
374const FOOTER_LEN: usize = 40;
375
376impl FileReader {
377    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
378        Self {
379            scheduler,
380            base_projection: self.base_projection.clone(),
381            cache: self.cache.clone(),
382            decoder_plugins: self.decoder_plugins.clone(),
383            metadata: self.metadata.clone(),
384            options: self.options.clone(),
385            num_rows: self.num_rows,
386        }
387    }
388
389    pub fn num_rows(&self) -> u64 {
390        self.num_rows
391    }
392
393    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
394        &self.metadata
395    }
396
397    pub fn file_statistics(&self) -> FileStatistics {
398        let column_metadatas = &self.metadata().column_metadatas;
399
400        let column_stats = column_metadatas
401            .iter()
402            .map(|col_metadata| {
403                let num_pages = col_metadata.pages.len();
404                let size_bytes = col_metadata
405                    .pages
406                    .iter()
407                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
408                    .sum::<u64>();
409                ColumnStatistics {
410                    num_pages,
411                    size_bytes,
412                }
413            })
414            .collect();
415
416        FileStatistics {
417            columns: column_stats,
418        }
419    }
420
421    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
422        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
423        self.scheduler
424            .submit_single(
425                buffer_desc.position..buffer_desc.position + buffer_desc.size,
426                0,
427            )
428            .await
429    }
430
431    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
432        let file_size = scheduler.reader().size().await? as u64;
433        let begin = if file_size < scheduler.reader().block_size() as u64 {
434            0
435        } else {
436            file_size - scheduler.reader().block_size() as u64
437        };
438        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
439        Ok((tail_bytes, file_size))
440    }
441
442    // Checks to make sure the footer is written correctly and returns the
443    // position of the file descriptor (which comes from the footer)
444    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
445        let len = footer_bytes.len();
446        if len < FOOTER_LEN {
447            return Err(Error::io(
448                format!(
449                    "does not have sufficient data, len: {}, bytes: {:?}",
450                    len, footer_bytes
451                ),
452                location!(),
453            ));
454        }
455        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
456
457        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
458        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
459        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
460        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
461        let num_columns = cursor.read_u32::<LittleEndian>()?;
462        let major_version = cursor.read_u16::<LittleEndian>()?;
463        let minor_version = cursor.read_u16::<LittleEndian>()?;
464
465        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
466            return Err(Error::version_conflict(
467                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
468                major_version,
469                minor_version,
470                location!(),
471            ));
472        }
473
474        let magic_bytes = footer_bytes.slice(len - 4..);
475        if magic_bytes.as_ref() != MAGIC {
476            return Err(Error::io(
477                format!(
478                    "file does not appear to be a Lance file (invalid magic: {:?})",
479                    MAGIC
480                ),
481                location!(),
482            ));
483        }
484        Ok(Footer {
485            column_meta_start,
486            column_meta_offsets_start,
487            global_buff_offsets_start,
488            num_global_buffers,
489            num_columns,
490            major_version,
491            minor_version,
492        })
493    }
494
495    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
496    fn read_all_column_metadata(
497        column_metadata_bytes: Bytes,
498        footer: &Footer,
499    ) -> Result<Vec<pbfile::ColumnMetadata>> {
500        let column_metadata_start = footer.column_meta_start;
501        // cmo == column_metadata_offsets
502        let cmo_table_size = 16 * footer.num_columns as usize;
503        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
504
505        (0..footer.num_columns)
506            .map(|col_idx| {
507                let offset = (col_idx * 16) as usize;
508                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
509                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
510                let normalized_position = (position - column_metadata_start) as usize;
511                let normalized_end = normalized_position + (length as usize);
512                Ok(pbfile::ColumnMetadata::decode(
513                    &column_metadata_bytes[normalized_position..normalized_end],
514                )?)
515            })
516            .collect::<Result<Vec<_>>>()
517    }
518
519    async fn optimistic_tail_read(
520        data: &Bytes,
521        start_pos: u64,
522        scheduler: &FileScheduler,
523        file_len: u64,
524    ) -> Result<Bytes> {
525        let num_bytes_needed = (file_len - start_pos) as usize;
526        if data.len() >= num_bytes_needed {
527            Ok(data.slice((data.len() - num_bytes_needed)..))
528        } else {
529            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
530            let start = file_len - num_bytes_needed as u64;
531            let missing_bytes = scheduler
532                .submit_single(start..start + num_bytes_missing, 0)
533                .await?;
534            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
535            combined.extend(missing_bytes);
536            combined.extend(data);
537            Ok(combined.freeze())
538        }
539    }
540
541    fn do_decode_gbo_table(
542        gbo_bytes: &Bytes,
543        footer: &Footer,
544        version: LanceFileVersion,
545    ) -> Result<Vec<BufferDescriptor>> {
546        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
547
548        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
549        for _ in 0..footer.num_global_buffers {
550            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
551            assert!(
552                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
553            );
554            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
555            global_buffers.push(BufferDescriptor {
556                position: buf_pos,
557                size: buf_size,
558            });
559        }
560
561        Ok(global_buffers)
562    }
563
564    async fn decode_gbo_table(
565        tail_bytes: &Bytes,
566        file_len: u64,
567        scheduler: &FileScheduler,
568        footer: &Footer,
569        version: LanceFileVersion,
570    ) -> Result<Vec<BufferDescriptor>> {
571        // This could, in theory, trigger another IOP but the GBO table should never be large
572        // enough for that to happen
573        let gbo_bytes = Self::optimistic_tail_read(
574            tail_bytes,
575            footer.global_buff_offsets_start,
576            scheduler,
577            file_len,
578        )
579        .await?;
580        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
581    }
582
583    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
584        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
585        let pb_schema = file_descriptor.schema.unwrap();
586        let num_rows = file_descriptor.length;
587        let fields_with_meta = FieldsWithMeta {
588            fields: Fields(pb_schema.fields),
589            metadata: pb_schema.metadata,
590        };
591        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
592        Ok((num_rows, schema))
593    }
594
595    // TODO: Support late projection.  Currently, if we want to perform a
596    // projected read of a file, we load all of the column metadata, and then
597    // only read the column data that is requested.  This is fine for most cases.
598    //
599    // However, if there are many columns then loading all of the column metadata
600    // may be expensive.  We should support a mode where we only load the column
601    // metadata for the columns that are requested (the file format supports this).
602    //
603    // The main challenge is that we either need to ignore the column metadata cache
604    // or have a more sophisticated cache that can cache per-column metadata.
605    //
606    // Also, if the number of columns is fairly small, it's faster to read them as a
607    // single IOP, but we can fix this through coalescing.
608    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
609        // 1. read the footer
610        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
611        let footer = Self::decode_footer(&tail_bytes)?;
612
613        let file_version = LanceFileVersion::try_from_major_minor(
614            footer.major_version as u32,
615            footer.minor_version as u32,
616        )?;
617
618        let gbo_table =
619            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
620        if gbo_table.is_empty() {
621            return Err(Error::Internal {
622                message: "File did not contain any global buffers, schema expected".to_string(),
623                location: location!(),
624            });
625        }
626        let schema_start = gbo_table[0].position;
627        let schema_size = gbo_table[0].size;
628
629        let num_footer_bytes = file_len - schema_start;
630
631        // By default we read all column metadatas.  We do NOT read the column metadata buffers
632        // at this point.  We only want to read the column metadata for columns we are actually loading.
633        let all_metadata_bytes =
634            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
635
636        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
637        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
638
639        // Next, read the metadata for the columns
640        // This is both the column metadata and the CMO table
641        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
642        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
643        let column_metadata_bytes =
644            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
645        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
646
647        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
648        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
649        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
650
651        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
652
653        Ok(CachedFileMetadata {
654            file_schema: Arc::new(schema),
655            column_metadatas,
656            column_infos,
657            num_rows,
658            num_data_bytes,
659            num_column_metadata_bytes,
660            num_global_buffer_bytes,
661            num_footer_bytes,
662            file_buffers: gbo_table,
663            major_version: footer.major_version,
664            minor_version: footer.minor_version,
665        })
666    }
667
668    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
669        match &encoding.location {
670            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
671            Some(pbfile::encoding::Location::Direct(encoding)) => {
672                let encoding_buf = Bytes::from(encoding.encoding.clone());
673                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
674                encoding_any.to_msg::<M>().unwrap()
675            }
676            Some(pbfile::encoding::Location::None(_)) => panic!(),
677            None => panic!(),
678        }
679    }
680
681    fn meta_to_col_infos(
682        column_metadatas: &[pbfile::ColumnMetadata],
683        file_version: LanceFileVersion,
684    ) -> Vec<Arc<ColumnInfo>> {
685        column_metadatas
686            .iter()
687            .enumerate()
688            .map(|(col_idx, col_meta)| {
689                let page_infos = col_meta
690                    .pages
691                    .iter()
692                    .map(|page| {
693                        let num_rows = page.length;
694                        let encoding = match file_version {
695                            LanceFileVersion::V2_0 => {
696                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
697                                    page.encoding.as_ref().unwrap(),
698                                ))
699                            }
700                            _ => PageEncoding::Structural(Self::fetch_encoding::<
701                                pbenc21::PageLayout,
702                            >(
703                                page.encoding.as_ref().unwrap()
704                            )),
705                        };
706                        let buffer_offsets_and_sizes = Arc::from(
707                            page.buffer_offsets
708                                .iter()
709                                .zip(page.buffer_sizes.iter())
710                                .map(|(offset, size)| {
711                                    // Starting with version 2.1 we can assert that page buffers are aligned
712                                    assert!(
713                                        file_version < LanceFileVersion::V2_1
714                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
715                                    );
716                                    (*offset, *size)
717                                })
718                                .collect::<Vec<_>>(),
719                        );
720                        PageInfo {
721                            buffer_offsets_and_sizes,
722                            encoding,
723                            num_rows,
724                            priority: page.priority,
725                        }
726                    })
727                    .collect::<Vec<_>>();
728                let buffer_offsets_and_sizes = Arc::from(
729                    col_meta
730                        .buffer_offsets
731                        .iter()
732                        .zip(col_meta.buffer_sizes.iter())
733                        .map(|(offset, size)| (*offset, *size))
734                        .collect::<Vec<_>>(),
735                );
736                Arc::new(ColumnInfo {
737                    index: col_idx as u32,
738                    page_infos: Arc::from(page_infos),
739                    buffer_offsets_and_sizes,
740                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
741                })
742            })
743            .collect::<Vec<_>>()
744    }
745
746    fn validate_projection(
747        projection: &ReaderProjection,
748        metadata: &CachedFileMetadata,
749    ) -> Result<()> {
750        if projection.schema.fields.is_empty() {
751            return Err(Error::invalid_input(
752                "Attempt to read zero columns from the file, at least one column must be specified"
753                    .to_string(),
754                location!(),
755            ));
756        }
757        let mut column_indices_seen = BTreeSet::new();
758        for column_index in &projection.column_indices {
759            if !column_indices_seen.insert(*column_index) {
760                return Err(Error::invalid_input(
761                    format!(
762                        "The projection specified the column index {} more than once",
763                        column_index
764                    ),
765                    location!(),
766                ));
767            }
768            if *column_index >= metadata.column_infos.len() as u32 {
769                return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
770            }
771        }
772        Ok(())
773    }
774
775    /// Opens a new file reader without any pre-existing knowledge
776    ///
777    /// This will read the file schema from the file itself and thus requires a bit more I/O
778    ///
779    /// A `base_projection` can also be provided.  If provided, then the projection will apply
780    /// to all reads from the file that do not specify their own projection.
781    pub async fn try_open(
782        scheduler: FileScheduler,
783        base_projection: Option<ReaderProjection>,
784        decoder_plugins: Arc<DecoderPlugins>,
785        cache: &LanceCache,
786        options: FileReaderOptions,
787    ) -> Result<Self> {
788        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
789        let path = scheduler.reader().path().clone();
790
791        // Create LanceEncodingsIo with read chunk size from options
792        let encodings_io =
793            LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
794
795        Self::try_open_with_file_metadata(
796            Arc::new(encodings_io),
797            path,
798            base_projection,
799            decoder_plugins,
800            file_metadata,
801            cache,
802            options,
803        )
804        .await
805    }
806
807    /// Same as `try_open` but with the file metadata already loaded.
808    ///
809    /// This method also can accept any kind of `EncodingsIo` implementation allowing
810    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
811    /// disks it may be better to avoid asynchronous overhead).
812    pub async fn try_open_with_file_metadata(
813        scheduler: Arc<dyn EncodingsIo>,
814        path: Path,
815        base_projection: Option<ReaderProjection>,
816        decoder_plugins: Arc<DecoderPlugins>,
817        file_metadata: Arc<CachedFileMetadata>,
818        cache: &LanceCache,
819        options: FileReaderOptions,
820    ) -> Result<Self> {
821        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
822
823        if let Some(base_projection) = base_projection.as_ref() {
824            Self::validate_projection(base_projection, &file_metadata)?;
825        }
826        let num_rows = file_metadata.num_rows;
827        Ok(Self {
828            scheduler,
829            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
830                file_metadata.file_schema.as_ref(),
831                file_metadata.version(),
832            )),
833            num_rows,
834            metadata: file_metadata,
835            decoder_plugins,
836            cache,
837            options,
838        })
839    }
840
841    // The actual decoder needs all the column infos that make up a type.  In other words, if
842    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
843    //
844    // This is a file reader concern because the file reader needs to support late projection of columns
845    // and so it will need to figure this out anyways.
846    //
847    // It's a bit of a tricky process though because the number of column infos may depend on the
848    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
849    // only be a single column in the file (and not 3).
850    //
851    // At the moment this method words because our rules are simple and we just repeat them here.  See
852    // Self::default_projection for a similar problem.  In the future this is something the encodings
853    // registry will need to figure out.
854    fn collect_columns_from_projection(
855        &self,
856        _projection: &ReaderProjection,
857    ) -> Result<Vec<Arc<ColumnInfo>>> {
858        Ok(self.metadata.column_infos.to_vec())
859    }
860
861    #[allow(clippy::too_many_arguments)]
862    fn do_read_range(
863        column_infos: Vec<Arc<ColumnInfo>>,
864        io: Arc<dyn EncodingsIo>,
865        cache: Arc<LanceCache>,
866        num_rows: u64,
867        decoder_plugins: Arc<DecoderPlugins>,
868        range: Range<u64>,
869        batch_size: u32,
870        projection: ReaderProjection,
871        filter: FilterExpression,
872        decoder_config: DecoderConfig,
873    ) -> Result<BoxStream<'static, ReadBatchTask>> {
874        debug!(
875            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
876            range,
877            batch_size,
878            num_rows,
879            column_infos.len(),
880            projection.schema.fields.len(),
881        );
882
883        let config = SchedulerDecoderConfig {
884            batch_size,
885            cache,
886            decoder_plugins,
887            io,
888            decoder_config,
889        };
890
891        let requested_rows = RequestedRows::Ranges(vec![range]);
892
893        Ok(schedule_and_decode(
894            column_infos,
895            requested_rows,
896            filter,
897            projection.column_indices,
898            projection.schema,
899            config,
900        ))
901    }
902
903    fn read_range(
904        &self,
905        range: Range<u64>,
906        batch_size: u32,
907        projection: ReaderProjection,
908        filter: FilterExpression,
909    ) -> Result<BoxStream<'static, ReadBatchTask>> {
910        // Create and initialize the stream
911        Self::do_read_range(
912            self.collect_columns_from_projection(&projection)?,
913            self.scheduler.clone(),
914            self.cache.clone(),
915            self.num_rows,
916            self.decoder_plugins.clone(),
917            range,
918            batch_size,
919            projection,
920            filter,
921            self.options.decoder_config.clone(),
922        )
923    }
924
925    #[allow(clippy::too_many_arguments)]
926    fn do_take_rows(
927        column_infos: Vec<Arc<ColumnInfo>>,
928        io: Arc<dyn EncodingsIo>,
929        cache: Arc<LanceCache>,
930        decoder_plugins: Arc<DecoderPlugins>,
931        indices: Vec<u64>,
932        batch_size: u32,
933        projection: ReaderProjection,
934        filter: FilterExpression,
935        decoder_config: DecoderConfig,
936    ) -> Result<BoxStream<'static, ReadBatchTask>> {
937        debug!(
938            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
939            indices.len(),
940            indices[0],
941            indices[indices.len() - 1],
942            batch_size,
943            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
944        );
945
946        let config = SchedulerDecoderConfig {
947            batch_size,
948            cache,
949            decoder_plugins,
950            io,
951            decoder_config,
952        };
953
954        let requested_rows = RequestedRows::Indices(indices);
955
956        Ok(schedule_and_decode(
957            column_infos,
958            requested_rows,
959            filter,
960            projection.column_indices,
961            projection.schema,
962            config,
963        ))
964    }
965
966    fn take_rows(
967        &self,
968        indices: Vec<u64>,
969        batch_size: u32,
970        projection: ReaderProjection,
971    ) -> Result<BoxStream<'static, ReadBatchTask>> {
972        // Create and initialize the stream
973        Self::do_take_rows(
974            self.collect_columns_from_projection(&projection)?,
975            self.scheduler.clone(),
976            self.cache.clone(),
977            self.decoder_plugins.clone(),
978            indices,
979            batch_size,
980            projection,
981            FilterExpression::no_filter(),
982            self.options.decoder_config.clone(),
983        )
984    }
985
986    #[allow(clippy::too_many_arguments)]
987    fn do_read_ranges(
988        column_infos: Vec<Arc<ColumnInfo>>,
989        io: Arc<dyn EncodingsIo>,
990        cache: Arc<LanceCache>,
991        decoder_plugins: Arc<DecoderPlugins>,
992        ranges: Vec<Range<u64>>,
993        batch_size: u32,
994        projection: ReaderProjection,
995        filter: FilterExpression,
996        decoder_config: DecoderConfig,
997    ) -> Result<BoxStream<'static, ReadBatchTask>> {
998        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
999        debug!(
1000            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1001            ranges.len(),
1002            num_rows,
1003            ranges[0].start,
1004            ranges[ranges.len() - 1].end,
1005            batch_size,
1006            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1007        );
1008
1009        let config = SchedulerDecoderConfig {
1010            batch_size,
1011            cache,
1012            decoder_plugins,
1013            io,
1014            decoder_config,
1015        };
1016
1017        let requested_rows = RequestedRows::Ranges(ranges);
1018
1019        Ok(schedule_and_decode(
1020            column_infos,
1021            requested_rows,
1022            filter,
1023            projection.column_indices,
1024            projection.schema,
1025            config,
1026        ))
1027    }
1028
1029    fn read_ranges(
1030        &self,
1031        ranges: Vec<Range<u64>>,
1032        batch_size: u32,
1033        projection: ReaderProjection,
1034        filter: FilterExpression,
1035    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1036        Self::do_read_ranges(
1037            self.collect_columns_from_projection(&projection)?,
1038            self.scheduler.clone(),
1039            self.cache.clone(),
1040            self.decoder_plugins.clone(),
1041            ranges,
1042            batch_size,
1043            projection,
1044            filter,
1045            self.options.decoder_config.clone(),
1046        )
1047    }
1048
1049    /// Creates a stream of "read tasks" to read the data from the file
1050    ///
1051    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1052    /// of record batches it returns a stream of "read tasks".
1053    ///
1054    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1055    ///
1056    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1057    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1058    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1059    pub fn read_tasks(
1060        &self,
1061        params: ReadBatchParams,
1062        batch_size: u32,
1063        projection: Option<ReaderProjection>,
1064        filter: FilterExpression,
1065    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1066        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1067        Self::validate_projection(&projection, &self.metadata)?;
1068        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1069            if bound > self.num_rows || bound == self.num_rows && inclusive {
1070                Err(Error::invalid_input(
1071                    format!(
1072                        "cannot read {:?} from file with {} rows",
1073                        params, self.num_rows
1074                    ),
1075                    location!(),
1076                ))
1077            } else {
1078                Ok(())
1079            }
1080        };
1081        match &params {
1082            ReadBatchParams::Indices(indices) => {
1083                for idx in indices {
1084                    match idx {
1085                        None => {
1086                            return Err(Error::invalid_input(
1087                                "Null value in indices array",
1088                                location!(),
1089                            ));
1090                        }
1091                        Some(idx) => {
1092                            verify_bound(&params, idx as u64, true)?;
1093                        }
1094                    }
1095                }
1096                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1097                self.take_rows(indices, batch_size, projection)
1098            }
1099            ReadBatchParams::Range(range) => {
1100                verify_bound(&params, range.end as u64, false)?;
1101                self.read_range(
1102                    range.start as u64..range.end as u64,
1103                    batch_size,
1104                    projection,
1105                    filter,
1106                )
1107            }
1108            ReadBatchParams::Ranges(ranges) => {
1109                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1110                for range in ranges.as_ref() {
1111                    verify_bound(&params, range.end, false)?;
1112                    ranges_u64.push(range.start..range.end);
1113                }
1114                self.read_ranges(ranges_u64, batch_size, projection, filter)
1115            }
1116            ReadBatchParams::RangeFrom(range) => {
1117                verify_bound(&params, range.start as u64, true)?;
1118                self.read_range(
1119                    range.start as u64..self.num_rows,
1120                    batch_size,
1121                    projection,
1122                    filter,
1123                )
1124            }
1125            ReadBatchParams::RangeTo(range) => {
1126                verify_bound(&params, range.end as u64, false)?;
1127                self.read_range(0..range.end as u64, batch_size, projection, filter)
1128            }
1129            ReadBatchParams::RangeFull => {
1130                self.read_range(0..self.num_rows, batch_size, projection, filter)
1131            }
1132        }
1133    }
1134
1135    /// Reads data from the file as a stream of record batches
1136    ///
1137    /// * `params` - Specifies the range (or indices) of data to read
1138    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1139    ///   if it is the last batch or if it is not possible to create a batch of the
1140    ///   requested size.
1141    ///
1142    ///   For example, if the batch size is 1024 and one of the columns is a string
1143    ///   column then there may be some ranges of 1024 rows that contain more than
1144    ///   2^31 bytes of string data (which is the maximum size of a string column
1145    ///   in Arrow).  In this case smaller batches may be emitted.
1146    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1147    ///   amount of CPU parallelism of the read.  In other words it controls how many
1148    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1149    ///   of the read (how many I/O requests are in flight at once).
1150    ///
1151    ///   This parameter also is also related to backpressure.  If the consumer of the
1152    ///   stream is slow then the reader will build up RAM.
1153    /// * `projection` - A projection to apply to the read.  This controls which columns
1154    ///   are read from the file.  The projection is NOT applied on top of the base
1155    ///   projection.  The projection is applied directly to the file schema.
1156    pub fn read_stream_projected(
1157        &self,
1158        params: ReadBatchParams,
1159        batch_size: u32,
1160        batch_readahead: u32,
1161        projection: ReaderProjection,
1162        filter: FilterExpression,
1163    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1164        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1165        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1166        let batch_stream = tasks_stream
1167            .map(|task| task.task)
1168            .buffered(batch_readahead as usize)
1169            .boxed();
1170        Ok(Box::pin(RecordBatchStreamAdapter::new(
1171            arrow_schema,
1172            batch_stream,
1173        )))
1174    }
1175
1176    fn take_rows_blocking(
1177        &self,
1178        indices: Vec<u64>,
1179        batch_size: u32,
1180        projection: ReaderProjection,
1181        filter: FilterExpression,
1182    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1183        let column_infos = self.collect_columns_from_projection(&projection)?;
1184        debug!(
1185            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1186            indices.len(),
1187            indices[0],
1188            indices[indices.len() - 1],
1189            batch_size,
1190            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1191        );
1192
1193        let config = SchedulerDecoderConfig {
1194            batch_size,
1195            cache: self.cache.clone(),
1196            decoder_plugins: self.decoder_plugins.clone(),
1197            io: self.scheduler.clone(),
1198            decoder_config: self.options.decoder_config.clone(),
1199        };
1200
1201        let requested_rows = RequestedRows::Indices(indices);
1202
1203        schedule_and_decode_blocking(
1204            column_infos,
1205            requested_rows,
1206            filter,
1207            projection.column_indices,
1208            projection.schema,
1209            config,
1210        )
1211    }
1212
1213    fn read_ranges_blocking(
1214        &self,
1215        ranges: Vec<Range<u64>>,
1216        batch_size: u32,
1217        projection: ReaderProjection,
1218        filter: FilterExpression,
1219    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1220        let column_infos = self.collect_columns_from_projection(&projection)?;
1221        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1222        debug!(
1223            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1224            ranges.len(),
1225            num_rows,
1226            ranges[0].start,
1227            ranges[ranges.len() - 1].end,
1228            batch_size,
1229            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1230        );
1231
1232        let config = SchedulerDecoderConfig {
1233            batch_size,
1234            cache: self.cache.clone(),
1235            decoder_plugins: self.decoder_plugins.clone(),
1236            io: self.scheduler.clone(),
1237            decoder_config: self.options.decoder_config.clone(),
1238        };
1239
1240        let requested_rows = RequestedRows::Ranges(ranges);
1241
1242        schedule_and_decode_blocking(
1243            column_infos,
1244            requested_rows,
1245            filter,
1246            projection.column_indices,
1247            projection.schema,
1248            config,
1249        )
1250    }
1251
1252    fn read_range_blocking(
1253        &self,
1254        range: Range<u64>,
1255        batch_size: u32,
1256        projection: ReaderProjection,
1257        filter: FilterExpression,
1258    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1259        let column_infos = self.collect_columns_from_projection(&projection)?;
1260        let num_rows = self.num_rows;
1261
1262        debug!(
1263            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1264            range,
1265            batch_size,
1266            num_rows,
1267            column_infos.len(),
1268            projection.schema.fields.len(),
1269        );
1270
1271        let config = SchedulerDecoderConfig {
1272            batch_size,
1273            cache: self.cache.clone(),
1274            decoder_plugins: self.decoder_plugins.clone(),
1275            io: self.scheduler.clone(),
1276            decoder_config: self.options.decoder_config.clone(),
1277        };
1278
1279        let requested_rows = RequestedRows::Ranges(vec![range]);
1280
1281        schedule_and_decode_blocking(
1282            column_infos,
1283            requested_rows,
1284            filter,
1285            projection.column_indices,
1286            projection.schema,
1287            config,
1288        )
1289    }
1290
1291    /// Read data from the file as an iterator of record batches
1292    ///
1293    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1294    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1295    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1296    ///
1297    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1298    /// use this method) because we can parallelize the decode.
1299    ///
1300    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1301    /// from a spawn_blocking context.
1302    pub fn read_stream_projected_blocking(
1303        &self,
1304        params: ReadBatchParams,
1305        batch_size: u32,
1306        projection: Option<ReaderProjection>,
1307        filter: FilterExpression,
1308    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1309        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1310        Self::validate_projection(&projection, &self.metadata)?;
1311        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1312            if bound > self.num_rows || bound == self.num_rows && inclusive {
1313                Err(Error::invalid_input(
1314                    format!(
1315                        "cannot read {:?} from file with {} rows",
1316                        params, self.num_rows
1317                    ),
1318                    location!(),
1319                ))
1320            } else {
1321                Ok(())
1322            }
1323        };
1324        match &params {
1325            ReadBatchParams::Indices(indices) => {
1326                for idx in indices {
1327                    match idx {
1328                        None => {
1329                            return Err(Error::invalid_input(
1330                                "Null value in indices array",
1331                                location!(),
1332                            ));
1333                        }
1334                        Some(idx) => {
1335                            verify_bound(&params, idx as u64, true)?;
1336                        }
1337                    }
1338                }
1339                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1340                self.take_rows_blocking(indices, batch_size, projection, filter)
1341            }
1342            ReadBatchParams::Range(range) => {
1343                verify_bound(&params, range.end as u64, false)?;
1344                self.read_range_blocking(
1345                    range.start as u64..range.end as u64,
1346                    batch_size,
1347                    projection,
1348                    filter,
1349                )
1350            }
1351            ReadBatchParams::Ranges(ranges) => {
1352                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1353                for range in ranges.as_ref() {
1354                    verify_bound(&params, range.end, false)?;
1355                    ranges_u64.push(range.start..range.end);
1356                }
1357                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1358            }
1359            ReadBatchParams::RangeFrom(range) => {
1360                verify_bound(&params, range.start as u64, true)?;
1361                self.read_range_blocking(
1362                    range.start as u64..self.num_rows,
1363                    batch_size,
1364                    projection,
1365                    filter,
1366                )
1367            }
1368            ReadBatchParams::RangeTo(range) => {
1369                verify_bound(&params, range.end as u64, false)?;
1370                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1371            }
1372            ReadBatchParams::RangeFull => {
1373                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1374            }
1375        }
1376    }
1377
1378    /// Reads data from the file as a stream of record batches
1379    ///
1380    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1381    /// provided when the file was opened (or reads all columns if the file was
1382    /// opened without a base projection)
1383    pub fn read_stream(
1384        &self,
1385        params: ReadBatchParams,
1386        batch_size: u32,
1387        batch_readahead: u32,
1388        filter: FilterExpression,
1389    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1390        self.read_stream_projected(
1391            params,
1392            batch_size,
1393            batch_readahead,
1394            self.base_projection.clone(),
1395            filter,
1396        )
1397    }
1398
1399    pub fn schema(&self) -> &Arc<Schema> {
1400        &self.metadata.file_schema
1401    }
1402}
1403
1404/// Inspects a page and returns a String describing the page's encoding
1405pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1406    if let Some(encoding) = &page.encoding {
1407        if let Some(style) = &encoding.location {
1408            match style {
1409                pbfile::encoding::Location::Indirect(indirect) => {
1410                    format!(
1411                        "IndirectEncoding(pos={},size={})",
1412                        indirect.buffer_location, indirect.buffer_length
1413                    )
1414                }
1415                pbfile::encoding::Location::Direct(direct) => {
1416                    let encoding_any =
1417                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1418                            .expect("failed to deserialize encoding as protobuf");
1419                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1420                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1421                        match encoding {
1422                            Ok(encoding) => {
1423                                format!("{:#?}", encoding)
1424                            }
1425                            Err(err) => {
1426                                format!("Unsupported(decode_err={})", err)
1427                            }
1428                        }
1429                    } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1430                        let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1431                        match encoding {
1432                            Ok(encoding) => {
1433                                format!("{:#?}", encoding)
1434                            }
1435                            Err(err) => {
1436                                format!("Unsupported(decode_err={})", err)
1437                            }
1438                        }
1439                    } else {
1440                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1441                    }
1442                }
1443                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1444            }
1445        } else {
1446            "MISSING STYLE".to_string()
1447        }
1448    } else {
1449        "MISSING".to_string()
1450    }
1451}
1452
1453pub trait EncodedBatchReaderExt {
1454    fn try_from_mini_lance(
1455        bytes: Bytes,
1456        schema: &Schema,
1457        version: LanceFileVersion,
1458    ) -> Result<Self>
1459    where
1460        Self: Sized;
1461    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1462    where
1463        Self: Sized;
1464}
1465
1466impl EncodedBatchReaderExt for EncodedBatch {
1467    fn try_from_mini_lance(
1468        bytes: Bytes,
1469        schema: &Schema,
1470        file_version: LanceFileVersion,
1471    ) -> Result<Self>
1472    where
1473        Self: Sized,
1474    {
1475        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1476        let footer = FileReader::decode_footer(&bytes)?;
1477
1478        // Next, read the metadata for the columns
1479        // This is both the column metadata and the CMO table
1480        let column_metadata_start = footer.column_meta_start as usize;
1481        let column_metadata_end = footer.global_buff_offsets_start as usize;
1482        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1483        let column_metadatas =
1484            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1485
1486        let file_version = LanceFileVersion::try_from_major_minor(
1487            footer.major_version as u32,
1488            footer.minor_version as u32,
1489        )?;
1490
1491        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1492
1493        Ok(Self {
1494            data: bytes,
1495            num_rows: page_table
1496                .first()
1497                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1498                .unwrap_or(0),
1499            page_table,
1500            top_level_columns: projection.column_indices,
1501            schema: Arc::new(schema.clone()),
1502        })
1503    }
1504
1505    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1506    where
1507        Self: Sized,
1508    {
1509        let footer = FileReader::decode_footer(&bytes)?;
1510        let file_version = LanceFileVersion::try_from_major_minor(
1511            footer.major_version as u32,
1512            footer.minor_version as u32,
1513        )?;
1514
1515        let gbo_table = FileReader::do_decode_gbo_table(
1516            &bytes.slice(footer.global_buff_offsets_start as usize..),
1517            &footer,
1518            file_version,
1519        )?;
1520        if gbo_table.is_empty() {
1521            return Err(Error::Internal {
1522                message: "File did not contain any global buffers, schema expected".to_string(),
1523                location: location!(),
1524            });
1525        }
1526        let schema_start = gbo_table[0].position as usize;
1527        let schema_size = gbo_table[0].size as usize;
1528
1529        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1530        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1531        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1532
1533        // Next, read the metadata for the columns
1534        // This is both the column metadata and the CMO table
1535        let column_metadata_start = footer.column_meta_start as usize;
1536        let column_metadata_end = footer.global_buff_offsets_start as usize;
1537        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1538        let column_metadatas =
1539            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1540
1541        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1542
1543        Ok(Self {
1544            data: bytes,
1545            num_rows: page_table
1546                .first()
1547                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1548                .unwrap_or(0),
1549            page_table,
1550            top_level_columns: projection.column_indices,
1551            schema: Arc::new(schema),
1552        })
1553    }
1554}
1555
1556#[cfg(test)]
1557pub mod tests {
1558    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1559
1560    use arrow_array::{
1561        types::{Float64Type, Int32Type},
1562        RecordBatch, UInt32Array,
1563    };
1564    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1565    use bytes::Bytes;
1566    use futures::{prelude::stream::TryStreamExt, StreamExt};
1567    use lance_arrow::RecordBatchExt;
1568    use lance_core::{datatypes::Schema, ArrowResult};
1569    use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1570    use lance_encoding::{
1571        decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1572        encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1573        version::LanceFileVersion,
1574    };
1575    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1576    use log::debug;
1577    use rstest::rstest;
1578    use tokio::sync::mpsc;
1579
1580    use crate::v2::{
1581        reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1582        testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1583        writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1584    };
1585    use lance_encoding::decoder::DecoderConfig;
1586
1587    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1588        let location_type = DataType::Struct(Fields::from(vec![
1589            Field::new("x", DataType::Float64, true),
1590            Field::new("y", DataType::Float64, true),
1591        ]));
1592        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1593
1594        let mut reader = gen_batch()
1595            .col("score", array::rand::<Float64Type>())
1596            .col("location", array::rand_type(&location_type))
1597            .col("categories", array::rand_type(&categories_type))
1598            .col("binary", array::rand_type(&DataType::Binary));
1599        if version <= LanceFileVersion::V2_0 {
1600            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1601        }
1602        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1603
1604        write_lance_file(
1605            reader,
1606            fs,
1607            FileWriterOptions {
1608                format_version: Some(version),
1609                ..Default::default()
1610            },
1611        )
1612        .await
1613    }
1614
1615    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1616
1617    async fn verify_expected(
1618        expected: &[RecordBatch],
1619        mut actual: Pin<Box<dyn RecordBatchStream>>,
1620        read_size: u32,
1621        transform: Option<Transformer>,
1622    ) {
1623        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1624        let mut expected_iter = expected.iter().map(|batch| {
1625            if let Some(transform) = &transform {
1626                transform(batch)
1627            } else {
1628                batch.clone()
1629            }
1630        });
1631        let mut next_expected = expected_iter.next().unwrap().clone();
1632        while let Some(actual) = actual.next().await {
1633            let mut actual = actual.unwrap();
1634            let mut rows_to_verify = actual.num_rows() as u32;
1635            let expected_length = remaining.min(read_size);
1636            assert_eq!(expected_length, rows_to_verify);
1637
1638            while rows_to_verify > 0 {
1639                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1640                assert_eq!(
1641                    next_expected.slice(0, next_slice_len as usize),
1642                    actual.slice(0, next_slice_len as usize)
1643                );
1644                remaining -= next_slice_len;
1645                rows_to_verify -= next_slice_len;
1646                if remaining > 0 {
1647                    if next_slice_len == next_expected.num_rows() as u32 {
1648                        next_expected = expected_iter.next().unwrap().clone();
1649                    } else {
1650                        next_expected = next_expected.slice(
1651                            next_slice_len as usize,
1652                            next_expected.num_rows() - next_slice_len as usize,
1653                        );
1654                    }
1655                }
1656                if rows_to_verify > 0 {
1657                    actual = actual.slice(
1658                        next_slice_len as usize,
1659                        actual.num_rows() - next_slice_len as usize,
1660                    );
1661                }
1662            }
1663        }
1664        assert_eq!(remaining, 0);
1665    }
1666
1667    #[tokio::test]
1668    async fn test_round_trip() {
1669        let fs = FsFixture::default();
1670
1671        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1672
1673        for read_size in [32, 1024, 1024 * 1024] {
1674            let file_scheduler = fs
1675                .scheduler
1676                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1677                .await
1678                .unwrap();
1679            let file_reader = FileReader::try_open(
1680                file_scheduler,
1681                None,
1682                Arc::<DecoderPlugins>::default(),
1683                &test_cache(),
1684                FileReaderOptions::default(),
1685            )
1686            .await
1687            .unwrap();
1688
1689            let schema = file_reader.schema();
1690            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1691
1692            let batch_stream = file_reader
1693                .read_stream(
1694                    lance_io::ReadBatchParams::RangeFull,
1695                    read_size,
1696                    16,
1697                    FilterExpression::no_filter(),
1698                )
1699                .unwrap();
1700
1701            verify_expected(&data, batch_stream, read_size, None).await;
1702        }
1703    }
1704
1705    #[rstest]
1706    #[test_log::test(tokio::test)]
1707    async fn test_encoded_batch_round_trip(
1708        // TODO: Add V2_1 (currently fails)
1709        #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1710    ) {
1711        let data = gen_batch()
1712            .col("x", array::rand::<Int32Type>())
1713            .col("y", array::rand_utf8(ByteCount::from(16), false))
1714            .into_batch_rows(RowCount::from(10000))
1715            .unwrap();
1716
1717        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1718
1719        let encoding_options = EncodingOptions {
1720            cache_bytes_per_column: 4096,
1721            max_page_bytes: 32 * 1024 * 1024,
1722            keep_original_array: true,
1723            buffer_alignment: 64,
1724        };
1725
1726        let encoding_strategy = default_encoding_strategy(version);
1727
1728        let encoded_batch = encode_batch(
1729            &data,
1730            lance_schema.clone(),
1731            encoding_strategy.as_ref(),
1732            &encoding_options,
1733        )
1734        .await
1735        .unwrap();
1736
1737        // Test self described
1738        let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1739
1740        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1741
1742        let decoded = decode_batch(
1743            &decoded_batch,
1744            &FilterExpression::no_filter(),
1745            Arc::<DecoderPlugins>::default(),
1746            false,
1747            version,
1748            None,
1749        )
1750        .await
1751        .unwrap();
1752
1753        assert_eq!(data, decoded);
1754
1755        // Test mini
1756        let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1757        let decoded_batch =
1758            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1759                .unwrap();
1760        let decoded = decode_batch(
1761            &decoded_batch,
1762            &FilterExpression::no_filter(),
1763            Arc::<DecoderPlugins>::default(),
1764            false,
1765            version,
1766            None,
1767        )
1768        .await
1769        .unwrap();
1770
1771        assert_eq!(data, decoded);
1772    }
1773
1774    #[rstest]
1775    #[test_log::test(tokio::test)]
1776    async fn test_projection(
1777        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1778    ) {
1779        let fs = FsFixture::default();
1780
1781        let written_file = create_some_file(&fs, version).await;
1782        let file_scheduler = fs
1783            .scheduler
1784            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1785            .await
1786            .unwrap();
1787
1788        let field_id_mapping = written_file
1789            .field_id_mapping
1790            .iter()
1791            .copied()
1792            .collect::<BTreeMap<_, _>>();
1793
1794        let empty_projection = ReaderProjection {
1795            column_indices: Vec::default(),
1796            schema: Arc::new(Schema::default()),
1797        };
1798
1799        for columns in [
1800            vec!["score"],
1801            vec!["location"],
1802            vec!["categories"],
1803            vec!["score.x"],
1804            vec!["score", "categories"],
1805            vec!["score", "location"],
1806            vec!["location", "categories"],
1807            vec!["score.y", "location", "categories"],
1808        ] {
1809            debug!("Testing round trip with projection {:?}", columns);
1810            for use_field_ids in [true, false] {
1811                // We can specify the projection as part of the read operation via read_stream_projected
1812                let file_reader = FileReader::try_open(
1813                    file_scheduler.clone(),
1814                    None,
1815                    Arc::<DecoderPlugins>::default(),
1816                    &test_cache(),
1817                    FileReaderOptions::default(),
1818                )
1819                .await
1820                .unwrap();
1821
1822                let projected_schema = written_file.schema.project(&columns).unwrap();
1823                let projection = if use_field_ids {
1824                    ReaderProjection::from_field_ids(
1825                        file_reader.metadata.version(),
1826                        &projected_schema,
1827                        &field_id_mapping,
1828                    )
1829                    .unwrap()
1830                } else {
1831                    ReaderProjection::from_column_names(
1832                        file_reader.metadata.version(),
1833                        &written_file.schema,
1834                        &columns,
1835                    )
1836                    .unwrap()
1837                };
1838
1839                let batch_stream = file_reader
1840                    .read_stream_projected(
1841                        lance_io::ReadBatchParams::RangeFull,
1842                        1024,
1843                        16,
1844                        projection.clone(),
1845                        FilterExpression::no_filter(),
1846                    )
1847                    .unwrap();
1848
1849                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1850                verify_expected(
1851                    &written_file.data,
1852                    batch_stream,
1853                    1024,
1854                    Some(Box::new(move |batch: &RecordBatch| {
1855                        batch.project_by_schema(&projection_arrow).unwrap()
1856                    })),
1857                )
1858                .await;
1859
1860                // We can also specify the projection as a base projection when we open the file
1861                let file_reader = FileReader::try_open(
1862                    file_scheduler.clone(),
1863                    Some(projection.clone()),
1864                    Arc::<DecoderPlugins>::default(),
1865                    &test_cache(),
1866                    FileReaderOptions::default(),
1867                )
1868                .await
1869                .unwrap();
1870
1871                let batch_stream = file_reader
1872                    .read_stream(
1873                        lance_io::ReadBatchParams::RangeFull,
1874                        1024,
1875                        16,
1876                        FilterExpression::no_filter(),
1877                    )
1878                    .unwrap();
1879
1880                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1881                verify_expected(
1882                    &written_file.data,
1883                    batch_stream,
1884                    1024,
1885                    Some(Box::new(move |batch: &RecordBatch| {
1886                        batch.project_by_schema(&projection_arrow).unwrap()
1887                    })),
1888                )
1889                .await;
1890
1891                assert!(file_reader
1892                    .read_stream_projected(
1893                        lance_io::ReadBatchParams::RangeFull,
1894                        1024,
1895                        16,
1896                        empty_projection.clone(),
1897                        FilterExpression::no_filter(),
1898                    )
1899                    .is_err());
1900            }
1901        }
1902
1903        assert!(FileReader::try_open(
1904            file_scheduler.clone(),
1905            Some(empty_projection),
1906            Arc::<DecoderPlugins>::default(),
1907            &test_cache(),
1908            FileReaderOptions::default(),
1909        )
1910        .await
1911        .is_err());
1912
1913        let arrow_schema = ArrowSchema::new(vec![
1914            Field::new("x", DataType::Int32, true),
1915            Field::new("y", DataType::Int32, true),
1916        ]);
1917        let schema = Schema::try_from(&arrow_schema).unwrap();
1918
1919        let projection_with_dupes = ReaderProjection {
1920            column_indices: vec![0, 0],
1921            schema: Arc::new(schema),
1922        };
1923
1924        assert!(FileReader::try_open(
1925            file_scheduler.clone(),
1926            Some(projection_with_dupes),
1927            Arc::<DecoderPlugins>::default(),
1928            &test_cache(),
1929            FileReaderOptions::default(),
1930        )
1931        .await
1932        .is_err());
1933    }
1934
1935    #[test_log::test(tokio::test)]
1936    async fn test_compressing_buffer() {
1937        let fs = FsFixture::default();
1938
1939        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1940        let file_scheduler = fs
1941            .scheduler
1942            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1943            .await
1944            .unwrap();
1945
1946        // We can specify the projection as part of the read operation via read_stream_projected
1947        let file_reader = FileReader::try_open(
1948            file_scheduler.clone(),
1949            None,
1950            Arc::<DecoderPlugins>::default(),
1951            &test_cache(),
1952            FileReaderOptions::default(),
1953        )
1954        .await
1955        .unwrap();
1956
1957        let mut projection = written_file.schema.project(&["score"]).unwrap();
1958        for field in projection.fields.iter_mut() {
1959            field
1960                .metadata
1961                .insert("lance:compression".to_string(), "zstd".to_string());
1962        }
1963        let projection = ReaderProjection {
1964            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1965            schema: Arc::new(projection),
1966        };
1967
1968        let batch_stream = file_reader
1969            .read_stream_projected(
1970                lance_io::ReadBatchParams::RangeFull,
1971                1024,
1972                16,
1973                projection.clone(),
1974                FilterExpression::no_filter(),
1975            )
1976            .unwrap();
1977
1978        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1979        verify_expected(
1980            &written_file.data,
1981            batch_stream,
1982            1024,
1983            Some(Box::new(move |batch: &RecordBatch| {
1984                batch.project_by_schema(&projection_arrow).unwrap()
1985            })),
1986        )
1987        .await;
1988    }
1989
1990    #[tokio::test]
1991    async fn test_read_all() {
1992        let fs = FsFixture::default();
1993        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1994        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1995
1996        let file_scheduler = fs
1997            .scheduler
1998            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1999            .await
2000            .unwrap();
2001        let file_reader = FileReader::try_open(
2002            file_scheduler.clone(),
2003            None,
2004            Arc::<DecoderPlugins>::default(),
2005            &test_cache(),
2006            FileReaderOptions::default(),
2007        )
2008        .await
2009        .unwrap();
2010
2011        let batches = file_reader
2012            .read_stream(
2013                lance_io::ReadBatchParams::RangeFull,
2014                total_rows as u32,
2015                16,
2016                FilterExpression::no_filter(),
2017            )
2018            .unwrap()
2019            .try_collect::<Vec<_>>()
2020            .await
2021            .unwrap();
2022        assert_eq!(batches.len(), 1);
2023        assert_eq!(batches[0].num_rows(), total_rows);
2024    }
2025
2026    #[rstest]
2027    #[tokio::test]
2028    async fn test_blocking_take(
2029        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
2030    ) {
2031        let fs = FsFixture::default();
2032        let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2033        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2034
2035        let file_scheduler = fs
2036            .scheduler
2037            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2038            .await
2039            .unwrap();
2040        let file_reader = FileReader::try_open(
2041            file_scheduler.clone(),
2042            Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2043            Arc::<DecoderPlugins>::default(),
2044            &test_cache(),
2045            FileReaderOptions::default(),
2046        )
2047        .await
2048        .unwrap();
2049
2050        let batches = tokio::task::spawn_blocking(move || {
2051            file_reader
2052                .read_stream_projected_blocking(
2053                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2054                    total_rows as u32,
2055                    None,
2056                    FilterExpression::no_filter(),
2057                )
2058                .unwrap()
2059                .collect::<ArrowResult<Vec<_>>>()
2060                .unwrap()
2061        })
2062        .await
2063        .unwrap();
2064
2065        assert_eq!(batches.len(), 1);
2066        assert_eq!(batches[0].num_rows(), 5);
2067        assert_eq!(batches[0].num_columns(), 1);
2068    }
2069
2070    #[tokio::test(flavor = "multi_thread")]
2071    async fn test_drop_in_progress() {
2072        let fs = FsFixture::default();
2073        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2074        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2075
2076        let file_scheduler = fs
2077            .scheduler
2078            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2079            .await
2080            .unwrap();
2081        let file_reader = FileReader::try_open(
2082            file_scheduler.clone(),
2083            None,
2084            Arc::<DecoderPlugins>::default(),
2085            &test_cache(),
2086            FileReaderOptions::default(),
2087        )
2088        .await
2089        .unwrap();
2090
2091        let mut batches = file_reader
2092            .read_stream(
2093                lance_io::ReadBatchParams::RangeFull,
2094                (total_rows / 10) as u32,
2095                16,
2096                FilterExpression::no_filter(),
2097            )
2098            .unwrap();
2099
2100        drop(file_reader);
2101
2102        let batch = batches.next().await.unwrap().unwrap();
2103        assert!(batch.num_rows() > 0);
2104
2105        // Drop in-progress scan
2106        drop(batches);
2107    }
2108
2109    #[tokio::test]
2110    async fn drop_while_scheduling() {
2111        // This is a bit of a white-box test, pokes at the internals.  We want to
2112        // test the case where the read stream is dropped before the scheduling
2113        // thread finishes.  We can't do that in a black-box fashion because the
2114        // scheduling thread runs in the background and there is no easy way to
2115        // pause / gate it.
2116
2117        // It's a regression for a bug where the scheduling thread would panic
2118        // if the stream was dropped before it finished.
2119
2120        let fs = FsFixture::default();
2121        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2122        let total_rows = written_file
2123            .data
2124            .iter()
2125            .map(|batch| batch.num_rows())
2126            .sum::<usize>();
2127
2128        let file_scheduler = fs
2129            .scheduler
2130            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2131            .await
2132            .unwrap();
2133        let file_reader = FileReader::try_open(
2134            file_scheduler.clone(),
2135            None,
2136            Arc::<DecoderPlugins>::default(),
2137            &test_cache(),
2138            FileReaderOptions::default(),
2139        )
2140        .await
2141        .unwrap();
2142
2143        let projection =
2144            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2145        let column_infos = file_reader
2146            .collect_columns_from_projection(&projection)
2147            .unwrap();
2148        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2149            &projection.schema,
2150            &projection.column_indices,
2151            &column_infos,
2152            &vec![],
2153            total_rows as u64,
2154            Arc::<DecoderPlugins>::default(),
2155            file_reader.scheduler.clone(),
2156            test_cache(),
2157            &FilterExpression::no_filter(),
2158            &DecoderConfig::default(),
2159        )
2160        .await
2161        .unwrap();
2162
2163        let range = 0..total_rows as u64;
2164
2165        let (tx, rx) = mpsc::unbounded_channel();
2166
2167        // Simulate the stream / decoder being dropped
2168        drop(rx);
2169
2170        // Scheduling should not panic
2171        decode_scheduler.schedule_range(
2172            range,
2173            &FilterExpression::no_filter(),
2174            tx,
2175            file_reader.scheduler.clone(),
2176        )
2177    }
2178
2179    #[tokio::test]
2180    async fn test_read_empty_range() {
2181        let fs = FsFixture::default();
2182        create_some_file(&fs, LanceFileVersion::V2_0).await;
2183
2184        let file_scheduler = fs
2185            .scheduler
2186            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2187            .await
2188            .unwrap();
2189        let file_reader = FileReader::try_open(
2190            file_scheduler.clone(),
2191            None,
2192            Arc::<DecoderPlugins>::default(),
2193            &test_cache(),
2194            FileReaderOptions::default(),
2195        )
2196        .await
2197        .unwrap();
2198
2199        // All ranges empty, no data
2200        let batches = file_reader
2201            .read_stream(
2202                lance_io::ReadBatchParams::Range(0..0),
2203                1024,
2204                16,
2205                FilterExpression::no_filter(),
2206            )
2207            .unwrap()
2208            .try_collect::<Vec<_>>()
2209            .await
2210            .unwrap();
2211
2212        assert_eq!(batches.len(), 0);
2213
2214        // Some ranges empty
2215        let batches = file_reader
2216            .read_stream(
2217                lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2218                1024,
2219                16,
2220                FilterExpression::no_filter(),
2221            )
2222            .unwrap()
2223            .try_collect::<Vec<_>>()
2224            .await
2225            .unwrap();
2226        assert_eq!(batches.len(), 1);
2227    }
2228
2229    #[tokio::test]
2230    async fn test_global_buffers() {
2231        let fs = FsFixture::default();
2232
2233        let lance_schema =
2234            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2235                "foo",
2236                DataType::Int32,
2237                true,
2238            )]))
2239            .unwrap();
2240
2241        let mut file_writer = FileWriter::try_new(
2242            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2243            lance_schema.clone(),
2244            FileWriterOptions::default(),
2245        )
2246        .unwrap();
2247
2248        let test_bytes = Bytes::from_static(b"hello");
2249
2250        let buf_index = file_writer
2251            .add_global_buffer(test_bytes.clone())
2252            .await
2253            .unwrap();
2254
2255        assert_eq!(buf_index, 1);
2256
2257        file_writer.finish().await.unwrap();
2258
2259        let file_scheduler = fs
2260            .scheduler
2261            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2262            .await
2263            .unwrap();
2264        let file_reader = FileReader::try_open(
2265            file_scheduler.clone(),
2266            None,
2267            Arc::<DecoderPlugins>::default(),
2268            &test_cache(),
2269            FileReaderOptions::default(),
2270        )
2271        .await
2272        .unwrap();
2273
2274        let buf = file_reader.read_global_buffer(1).await.unwrap();
2275        assert_eq!(buf, test_bytes);
2276    }
2277}