lance_file/v2/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19    decoder::{
20        schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21        DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22        SchedulerDecoderConfig,
23    },
24    encoder::EncodedBatch,
25    version::LanceFileVersion,
26    EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36    Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41    scheduler::FileScheduler,
42    stream::{RecordBatchStream, RecordBatchStreamAdapter},
43    ReadBatchParams,
44};
45
46use crate::{
47    datatypes::{Fields, FieldsWithMeta},
48    format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49    v2::writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52use super::io::LanceEncodingsIo;
53
54// For now, we don't use global buffers for anything other than schema.  If we
55// use these later we should make them lazily loaded and then cached once loaded.
56//
57// We store their position / length for debugging purposes
58#[derive(Debug, DeepSizeOf)]
59pub struct BufferDescriptor {
60    pub position: u64,
61    pub size: u64,
62}
63
64/// Statistics summarize some of the file metadata for quick summary info
65#[derive(Debug)]
66pub struct FileStatistics {
67    /// Statistics about each of the columns in the file
68    pub columns: Vec<ColumnStatistics>,
69}
70
71/// Summary information describing a column
72#[derive(Debug)]
73pub struct ColumnStatistics {
74    /// The number of pages in the column
75    pub num_pages: usize,
76    /// The total number of data & metadata bytes in the column
77    ///
78    /// This is the compressed on-disk size
79    pub size_bytes: u64,
80}
81
82// TODO: Caching
83#[derive(Debug)]
84pub struct CachedFileMetadata {
85    /// The schema of the file
86    pub file_schema: Arc<Schema>,
87    /// The column metadatas
88    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
89    pub column_infos: Vec<Arc<ColumnInfo>>,
90    /// The number of rows in the file
91    pub num_rows: u64,
92    pub file_buffers: Vec<BufferDescriptor>,
93    /// The number of bytes contained in the data page section of the file
94    pub num_data_bytes: u64,
95    /// The number of bytes contained in the column metadata (not including buffers
96    /// referenced by the metadata)
97    pub num_column_metadata_bytes: u64,
98    /// The number of bytes contained in global buffers
99    pub num_global_buffer_bytes: u64,
100    /// The number of bytes contained in the CMO and GBO tables
101    pub num_footer_bytes: u64,
102    pub major_version: u16,
103    pub minor_version: u16,
104}
105
106impl DeepSizeOf for CachedFileMetadata {
107    // TODO: include size for `column_metadatas` and `column_infos`.
108    fn deep_size_of_children(&self, context: &mut Context) -> usize {
109        self.file_schema.deep_size_of_children(context)
110            + self
111                .file_buffers
112                .iter()
113                .map(|file_buffer| file_buffer.deep_size_of_children(context))
114                .sum::<usize>()
115    }
116}
117
118impl CachedFileMetadata {
119    pub fn version(&self) -> LanceFileVersion {
120        match (self.major_version, self.minor_version) {
121            (0, 3) => LanceFileVersion::V2_0,
122            (2, 1) => LanceFileVersion::V2_1,
123            _ => panic!(
124                "Unsupported version: {}.{}",
125                self.major_version, self.minor_version
126            ),
127        }
128    }
129}
130
131/// Selecting columns from a lance file requires specifying both the
132/// index of the column and the data type of the column
133///
134/// Partly, this is because it is not strictly required that columns
135/// be read into the same type.  For example, a string column may be
136/// read as a string, large_string or string_view type.
137///
138/// A read will only succeed if the decoder for a column is capable
139/// of decoding into the requested type.
140///
141/// Note that this should generally be limited to different in-memory
142/// representations of the same semantic type.  An encoding could
143/// theoretically support "casting" (e.g. int to string, etc.) but
144/// there is little advantage in doing so here.
145///
146/// Note: in order to specify a projection the user will need some way
147/// to figure out the column indices.  In the table format we do this
148/// using field IDs and keeping track of the field id->column index mapping.
149///
150/// If users are not using the table format then they will need to figure
151/// out some way to do this themselves.
152#[derive(Debug, Clone)]
153pub struct ReaderProjection {
154    /// The data types (schema) of the selected columns.  The names
155    /// of the schema are arbitrary and ignored.
156    pub schema: Arc<Schema>,
157    /// The indices of the columns to load.
158    ///
159    /// The content of this vector depends on the file version.
160    ///
161    /// In Lance File Version 2.0 we need ids for structural fields as
162    /// well as leaf fields:
163    ///
164    ///   - Primitive: the index of the column in the schema
165    ///   - List: the index of the list column in the schema
166    ///     followed by the column indices of the children
167    ///   - FixedSizeList (of primitive): the index of the column in the schema
168    ///     (this case is not nested)
169    ///   - FixedSizeList (of non-primitive): not yet implemented
170    ///   - Dictionary: same as primitive
171    ///   - Struct: the index of the struct column in the schema
172    ///     followed by the column indices of the children
173    ///
174    ///   In other words, this should be a DFS listing of the desired schema.
175    ///
176    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
177    /// fields are completely transparent.
178    ///
179    /// For example, if the goal is to load:
180    ///
181    ///   x: int32
182    ///   y: struct<z: int32, w: string>
183    ///   z: list<int32>
184    ///
185    /// and the schema originally used to store the data was:
186    ///
187    ///   a: struct<x: int32>
188    ///   b: int64
189    ///   y: struct<z: int32, c: int64, w: string>
190    ///   z: list<int32>
191    ///
192    /// Then the column_indices should be:
193    ///
194    /// - 2.0: [1, 3, 4, 6, 7, 8]
195    /// - 2.1: [0, 2, 4, 5]
196    pub column_indices: Vec<u32>,
197}
198
199impl ReaderProjection {
200    fn from_field_ids_helper<'a>(
201        file_version: LanceFileVersion,
202        fields: impl Iterator<Item = &'a Field>,
203        field_id_to_column_index: &BTreeMap<u32, u32>,
204        column_indices: &mut Vec<u32>,
205    ) -> Result<()> {
206        for field in fields {
207            let is_structural = file_version >= LanceFileVersion::V2_1;
208            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
209            // we only need ids for leaf fields.
210            if !is_structural || field.children.is_empty() {
211                if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
212                {
213                    column_indices.push(column_idx);
214                }
215            }
216            Self::from_field_ids_helper(
217                file_version,
218                field.children.iter(),
219                field_id_to_column_index,
220                column_indices,
221            )?;
222        }
223        Ok(())
224    }
225
226    /// Creates a projection using a mapping from field IDs to column indices
227    ///
228    /// You can obtain such a mapping when the file is written using the
229    /// [`crate::v2::writer::FileWriter::field_id_to_column_indices`] method.
230    pub fn from_field_ids(
231        file_version: LanceFileVersion,
232        schema: &Schema,
233        field_id_to_column_index: &BTreeMap<u32, u32>,
234    ) -> Result<Self> {
235        let mut column_indices = Vec::new();
236        Self::from_field_ids_helper(
237            file_version,
238            schema.fields.iter(),
239            field_id_to_column_index,
240            &mut column_indices,
241        )?;
242        Ok(Self {
243            schema: Arc::new(schema.clone()),
244            column_indices,
245        })
246    }
247
248    /// Creates a projection that reads the entire file
249    ///
250    /// If the schema provided is not the schema of the entire file then
251    /// the projection will be invalid and the read will fail.
252    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
253    /// the whole struct has one column index.
254    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
255    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
256        let schema = Arc::new(schema.clone());
257        let is_structural = version >= LanceFileVersion::V2_1;
258        let mut column_indices = vec![];
259        let mut curr_column_idx = 0;
260        let mut packed_struct_fields_num = 0;
261        for field in schema.fields_pre_order() {
262            if packed_struct_fields_num > 0 {
263                packed_struct_fields_num -= 1;
264                continue;
265            }
266            if field.is_packed_struct() {
267                column_indices.push(curr_column_idx);
268                curr_column_idx += 1;
269                packed_struct_fields_num = field.children.len();
270            } else if field.children.is_empty() || !is_structural {
271                column_indices.push(curr_column_idx);
272                curr_column_idx += 1;
273            }
274        }
275        Self {
276            schema,
277            column_indices,
278        }
279    }
280
281    /// Creates a projection that reads the specified columns provided by name
282    ///
283    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
284    ///
285    /// If the schema provided is not the schema of the entire file then
286    /// the projection will be invalid and the read will fail.
287    pub fn from_column_names(
288        file_version: LanceFileVersion,
289        schema: &Schema,
290        column_names: &[&str],
291    ) -> Result<Self> {
292        let field_id_to_column_index = schema
293            .fields_pre_order()
294            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
295            // we only need ids for leaf fields.
296            .filter(|field| file_version < LanceFileVersion::V2_1 || field.is_leaf())
297            .enumerate()
298            .map(|(idx, field)| (field.id as u32, idx as u32))
299            .collect::<BTreeMap<_, _>>();
300        let projected = schema.project(column_names)?;
301        let mut column_indices = Vec::new();
302        Self::from_field_ids_helper(
303            file_version,
304            projected.fields.iter(),
305            &field_id_to_column_index,
306            &mut column_indices,
307        )?;
308        Ok(Self {
309            schema: Arc::new(projected),
310            column_indices,
311        })
312    }
313}
314
315/// File Reader Options that can control reading behaviors, such as whether to enable caching on repetition indices
316#[derive(Clone, Debug, Default)]
317pub struct FileReaderOptions {
318    pub decoder_config: DecoderConfig,
319}
320
321#[derive(Debug)]
322pub struct FileReader {
323    scheduler: Arc<dyn EncodingsIo>,
324    // The default projection to be applied to all reads
325    base_projection: ReaderProjection,
326    num_rows: u64,
327    metadata: Arc<CachedFileMetadata>,
328    decoder_plugins: Arc<DecoderPlugins>,
329    cache: Arc<LanceCache>,
330    options: FileReaderOptions,
331}
332#[derive(Debug)]
333struct Footer {
334    #[allow(dead_code)]
335    column_meta_start: u64,
336    // We don't use this today because we always load metadata for every column
337    // and don't yet support "metadata projection"
338    #[allow(dead_code)]
339    column_meta_offsets_start: u64,
340    global_buff_offsets_start: u64,
341    num_global_buffers: u32,
342    num_columns: u32,
343    major_version: u16,
344    minor_version: u16,
345}
346
347const FOOTER_LEN: usize = 40;
348
349impl FileReader {
350    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
351        Self {
352            scheduler,
353            base_projection: self.base_projection.clone(),
354            cache: self.cache.clone(),
355            decoder_plugins: self.decoder_plugins.clone(),
356            metadata: self.metadata.clone(),
357            options: self.options.clone(),
358            num_rows: self.num_rows,
359        }
360    }
361
362    pub fn num_rows(&self) -> u64 {
363        self.num_rows
364    }
365
366    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
367        &self.metadata
368    }
369
370    pub fn file_statistics(&self) -> FileStatistics {
371        let column_metadatas = &self.metadata().column_metadatas;
372
373        let column_stats = column_metadatas
374            .iter()
375            .map(|col_metadata| {
376                let num_pages = col_metadata.pages.len();
377                let size_bytes = col_metadata
378                    .pages
379                    .iter()
380                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
381                    .sum::<u64>();
382                ColumnStatistics {
383                    num_pages,
384                    size_bytes,
385                }
386            })
387            .collect();
388
389        FileStatistics {
390            columns: column_stats,
391        }
392    }
393
394    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
395        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
396        self.scheduler
397            .submit_single(
398                buffer_desc.position..buffer_desc.position + buffer_desc.size,
399                0,
400            )
401            .await
402    }
403
404    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
405        let file_size = scheduler.reader().size().await? as u64;
406        let begin = if file_size < scheduler.reader().block_size() as u64 {
407            0
408        } else {
409            file_size - scheduler.reader().block_size() as u64
410        };
411        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
412        Ok((tail_bytes, file_size))
413    }
414
415    // Checks to make sure the footer is written correctly and returns the
416    // position of the file descriptor (which comes from the footer)
417    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
418        let len = footer_bytes.len();
419        if len < FOOTER_LEN {
420            return Err(Error::io(
421                format!(
422                    "does not have sufficient data, len: {}, bytes: {:?}",
423                    len, footer_bytes
424                ),
425                location!(),
426            ));
427        }
428        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
429
430        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
431        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
432        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
433        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
434        let num_columns = cursor.read_u32::<LittleEndian>()?;
435        let major_version = cursor.read_u16::<LittleEndian>()?;
436        let minor_version = cursor.read_u16::<LittleEndian>()?;
437
438        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
439            return Err(Error::version_conflict(
440                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
441                major_version,
442                minor_version,
443                location!(),
444            ));
445        }
446
447        let magic_bytes = footer_bytes.slice(len - 4..);
448        if magic_bytes.as_ref() != MAGIC {
449            return Err(Error::io(
450                format!(
451                    "file does not appear to be a Lance file (invalid magic: {:?})",
452                    MAGIC
453                ),
454                location!(),
455            ));
456        }
457        Ok(Footer {
458            column_meta_start,
459            column_meta_offsets_start,
460            global_buff_offsets_start,
461            num_global_buffers,
462            num_columns,
463            major_version,
464            minor_version,
465        })
466    }
467
468    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
469    fn read_all_column_metadata(
470        column_metadata_bytes: Bytes,
471        footer: &Footer,
472    ) -> Result<Vec<pbfile::ColumnMetadata>> {
473        let column_metadata_start = footer.column_meta_start;
474        // cmo == column_metadata_offsets
475        let cmo_table_size = 16 * footer.num_columns as usize;
476        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
477
478        (0..footer.num_columns)
479            .map(|col_idx| {
480                let offset = (col_idx * 16) as usize;
481                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
482                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
483                let normalized_position = (position - column_metadata_start) as usize;
484                let normalized_end = normalized_position + (length as usize);
485                Ok(pbfile::ColumnMetadata::decode(
486                    &column_metadata_bytes[normalized_position..normalized_end],
487                )?)
488            })
489            .collect::<Result<Vec<_>>>()
490    }
491
492    async fn optimistic_tail_read(
493        data: &Bytes,
494        start_pos: u64,
495        scheduler: &FileScheduler,
496        file_len: u64,
497    ) -> Result<Bytes> {
498        let num_bytes_needed = (file_len - start_pos) as usize;
499        if data.len() >= num_bytes_needed {
500            Ok(data.slice((data.len() - num_bytes_needed)..))
501        } else {
502            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
503            let start = file_len - num_bytes_needed as u64;
504            let missing_bytes = scheduler
505                .submit_single(start..start + num_bytes_missing, 0)
506                .await?;
507            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
508            combined.extend(missing_bytes);
509            combined.extend(data);
510            Ok(combined.freeze())
511        }
512    }
513
514    fn do_decode_gbo_table(
515        gbo_bytes: &Bytes,
516        footer: &Footer,
517        version: LanceFileVersion,
518    ) -> Result<Vec<BufferDescriptor>> {
519        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
520
521        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
522        for _ in 0..footer.num_global_buffers {
523            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
524            assert!(
525                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
526            );
527            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
528            global_buffers.push(BufferDescriptor {
529                position: buf_pos,
530                size: buf_size,
531            });
532        }
533
534        Ok(global_buffers)
535    }
536
537    async fn decode_gbo_table(
538        tail_bytes: &Bytes,
539        file_len: u64,
540        scheduler: &FileScheduler,
541        footer: &Footer,
542        version: LanceFileVersion,
543    ) -> Result<Vec<BufferDescriptor>> {
544        // This could, in theory, trigger another IOP but the GBO table should never be large
545        // enough for that to happen
546        let gbo_bytes = Self::optimistic_tail_read(
547            tail_bytes,
548            footer.global_buff_offsets_start,
549            scheduler,
550            file_len,
551        )
552        .await?;
553        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
554    }
555
556    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
557        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
558        let pb_schema = file_descriptor.schema.unwrap();
559        let num_rows = file_descriptor.length;
560        let fields_with_meta = FieldsWithMeta {
561            fields: Fields(pb_schema.fields),
562            metadata: pb_schema.metadata,
563        };
564        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
565        Ok((num_rows, schema))
566    }
567
568    // TODO: Support late projection.  Currently, if we want to perform a
569    // projected read of a file, we load all of the column metadata, and then
570    // only read the column data that is requested.  This is fine for most cases.
571    //
572    // However, if there are many columns then loading all of the column metadata
573    // may be expensive.  We should support a mode where we only load the column
574    // metadata for the columns that are requested (the file format supports this).
575    //
576    // The main challenge is that we either need to ignore the column metadata cache
577    // or have a more sophisticated cache that can cache per-column metadata.
578    //
579    // Also, if the number of columns is fairly small, it's faster to read them as a
580    // single IOP, but we can fix this through coalescing.
581    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
582        // 1. read the footer
583        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
584        let footer = Self::decode_footer(&tail_bytes)?;
585
586        let file_version = LanceFileVersion::try_from_major_minor(
587            footer.major_version as u32,
588            footer.minor_version as u32,
589        )?;
590
591        let gbo_table =
592            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
593        if gbo_table.is_empty() {
594            return Err(Error::Internal {
595                message: "File did not contain any global buffers, schema expected".to_string(),
596                location: location!(),
597            });
598        }
599        let schema_start = gbo_table[0].position;
600        let schema_size = gbo_table[0].size;
601
602        let num_footer_bytes = file_len - schema_start;
603
604        // By default we read all column metadatas.  We do NOT read the column metadata buffers
605        // at this point.  We only want to read the column metadata for columns we are actually loading.
606        let all_metadata_bytes =
607            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
608
609        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
610        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
611
612        // Next, read the metadata for the columns
613        // This is both the column metadata and the CMO table
614        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
615        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
616        let column_metadata_bytes =
617            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
618        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
619
620        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
621        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
622        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
623
624        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
625
626        Ok(CachedFileMetadata {
627            file_schema: Arc::new(schema),
628            column_metadatas,
629            column_infos,
630            num_rows,
631            num_data_bytes,
632            num_column_metadata_bytes,
633            num_global_buffer_bytes,
634            num_footer_bytes,
635            file_buffers: gbo_table,
636            major_version: footer.major_version,
637            minor_version: footer.minor_version,
638        })
639    }
640
641    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
642        match &encoding.location {
643            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
644            Some(pbfile::encoding::Location::Direct(encoding)) => {
645                let encoding_buf = Bytes::from(encoding.encoding.clone());
646                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
647                encoding_any.to_msg::<M>().unwrap()
648            }
649            Some(pbfile::encoding::Location::None(_)) => panic!(),
650            None => panic!(),
651        }
652    }
653
654    fn meta_to_col_infos(
655        column_metadatas: &[pbfile::ColumnMetadata],
656        file_version: LanceFileVersion,
657    ) -> Vec<Arc<ColumnInfo>> {
658        column_metadatas
659            .iter()
660            .enumerate()
661            .map(|(col_idx, col_meta)| {
662                let page_infos = col_meta
663                    .pages
664                    .iter()
665                    .map(|page| {
666                        let num_rows = page.length;
667                        let encoding = match file_version {
668                            LanceFileVersion::V2_0 => {
669                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
670                                    page.encoding.as_ref().unwrap(),
671                                ))
672                            }
673                            _ => PageEncoding::Structural(Self::fetch_encoding::<
674                                pbenc21::PageLayout,
675                            >(
676                                page.encoding.as_ref().unwrap()
677                            )),
678                        };
679                        let buffer_offsets_and_sizes = Arc::from(
680                            page.buffer_offsets
681                                .iter()
682                                .zip(page.buffer_sizes.iter())
683                                .map(|(offset, size)| {
684                                    // Starting with version 2.1 we can assert that page buffers are aligned
685                                    assert!(
686                                        file_version < LanceFileVersion::V2_1
687                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
688                                    );
689                                    (*offset, *size)
690                                })
691                                .collect::<Vec<_>>(),
692                        );
693                        PageInfo {
694                            buffer_offsets_and_sizes,
695                            encoding,
696                            num_rows,
697                            priority: page.priority,
698                        }
699                    })
700                    .collect::<Vec<_>>();
701                let buffer_offsets_and_sizes = Arc::from(
702                    col_meta
703                        .buffer_offsets
704                        .iter()
705                        .zip(col_meta.buffer_sizes.iter())
706                        .map(|(offset, size)| (*offset, *size))
707                        .collect::<Vec<_>>(),
708                );
709                Arc::new(ColumnInfo {
710                    index: col_idx as u32,
711                    page_infos: Arc::from(page_infos),
712                    buffer_offsets_and_sizes,
713                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
714                })
715            })
716            .collect::<Vec<_>>()
717    }
718
719    fn validate_projection(
720        projection: &ReaderProjection,
721        metadata: &CachedFileMetadata,
722    ) -> Result<()> {
723        if projection.schema.fields.is_empty() {
724            return Err(Error::invalid_input(
725                "Attempt to read zero columns from the file, at least one column must be specified"
726                    .to_string(),
727                location!(),
728            ));
729        }
730        let mut column_indices_seen = BTreeSet::new();
731        for column_index in &projection.column_indices {
732            if !column_indices_seen.insert(*column_index) {
733                return Err(Error::invalid_input(
734                    format!(
735                        "The projection specified the column index {} more than once",
736                        column_index
737                    ),
738                    location!(),
739                ));
740            }
741            if *column_index >= metadata.column_infos.len() as u32 {
742                return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
743            }
744        }
745        Ok(())
746    }
747
748    /// Opens a new file reader without any pre-existing knowledge
749    ///
750    /// This will read the file schema from the file itself and thus requires a bit more I/O
751    ///
752    /// A `base_projection` can also be provided.  If provided, then the projection will apply
753    /// to all reads from the file that do not specify their own projection.
754    pub async fn try_open(
755        scheduler: FileScheduler,
756        base_projection: Option<ReaderProjection>,
757        decoder_plugins: Arc<DecoderPlugins>,
758        cache: &LanceCache,
759        options: FileReaderOptions,
760    ) -> Result<Self> {
761        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
762        let path = scheduler.reader().path().clone();
763        Self::try_open_with_file_metadata(
764            Arc::new(LanceEncodingsIo(scheduler)),
765            path,
766            base_projection,
767            decoder_plugins,
768            file_metadata,
769            cache,
770            options,
771        )
772        .await
773    }
774
775    /// Same as `try_open` but with the file metadata already loaded.
776    ///
777    /// This method also can accept any kind of `EncodingsIo` implementation allowing
778    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
779    /// disks it may be better to avoid asynchronous overhead).
780    pub async fn try_open_with_file_metadata(
781        scheduler: Arc<dyn EncodingsIo>,
782        path: Path,
783        base_projection: Option<ReaderProjection>,
784        decoder_plugins: Arc<DecoderPlugins>,
785        file_metadata: Arc<CachedFileMetadata>,
786        cache: &LanceCache,
787        options: FileReaderOptions,
788    ) -> Result<Self> {
789        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
790
791        if let Some(base_projection) = base_projection.as_ref() {
792            Self::validate_projection(base_projection, &file_metadata)?;
793        }
794        let num_rows = file_metadata.num_rows;
795        Ok(Self {
796            scheduler,
797            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
798                file_metadata.file_schema.as_ref(),
799                file_metadata.version(),
800            )),
801            num_rows,
802            metadata: file_metadata,
803            decoder_plugins,
804            cache,
805            options,
806        })
807    }
808
809    // The actual decoder needs all the column infos that make up a type.  In other words, if
810    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
811    //
812    // This is a file reader concern because the file reader needs to support late projection of columns
813    // and so it will need to figure this out anyways.
814    //
815    // It's a bit of a tricky process though because the number of column infos may depend on the
816    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
817    // only be a single column in the file (and not 3).
818    //
819    // At the moment this method words because our rules are simple and we just repeat them here.  See
820    // Self::default_projection for a similar problem.  In the future this is something the encodings
821    // registry will need to figure out.
822    fn collect_columns_from_projection(
823        &self,
824        _projection: &ReaderProjection,
825    ) -> Result<Vec<Arc<ColumnInfo>>> {
826        Ok(self.metadata.column_infos.to_vec())
827    }
828
829    #[allow(clippy::too_many_arguments)]
830    fn do_read_range(
831        column_infos: Vec<Arc<ColumnInfo>>,
832        io: Arc<dyn EncodingsIo>,
833        cache: Arc<LanceCache>,
834        num_rows: u64,
835        decoder_plugins: Arc<DecoderPlugins>,
836        range: Range<u64>,
837        batch_size: u32,
838        projection: ReaderProjection,
839        filter: FilterExpression,
840        decoder_config: DecoderConfig,
841    ) -> Result<BoxStream<'static, ReadBatchTask>> {
842        debug!(
843            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
844            range,
845            batch_size,
846            num_rows,
847            column_infos.len(),
848            projection.schema.fields.len(),
849        );
850
851        let config = SchedulerDecoderConfig {
852            batch_size,
853            cache,
854            decoder_plugins,
855            io,
856            decoder_config,
857        };
858
859        let requested_rows = RequestedRows::Ranges(vec![range]);
860
861        Ok(schedule_and_decode(
862            column_infos,
863            requested_rows,
864            filter,
865            projection.column_indices,
866            projection.schema,
867            config,
868        ))
869    }
870
871    fn read_range(
872        &self,
873        range: Range<u64>,
874        batch_size: u32,
875        projection: ReaderProjection,
876        filter: FilterExpression,
877    ) -> Result<BoxStream<'static, ReadBatchTask>> {
878        // Create and initialize the stream
879        Self::do_read_range(
880            self.collect_columns_from_projection(&projection)?,
881            self.scheduler.clone(),
882            self.cache.clone(),
883            self.num_rows,
884            self.decoder_plugins.clone(),
885            range,
886            batch_size,
887            projection,
888            filter,
889            self.options.decoder_config.clone(),
890        )
891    }
892
893    #[allow(clippy::too_many_arguments)]
894    fn do_take_rows(
895        column_infos: Vec<Arc<ColumnInfo>>,
896        io: Arc<dyn EncodingsIo>,
897        cache: Arc<LanceCache>,
898        decoder_plugins: Arc<DecoderPlugins>,
899        indices: Vec<u64>,
900        batch_size: u32,
901        projection: ReaderProjection,
902        filter: FilterExpression,
903        decoder_config: DecoderConfig,
904    ) -> Result<BoxStream<'static, ReadBatchTask>> {
905        debug!(
906            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
907            indices.len(),
908            indices[0],
909            indices[indices.len() - 1],
910            batch_size,
911            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
912        );
913
914        let config = SchedulerDecoderConfig {
915            batch_size,
916            cache,
917            decoder_plugins,
918            io,
919            decoder_config,
920        };
921
922        let requested_rows = RequestedRows::Indices(indices);
923
924        Ok(schedule_and_decode(
925            column_infos,
926            requested_rows,
927            filter,
928            projection.column_indices,
929            projection.schema,
930            config,
931        ))
932    }
933
934    fn take_rows(
935        &self,
936        indices: Vec<u64>,
937        batch_size: u32,
938        projection: ReaderProjection,
939    ) -> Result<BoxStream<'static, ReadBatchTask>> {
940        // Create and initialize the stream
941        Self::do_take_rows(
942            self.collect_columns_from_projection(&projection)?,
943            self.scheduler.clone(),
944            self.cache.clone(),
945            self.decoder_plugins.clone(),
946            indices,
947            batch_size,
948            projection,
949            FilterExpression::no_filter(),
950            self.options.decoder_config.clone(),
951        )
952    }
953
954    #[allow(clippy::too_many_arguments)]
955    fn do_read_ranges(
956        column_infos: Vec<Arc<ColumnInfo>>,
957        io: Arc<dyn EncodingsIo>,
958        cache: Arc<LanceCache>,
959        decoder_plugins: Arc<DecoderPlugins>,
960        ranges: Vec<Range<u64>>,
961        batch_size: u32,
962        projection: ReaderProjection,
963        filter: FilterExpression,
964        decoder_config: DecoderConfig,
965    ) -> Result<BoxStream<'static, ReadBatchTask>> {
966        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
967        debug!(
968            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
969            ranges.len(),
970            num_rows,
971            ranges[0].start,
972            ranges[ranges.len() - 1].end,
973            batch_size,
974            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
975        );
976
977        let config = SchedulerDecoderConfig {
978            batch_size,
979            cache,
980            decoder_plugins,
981            io,
982            decoder_config,
983        };
984
985        let requested_rows = RequestedRows::Ranges(ranges);
986
987        Ok(schedule_and_decode(
988            column_infos,
989            requested_rows,
990            filter,
991            projection.column_indices,
992            projection.schema,
993            config,
994        ))
995    }
996
997    fn read_ranges(
998        &self,
999        ranges: Vec<Range<u64>>,
1000        batch_size: u32,
1001        projection: ReaderProjection,
1002        filter: FilterExpression,
1003    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1004        Self::do_read_ranges(
1005            self.collect_columns_from_projection(&projection)?,
1006            self.scheduler.clone(),
1007            self.cache.clone(),
1008            self.decoder_plugins.clone(),
1009            ranges,
1010            batch_size,
1011            projection,
1012            filter,
1013            self.options.decoder_config.clone(),
1014        )
1015    }
1016
1017    /// Creates a stream of "read tasks" to read the data from the file
1018    ///
1019    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1020    /// of record batches it returns a stream of "read tasks".
1021    ///
1022    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1023    ///
1024    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1025    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1026    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1027    pub fn read_tasks(
1028        &self,
1029        params: ReadBatchParams,
1030        batch_size: u32,
1031        projection: Option<ReaderProjection>,
1032        filter: FilterExpression,
1033    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1034        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1035        Self::validate_projection(&projection, &self.metadata)?;
1036        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1037            if bound > self.num_rows || bound == self.num_rows && inclusive {
1038                Err(Error::invalid_input(
1039                    format!(
1040                        "cannot read {:?} from file with {} rows",
1041                        params, self.num_rows
1042                    ),
1043                    location!(),
1044                ))
1045            } else {
1046                Ok(())
1047            }
1048        };
1049        match &params {
1050            ReadBatchParams::Indices(indices) => {
1051                for idx in indices {
1052                    match idx {
1053                        None => {
1054                            return Err(Error::invalid_input(
1055                                "Null value in indices array",
1056                                location!(),
1057                            ));
1058                        }
1059                        Some(idx) => {
1060                            verify_bound(&params, idx as u64, true)?;
1061                        }
1062                    }
1063                }
1064                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1065                self.take_rows(indices, batch_size, projection)
1066            }
1067            ReadBatchParams::Range(range) => {
1068                verify_bound(&params, range.end as u64, false)?;
1069                self.read_range(
1070                    range.start as u64..range.end as u64,
1071                    batch_size,
1072                    projection,
1073                    filter,
1074                )
1075            }
1076            ReadBatchParams::Ranges(ranges) => {
1077                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1078                for range in ranges.as_ref() {
1079                    verify_bound(&params, range.end, false)?;
1080                    ranges_u64.push(range.start..range.end);
1081                }
1082                self.read_ranges(ranges_u64, batch_size, projection, filter)
1083            }
1084            ReadBatchParams::RangeFrom(range) => {
1085                verify_bound(&params, range.start as u64, true)?;
1086                self.read_range(
1087                    range.start as u64..self.num_rows,
1088                    batch_size,
1089                    projection,
1090                    filter,
1091                )
1092            }
1093            ReadBatchParams::RangeTo(range) => {
1094                verify_bound(&params, range.end as u64, false)?;
1095                self.read_range(0..range.end as u64, batch_size, projection, filter)
1096            }
1097            ReadBatchParams::RangeFull => {
1098                self.read_range(0..self.num_rows, batch_size, projection, filter)
1099            }
1100        }
1101    }
1102
1103    /// Reads data from the file as a stream of record batches
1104    ///
1105    /// * `params` - Specifies the range (or indices) of data to read
1106    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1107    ///   if it is the last batch or if it is not possible to create a batch of the
1108    ///   requested size.
1109    ///
1110    ///   For example, if the batch size is 1024 and one of the columns is a string
1111    ///   column then there may be some ranges of 1024 rows that contain more than
1112    ///   2^31 bytes of string data (which is the maximum size of a string column
1113    ///   in Arrow).  In this case smaller batches may be emitted.
1114    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1115    ///   amount of CPU parallelism of the read.  In other words it controls how many
1116    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1117    ///   of the read (how many I/O requests are in flight at once).
1118    ///
1119    ///   This parameter also is also related to backpressure.  If the consumer of the
1120    ///   stream is slow then the reader will build up RAM.
1121    /// * `projection` - A projection to apply to the read.  This controls which columns
1122    ///   are read from the file.  The projection is NOT applied on top of the base
1123    ///   projection.  The projection is applied directly to the file schema.
1124    pub fn read_stream_projected(
1125        &self,
1126        params: ReadBatchParams,
1127        batch_size: u32,
1128        batch_readahead: u32,
1129        projection: ReaderProjection,
1130        filter: FilterExpression,
1131    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1132        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1133        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1134        let batch_stream = tasks_stream
1135            .map(|task| task.task)
1136            .buffered(batch_readahead as usize)
1137            .boxed();
1138        Ok(Box::pin(RecordBatchStreamAdapter::new(
1139            arrow_schema,
1140            batch_stream,
1141        )))
1142    }
1143
1144    fn take_rows_blocking(
1145        &self,
1146        indices: Vec<u64>,
1147        batch_size: u32,
1148        projection: ReaderProjection,
1149        filter: FilterExpression,
1150    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1151        let column_infos = self.collect_columns_from_projection(&projection)?;
1152        debug!(
1153            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1154            indices.len(),
1155            indices[0],
1156            indices[indices.len() - 1],
1157            batch_size,
1158            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1159        );
1160
1161        let config = SchedulerDecoderConfig {
1162            batch_size,
1163            cache: self.cache.clone(),
1164            decoder_plugins: self.decoder_plugins.clone(),
1165            io: self.scheduler.clone(),
1166            decoder_config: self.options.decoder_config.clone(),
1167        };
1168
1169        let requested_rows = RequestedRows::Indices(indices);
1170
1171        schedule_and_decode_blocking(
1172            column_infos,
1173            requested_rows,
1174            filter,
1175            projection.column_indices,
1176            projection.schema,
1177            config,
1178        )
1179    }
1180
1181    fn read_ranges_blocking(
1182        &self,
1183        ranges: Vec<Range<u64>>,
1184        batch_size: u32,
1185        projection: ReaderProjection,
1186        filter: FilterExpression,
1187    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1188        let column_infos = self.collect_columns_from_projection(&projection)?;
1189        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1190        debug!(
1191            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1192            ranges.len(),
1193            num_rows,
1194            ranges[0].start,
1195            ranges[ranges.len() - 1].end,
1196            batch_size,
1197            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1198        );
1199
1200        let config = SchedulerDecoderConfig {
1201            batch_size,
1202            cache: self.cache.clone(),
1203            decoder_plugins: self.decoder_plugins.clone(),
1204            io: self.scheduler.clone(),
1205            decoder_config: self.options.decoder_config.clone(),
1206        };
1207
1208        let requested_rows = RequestedRows::Ranges(ranges);
1209
1210        schedule_and_decode_blocking(
1211            column_infos,
1212            requested_rows,
1213            filter,
1214            projection.column_indices,
1215            projection.schema,
1216            config,
1217        )
1218    }
1219
1220    fn read_range_blocking(
1221        &self,
1222        range: Range<u64>,
1223        batch_size: u32,
1224        projection: ReaderProjection,
1225        filter: FilterExpression,
1226    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1227        let column_infos = self.collect_columns_from_projection(&projection)?;
1228        let num_rows = self.num_rows;
1229
1230        debug!(
1231            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1232            range,
1233            batch_size,
1234            num_rows,
1235            column_infos.len(),
1236            projection.schema.fields.len(),
1237        );
1238
1239        let config = SchedulerDecoderConfig {
1240            batch_size,
1241            cache: self.cache.clone(),
1242            decoder_plugins: self.decoder_plugins.clone(),
1243            io: self.scheduler.clone(),
1244            decoder_config: self.options.decoder_config.clone(),
1245        };
1246
1247        let requested_rows = RequestedRows::Ranges(vec![range]);
1248
1249        schedule_and_decode_blocking(
1250            column_infos,
1251            requested_rows,
1252            filter,
1253            projection.column_indices,
1254            projection.schema,
1255            config,
1256        )
1257    }
1258
1259    /// Read data from the file as an iterator of record batches
1260    ///
1261    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1262    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1263    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1264    ///
1265    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1266    /// use this method) because we can parallelize the decode.
1267    ///
1268    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1269    /// from a spawn_blocking context.
1270    pub fn read_stream_projected_blocking(
1271        &self,
1272        params: ReadBatchParams,
1273        batch_size: u32,
1274        projection: Option<ReaderProjection>,
1275        filter: FilterExpression,
1276    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1277        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1278        Self::validate_projection(&projection, &self.metadata)?;
1279        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1280            if bound > self.num_rows || bound == self.num_rows && inclusive {
1281                Err(Error::invalid_input(
1282                    format!(
1283                        "cannot read {:?} from file with {} rows",
1284                        params, self.num_rows
1285                    ),
1286                    location!(),
1287                ))
1288            } else {
1289                Ok(())
1290            }
1291        };
1292        match &params {
1293            ReadBatchParams::Indices(indices) => {
1294                for idx in indices {
1295                    match idx {
1296                        None => {
1297                            return Err(Error::invalid_input(
1298                                "Null value in indices array",
1299                                location!(),
1300                            ));
1301                        }
1302                        Some(idx) => {
1303                            verify_bound(&params, idx as u64, true)?;
1304                        }
1305                    }
1306                }
1307                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1308                self.take_rows_blocking(indices, batch_size, projection, filter)
1309            }
1310            ReadBatchParams::Range(range) => {
1311                verify_bound(&params, range.end as u64, false)?;
1312                self.read_range_blocking(
1313                    range.start as u64..range.end as u64,
1314                    batch_size,
1315                    projection,
1316                    filter,
1317                )
1318            }
1319            ReadBatchParams::Ranges(ranges) => {
1320                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1321                for range in ranges.as_ref() {
1322                    verify_bound(&params, range.end, false)?;
1323                    ranges_u64.push(range.start..range.end);
1324                }
1325                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1326            }
1327            ReadBatchParams::RangeFrom(range) => {
1328                verify_bound(&params, range.start as u64, true)?;
1329                self.read_range_blocking(
1330                    range.start as u64..self.num_rows,
1331                    batch_size,
1332                    projection,
1333                    filter,
1334                )
1335            }
1336            ReadBatchParams::RangeTo(range) => {
1337                verify_bound(&params, range.end as u64, false)?;
1338                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1339            }
1340            ReadBatchParams::RangeFull => {
1341                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1342            }
1343        }
1344    }
1345
1346    /// Reads data from the file as a stream of record batches
1347    ///
1348    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1349    /// provided when the file was opened (or reads all columns if the file was
1350    /// opened without a base projection)
1351    pub fn read_stream(
1352        &self,
1353        params: ReadBatchParams,
1354        batch_size: u32,
1355        batch_readahead: u32,
1356        filter: FilterExpression,
1357    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1358        self.read_stream_projected(
1359            params,
1360            batch_size,
1361            batch_readahead,
1362            self.base_projection.clone(),
1363            filter,
1364        )
1365    }
1366
1367    pub fn schema(&self) -> &Arc<Schema> {
1368        &self.metadata.file_schema
1369    }
1370}
1371
1372/// Inspects a page and returns a String describing the page's encoding
1373pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1374    if let Some(encoding) = &page.encoding {
1375        if let Some(style) = &encoding.location {
1376            match style {
1377                pbfile::encoding::Location::Indirect(indirect) => {
1378                    format!(
1379                        "IndirectEncoding(pos={},size={})",
1380                        indirect.buffer_location, indirect.buffer_length
1381                    )
1382                }
1383                pbfile::encoding::Location::Direct(direct) => {
1384                    let encoding_any =
1385                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1386                            .expect("failed to deserialize encoding as protobuf");
1387                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1388                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1389                        match encoding {
1390                            Ok(encoding) => {
1391                                format!("{:#?}", encoding)
1392                            }
1393                            Err(err) => {
1394                                format!("Unsupported(decode_err={})", err)
1395                            }
1396                        }
1397                    } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1398                        let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1399                        match encoding {
1400                            Ok(encoding) => {
1401                                format!("{:#?}", encoding)
1402                            }
1403                            Err(err) => {
1404                                format!("Unsupported(decode_err={})", err)
1405                            }
1406                        }
1407                    } else {
1408                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1409                    }
1410                }
1411                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1412            }
1413        } else {
1414            "MISSING STYLE".to_string()
1415        }
1416    } else {
1417        "MISSING".to_string()
1418    }
1419}
1420
1421pub trait EncodedBatchReaderExt {
1422    fn try_from_mini_lance(
1423        bytes: Bytes,
1424        schema: &Schema,
1425        version: LanceFileVersion,
1426    ) -> Result<Self>
1427    where
1428        Self: Sized;
1429    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1430    where
1431        Self: Sized;
1432}
1433
1434impl EncodedBatchReaderExt for EncodedBatch {
1435    fn try_from_mini_lance(
1436        bytes: Bytes,
1437        schema: &Schema,
1438        file_version: LanceFileVersion,
1439    ) -> Result<Self>
1440    where
1441        Self: Sized,
1442    {
1443        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1444        let footer = FileReader::decode_footer(&bytes)?;
1445
1446        // Next, read the metadata for the columns
1447        // This is both the column metadata and the CMO table
1448        let column_metadata_start = footer.column_meta_start as usize;
1449        let column_metadata_end = footer.global_buff_offsets_start as usize;
1450        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1451        let column_metadatas =
1452            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1453
1454        let file_version = LanceFileVersion::try_from_major_minor(
1455            footer.major_version as u32,
1456            footer.minor_version as u32,
1457        )?;
1458
1459        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1460
1461        Ok(Self {
1462            data: bytes,
1463            num_rows: page_table
1464                .first()
1465                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1466                .unwrap_or(0),
1467            page_table,
1468            top_level_columns: projection.column_indices,
1469            schema: Arc::new(schema.clone()),
1470        })
1471    }
1472
1473    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1474    where
1475        Self: Sized,
1476    {
1477        let footer = FileReader::decode_footer(&bytes)?;
1478        let file_version = LanceFileVersion::try_from_major_minor(
1479            footer.major_version as u32,
1480            footer.minor_version as u32,
1481        )?;
1482
1483        let gbo_table = FileReader::do_decode_gbo_table(
1484            &bytes.slice(footer.global_buff_offsets_start as usize..),
1485            &footer,
1486            file_version,
1487        )?;
1488        if gbo_table.is_empty() {
1489            return Err(Error::Internal {
1490                message: "File did not contain any global buffers, schema expected".to_string(),
1491                location: location!(),
1492            });
1493        }
1494        let schema_start = gbo_table[0].position as usize;
1495        let schema_size = gbo_table[0].size as usize;
1496
1497        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1498        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1499        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1500
1501        // Next, read the metadata for the columns
1502        // This is both the column metadata and the CMO table
1503        let column_metadata_start = footer.column_meta_start as usize;
1504        let column_metadata_end = footer.global_buff_offsets_start as usize;
1505        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1506        let column_metadatas =
1507            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1508
1509        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1510
1511        Ok(Self {
1512            data: bytes,
1513            num_rows: page_table
1514                .first()
1515                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1516                .unwrap_or(0),
1517            page_table,
1518            top_level_columns: projection.column_indices,
1519            schema: Arc::new(schema),
1520        })
1521    }
1522}
1523
1524#[cfg(test)]
1525pub mod tests {
1526    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1527
1528    use arrow_array::{
1529        types::{Float64Type, Int32Type},
1530        RecordBatch, UInt32Array,
1531    };
1532    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1533    use bytes::Bytes;
1534    use futures::{prelude::stream::TryStreamExt, StreamExt};
1535    use lance_arrow::RecordBatchExt;
1536    use lance_core::{datatypes::Schema, ArrowResult};
1537    use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1538    use lance_encoding::{
1539        decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1540        encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1541        version::LanceFileVersion,
1542    };
1543    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1544    use log::debug;
1545    use rstest::rstest;
1546    use tokio::sync::mpsc;
1547
1548    use crate::v2::{
1549        reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1550        testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1551        writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1552    };
1553    use lance_encoding::decoder::DecoderConfig;
1554
1555    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1556        let location_type = DataType::Struct(Fields::from(vec![
1557            Field::new("x", DataType::Float64, true),
1558            Field::new("y", DataType::Float64, true),
1559        ]));
1560        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1561
1562        let mut reader = gen_batch()
1563            .col("score", array::rand::<Float64Type>())
1564            .col("location", array::rand_type(&location_type))
1565            .col("categories", array::rand_type(&categories_type))
1566            .col("binary", array::rand_type(&DataType::Binary));
1567        if version <= LanceFileVersion::V2_0 {
1568            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1569        }
1570        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1571
1572        write_lance_file(
1573            reader,
1574            fs,
1575            FileWriterOptions {
1576                format_version: Some(version),
1577                ..Default::default()
1578            },
1579        )
1580        .await
1581    }
1582
1583    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1584
1585    async fn verify_expected(
1586        expected: &[RecordBatch],
1587        mut actual: Pin<Box<dyn RecordBatchStream>>,
1588        read_size: u32,
1589        transform: Option<Transformer>,
1590    ) {
1591        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1592        let mut expected_iter = expected.iter().map(|batch| {
1593            if let Some(transform) = &transform {
1594                transform(batch)
1595            } else {
1596                batch.clone()
1597            }
1598        });
1599        let mut next_expected = expected_iter.next().unwrap().clone();
1600        while let Some(actual) = actual.next().await {
1601            let mut actual = actual.unwrap();
1602            let mut rows_to_verify = actual.num_rows() as u32;
1603            let expected_length = remaining.min(read_size);
1604            assert_eq!(expected_length, rows_to_verify);
1605
1606            while rows_to_verify > 0 {
1607                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1608                assert_eq!(
1609                    next_expected.slice(0, next_slice_len as usize),
1610                    actual.slice(0, next_slice_len as usize)
1611                );
1612                remaining -= next_slice_len;
1613                rows_to_verify -= next_slice_len;
1614                if remaining > 0 {
1615                    if next_slice_len == next_expected.num_rows() as u32 {
1616                        next_expected = expected_iter.next().unwrap().clone();
1617                    } else {
1618                        next_expected = next_expected.slice(
1619                            next_slice_len as usize,
1620                            next_expected.num_rows() - next_slice_len as usize,
1621                        );
1622                    }
1623                }
1624                if rows_to_verify > 0 {
1625                    actual = actual.slice(
1626                        next_slice_len as usize,
1627                        actual.num_rows() - next_slice_len as usize,
1628                    );
1629                }
1630            }
1631        }
1632        assert_eq!(remaining, 0);
1633    }
1634
1635    #[tokio::test]
1636    async fn test_round_trip() {
1637        let fs = FsFixture::default();
1638
1639        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1640
1641        for read_size in [32, 1024, 1024 * 1024] {
1642            let file_scheduler = fs
1643                .scheduler
1644                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1645                .await
1646                .unwrap();
1647            let file_reader = FileReader::try_open(
1648                file_scheduler,
1649                None,
1650                Arc::<DecoderPlugins>::default(),
1651                &test_cache(),
1652                FileReaderOptions::default(),
1653            )
1654            .await
1655            .unwrap();
1656
1657            let schema = file_reader.schema();
1658            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1659
1660            let batch_stream = file_reader
1661                .read_stream(
1662                    lance_io::ReadBatchParams::RangeFull,
1663                    read_size,
1664                    16,
1665                    FilterExpression::no_filter(),
1666                )
1667                .unwrap();
1668
1669            verify_expected(&data, batch_stream, read_size, None).await;
1670        }
1671    }
1672
1673    #[rstest]
1674    #[test_log::test(tokio::test)]
1675    async fn test_encoded_batch_round_trip(
1676        // TODO: Add V2_1 (currently fails)
1677        #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1678    ) {
1679        let data = gen_batch()
1680            .col("x", array::rand::<Int32Type>())
1681            .col("y", array::rand_utf8(ByteCount::from(16), false))
1682            .into_batch_rows(RowCount::from(10000))
1683            .unwrap();
1684
1685        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1686
1687        let encoding_options = EncodingOptions {
1688            cache_bytes_per_column: 4096,
1689            max_page_bytes: 32 * 1024 * 1024,
1690            keep_original_array: true,
1691            buffer_alignment: 64,
1692        };
1693
1694        let encoding_strategy = default_encoding_strategy(version);
1695
1696        let encoded_batch = encode_batch(
1697            &data,
1698            lance_schema.clone(),
1699            encoding_strategy.as_ref(),
1700            &encoding_options,
1701        )
1702        .await
1703        .unwrap();
1704
1705        // Test self described
1706        let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1707
1708        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1709
1710        let decoded = decode_batch(
1711            &decoded_batch,
1712            &FilterExpression::no_filter(),
1713            Arc::<DecoderPlugins>::default(),
1714            false,
1715            LanceFileVersion::default(),
1716            None,
1717        )
1718        .await
1719        .unwrap();
1720
1721        assert_eq!(data, decoded);
1722
1723        // Test mini
1724        let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1725        let decoded_batch =
1726            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1727                .unwrap();
1728        let decoded = decode_batch(
1729            &decoded_batch,
1730            &FilterExpression::no_filter(),
1731            Arc::<DecoderPlugins>::default(),
1732            false,
1733            LanceFileVersion::default(),
1734            None,
1735        )
1736        .await
1737        .unwrap();
1738
1739        assert_eq!(data, decoded);
1740    }
1741
1742    #[rstest]
1743    #[test_log::test(tokio::test)]
1744    async fn test_projection(
1745        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1746    ) {
1747        let fs = FsFixture::default();
1748
1749        let written_file = create_some_file(&fs, version).await;
1750        let file_scheduler = fs
1751            .scheduler
1752            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1753            .await
1754            .unwrap();
1755
1756        let field_id_mapping = written_file
1757            .field_id_mapping
1758            .iter()
1759            .copied()
1760            .collect::<BTreeMap<_, _>>();
1761
1762        let empty_projection = ReaderProjection {
1763            column_indices: Vec::default(),
1764            schema: Arc::new(Schema::default()),
1765        };
1766
1767        for columns in [
1768            vec!["score"],
1769            vec!["location"],
1770            vec!["categories"],
1771            vec!["score.x"],
1772            vec!["score", "categories"],
1773            vec!["score", "location"],
1774            vec!["location", "categories"],
1775            vec!["score.y", "location", "categories"],
1776        ] {
1777            debug!("Testing round trip with projection {:?}", columns);
1778            for use_field_ids in [true, false] {
1779                // We can specify the projection as part of the read operation via read_stream_projected
1780                let file_reader = FileReader::try_open(
1781                    file_scheduler.clone(),
1782                    None,
1783                    Arc::<DecoderPlugins>::default(),
1784                    &test_cache(),
1785                    FileReaderOptions::default(),
1786                )
1787                .await
1788                .unwrap();
1789
1790                let projected_schema = written_file.schema.project(&columns).unwrap();
1791                let projection = if use_field_ids {
1792                    ReaderProjection::from_field_ids(
1793                        file_reader.metadata.version(),
1794                        &projected_schema,
1795                        &field_id_mapping,
1796                    )
1797                    .unwrap()
1798                } else {
1799                    ReaderProjection::from_column_names(
1800                        file_reader.metadata.version(),
1801                        &written_file.schema,
1802                        &columns,
1803                    )
1804                    .unwrap()
1805                };
1806
1807                let batch_stream = file_reader
1808                    .read_stream_projected(
1809                        lance_io::ReadBatchParams::RangeFull,
1810                        1024,
1811                        16,
1812                        projection.clone(),
1813                        FilterExpression::no_filter(),
1814                    )
1815                    .unwrap();
1816
1817                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1818                verify_expected(
1819                    &written_file.data,
1820                    batch_stream,
1821                    1024,
1822                    Some(Box::new(move |batch: &RecordBatch| {
1823                        batch.project_by_schema(&projection_arrow).unwrap()
1824                    })),
1825                )
1826                .await;
1827
1828                // We can also specify the projection as a base projection when we open the file
1829                let file_reader = FileReader::try_open(
1830                    file_scheduler.clone(),
1831                    Some(projection.clone()),
1832                    Arc::<DecoderPlugins>::default(),
1833                    &test_cache(),
1834                    FileReaderOptions::default(),
1835                )
1836                .await
1837                .unwrap();
1838
1839                let batch_stream = file_reader
1840                    .read_stream(
1841                        lance_io::ReadBatchParams::RangeFull,
1842                        1024,
1843                        16,
1844                        FilterExpression::no_filter(),
1845                    )
1846                    .unwrap();
1847
1848                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1849                verify_expected(
1850                    &written_file.data,
1851                    batch_stream,
1852                    1024,
1853                    Some(Box::new(move |batch: &RecordBatch| {
1854                        batch.project_by_schema(&projection_arrow).unwrap()
1855                    })),
1856                )
1857                .await;
1858
1859                assert!(file_reader
1860                    .read_stream_projected(
1861                        lance_io::ReadBatchParams::RangeFull,
1862                        1024,
1863                        16,
1864                        empty_projection.clone(),
1865                        FilterExpression::no_filter(),
1866                    )
1867                    .is_err());
1868            }
1869        }
1870
1871        assert!(FileReader::try_open(
1872            file_scheduler.clone(),
1873            Some(empty_projection),
1874            Arc::<DecoderPlugins>::default(),
1875            &test_cache(),
1876            FileReaderOptions::default(),
1877        )
1878        .await
1879        .is_err());
1880
1881        let arrow_schema = ArrowSchema::new(vec![
1882            Field::new("x", DataType::Int32, true),
1883            Field::new("y", DataType::Int32, true),
1884        ]);
1885        let schema = Schema::try_from(&arrow_schema).unwrap();
1886
1887        let projection_with_dupes = ReaderProjection {
1888            column_indices: vec![0, 0],
1889            schema: Arc::new(schema),
1890        };
1891
1892        assert!(FileReader::try_open(
1893            file_scheduler.clone(),
1894            Some(projection_with_dupes),
1895            Arc::<DecoderPlugins>::default(),
1896            &test_cache(),
1897            FileReaderOptions::default(),
1898        )
1899        .await
1900        .is_err());
1901    }
1902
1903    #[test_log::test(tokio::test)]
1904    async fn test_compressing_buffer() {
1905        let fs = FsFixture::default();
1906
1907        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1908        let file_scheduler = fs
1909            .scheduler
1910            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1911            .await
1912            .unwrap();
1913
1914        // We can specify the projection as part of the read operation via read_stream_projected
1915        let file_reader = FileReader::try_open(
1916            file_scheduler.clone(),
1917            None,
1918            Arc::<DecoderPlugins>::default(),
1919            &test_cache(),
1920            FileReaderOptions::default(),
1921        )
1922        .await
1923        .unwrap();
1924
1925        let mut projection = written_file.schema.project(&["score"]).unwrap();
1926        for field in projection.fields.iter_mut() {
1927            field
1928                .metadata
1929                .insert("lance:compression".to_string(), "zstd".to_string());
1930        }
1931        let projection = ReaderProjection {
1932            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1933            schema: Arc::new(projection),
1934        };
1935
1936        let batch_stream = file_reader
1937            .read_stream_projected(
1938                lance_io::ReadBatchParams::RangeFull,
1939                1024,
1940                16,
1941                projection.clone(),
1942                FilterExpression::no_filter(),
1943            )
1944            .unwrap();
1945
1946        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1947        verify_expected(
1948            &written_file.data,
1949            batch_stream,
1950            1024,
1951            Some(Box::new(move |batch: &RecordBatch| {
1952                batch.project_by_schema(&projection_arrow).unwrap()
1953            })),
1954        )
1955        .await;
1956    }
1957
1958    #[tokio::test]
1959    async fn test_read_all() {
1960        let fs = FsFixture::default();
1961        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1962        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1963
1964        let file_scheduler = fs
1965            .scheduler
1966            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1967            .await
1968            .unwrap();
1969        let file_reader = FileReader::try_open(
1970            file_scheduler.clone(),
1971            None,
1972            Arc::<DecoderPlugins>::default(),
1973            &test_cache(),
1974            FileReaderOptions::default(),
1975        )
1976        .await
1977        .unwrap();
1978
1979        let batches = file_reader
1980            .read_stream(
1981                lance_io::ReadBatchParams::RangeFull,
1982                total_rows as u32,
1983                16,
1984                FilterExpression::no_filter(),
1985            )
1986            .unwrap()
1987            .try_collect::<Vec<_>>()
1988            .await
1989            .unwrap();
1990        assert_eq!(batches.len(), 1);
1991        assert_eq!(batches[0].num_rows(), total_rows);
1992    }
1993
1994    #[tokio::test]
1995    async fn test_blocking_take() {
1996        let fs = FsFixture::default();
1997        let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
1998        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1999
2000        let file_scheduler = fs
2001            .scheduler
2002            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2003            .await
2004            .unwrap();
2005        let file_reader = FileReader::try_open(
2006            file_scheduler.clone(),
2007            Some(
2008                ReaderProjection::from_column_names(LanceFileVersion::V2_1, &schema, &["score"])
2009                    .unwrap(),
2010            ),
2011            Arc::<DecoderPlugins>::default(),
2012            &test_cache(),
2013            FileReaderOptions::default(),
2014        )
2015        .await
2016        .unwrap();
2017
2018        let batches = tokio::task::spawn_blocking(move || {
2019            file_reader
2020                .read_stream_projected_blocking(
2021                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2022                    total_rows as u32,
2023                    None,
2024                    FilterExpression::no_filter(),
2025                )
2026                .unwrap()
2027                .collect::<ArrowResult<Vec<_>>>()
2028                .unwrap()
2029        })
2030        .await
2031        .unwrap();
2032
2033        assert_eq!(batches.len(), 1);
2034        assert_eq!(batches[0].num_rows(), 5);
2035        assert_eq!(batches[0].num_columns(), 1);
2036    }
2037
2038    #[tokio::test(flavor = "multi_thread")]
2039    async fn test_drop_in_progress() {
2040        let fs = FsFixture::default();
2041        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2042        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2043
2044        let file_scheduler = fs
2045            .scheduler
2046            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2047            .await
2048            .unwrap();
2049        let file_reader = FileReader::try_open(
2050            file_scheduler.clone(),
2051            None,
2052            Arc::<DecoderPlugins>::default(),
2053            &test_cache(),
2054            FileReaderOptions::default(),
2055        )
2056        .await
2057        .unwrap();
2058
2059        let mut batches = file_reader
2060            .read_stream(
2061                lance_io::ReadBatchParams::RangeFull,
2062                (total_rows / 10) as u32,
2063                16,
2064                FilterExpression::no_filter(),
2065            )
2066            .unwrap();
2067
2068        drop(file_reader);
2069
2070        let batch = batches.next().await.unwrap().unwrap();
2071        assert!(batch.num_rows() > 0);
2072
2073        // Drop in-progress scan
2074        drop(batches);
2075    }
2076
2077    #[tokio::test]
2078    async fn drop_while_scheduling() {
2079        // This is a bit of a white-box test, pokes at the internals.  We want to
2080        // test the case where the read stream is dropped before the scheduling
2081        // thread finishes.  We can't do that in a black-box fashion because the
2082        // scheduling thread runs in the background and there is no easy way to
2083        // pause / gate it.
2084
2085        // It's a regression for a bug where the scheduling thread would panic
2086        // if the stream was dropped before it finished.
2087
2088        let fs = FsFixture::default();
2089        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2090        let total_rows = written_file
2091            .data
2092            .iter()
2093            .map(|batch| batch.num_rows())
2094            .sum::<usize>();
2095
2096        let file_scheduler = fs
2097            .scheduler
2098            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2099            .await
2100            .unwrap();
2101        let file_reader = FileReader::try_open(
2102            file_scheduler.clone(),
2103            None,
2104            Arc::<DecoderPlugins>::default(),
2105            &test_cache(),
2106            FileReaderOptions::default(),
2107        )
2108        .await
2109        .unwrap();
2110
2111        let projection =
2112            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2113        let column_infos = file_reader
2114            .collect_columns_from_projection(&projection)
2115            .unwrap();
2116        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2117            &projection.schema,
2118            &projection.column_indices,
2119            &column_infos,
2120            &vec![],
2121            total_rows as u64,
2122            Arc::<DecoderPlugins>::default(),
2123            file_reader.scheduler.clone(),
2124            test_cache(),
2125            &FilterExpression::no_filter(),
2126            &DecoderConfig::default(),
2127        )
2128        .await
2129        .unwrap();
2130
2131        let range = 0..total_rows as u64;
2132
2133        let (tx, rx) = mpsc::unbounded_channel();
2134
2135        // Simulate the stream / decoder being dropped
2136        drop(rx);
2137
2138        // Scheduling should not panic
2139        decode_scheduler.schedule_range(
2140            range,
2141            &FilterExpression::no_filter(),
2142            tx,
2143            file_reader.scheduler.clone(),
2144        )
2145    }
2146
2147    #[tokio::test]
2148    async fn test_read_empty_range() {
2149        let fs = FsFixture::default();
2150        create_some_file(&fs, LanceFileVersion::V2_0).await;
2151
2152        let file_scheduler = fs
2153            .scheduler
2154            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2155            .await
2156            .unwrap();
2157        let file_reader = FileReader::try_open(
2158            file_scheduler.clone(),
2159            None,
2160            Arc::<DecoderPlugins>::default(),
2161            &test_cache(),
2162            FileReaderOptions::default(),
2163        )
2164        .await
2165        .unwrap();
2166
2167        // All ranges empty, no data
2168        let batches = file_reader
2169            .read_stream(
2170                lance_io::ReadBatchParams::Range(0..0),
2171                1024,
2172                16,
2173                FilterExpression::no_filter(),
2174            )
2175            .unwrap()
2176            .try_collect::<Vec<_>>()
2177            .await
2178            .unwrap();
2179
2180        assert_eq!(batches.len(), 0);
2181
2182        // Some ranges empty
2183        let batches = file_reader
2184            .read_stream(
2185                lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2186                1024,
2187                16,
2188                FilterExpression::no_filter(),
2189            )
2190            .unwrap()
2191            .try_collect::<Vec<_>>()
2192            .await
2193            .unwrap();
2194        assert_eq!(batches.len(), 1);
2195    }
2196
2197    #[tokio::test]
2198    async fn test_global_buffers() {
2199        let fs = FsFixture::default();
2200
2201        let lance_schema =
2202            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2203                "foo",
2204                DataType::Int32,
2205                true,
2206            )]))
2207            .unwrap();
2208
2209        let mut file_writer = FileWriter::try_new(
2210            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2211            lance_schema.clone(),
2212            FileWriterOptions::default(),
2213        )
2214        .unwrap();
2215
2216        let test_bytes = Bytes::from_static(b"hello");
2217
2218        let buf_index = file_writer
2219            .add_global_buffer(test_bytes.clone())
2220            .await
2221            .unwrap();
2222
2223        assert_eq!(buf_index, 1);
2224
2225        file_writer.finish().await.unwrap();
2226
2227        let file_scheduler = fs
2228            .scheduler
2229            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2230            .await
2231            .unwrap();
2232        let file_reader = FileReader::try_open(
2233            file_scheduler.clone(),
2234            None,
2235            Arc::<DecoderPlugins>::default(),
2236            &test_cache(),
2237            FileReaderOptions::default(),
2238        )
2239        .await
2240        .unwrap();
2241
2242        let buf = file_reader.read_global_buffer(1).await.unwrap();
2243        assert_eq!(buf, test_bytes);
2244    }
2245}