lance_file/v2/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19    decoder::{
20        schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderPlugins,
21        FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22        SchedulerDecoderConfig,
23    },
24    encoder::EncodedBatch,
25    version::LanceFileVersion,
26    EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36    Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_io::{
40    scheduler::FileScheduler,
41    stream::{RecordBatchStream, RecordBatchStreamAdapter},
42    ReadBatchParams,
43};
44
45use crate::{
46    datatypes::{Fields, FieldsWithMeta},
47    format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
48    v2::writer::PAGE_BUFFER_ALIGNMENT,
49};
50
51use super::io::LanceEncodingsIo;
52
53// For now, we don't use global buffers for anything other than schema.  If we
54// use these later we should make them lazily loaded and then cached once loaded.
55//
56// We store their position / length for debugging purposes
57#[derive(Debug, DeepSizeOf)]
58pub struct BufferDescriptor {
59    pub position: u64,
60    pub size: u64,
61}
62
63/// Statistics summarize some of the file metadata for quick summary info
64#[derive(Debug)]
65pub struct FileStatistics {
66    /// Statistics about each of the columns in the file
67    pub columns: Vec<ColumnStatistics>,
68}
69
70/// Summary information describing a column
71#[derive(Debug)]
72pub struct ColumnStatistics {
73    /// The number of pages in the column
74    pub num_pages: usize,
75    /// The total number of data & metadata bytes in the column
76    ///
77    /// This is the compressed on-disk size
78    pub size_bytes: u64,
79}
80
81// TODO: Caching
82#[derive(Debug)]
83pub struct CachedFileMetadata {
84    /// The schema of the file
85    pub file_schema: Arc<Schema>,
86    /// The column metadatas
87    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
88    pub column_infos: Vec<Arc<ColumnInfo>>,
89    /// The number of rows in the file
90    pub num_rows: u64,
91    pub file_buffers: Vec<BufferDescriptor>,
92    /// The number of bytes contained in the data page section of the file
93    pub num_data_bytes: u64,
94    /// The number of bytes contained in the column metadata (not including buffers
95    /// referenced by the metadata)
96    pub num_column_metadata_bytes: u64,
97    /// The number of bytes contained in global buffers
98    pub num_global_buffer_bytes: u64,
99    /// The number of bytes contained in the CMO and GBO tables
100    pub num_footer_bytes: u64,
101    pub major_version: u16,
102    pub minor_version: u16,
103}
104
105impl DeepSizeOf for CachedFileMetadata {
106    // TODO: include size for `column_metadatas` and `column_infos`.
107    fn deep_size_of_children(&self, context: &mut Context) -> usize {
108        self.file_schema.deep_size_of_children(context)
109            + self
110                .file_buffers
111                .iter()
112                .map(|file_buffer| file_buffer.deep_size_of_children(context))
113                .sum::<usize>()
114    }
115}
116
117impl CachedFileMetadata {
118    pub fn version(&self) -> LanceFileVersion {
119        match (self.major_version, self.minor_version) {
120            (0, 3) => LanceFileVersion::V2_0,
121            (2, 1) => LanceFileVersion::V2_1,
122            _ => panic!(
123                "Unsupported version: {}.{}",
124                self.major_version, self.minor_version
125            ),
126        }
127    }
128}
129
130/// Selecting columns from a lance file requires specifying both the
131/// index of the column and the data type of the column
132///
133/// Partly, this is because it is not strictly required that columns
134/// be read into the same type.  For example, a string column may be
135/// read as a string, large_string or string_view type.
136///
137/// A read will only succeed if the decoder for a column is capable
138/// of decoding into the requested type.
139///
140/// Note that this should generally be limited to different in-memory
141/// representations of the same semantic type.  An encoding could
142/// theoretically support "casting" (e.g. int to string, etc.) but
143/// there is little advantage in doing so here.
144///
145/// Note: in order to specify a projection the user will need some way
146/// to figure out the column indices.  In the table format we do this
147/// using field IDs and keeping track of the field id->column index mapping.
148///
149/// If users are not using the table format then they will need to figure
150/// out some way to do this themselves.
151#[derive(Debug, Clone)]
152pub struct ReaderProjection {
153    /// The data types (schema) of the selected columns.  The names
154    /// of the schema are arbitrary and ignored.
155    pub schema: Arc<Schema>,
156    /// The indices of the columns to load.
157    ///
158    /// The content of this vector depends on the file version.
159    ///
160    /// In Lance File Version 2.0 we need ids for structural fields as
161    /// well as leaf fields:
162    ///
163    ///   - Primitive: the index of the column in the schema
164    ///   - List: the index of the list column in the schema
165    ///     followed by the column indices of the children
166    ///   - FixedSizeList (of primitive): the index of the column in the schema
167    ///     (this case is not nested)
168    ///   - FixedSizeList (of non-primitive): not yet implemented
169    ///   - Dictionary: same as primitive
170    ///   - Struct: the index of the struct column in the schema
171    ///     followed by the column indices of the children
172    ///
173    ///   In other words, this should be a DFS listing of the desired schema.
174    ///
175    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
176    /// fields are completely transparent.
177    ///
178    /// For example, if the goal is to load:
179    ///
180    ///   x: int32
181    ///   y: struct<z: int32, w: string>
182    ///   z: list<int32>
183    ///
184    /// and the schema originally used to store the data was:
185    ///
186    ///   a: struct<x: int32>
187    ///   b: int64
188    ///   y: struct<z: int32, c: int64, w: string>
189    ///   z: list<int32>
190    ///
191    /// Then the column_indices should be:
192    ///
193    /// - 2.0: [1, 3, 4, 6, 7, 8]
194    /// - 2.1: [0, 2, 4, 5]
195    pub column_indices: Vec<u32>,
196}
197
198impl ReaderProjection {
199    fn from_field_ids_helper<'a>(
200        file_version: LanceFileVersion,
201        fields: impl Iterator<Item = &'a Field>,
202        field_id_to_column_index: &BTreeMap<u32, u32>,
203        column_indices: &mut Vec<u32>,
204    ) -> Result<()> {
205        for field in fields {
206            let is_structural = file_version >= LanceFileVersion::V2_1;
207            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
208            // we only need ids for leaf fields.
209            if !is_structural || field.children.is_empty() {
210                if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
211                {
212                    column_indices.push(column_idx);
213                }
214            }
215            Self::from_field_ids_helper(
216                file_version,
217                field.children.iter(),
218                field_id_to_column_index,
219                column_indices,
220            )?;
221        }
222        Ok(())
223    }
224
225    /// Creates a projection using a mapping from field IDs to column indices
226    ///
227    /// You can obtain such a mapping when the file is written using the
228    /// [`crate::v2::writer::FileWriter::field_id_to_column_indices`] method.
229    pub fn from_field_ids(
230        file_version: LanceFileVersion,
231        schema: &Schema,
232        field_id_to_column_index: &BTreeMap<u32, u32>,
233    ) -> Result<Self> {
234        let mut column_indices = Vec::new();
235        Self::from_field_ids_helper(
236            file_version,
237            schema.fields.iter(),
238            field_id_to_column_index,
239            &mut column_indices,
240        )?;
241        Ok(Self {
242            schema: Arc::new(schema.clone()),
243            column_indices,
244        })
245    }
246
247    /// Creates a projection that reads the entire file
248    ///
249    /// If the schema provided is not the schema of the entire file then
250    /// the projection will be invalid and the read will fail.
251    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
252    /// the whole struct has one column index.
253    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
254    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
255        let schema = Arc::new(schema.clone());
256        let is_structural = version >= LanceFileVersion::V2_1;
257        let mut column_indices = vec![];
258        let mut curr_column_idx = 0;
259        let mut packed_struct_fields_num = 0;
260        for field in schema.fields_pre_order() {
261            if packed_struct_fields_num > 0 {
262                packed_struct_fields_num -= 1;
263                continue;
264            }
265            if field.is_packed_struct() {
266                column_indices.push(curr_column_idx);
267                curr_column_idx += 1;
268                packed_struct_fields_num = field.children.len();
269            } else if field.children.is_empty() || !is_structural {
270                column_indices.push(curr_column_idx);
271                curr_column_idx += 1;
272            }
273        }
274        Self {
275            schema,
276            column_indices,
277        }
278    }
279
280    /// Creates a projection that reads the specified columns provided by name
281    ///
282    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
283    ///
284    /// If the schema provided is not the schema of the entire file then
285    /// the projection will be invalid and the read will fail.
286    pub fn from_column_names(
287        file_version: LanceFileVersion,
288        schema: &Schema,
289        column_names: &[&str],
290    ) -> Result<Self> {
291        let field_id_to_column_index = schema
292            .fields_pre_order()
293            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
294            // we only need ids for leaf fields.
295            .filter(|field| file_version < LanceFileVersion::V2_1 || field.is_leaf())
296            .enumerate()
297            .map(|(idx, field)| (field.id as u32, idx as u32))
298            .collect::<BTreeMap<_, _>>();
299        let projected = schema.project(column_names)?;
300        let mut column_indices = Vec::new();
301        Self::from_field_ids_helper(
302            file_version,
303            projected.fields.iter(),
304            &field_id_to_column_index,
305            &mut column_indices,
306        )?;
307        Ok(Self {
308            schema: Arc::new(projected),
309            column_indices,
310        })
311    }
312}
313
314#[derive(Clone, Debug, Default)]
315pub struct FileReaderOptions {
316    validate_on_decode: bool,
317    /// Whether to cache repetition indices for better performance
318    /// Default is false for backward compatibility
319    pub cache_repetition_index: bool,
320}
321
322#[derive(Debug)]
323pub struct FileReader {
324    scheduler: Arc<dyn EncodingsIo>,
325    // The default projection to be applied to all reads
326    base_projection: ReaderProjection,
327    num_rows: u64,
328    metadata: Arc<CachedFileMetadata>,
329    decoder_plugins: Arc<DecoderPlugins>,
330    cache: Arc<LanceCache>,
331    options: FileReaderOptions,
332}
333#[derive(Debug)]
334struct Footer {
335    #[allow(dead_code)]
336    column_meta_start: u64,
337    // We don't use this today because we always load metadata for every column
338    // and don't yet support "metadata projection"
339    #[allow(dead_code)]
340    column_meta_offsets_start: u64,
341    global_buff_offsets_start: u64,
342    num_global_buffers: u32,
343    num_columns: u32,
344    major_version: u16,
345    minor_version: u16,
346}
347
348const FOOTER_LEN: usize = 40;
349
350impl FileReader {
351    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
352        Self {
353            scheduler,
354            base_projection: self.base_projection.clone(),
355            cache: self.cache.clone(),
356            decoder_plugins: self.decoder_plugins.clone(),
357            metadata: self.metadata.clone(),
358            options: self.options.clone(),
359            num_rows: self.num_rows,
360        }
361    }
362
363    pub fn num_rows(&self) -> u64 {
364        self.num_rows
365    }
366
367    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
368        &self.metadata
369    }
370
371    pub fn file_statistics(&self) -> FileStatistics {
372        let column_metadatas = &self.metadata().column_metadatas;
373
374        let column_stats = column_metadatas
375            .iter()
376            .map(|col_metadata| {
377                let num_pages = col_metadata.pages.len();
378                let size_bytes = col_metadata
379                    .pages
380                    .iter()
381                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
382                    .sum::<u64>();
383                ColumnStatistics {
384                    num_pages,
385                    size_bytes,
386                }
387            })
388            .collect();
389
390        FileStatistics {
391            columns: column_stats,
392        }
393    }
394
395    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
396        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
397        self.scheduler
398            .submit_single(
399                buffer_desc.position..buffer_desc.position + buffer_desc.size,
400                0,
401            )
402            .await
403    }
404
405    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
406        let file_size = scheduler.reader().size().await? as u64;
407        let begin = if file_size < scheduler.reader().block_size() as u64 {
408            0
409        } else {
410            file_size - scheduler.reader().block_size() as u64
411        };
412        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
413        Ok((tail_bytes, file_size))
414    }
415
416    // Checks to make sure the footer is written correctly and returns the
417    // position of the file descriptor (which comes from the footer)
418    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
419        let len = footer_bytes.len();
420        if len < FOOTER_LEN {
421            return Err(Error::io(
422                format!(
423                    "does not have sufficient data, len: {}, bytes: {:?}",
424                    len, footer_bytes
425                ),
426                location!(),
427            ));
428        }
429        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
430
431        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
432        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
433        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
434        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
435        let num_columns = cursor.read_u32::<LittleEndian>()?;
436        let major_version = cursor.read_u16::<LittleEndian>()?;
437        let minor_version = cursor.read_u16::<LittleEndian>()?;
438
439        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
440            return Err(Error::version_conflict(
441                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
442                major_version,
443                minor_version,
444                location!(),
445            ));
446        }
447
448        let magic_bytes = footer_bytes.slice(len - 4..);
449        if magic_bytes.as_ref() != MAGIC {
450            return Err(Error::io(
451                format!(
452                    "file does not appear to be a Lance file (invalid magic: {:?})",
453                    MAGIC
454                ),
455                location!(),
456            ));
457        }
458        Ok(Footer {
459            column_meta_start,
460            column_meta_offsets_start,
461            global_buff_offsets_start,
462            num_global_buffers,
463            num_columns,
464            major_version,
465            minor_version,
466        })
467    }
468
469    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
470    fn read_all_column_metadata(
471        column_metadata_bytes: Bytes,
472        footer: &Footer,
473    ) -> Result<Vec<pbfile::ColumnMetadata>> {
474        let column_metadata_start = footer.column_meta_start;
475        // cmo == column_metadata_offsets
476        let cmo_table_size = 16 * footer.num_columns as usize;
477        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
478
479        (0..footer.num_columns)
480            .map(|col_idx| {
481                let offset = (col_idx * 16) as usize;
482                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
483                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
484                let normalized_position = (position - column_metadata_start) as usize;
485                let normalized_end = normalized_position + (length as usize);
486                Ok(pbfile::ColumnMetadata::decode(
487                    &column_metadata_bytes[normalized_position..normalized_end],
488                )?)
489            })
490            .collect::<Result<Vec<_>>>()
491    }
492
493    async fn optimistic_tail_read(
494        data: &Bytes,
495        start_pos: u64,
496        scheduler: &FileScheduler,
497        file_len: u64,
498    ) -> Result<Bytes> {
499        let num_bytes_needed = (file_len - start_pos) as usize;
500        if data.len() >= num_bytes_needed {
501            Ok(data.slice((data.len() - num_bytes_needed)..))
502        } else {
503            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
504            let start = file_len - num_bytes_needed as u64;
505            let missing_bytes = scheduler
506                .submit_single(start..start + num_bytes_missing, 0)
507                .await?;
508            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
509            combined.extend(missing_bytes);
510            combined.extend(data);
511            Ok(combined.freeze())
512        }
513    }
514
515    fn do_decode_gbo_table(
516        gbo_bytes: &Bytes,
517        footer: &Footer,
518        version: LanceFileVersion,
519    ) -> Result<Vec<BufferDescriptor>> {
520        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
521
522        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
523        for _ in 0..footer.num_global_buffers {
524            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
525            assert!(
526                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
527            );
528            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
529            global_buffers.push(BufferDescriptor {
530                position: buf_pos,
531                size: buf_size,
532            });
533        }
534
535        Ok(global_buffers)
536    }
537
538    async fn decode_gbo_table(
539        tail_bytes: &Bytes,
540        file_len: u64,
541        scheduler: &FileScheduler,
542        footer: &Footer,
543        version: LanceFileVersion,
544    ) -> Result<Vec<BufferDescriptor>> {
545        // This could, in theory, trigger another IOP but the GBO table should never be large
546        // enough for that to happen
547        let gbo_bytes = Self::optimistic_tail_read(
548            tail_bytes,
549            footer.global_buff_offsets_start,
550            scheduler,
551            file_len,
552        )
553        .await?;
554        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
555    }
556
557    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
558        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
559        let pb_schema = file_descriptor.schema.unwrap();
560        let num_rows = file_descriptor.length;
561        let fields_with_meta = FieldsWithMeta {
562            fields: Fields(pb_schema.fields),
563            metadata: pb_schema.metadata,
564        };
565        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
566        Ok((num_rows, schema))
567    }
568
569    // TODO: Support late projection.  Currently, if we want to perform a
570    // projected read of a file, we load all of the column metadata, and then
571    // only read the column data that is requested.  This is fine for most cases.
572    //
573    // However, if there are many columns then loading all of the column metadata
574    // may be expensive.  We should support a mode where we only load the column
575    // metadata for the columns that are requested (the file format supports this).
576    //
577    // The main challenge is that we either need to ignore the column metadata cache
578    // or have a more sophisticated cache that can cache per-column metadata.
579    //
580    // Also, if the number of columns is fairly small, it's faster to read them as a
581    // single IOP, but we can fix this through coalescing.
582    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
583        // 1. read the footer
584        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
585        let footer = Self::decode_footer(&tail_bytes)?;
586
587        let file_version = LanceFileVersion::try_from_major_minor(
588            footer.major_version as u32,
589            footer.minor_version as u32,
590        )?;
591
592        let gbo_table =
593            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
594        if gbo_table.is_empty() {
595            return Err(Error::Internal {
596                message: "File did not contain any global buffers, schema expected".to_string(),
597                location: location!(),
598            });
599        }
600        let schema_start = gbo_table[0].position;
601        let schema_size = gbo_table[0].size;
602
603        let num_footer_bytes = file_len - schema_start;
604
605        // By default we read all column metadatas.  We do NOT read the column metadata buffers
606        // at this point.  We only want to read the column metadata for columns we are actually loading.
607        let all_metadata_bytes =
608            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
609
610        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
611        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
612
613        // Next, read the metadata for the columns
614        // This is both the column metadata and the CMO table
615        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
616        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
617        let column_metadata_bytes =
618            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
619        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
620
621        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
622        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
623        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
624
625        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
626
627        Ok(CachedFileMetadata {
628            file_schema: Arc::new(schema),
629            column_metadatas,
630            column_infos,
631            num_rows,
632            num_data_bytes,
633            num_column_metadata_bytes,
634            num_global_buffer_bytes,
635            num_footer_bytes,
636            file_buffers: gbo_table,
637            major_version: footer.major_version,
638            minor_version: footer.minor_version,
639        })
640    }
641
642    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
643        match &encoding.location {
644            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
645            Some(pbfile::encoding::Location::Direct(encoding)) => {
646                let encoding_buf = Bytes::from(encoding.encoding.clone());
647                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
648                encoding_any.to_msg::<M>().unwrap()
649            }
650            Some(pbfile::encoding::Location::None(_)) => panic!(),
651            None => panic!(),
652        }
653    }
654
655    fn meta_to_col_infos(
656        column_metadatas: &[pbfile::ColumnMetadata],
657        file_version: LanceFileVersion,
658    ) -> Vec<Arc<ColumnInfo>> {
659        column_metadatas
660            .iter()
661            .enumerate()
662            .map(|(col_idx, col_meta)| {
663                let page_infos = col_meta
664                    .pages
665                    .iter()
666                    .map(|page| {
667                        let num_rows = page.length;
668                        let encoding = match file_version {
669                            LanceFileVersion::V2_0 => {
670                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
671                                    page.encoding.as_ref().unwrap(),
672                                ))
673                            }
674                            _ => {
675                                PageEncoding::Structural(Self::fetch_encoding::<pbenc::PageLayout>(
676                                    page.encoding.as_ref().unwrap(),
677                                ))
678                            }
679                        };
680                        let buffer_offsets_and_sizes = Arc::from(
681                            page.buffer_offsets
682                                .iter()
683                                .zip(page.buffer_sizes.iter())
684                                .map(|(offset, size)| {
685                                    // Starting with version 2.1 we can assert that page buffers are aligned
686                                    assert!(
687                                        file_version < LanceFileVersion::V2_1
688                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
689                                    );
690                                    (*offset, *size)
691                                })
692                                .collect::<Vec<_>>(),
693                        );
694                        PageInfo {
695                            buffer_offsets_and_sizes,
696                            encoding,
697                            num_rows,
698                            priority: page.priority,
699                        }
700                    })
701                    .collect::<Vec<_>>();
702                let buffer_offsets_and_sizes = Arc::from(
703                    col_meta
704                        .buffer_offsets
705                        .iter()
706                        .zip(col_meta.buffer_sizes.iter())
707                        .map(|(offset, size)| (*offset, *size))
708                        .collect::<Vec<_>>(),
709                );
710                Arc::new(ColumnInfo {
711                    index: col_idx as u32,
712                    page_infos: Arc::from(page_infos),
713                    buffer_offsets_and_sizes,
714                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
715                })
716            })
717            .collect::<Vec<_>>()
718    }
719
720    fn validate_projection(
721        projection: &ReaderProjection,
722        metadata: &CachedFileMetadata,
723    ) -> Result<()> {
724        if projection.schema.fields.is_empty() {
725            return Err(Error::invalid_input(
726                "Attempt to read zero columns from the file, at least one column must be specified"
727                    .to_string(),
728                location!(),
729            ));
730        }
731        let mut column_indices_seen = BTreeSet::new();
732        for column_index in &projection.column_indices {
733            if !column_indices_seen.insert(*column_index) {
734                return Err(Error::invalid_input(
735                    format!(
736                        "The projection specified the column index {} more than once",
737                        column_index
738                    ),
739                    location!(),
740                ));
741            }
742            if *column_index >= metadata.column_infos.len() as u32 {
743                return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
744            }
745        }
746        Ok(())
747    }
748
749    /// Opens a new file reader without any pre-existing knowledge
750    ///
751    /// This will read the file schema from the file itself and thus requires a bit more I/O
752    ///
753    /// A `base_projection` can also be provided.  If provided, then the projection will apply
754    /// to all reads from the file that do not specify their own projection.
755    pub async fn try_open(
756        scheduler: FileScheduler,
757        base_projection: Option<ReaderProjection>,
758        decoder_plugins: Arc<DecoderPlugins>,
759        cache: &LanceCache,
760        options: FileReaderOptions,
761    ) -> Result<Self> {
762        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
763        let path = scheduler.reader().path().clone();
764        Self::try_open_with_file_metadata(
765            Arc::new(LanceEncodingsIo(scheduler)),
766            path,
767            base_projection,
768            decoder_plugins,
769            file_metadata,
770            cache,
771            options,
772        )
773        .await
774    }
775
776    /// Same as `try_open` but with the file metadata already loaded.
777    ///
778    /// This method also can accept any kind of `EncodingsIo` implementation allowing
779    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
780    /// disks it may be better to avoid asynchronous overhead).
781    pub async fn try_open_with_file_metadata(
782        scheduler: Arc<dyn EncodingsIo>,
783        path: Path,
784        base_projection: Option<ReaderProjection>,
785        decoder_plugins: Arc<DecoderPlugins>,
786        file_metadata: Arc<CachedFileMetadata>,
787        cache: &LanceCache,
788        options: FileReaderOptions,
789    ) -> Result<Self> {
790        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
791
792        if let Some(base_projection) = base_projection.as_ref() {
793            Self::validate_projection(base_projection, &file_metadata)?;
794        }
795        let num_rows = file_metadata.num_rows;
796        Ok(Self {
797            scheduler,
798            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
799                file_metadata.file_schema.as_ref(),
800                file_metadata.version(),
801            )),
802            num_rows,
803            metadata: file_metadata,
804            decoder_plugins,
805            cache,
806            options,
807        })
808    }
809
810    // The actual decoder needs all the column infos that make up a type.  In other words, if
811    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
812    //
813    // This is a file reader concern because the file reader needs to support late projection of columns
814    // and so it will need to figure this out anyways.
815    //
816    // It's a bit of a tricky process though because the number of column infos may depend on the
817    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
818    // only be a single column in the file (and not 3).
819    //
820    // At the moment this method words because our rules are simple and we just repeat them here.  See
821    // Self::default_projection for a similar problem.  In the future this is something the encodings
822    // registry will need to figure out.
823    fn collect_columns_from_projection(
824        &self,
825        _projection: &ReaderProjection,
826    ) -> Result<Vec<Arc<ColumnInfo>>> {
827        Ok(self.metadata.column_infos.to_vec())
828    }
829
830    #[allow(clippy::too_many_arguments)]
831    fn do_read_range(
832        column_infos: Vec<Arc<ColumnInfo>>,
833        io: Arc<dyn EncodingsIo>,
834        cache: Arc<LanceCache>,
835        num_rows: u64,
836        decoder_plugins: Arc<DecoderPlugins>,
837        range: Range<u64>,
838        batch_size: u32,
839        projection: ReaderProjection,
840        filter: FilterExpression,
841        should_validate: bool,
842        cache_repetition_index: bool,
843    ) -> Result<BoxStream<'static, ReadBatchTask>> {
844        debug!(
845            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
846            range,
847            batch_size,
848            num_rows,
849            column_infos.len(),
850            projection.schema.fields.len(),
851        );
852
853        let config = SchedulerDecoderConfig {
854            batch_size,
855            cache,
856            decoder_plugins,
857            io,
858            should_validate,
859            cache_repetition_index,
860        };
861
862        let requested_rows = RequestedRows::Ranges(vec![range]);
863
864        Ok(schedule_and_decode(
865            column_infos,
866            requested_rows,
867            filter,
868            projection.column_indices,
869            projection.schema,
870            config,
871        ))
872    }
873
874    fn read_range(
875        &self,
876        range: Range<u64>,
877        batch_size: u32,
878        projection: ReaderProjection,
879        filter: FilterExpression,
880    ) -> Result<BoxStream<'static, ReadBatchTask>> {
881        // Create and initialize the stream
882        Self::do_read_range(
883            self.collect_columns_from_projection(&projection)?,
884            self.scheduler.clone(),
885            self.cache.clone(),
886            self.num_rows,
887            self.decoder_plugins.clone(),
888            range,
889            batch_size,
890            projection,
891            filter,
892            self.options.validate_on_decode,
893            self.options.cache_repetition_index,
894        )
895    }
896
897    #[allow(clippy::too_many_arguments)]
898    fn do_take_rows(
899        column_infos: Vec<Arc<ColumnInfo>>,
900        io: Arc<dyn EncodingsIo>,
901        cache: Arc<LanceCache>,
902        decoder_plugins: Arc<DecoderPlugins>,
903        indices: Vec<u64>,
904        batch_size: u32,
905        projection: ReaderProjection,
906        filter: FilterExpression,
907        should_validate: bool,
908        cache_repetition_index: bool,
909    ) -> Result<BoxStream<'static, ReadBatchTask>> {
910        debug!(
911            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
912            indices.len(),
913            indices[0],
914            indices[indices.len() - 1],
915            batch_size,
916            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
917        );
918
919        let config = SchedulerDecoderConfig {
920            batch_size,
921            cache,
922            decoder_plugins,
923            io,
924            should_validate,
925            cache_repetition_index,
926        };
927
928        let requested_rows = RequestedRows::Indices(indices);
929
930        Ok(schedule_and_decode(
931            column_infos,
932            requested_rows,
933            filter,
934            projection.column_indices,
935            projection.schema,
936            config,
937        ))
938    }
939
940    fn take_rows(
941        &self,
942        indices: Vec<u64>,
943        batch_size: u32,
944        projection: ReaderProjection,
945    ) -> Result<BoxStream<'static, ReadBatchTask>> {
946        // Create and initialize the stream
947        Self::do_take_rows(
948            self.collect_columns_from_projection(&projection)?,
949            self.scheduler.clone(),
950            self.cache.clone(),
951            self.decoder_plugins.clone(),
952            indices,
953            batch_size,
954            projection,
955            FilterExpression::no_filter(),
956            self.options.validate_on_decode,
957            self.options.cache_repetition_index,
958        )
959    }
960
961    #[allow(clippy::too_many_arguments)]
962    fn do_read_ranges(
963        column_infos: Vec<Arc<ColumnInfo>>,
964        io: Arc<dyn EncodingsIo>,
965        cache: Arc<LanceCache>,
966        decoder_plugins: Arc<DecoderPlugins>,
967        ranges: Vec<Range<u64>>,
968        batch_size: u32,
969        projection: ReaderProjection,
970        filter: FilterExpression,
971        should_validate: bool,
972        cache_repetition_index: bool,
973    ) -> Result<BoxStream<'static, ReadBatchTask>> {
974        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
975        debug!(
976            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
977            ranges.len(),
978            num_rows,
979            ranges[0].start,
980            ranges[ranges.len() - 1].end,
981            batch_size,
982            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
983        );
984
985        let config = SchedulerDecoderConfig {
986            batch_size,
987            cache,
988            decoder_plugins,
989            io,
990            should_validate,
991            cache_repetition_index,
992        };
993
994        let requested_rows = RequestedRows::Ranges(ranges);
995
996        Ok(schedule_and_decode(
997            column_infos,
998            requested_rows,
999            filter,
1000            projection.column_indices,
1001            projection.schema,
1002            config,
1003        ))
1004    }
1005
1006    fn read_ranges(
1007        &self,
1008        ranges: Vec<Range<u64>>,
1009        batch_size: u32,
1010        projection: ReaderProjection,
1011        filter: FilterExpression,
1012    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1013        Self::do_read_ranges(
1014            self.collect_columns_from_projection(&projection)?,
1015            self.scheduler.clone(),
1016            self.cache.clone(),
1017            self.decoder_plugins.clone(),
1018            ranges,
1019            batch_size,
1020            projection,
1021            filter,
1022            self.options.validate_on_decode,
1023            self.options.cache_repetition_index,
1024        )
1025    }
1026
1027    /// Creates a stream of "read tasks" to read the data from the file
1028    ///
1029    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1030    /// of record batches it returns a stream of "read tasks".
1031    ///
1032    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1033    ///
1034    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1035    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1036    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1037    pub fn read_tasks(
1038        &self,
1039        params: ReadBatchParams,
1040        batch_size: u32,
1041        projection: Option<ReaderProjection>,
1042        filter: FilterExpression,
1043    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1044        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1045        Self::validate_projection(&projection, &self.metadata)?;
1046        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1047            if bound > self.num_rows || bound == self.num_rows && inclusive {
1048                Err(Error::invalid_input(
1049                    format!(
1050                        "cannot read {:?} from file with {} rows",
1051                        params, self.num_rows
1052                    ),
1053                    location!(),
1054                ))
1055            } else {
1056                Ok(())
1057            }
1058        };
1059        match &params {
1060            ReadBatchParams::Indices(indices) => {
1061                for idx in indices {
1062                    match idx {
1063                        None => {
1064                            return Err(Error::invalid_input(
1065                                "Null value in indices array",
1066                                location!(),
1067                            ));
1068                        }
1069                        Some(idx) => {
1070                            verify_bound(&params, idx as u64, true)?;
1071                        }
1072                    }
1073                }
1074                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1075                self.take_rows(indices, batch_size, projection)
1076            }
1077            ReadBatchParams::Range(range) => {
1078                verify_bound(&params, range.end as u64, false)?;
1079                self.read_range(
1080                    range.start as u64..range.end as u64,
1081                    batch_size,
1082                    projection,
1083                    filter,
1084                )
1085            }
1086            ReadBatchParams::Ranges(ranges) => {
1087                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1088                for range in ranges.as_ref() {
1089                    verify_bound(&params, range.end, false)?;
1090                    ranges_u64.push(range.start..range.end);
1091                }
1092                self.read_ranges(ranges_u64, batch_size, projection, filter)
1093            }
1094            ReadBatchParams::RangeFrom(range) => {
1095                verify_bound(&params, range.start as u64, true)?;
1096                self.read_range(
1097                    range.start as u64..self.num_rows,
1098                    batch_size,
1099                    projection,
1100                    filter,
1101                )
1102            }
1103            ReadBatchParams::RangeTo(range) => {
1104                verify_bound(&params, range.end as u64, false)?;
1105                self.read_range(0..range.end as u64, batch_size, projection, filter)
1106            }
1107            ReadBatchParams::RangeFull => {
1108                self.read_range(0..self.num_rows, batch_size, projection, filter)
1109            }
1110        }
1111    }
1112
1113    /// Reads data from the file as a stream of record batches
1114    ///
1115    /// * `params` - Specifies the range (or indices) of data to read
1116    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1117    ///   if it is the last batch or if it is not possible to create a batch of the
1118    ///   requested size.
1119    ///
1120    ///   For example, if the batch size is 1024 and one of the columns is a string
1121    ///   column then there may be some ranges of 1024 rows that contain more than
1122    ///   2^31 bytes of string data (which is the maximum size of a string column
1123    ///   in Arrow).  In this case smaller batches may be emitted.
1124    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1125    ///   amount of CPU parallelism of the read.  In other words it controls how many
1126    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1127    ///   of the read (how many I/O requests are in flight at once).
1128    ///
1129    ///   This parameter also is also related to backpressure.  If the consumer of the
1130    ///   stream is slow then the reader will build up RAM.
1131    /// * `projection` - A projection to apply to the read.  This controls which columns
1132    ///   are read from the file.  The projection is NOT applied on top of the base
1133    ///   projection.  The projection is applied directly to the file schema.
1134    pub fn read_stream_projected(
1135        &self,
1136        params: ReadBatchParams,
1137        batch_size: u32,
1138        batch_readahead: u32,
1139        projection: ReaderProjection,
1140        filter: FilterExpression,
1141    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1142        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1143        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1144        let batch_stream = tasks_stream
1145            .map(|task| task.task)
1146            .buffered(batch_readahead as usize)
1147            .boxed();
1148        Ok(Box::pin(RecordBatchStreamAdapter::new(
1149            arrow_schema,
1150            batch_stream,
1151        )))
1152    }
1153
1154    fn take_rows_blocking(
1155        &self,
1156        indices: Vec<u64>,
1157        batch_size: u32,
1158        projection: ReaderProjection,
1159        filter: FilterExpression,
1160    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1161        let column_infos = self.collect_columns_from_projection(&projection)?;
1162        debug!(
1163            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1164            indices.len(),
1165            indices[0],
1166            indices[indices.len() - 1],
1167            batch_size,
1168            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1169        );
1170
1171        let config = SchedulerDecoderConfig {
1172            batch_size,
1173            cache: self.cache.clone(),
1174            decoder_plugins: self.decoder_plugins.clone(),
1175            io: self.scheduler.clone(),
1176            should_validate: self.options.validate_on_decode,
1177            cache_repetition_index: self.options.cache_repetition_index,
1178        };
1179
1180        let requested_rows = RequestedRows::Indices(indices);
1181
1182        schedule_and_decode_blocking(
1183            column_infos,
1184            requested_rows,
1185            filter,
1186            projection.column_indices,
1187            projection.schema,
1188            config,
1189        )
1190    }
1191
1192    fn read_ranges_blocking(
1193        &self,
1194        ranges: Vec<Range<u64>>,
1195        batch_size: u32,
1196        projection: ReaderProjection,
1197        filter: FilterExpression,
1198    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1199        let column_infos = self.collect_columns_from_projection(&projection)?;
1200        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1201        debug!(
1202            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1203            ranges.len(),
1204            num_rows,
1205            ranges[0].start,
1206            ranges[ranges.len() - 1].end,
1207            batch_size,
1208            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1209        );
1210
1211        let config = SchedulerDecoderConfig {
1212            batch_size,
1213            cache: self.cache.clone(),
1214            decoder_plugins: self.decoder_plugins.clone(),
1215            io: self.scheduler.clone(),
1216            should_validate: self.options.validate_on_decode,
1217            cache_repetition_index: self.options.cache_repetition_index,
1218        };
1219
1220        let requested_rows = RequestedRows::Ranges(ranges);
1221
1222        schedule_and_decode_blocking(
1223            column_infos,
1224            requested_rows,
1225            filter,
1226            projection.column_indices,
1227            projection.schema,
1228            config,
1229        )
1230    }
1231
1232    fn read_range_blocking(
1233        &self,
1234        range: Range<u64>,
1235        batch_size: u32,
1236        projection: ReaderProjection,
1237        filter: FilterExpression,
1238    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1239        let column_infos = self.collect_columns_from_projection(&projection)?;
1240        let num_rows = self.num_rows;
1241
1242        debug!(
1243            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1244            range,
1245            batch_size,
1246            num_rows,
1247            column_infos.len(),
1248            projection.schema.fields.len(),
1249        );
1250
1251        let config = SchedulerDecoderConfig {
1252            batch_size,
1253            cache: self.cache.clone(),
1254            decoder_plugins: self.decoder_plugins.clone(),
1255            io: self.scheduler.clone(),
1256            should_validate: self.options.validate_on_decode,
1257            cache_repetition_index: self.options.cache_repetition_index,
1258        };
1259
1260        let requested_rows = RequestedRows::Ranges(vec![range]);
1261
1262        schedule_and_decode_blocking(
1263            column_infos,
1264            requested_rows,
1265            filter,
1266            projection.column_indices,
1267            projection.schema,
1268            config,
1269        )
1270    }
1271
1272    /// Read data from the file as an iterator of record batches
1273    ///
1274    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1275    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1276    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1277    ///
1278    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1279    /// use this method) because we can parallelize the decode.
1280    ///
1281    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1282    /// from a spawn_blocking context.
1283    pub fn read_stream_projected_blocking(
1284        &self,
1285        params: ReadBatchParams,
1286        batch_size: u32,
1287        projection: Option<ReaderProjection>,
1288        filter: FilterExpression,
1289    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1290        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1291        Self::validate_projection(&projection, &self.metadata)?;
1292        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1293            if bound > self.num_rows || bound == self.num_rows && inclusive {
1294                Err(Error::invalid_input(
1295                    format!(
1296                        "cannot read {:?} from file with {} rows",
1297                        params, self.num_rows
1298                    ),
1299                    location!(),
1300                ))
1301            } else {
1302                Ok(())
1303            }
1304        };
1305        match &params {
1306            ReadBatchParams::Indices(indices) => {
1307                for idx in indices {
1308                    match idx {
1309                        None => {
1310                            return Err(Error::invalid_input(
1311                                "Null value in indices array",
1312                                location!(),
1313                            ));
1314                        }
1315                        Some(idx) => {
1316                            verify_bound(&params, idx as u64, true)?;
1317                        }
1318                    }
1319                }
1320                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1321                self.take_rows_blocking(indices, batch_size, projection, filter)
1322            }
1323            ReadBatchParams::Range(range) => {
1324                verify_bound(&params, range.end as u64, false)?;
1325                self.read_range_blocking(
1326                    range.start as u64..range.end as u64,
1327                    batch_size,
1328                    projection,
1329                    filter,
1330                )
1331            }
1332            ReadBatchParams::Ranges(ranges) => {
1333                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1334                for range in ranges.as_ref() {
1335                    verify_bound(&params, range.end, false)?;
1336                    ranges_u64.push(range.start..range.end);
1337                }
1338                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1339            }
1340            ReadBatchParams::RangeFrom(range) => {
1341                verify_bound(&params, range.start as u64, true)?;
1342                self.read_range_blocking(
1343                    range.start as u64..self.num_rows,
1344                    batch_size,
1345                    projection,
1346                    filter,
1347                )
1348            }
1349            ReadBatchParams::RangeTo(range) => {
1350                verify_bound(&params, range.end as u64, false)?;
1351                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1352            }
1353            ReadBatchParams::RangeFull => {
1354                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1355            }
1356        }
1357    }
1358
1359    /// Reads data from the file as a stream of record batches
1360    ///
1361    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1362    /// provided when the file was opened (or reads all columns if the file was
1363    /// opened without a base projection)
1364    pub fn read_stream(
1365        &self,
1366        params: ReadBatchParams,
1367        batch_size: u32,
1368        batch_readahead: u32,
1369        filter: FilterExpression,
1370    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1371        self.read_stream_projected(
1372            params,
1373            batch_size,
1374            batch_readahead,
1375            self.base_projection.clone(),
1376            filter,
1377        )
1378    }
1379
1380    pub fn schema(&self) -> &Arc<Schema> {
1381        &self.metadata.file_schema
1382    }
1383}
1384
1385/// Inspects a page and returns a String describing the page's encoding
1386pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1387    if let Some(encoding) = &page.encoding {
1388        if let Some(style) = &encoding.location {
1389            match style {
1390                pbfile::encoding::Location::Indirect(indirect) => {
1391                    format!(
1392                        "IndirectEncoding(pos={},size={})",
1393                        indirect.buffer_location, indirect.buffer_length
1394                    )
1395                }
1396                pbfile::encoding::Location::Direct(direct) => {
1397                    let encoding_any =
1398                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1399                            .expect("failed to deserialize encoding as protobuf");
1400                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1401                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1402                        match encoding {
1403                            Ok(encoding) => {
1404                                format!("{:#?}", encoding)
1405                            }
1406                            Err(err) => {
1407                                format!("Unsupported(decode_err={})", err)
1408                            }
1409                        }
1410                    } else if encoding_any.type_url == "/lance.encodings.PageLayout" {
1411                        let encoding = encoding_any.to_msg::<pbenc::PageLayout>();
1412                        match encoding {
1413                            Ok(encoding) => {
1414                                format!("{:#?}", encoding)
1415                            }
1416                            Err(err) => {
1417                                format!("Unsupported(decode_err={})", err)
1418                            }
1419                        }
1420                    } else {
1421                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1422                    }
1423                }
1424                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1425            }
1426        } else {
1427            "MISSING STYLE".to_string()
1428        }
1429    } else {
1430        "MISSING".to_string()
1431    }
1432}
1433
1434pub trait EncodedBatchReaderExt {
1435    fn try_from_mini_lance(
1436        bytes: Bytes,
1437        schema: &Schema,
1438        version: LanceFileVersion,
1439    ) -> Result<Self>
1440    where
1441        Self: Sized;
1442    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1443    where
1444        Self: Sized;
1445}
1446
1447impl EncodedBatchReaderExt for EncodedBatch {
1448    fn try_from_mini_lance(
1449        bytes: Bytes,
1450        schema: &Schema,
1451        file_version: LanceFileVersion,
1452    ) -> Result<Self>
1453    where
1454        Self: Sized,
1455    {
1456        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1457        let footer = FileReader::decode_footer(&bytes)?;
1458
1459        // Next, read the metadata for the columns
1460        // This is both the column metadata and the CMO table
1461        let column_metadata_start = footer.column_meta_start as usize;
1462        let column_metadata_end = footer.global_buff_offsets_start as usize;
1463        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1464        let column_metadatas =
1465            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1466
1467        let file_version = LanceFileVersion::try_from_major_minor(
1468            footer.major_version as u32,
1469            footer.minor_version as u32,
1470        )?;
1471
1472        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1473
1474        Ok(Self {
1475            data: bytes,
1476            num_rows: page_table
1477                .first()
1478                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1479                .unwrap_or(0),
1480            page_table,
1481            top_level_columns: projection.column_indices,
1482            schema: Arc::new(schema.clone()),
1483        })
1484    }
1485
1486    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1487    where
1488        Self: Sized,
1489    {
1490        let footer = FileReader::decode_footer(&bytes)?;
1491        let file_version = LanceFileVersion::try_from_major_minor(
1492            footer.major_version as u32,
1493            footer.minor_version as u32,
1494        )?;
1495
1496        let gbo_table = FileReader::do_decode_gbo_table(
1497            &bytes.slice(footer.global_buff_offsets_start as usize..),
1498            &footer,
1499            file_version,
1500        )?;
1501        if gbo_table.is_empty() {
1502            return Err(Error::Internal {
1503                message: "File did not contain any global buffers, schema expected".to_string(),
1504                location: location!(),
1505            });
1506        }
1507        let schema_start = gbo_table[0].position as usize;
1508        let schema_size = gbo_table[0].size as usize;
1509
1510        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1511        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1512        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1513
1514        // Next, read the metadata for the columns
1515        // This is both the column metadata and the CMO table
1516        let column_metadata_start = footer.column_meta_start as usize;
1517        let column_metadata_end = footer.global_buff_offsets_start as usize;
1518        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1519        let column_metadatas =
1520            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1521
1522        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1523
1524        Ok(Self {
1525            data: bytes,
1526            num_rows: page_table
1527                .first()
1528                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1529                .unwrap_or(0),
1530            page_table,
1531            top_level_columns: projection.column_indices,
1532            schema: Arc::new(schema),
1533        })
1534    }
1535}
1536
1537#[cfg(test)]
1538pub mod tests {
1539    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1540
1541    use arrow_array::{
1542        types::{Float64Type, Int32Type},
1543        RecordBatch, UInt32Array,
1544    };
1545    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1546    use bytes::Bytes;
1547    use futures::{prelude::stream::TryStreamExt, StreamExt};
1548    use lance_arrow::RecordBatchExt;
1549    use lance_core::{datatypes::Schema, ArrowResult};
1550    use lance_datagen::{array, gen, BatchCount, ByteCount, RowCount};
1551    use lance_encoding::{
1552        decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1553        encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1554        version::LanceFileVersion,
1555    };
1556    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1557    use log::debug;
1558    use rstest::rstest;
1559    use tokio::sync::mpsc;
1560
1561    use crate::v2::{
1562        reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1563        testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1564        writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1565    };
1566
1567    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1568        let location_type = DataType::Struct(Fields::from(vec![
1569            Field::new("x", DataType::Float64, true),
1570            Field::new("y", DataType::Float64, true),
1571        ]));
1572        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1573
1574        let mut reader = gen()
1575            .col("score", array::rand::<Float64Type>())
1576            .col("location", array::rand_type(&location_type))
1577            .col("categories", array::rand_type(&categories_type))
1578            .col("binary", array::rand_type(&DataType::Binary));
1579        if version <= LanceFileVersion::V2_0 {
1580            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1581        }
1582        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1583
1584        write_lance_file(
1585            reader,
1586            fs,
1587            FileWriterOptions {
1588                format_version: Some(version),
1589                ..Default::default()
1590            },
1591        )
1592        .await
1593    }
1594
1595    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1596
1597    async fn verify_expected(
1598        expected: &[RecordBatch],
1599        mut actual: Pin<Box<dyn RecordBatchStream>>,
1600        read_size: u32,
1601        transform: Option<Transformer>,
1602    ) {
1603        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1604        let mut expected_iter = expected.iter().map(|batch| {
1605            if let Some(transform) = &transform {
1606                transform(batch)
1607            } else {
1608                batch.clone()
1609            }
1610        });
1611        let mut next_expected = expected_iter.next().unwrap().clone();
1612        while let Some(actual) = actual.next().await {
1613            let mut actual = actual.unwrap();
1614            let mut rows_to_verify = actual.num_rows() as u32;
1615            let expected_length = remaining.min(read_size);
1616            assert_eq!(expected_length, rows_to_verify);
1617
1618            while rows_to_verify > 0 {
1619                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1620                assert_eq!(
1621                    next_expected.slice(0, next_slice_len as usize),
1622                    actual.slice(0, next_slice_len as usize)
1623                );
1624                remaining -= next_slice_len;
1625                rows_to_verify -= next_slice_len;
1626                if remaining > 0 {
1627                    if next_slice_len == next_expected.num_rows() as u32 {
1628                        next_expected = expected_iter.next().unwrap().clone();
1629                    } else {
1630                        next_expected = next_expected.slice(
1631                            next_slice_len as usize,
1632                            next_expected.num_rows() - next_slice_len as usize,
1633                        );
1634                    }
1635                }
1636                if rows_to_verify > 0 {
1637                    actual = actual.slice(
1638                        next_slice_len as usize,
1639                        actual.num_rows() - next_slice_len as usize,
1640                    );
1641                }
1642            }
1643        }
1644        assert_eq!(remaining, 0);
1645    }
1646
1647    #[tokio::test]
1648    async fn test_round_trip() {
1649        let fs = FsFixture::default();
1650
1651        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1652
1653        for read_size in [32, 1024, 1024 * 1024] {
1654            let file_scheduler = fs
1655                .scheduler
1656                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1657                .await
1658                .unwrap();
1659            let file_reader = FileReader::try_open(
1660                file_scheduler,
1661                None,
1662                Arc::<DecoderPlugins>::default(),
1663                &test_cache(),
1664                FileReaderOptions::default(),
1665            )
1666            .await
1667            .unwrap();
1668
1669            let schema = file_reader.schema();
1670            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1671
1672            let batch_stream = file_reader
1673                .read_stream(
1674                    lance_io::ReadBatchParams::RangeFull,
1675                    read_size,
1676                    16,
1677                    FilterExpression::no_filter(),
1678                )
1679                .unwrap();
1680
1681            verify_expected(&data, batch_stream, read_size, None).await;
1682        }
1683    }
1684
1685    #[rstest]
1686    #[test_log::test(tokio::test)]
1687    async fn test_encoded_batch_round_trip(
1688        // TODO: Add V2_1 (currently fails)
1689        #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1690    ) {
1691        let data = gen()
1692            .col("x", array::rand::<Int32Type>())
1693            .col("y", array::rand_utf8(ByteCount::from(16), false))
1694            .into_batch_rows(RowCount::from(10000))
1695            .unwrap();
1696
1697        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1698
1699        let encoding_options = EncodingOptions {
1700            cache_bytes_per_column: 4096,
1701            max_page_bytes: 32 * 1024 * 1024,
1702            keep_original_array: true,
1703            buffer_alignment: 64,
1704        };
1705
1706        let encoding_strategy = default_encoding_strategy(version);
1707
1708        let encoded_batch = encode_batch(
1709            &data,
1710            lance_schema.clone(),
1711            encoding_strategy.as_ref(),
1712            &encoding_options,
1713        )
1714        .await
1715        .unwrap();
1716
1717        // Test self described
1718        let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1719
1720        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1721
1722        let decoded = decode_batch(
1723            &decoded_batch,
1724            &FilterExpression::no_filter(),
1725            Arc::<DecoderPlugins>::default(),
1726            false,
1727            LanceFileVersion::default(),
1728            None,
1729        )
1730        .await
1731        .unwrap();
1732
1733        assert_eq!(data, decoded);
1734
1735        // Test mini
1736        let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1737        let decoded_batch =
1738            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1739                .unwrap();
1740        let decoded = decode_batch(
1741            &decoded_batch,
1742            &FilterExpression::no_filter(),
1743            Arc::<DecoderPlugins>::default(),
1744            false,
1745            LanceFileVersion::default(),
1746            None,
1747        )
1748        .await
1749        .unwrap();
1750
1751        assert_eq!(data, decoded);
1752    }
1753
1754    #[rstest]
1755    #[test_log::test(tokio::test)]
1756    async fn test_projection(
1757        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1758    ) {
1759        let fs = FsFixture::default();
1760
1761        let written_file = create_some_file(&fs, version).await;
1762        let file_scheduler = fs
1763            .scheduler
1764            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1765            .await
1766            .unwrap();
1767
1768        let field_id_mapping = written_file
1769            .field_id_mapping
1770            .iter()
1771            .copied()
1772            .collect::<BTreeMap<_, _>>();
1773
1774        let empty_projection = ReaderProjection {
1775            column_indices: Vec::default(),
1776            schema: Arc::new(Schema::default()),
1777        };
1778
1779        for columns in [
1780            vec!["score"],
1781            vec!["location"],
1782            vec!["categories"],
1783            vec!["score.x"],
1784            vec!["score", "categories"],
1785            vec!["score", "location"],
1786            vec!["location", "categories"],
1787            vec!["score.y", "location", "categories"],
1788        ] {
1789            debug!("Testing round trip with projection {:?}", columns);
1790            for use_field_ids in [true, false] {
1791                // We can specify the projection as part of the read operation via read_stream_projected
1792                let file_reader = FileReader::try_open(
1793                    file_scheduler.clone(),
1794                    None,
1795                    Arc::<DecoderPlugins>::default(),
1796                    &test_cache(),
1797                    FileReaderOptions::default(),
1798                )
1799                .await
1800                .unwrap();
1801
1802                let projected_schema = written_file.schema.project(&columns).unwrap();
1803                let projection = if use_field_ids {
1804                    ReaderProjection::from_field_ids(
1805                        file_reader.metadata.version(),
1806                        &projected_schema,
1807                        &field_id_mapping,
1808                    )
1809                    .unwrap()
1810                } else {
1811                    ReaderProjection::from_column_names(
1812                        file_reader.metadata.version(),
1813                        &written_file.schema,
1814                        &columns,
1815                    )
1816                    .unwrap()
1817                };
1818
1819                let batch_stream = file_reader
1820                    .read_stream_projected(
1821                        lance_io::ReadBatchParams::RangeFull,
1822                        1024,
1823                        16,
1824                        projection.clone(),
1825                        FilterExpression::no_filter(),
1826                    )
1827                    .unwrap();
1828
1829                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1830                verify_expected(
1831                    &written_file.data,
1832                    batch_stream,
1833                    1024,
1834                    Some(Box::new(move |batch: &RecordBatch| {
1835                        batch.project_by_schema(&projection_arrow).unwrap()
1836                    })),
1837                )
1838                .await;
1839
1840                // We can also specify the projection as a base projection when we open the file
1841                let file_reader = FileReader::try_open(
1842                    file_scheduler.clone(),
1843                    Some(projection.clone()),
1844                    Arc::<DecoderPlugins>::default(),
1845                    &test_cache(),
1846                    FileReaderOptions::default(),
1847                )
1848                .await
1849                .unwrap();
1850
1851                let batch_stream = file_reader
1852                    .read_stream(
1853                        lance_io::ReadBatchParams::RangeFull,
1854                        1024,
1855                        16,
1856                        FilterExpression::no_filter(),
1857                    )
1858                    .unwrap();
1859
1860                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1861                verify_expected(
1862                    &written_file.data,
1863                    batch_stream,
1864                    1024,
1865                    Some(Box::new(move |batch: &RecordBatch| {
1866                        batch.project_by_schema(&projection_arrow).unwrap()
1867                    })),
1868                )
1869                .await;
1870
1871                assert!(file_reader
1872                    .read_stream_projected(
1873                        lance_io::ReadBatchParams::RangeFull,
1874                        1024,
1875                        16,
1876                        empty_projection.clone(),
1877                        FilterExpression::no_filter(),
1878                    )
1879                    .is_err());
1880            }
1881        }
1882
1883        assert!(FileReader::try_open(
1884            file_scheduler.clone(),
1885            Some(empty_projection),
1886            Arc::<DecoderPlugins>::default(),
1887            &test_cache(),
1888            FileReaderOptions::default(),
1889        )
1890        .await
1891        .is_err());
1892
1893        let arrow_schema = ArrowSchema::new(vec![
1894            Field::new("x", DataType::Int32, true),
1895            Field::new("y", DataType::Int32, true),
1896        ]);
1897        let schema = Schema::try_from(&arrow_schema).unwrap();
1898
1899        let projection_with_dupes = ReaderProjection {
1900            column_indices: vec![0, 0],
1901            schema: Arc::new(schema),
1902        };
1903
1904        assert!(FileReader::try_open(
1905            file_scheduler.clone(),
1906            Some(projection_with_dupes),
1907            Arc::<DecoderPlugins>::default(),
1908            &test_cache(),
1909            FileReaderOptions::default(),
1910        )
1911        .await
1912        .is_err());
1913    }
1914
1915    #[test_log::test(tokio::test)]
1916    async fn test_compressing_buffer() {
1917        let fs = FsFixture::default();
1918
1919        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1920        let file_scheduler = fs
1921            .scheduler
1922            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1923            .await
1924            .unwrap();
1925
1926        // We can specify the projection as part of the read operation via read_stream_projected
1927        let file_reader = FileReader::try_open(
1928            file_scheduler.clone(),
1929            None,
1930            Arc::<DecoderPlugins>::default(),
1931            &test_cache(),
1932            FileReaderOptions::default(),
1933        )
1934        .await
1935        .unwrap();
1936
1937        let mut projection = written_file.schema.project(&["score"]).unwrap();
1938        for field in projection.fields.iter_mut() {
1939            field
1940                .metadata
1941                .insert("lance:compression".to_string(), "zstd".to_string());
1942        }
1943        let projection = ReaderProjection {
1944            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1945            schema: Arc::new(projection),
1946        };
1947
1948        let batch_stream = file_reader
1949            .read_stream_projected(
1950                lance_io::ReadBatchParams::RangeFull,
1951                1024,
1952                16,
1953                projection.clone(),
1954                FilterExpression::no_filter(),
1955            )
1956            .unwrap();
1957
1958        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1959        verify_expected(
1960            &written_file.data,
1961            batch_stream,
1962            1024,
1963            Some(Box::new(move |batch: &RecordBatch| {
1964                batch.project_by_schema(&projection_arrow).unwrap()
1965            })),
1966        )
1967        .await;
1968    }
1969
1970    #[tokio::test]
1971    async fn test_read_all() {
1972        let fs = FsFixture::default();
1973        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1974        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1975
1976        let file_scheduler = fs
1977            .scheduler
1978            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1979            .await
1980            .unwrap();
1981        let file_reader = FileReader::try_open(
1982            file_scheduler.clone(),
1983            None,
1984            Arc::<DecoderPlugins>::default(),
1985            &test_cache(),
1986            FileReaderOptions::default(),
1987        )
1988        .await
1989        .unwrap();
1990
1991        let batches = file_reader
1992            .read_stream(
1993                lance_io::ReadBatchParams::RangeFull,
1994                total_rows as u32,
1995                16,
1996                FilterExpression::no_filter(),
1997            )
1998            .unwrap()
1999            .try_collect::<Vec<_>>()
2000            .await
2001            .unwrap();
2002        assert_eq!(batches.len(), 1);
2003        assert_eq!(batches[0].num_rows(), total_rows);
2004    }
2005
2006    #[tokio::test]
2007    async fn test_blocking_take() {
2008        let fs = FsFixture::default();
2009        let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
2010        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2011
2012        let file_scheduler = fs
2013            .scheduler
2014            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2015            .await
2016            .unwrap();
2017        let file_reader = FileReader::try_open(
2018            file_scheduler.clone(),
2019            Some(
2020                ReaderProjection::from_column_names(LanceFileVersion::V2_1, &schema, &["score"])
2021                    .unwrap(),
2022            ),
2023            Arc::<DecoderPlugins>::default(),
2024            &test_cache(),
2025            FileReaderOptions::default(),
2026        )
2027        .await
2028        .unwrap();
2029
2030        let batches = tokio::task::spawn_blocking(move || {
2031            file_reader
2032                .read_stream_projected_blocking(
2033                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2034                    total_rows as u32,
2035                    None,
2036                    FilterExpression::no_filter(),
2037                )
2038                .unwrap()
2039                .collect::<ArrowResult<Vec<_>>>()
2040                .unwrap()
2041        })
2042        .await
2043        .unwrap();
2044
2045        assert_eq!(batches.len(), 1);
2046        assert_eq!(batches[0].num_rows(), 5);
2047        assert_eq!(batches[0].num_columns(), 1);
2048    }
2049
2050    #[tokio::test(flavor = "multi_thread")]
2051    async fn test_drop_in_progress() {
2052        let fs = FsFixture::default();
2053        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2054        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2055
2056        let file_scheduler = fs
2057            .scheduler
2058            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2059            .await
2060            .unwrap();
2061        let file_reader = FileReader::try_open(
2062            file_scheduler.clone(),
2063            None,
2064            Arc::<DecoderPlugins>::default(),
2065            &test_cache(),
2066            FileReaderOptions::default(),
2067        )
2068        .await
2069        .unwrap();
2070
2071        let mut batches = file_reader
2072            .read_stream(
2073                lance_io::ReadBatchParams::RangeFull,
2074                (total_rows / 10) as u32,
2075                16,
2076                FilterExpression::no_filter(),
2077            )
2078            .unwrap();
2079
2080        drop(file_reader);
2081
2082        let batch = batches.next().await.unwrap().unwrap();
2083        assert!(batch.num_rows() > 0);
2084
2085        // Drop in-progress scan
2086        drop(batches);
2087    }
2088
2089    #[tokio::test]
2090    async fn drop_while_scheduling() {
2091        // This is a bit of a white-box test, pokes at the internals.  We want to
2092        // test the case where the read stream is dropped before the scheduling
2093        // thread finishes.  We can't do that in a black-box fashion because the
2094        // scheduling thread runs in the background and there is no easy way to
2095        // pause / gate it.
2096
2097        // It's a regression for a bug where the scheduling thread would panic
2098        // if the stream was dropped before it finished.
2099
2100        let fs = FsFixture::default();
2101        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2102        let total_rows = written_file
2103            .data
2104            .iter()
2105            .map(|batch| batch.num_rows())
2106            .sum::<usize>();
2107
2108        let file_scheduler = fs
2109            .scheduler
2110            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2111            .await
2112            .unwrap();
2113        let file_reader = FileReader::try_open(
2114            file_scheduler.clone(),
2115            None,
2116            Arc::<DecoderPlugins>::default(),
2117            &test_cache(),
2118            FileReaderOptions::default(),
2119        )
2120        .await
2121        .unwrap();
2122
2123        let projection =
2124            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2125        let column_infos = file_reader
2126            .collect_columns_from_projection(&projection)
2127            .unwrap();
2128        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2129            &projection.schema,
2130            &projection.column_indices,
2131            &column_infos,
2132            &vec![],
2133            total_rows as u64,
2134            Arc::<DecoderPlugins>::default(),
2135            file_reader.scheduler.clone(),
2136            test_cache(),
2137            &FilterExpression::no_filter(),
2138            false,
2139        )
2140        .await
2141        .unwrap();
2142
2143        let range = 0..total_rows as u64;
2144
2145        let (tx, rx) = mpsc::unbounded_channel();
2146
2147        // Simulate the stream / decoder being dropped
2148        drop(rx);
2149
2150        // Scheduling should not panic
2151        decode_scheduler.schedule_range(
2152            range,
2153            &FilterExpression::no_filter(),
2154            tx,
2155            file_reader.scheduler.clone(),
2156        )
2157    }
2158
2159    #[tokio::test]
2160    async fn test_global_buffers() {
2161        let fs = FsFixture::default();
2162
2163        let lance_schema =
2164            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2165                "foo",
2166                DataType::Int32,
2167                true,
2168            )]))
2169            .unwrap();
2170
2171        let mut file_writer = FileWriter::try_new(
2172            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2173            lance_schema.clone(),
2174            FileWriterOptions::default(),
2175        )
2176        .unwrap();
2177
2178        let test_bytes = Bytes::from_static(b"hello");
2179
2180        let buf_index = file_writer
2181            .add_global_buffer(test_bytes.clone())
2182            .await
2183            .unwrap();
2184
2185        assert_eq!(buf_index, 1);
2186
2187        file_writer.finish().await.unwrap();
2188
2189        let file_scheduler = fs
2190            .scheduler
2191            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2192            .await
2193            .unwrap();
2194        let file_reader = FileReader::try_open(
2195            file_scheduler.clone(),
2196            None,
2197            Arc::<DecoderPlugins>::default(),
2198            &test_cache(),
2199            FileReaderOptions::default(),
2200        )
2201        .await
2202        .unwrap();
2203
2204        let buf = file_reader.read_global_buffer(1).await.unwrap();
2205        assert_eq!(buf, test_bytes);
2206    }
2207}