lance_file/v2/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19    decoder::{
20        schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderPlugins,
21        FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22        SchedulerDecoderConfig,
23    },
24    encoder::EncodedBatch,
25    version::LanceFileVersion,
26    EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34    cache::LanceCache,
35    datatypes::{Field, Schema},
36    Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_io::{
40    scheduler::FileScheduler,
41    stream::{RecordBatchStream, RecordBatchStreamAdapter},
42    ReadBatchParams,
43};
44
45use crate::{
46    datatypes::{Fields, FieldsWithMeta},
47    format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
48    v2::writer::PAGE_BUFFER_ALIGNMENT,
49};
50
51use super::io::LanceEncodingsIo;
52
53// For now, we don't use global buffers for anything other than schema.  If we
54// use these later we should make them lazily loaded and then cached once loaded.
55//
56// We store their position / length for debugging purposes
57#[derive(Debug, DeepSizeOf)]
58pub struct BufferDescriptor {
59    pub position: u64,
60    pub size: u64,
61}
62
63/// Statistics summarize some of the file metadata for quick summary info
64#[derive(Debug)]
65pub struct FileStatistics {
66    /// Statistics about each of the columns in the file
67    pub columns: Vec<ColumnStatistics>,
68}
69
70/// Summary information describing a column
71#[derive(Debug)]
72pub struct ColumnStatistics {
73    /// The number of pages in the column
74    pub num_pages: usize,
75    /// The total number of data & metadata bytes in the column
76    ///
77    /// This is the compressed on-disk size
78    pub size_bytes: u64,
79}
80
81// TODO: Caching
82#[derive(Debug)]
83pub struct CachedFileMetadata {
84    /// The schema of the file
85    pub file_schema: Arc<Schema>,
86    /// The column metadatas
87    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
88    pub column_infos: Vec<Arc<ColumnInfo>>,
89    /// The number of rows in the file
90    pub num_rows: u64,
91    pub file_buffers: Vec<BufferDescriptor>,
92    /// The number of bytes contained in the data page section of the file
93    pub num_data_bytes: u64,
94    /// The number of bytes contained in the column metadata (not including buffers
95    /// referenced by the metadata)
96    pub num_column_metadata_bytes: u64,
97    /// The number of bytes contained in global buffers
98    pub num_global_buffer_bytes: u64,
99    /// The number of bytes contained in the CMO and GBO tables
100    pub num_footer_bytes: u64,
101    pub major_version: u16,
102    pub minor_version: u16,
103}
104
105impl DeepSizeOf for CachedFileMetadata {
106    // TODO: include size for `column_metadatas` and `column_infos`.
107    fn deep_size_of_children(&self, context: &mut Context) -> usize {
108        self.file_schema.deep_size_of_children(context)
109            + self
110                .file_buffers
111                .iter()
112                .map(|file_buffer| file_buffer.deep_size_of_children(context))
113                .sum::<usize>()
114    }
115}
116
117impl CachedFileMetadata {
118    pub fn version(&self) -> LanceFileVersion {
119        match (self.major_version, self.minor_version) {
120            (0, 3) => LanceFileVersion::V2_0,
121            (2, 1) => LanceFileVersion::V2_1,
122            _ => panic!(
123                "Unsupported version: {}.{}",
124                self.major_version, self.minor_version
125            ),
126        }
127    }
128}
129
130/// Selecting columns from a lance file requires specifying both the
131/// index of the column and the data type of the column
132///
133/// Partly, this is because it is not strictly required that columns
134/// be read into the same type.  For example, a string column may be
135/// read as a string, large_string or string_view type.
136///
137/// A read will only succeed if the decoder for a column is capable
138/// of decoding into the requested type.
139///
140/// Note that this should generally be limited to different in-memory
141/// representations of the same semantic type.  An encoding could
142/// theoretically support "casting" (e.g. int to string, etc.) but
143/// there is little advantage in doing so here.
144///
145/// Note: in order to specify a projection the user will need some way
146/// to figure out the column indices.  In the table format we do this
147/// using field IDs and keeping track of the field id->column index mapping.
148///
149/// If users are not using the table format then they will need to figure
150/// out some way to do this themselves.
151#[derive(Debug, Clone)]
152pub struct ReaderProjection {
153    /// The data types (schema) of the selected columns.  The names
154    /// of the schema are arbitrary and ignored.
155    pub schema: Arc<Schema>,
156    /// The indices of the columns to load.
157    ///
158    /// The content of this vector depends on the file version.
159    ///
160    /// In Lance File Version 2.0 we need ids for structural fields as
161    /// well as leaf fields:
162    ///
163    ///   - Primitive: the index of the column in the schema
164    ///   - List: the index of the list column in the schema
165    ///     followed by the column indices of the children
166    ///   - FixedSizeList (of primitive): the index of the column in the schema
167    ///     (this case is not nested)
168    ///   - FixedSizeList (of non-primitive): not yet implemented
169    ///   - Dictionary: same as primitive
170    ///   - Struct: the index of the struct column in the schema
171    ///     followed by the column indices of the children
172    ///
173    ///   In other words, this should be a DFS listing of the desired schema.
174    ///
175    /// In Lance File Version 2.1 we only need ids for leaf fields.  Any structural
176    /// fields are completely transparent.
177    ///
178    /// For example, if the goal is to load:
179    ///
180    ///   x: int32
181    ///   y: struct<z: int32, w: string>
182    ///   z: list<int32>
183    ///
184    /// and the schema originally used to store the data was:
185    ///
186    ///   a: struct<x: int32>
187    ///   b: int64
188    ///   y: struct<z: int32, c: int64, w: string>
189    ///   z: list<int32>
190    ///
191    /// Then the column_indices should be:
192    ///
193    /// - 2.0: [1, 3, 4, 6, 7, 8]
194    /// - 2.1: [0, 2, 4, 5]
195    pub column_indices: Vec<u32>,
196}
197
198impl ReaderProjection {
199    fn from_field_ids_helper<'a>(
200        file_version: LanceFileVersion,
201        fields: impl Iterator<Item = &'a Field>,
202        field_id_to_column_index: &BTreeMap<u32, u32>,
203        column_indices: &mut Vec<u32>,
204    ) -> Result<()> {
205        for field in fields {
206            let is_structural = file_version >= LanceFileVersion::V2_1;
207            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
208            // we only need ids for leaf fields.
209            if !is_structural || field.children.is_empty() {
210                if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
211                {
212                    column_indices.push(column_idx);
213                }
214            }
215            Self::from_field_ids_helper(
216                file_version,
217                field.children.iter(),
218                field_id_to_column_index,
219                column_indices,
220            )?;
221        }
222        Ok(())
223    }
224
225    /// Creates a projection using a mapping from field IDs to column indices
226    ///
227    /// You can obtain such a mapping when the file is written using the
228    /// [`crate::v2::writer::FileWriter::field_id_to_column_indices`] method.
229    pub fn from_field_ids(
230        file_version: LanceFileVersion,
231        schema: &Schema,
232        field_id_to_column_index: &BTreeMap<u32, u32>,
233    ) -> Result<Self> {
234        let mut column_indices = Vec::new();
235        Self::from_field_ids_helper(
236            file_version,
237            schema.fields.iter(),
238            field_id_to_column_index,
239            &mut column_indices,
240        )?;
241        Ok(Self {
242            schema: Arc::new(schema.clone()),
243            column_indices,
244        })
245    }
246
247    /// Creates a projection that reads the entire file
248    ///
249    /// If the schema provided is not the schema of the entire file then
250    /// the projection will be invalid and the read will fail.
251    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
252    /// the whole struct has one column index.
253    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
254    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
255        let schema = Arc::new(schema.clone());
256        let is_structural = version >= LanceFileVersion::V2_1;
257        let mut column_indices = vec![];
258        let mut curr_column_idx = 0;
259        let mut packed_struct_fields_num = 0;
260        for field in schema.fields_pre_order() {
261            if packed_struct_fields_num > 0 {
262                packed_struct_fields_num -= 1;
263                continue;
264            }
265            if field.is_packed_struct() {
266                column_indices.push(curr_column_idx);
267                curr_column_idx += 1;
268                packed_struct_fields_num = field.children.len();
269            } else if field.children.is_empty() || !is_structural {
270                column_indices.push(curr_column_idx);
271                curr_column_idx += 1;
272            }
273        }
274        Self {
275            schema,
276            column_indices,
277        }
278    }
279
280    /// Creates a projection that reads the specified columns provided by name
281    ///
282    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
283    ///
284    /// If the schema provided is not the schema of the entire file then
285    /// the projection will be invalid and the read will fail.
286    pub fn from_column_names(
287        file_version: LanceFileVersion,
288        schema: &Schema,
289        column_names: &[&str],
290    ) -> Result<Self> {
291        let field_id_to_column_index = schema
292            .fields_pre_order()
293            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
294            // we only need ids for leaf fields.
295            .filter(|field| file_version < LanceFileVersion::V2_1 || field.is_leaf())
296            .enumerate()
297            .map(|(idx, field)| (field.id as u32, idx as u32))
298            .collect::<BTreeMap<_, _>>();
299        let projected = schema.project(column_names)?;
300        let mut column_indices = Vec::new();
301        Self::from_field_ids_helper(
302            file_version,
303            projected.fields.iter(),
304            &field_id_to_column_index,
305            &mut column_indices,
306        )?;
307        Ok(Self {
308            schema: Arc::new(projected),
309            column_indices,
310        })
311    }
312}
313
314#[derive(Clone, Debug, Default)]
315pub struct FileReaderOptions {
316    validate_on_decode: bool,
317}
318
319#[derive(Debug)]
320pub struct FileReader {
321    scheduler: Arc<dyn EncodingsIo>,
322    // The default projection to be applied to all reads
323    base_projection: ReaderProjection,
324    num_rows: u64,
325    metadata: Arc<CachedFileMetadata>,
326    decoder_plugins: Arc<DecoderPlugins>,
327    cache: Arc<LanceCache>,
328    options: FileReaderOptions,
329}
330#[derive(Debug)]
331struct Footer {
332    #[allow(dead_code)]
333    column_meta_start: u64,
334    // We don't use this today because we always load metadata for every column
335    // and don't yet support "metadata projection"
336    #[allow(dead_code)]
337    column_meta_offsets_start: u64,
338    global_buff_offsets_start: u64,
339    num_global_buffers: u32,
340    num_columns: u32,
341    major_version: u16,
342    minor_version: u16,
343}
344
345const FOOTER_LEN: usize = 40;
346
347impl FileReader {
348    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
349        Self {
350            scheduler,
351            base_projection: self.base_projection.clone(),
352            cache: self.cache.clone(),
353            decoder_plugins: self.decoder_plugins.clone(),
354            metadata: self.metadata.clone(),
355            options: self.options.clone(),
356            num_rows: self.num_rows,
357        }
358    }
359
360    pub fn num_rows(&self) -> u64 {
361        self.num_rows
362    }
363
364    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
365        &self.metadata
366    }
367
368    pub fn file_statistics(&self) -> FileStatistics {
369        let column_metadatas = &self.metadata().column_metadatas;
370
371        let column_stats = column_metadatas
372            .iter()
373            .map(|col_metadata| {
374                let num_pages = col_metadata.pages.len();
375                let size_bytes = col_metadata
376                    .pages
377                    .iter()
378                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
379                    .sum::<u64>();
380                ColumnStatistics {
381                    num_pages,
382                    size_bytes,
383                }
384            })
385            .collect();
386
387        FileStatistics {
388            columns: column_stats,
389        }
390    }
391
392    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
393        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
394        self.scheduler
395            .submit_single(
396                buffer_desc.position..buffer_desc.position + buffer_desc.size,
397                0,
398            )
399            .await
400    }
401
402    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
403        let file_size = scheduler.reader().size().await? as u64;
404        let begin = if file_size < scheduler.reader().block_size() as u64 {
405            0
406        } else {
407            file_size - scheduler.reader().block_size() as u64
408        };
409        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
410        Ok((tail_bytes, file_size))
411    }
412
413    // Checks to make sure the footer is written correctly and returns the
414    // position of the file descriptor (which comes from the footer)
415    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
416        let len = footer_bytes.len();
417        if len < FOOTER_LEN {
418            return Err(Error::io(
419                format!(
420                    "does not have sufficient data, len: {}, bytes: {:?}",
421                    len, footer_bytes
422                ),
423                location!(),
424            ));
425        }
426        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
427
428        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
429        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
430        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
431        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
432        let num_columns = cursor.read_u32::<LittleEndian>()?;
433        let major_version = cursor.read_u16::<LittleEndian>()?;
434        let minor_version = cursor.read_u16::<LittleEndian>()?;
435
436        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
437            return Err(Error::version_conflict(
438                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
439                major_version,
440                minor_version,
441                location!(),
442            ));
443        }
444
445        let magic_bytes = footer_bytes.slice(len - 4..);
446        if magic_bytes.as_ref() != MAGIC {
447            return Err(Error::io(
448                format!(
449                    "file does not appear to be a Lance file (invalid magic: {:?})",
450                    MAGIC
451                ),
452                location!(),
453            ));
454        }
455        Ok(Footer {
456            column_meta_start,
457            column_meta_offsets_start,
458            global_buff_offsets_start,
459            num_global_buffers,
460            num_columns,
461            major_version,
462            minor_version,
463        })
464    }
465
466    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
467    fn read_all_column_metadata(
468        column_metadata_bytes: Bytes,
469        footer: &Footer,
470    ) -> Result<Vec<pbfile::ColumnMetadata>> {
471        let column_metadata_start = footer.column_meta_start;
472        // cmo == column_metadata_offsets
473        let cmo_table_size = 16 * footer.num_columns as usize;
474        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
475
476        (0..footer.num_columns)
477            .map(|col_idx| {
478                let offset = (col_idx * 16) as usize;
479                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
480                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
481                let normalized_position = (position - column_metadata_start) as usize;
482                let normalized_end = normalized_position + (length as usize);
483                Ok(pbfile::ColumnMetadata::decode(
484                    &column_metadata_bytes[normalized_position..normalized_end],
485                )?)
486            })
487            .collect::<Result<Vec<_>>>()
488    }
489
490    async fn optimistic_tail_read(
491        data: &Bytes,
492        start_pos: u64,
493        scheduler: &FileScheduler,
494        file_len: u64,
495    ) -> Result<Bytes> {
496        let num_bytes_needed = (file_len - start_pos) as usize;
497        if data.len() >= num_bytes_needed {
498            Ok(data.slice((data.len() - num_bytes_needed)..))
499        } else {
500            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
501            let start = file_len - num_bytes_needed as u64;
502            let missing_bytes = scheduler
503                .submit_single(start..start + num_bytes_missing, 0)
504                .await?;
505            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
506            combined.extend(missing_bytes);
507            combined.extend(data);
508            Ok(combined.freeze())
509        }
510    }
511
512    fn do_decode_gbo_table(
513        gbo_bytes: &Bytes,
514        footer: &Footer,
515        version: LanceFileVersion,
516    ) -> Result<Vec<BufferDescriptor>> {
517        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
518
519        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
520        for _ in 0..footer.num_global_buffers {
521            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
522            assert!(
523                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
524            );
525            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
526            global_buffers.push(BufferDescriptor {
527                position: buf_pos,
528                size: buf_size,
529            });
530        }
531
532        Ok(global_buffers)
533    }
534
535    async fn decode_gbo_table(
536        tail_bytes: &Bytes,
537        file_len: u64,
538        scheduler: &FileScheduler,
539        footer: &Footer,
540        version: LanceFileVersion,
541    ) -> Result<Vec<BufferDescriptor>> {
542        // This could, in theory, trigger another IOP but the GBO table should never be large
543        // enough for that to happen
544        let gbo_bytes = Self::optimistic_tail_read(
545            tail_bytes,
546            footer.global_buff_offsets_start,
547            scheduler,
548            file_len,
549        )
550        .await?;
551        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
552    }
553
554    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
555        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
556        let pb_schema = file_descriptor.schema.unwrap();
557        let num_rows = file_descriptor.length;
558        let fields_with_meta = FieldsWithMeta {
559            fields: Fields(pb_schema.fields),
560            metadata: pb_schema.metadata,
561        };
562        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
563        Ok((num_rows, schema))
564    }
565
566    // TODO: Support late projection.  Currently, if we want to perform a
567    // projected read of a file, we load all of the column metadata, and then
568    // only read the column data that is requested.  This is fine for most cases.
569    //
570    // However, if there are many columns then loading all of the column metadata
571    // may be expensive.  We should support a mode where we only load the column
572    // metadata for the columns that are requested (the file format supports this).
573    //
574    // The main challenge is that we either need to ignore the column metadata cache
575    // or have a more sophisticated cache that can cache per-column metadata.
576    //
577    // Also, if the number of columns is fairly small, it's faster to read them as a
578    // single IOP, but we can fix this through coalescing.
579    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
580        // 1. read the footer
581        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
582        let footer = Self::decode_footer(&tail_bytes)?;
583
584        let file_version = LanceFileVersion::try_from_major_minor(
585            footer.major_version as u32,
586            footer.minor_version as u32,
587        )?;
588
589        let gbo_table =
590            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
591        if gbo_table.is_empty() {
592            return Err(Error::Internal {
593                message: "File did not contain any global buffers, schema expected".to_string(),
594                location: location!(),
595            });
596        }
597        let schema_start = gbo_table[0].position;
598        let schema_size = gbo_table[0].size;
599
600        let num_footer_bytes = file_len - schema_start;
601
602        // By default we read all column metadatas.  We do NOT read the column metadata buffers
603        // at this point.  We only want to read the column metadata for columns we are actually loading.
604        let all_metadata_bytes =
605            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
606
607        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
608        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
609
610        // Next, read the metadata for the columns
611        // This is both the column metadata and the CMO table
612        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
613        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
614        let column_metadata_bytes =
615            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
616        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
617
618        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
619        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
620        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
621
622        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
623
624        Ok(CachedFileMetadata {
625            file_schema: Arc::new(schema),
626            column_metadatas,
627            column_infos,
628            num_rows,
629            num_data_bytes,
630            num_column_metadata_bytes,
631            num_global_buffer_bytes,
632            num_footer_bytes,
633            file_buffers: gbo_table,
634            major_version: footer.major_version,
635            minor_version: footer.minor_version,
636        })
637    }
638
639    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
640        match &encoding.location {
641            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
642            Some(pbfile::encoding::Location::Direct(encoding)) => {
643                let encoding_buf = Bytes::from(encoding.encoding.clone());
644                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
645                encoding_any.to_msg::<M>().unwrap()
646            }
647            Some(pbfile::encoding::Location::None(_)) => panic!(),
648            None => panic!(),
649        }
650    }
651
652    fn meta_to_col_infos(
653        column_metadatas: &[pbfile::ColumnMetadata],
654        file_version: LanceFileVersion,
655    ) -> Vec<Arc<ColumnInfo>> {
656        column_metadatas
657            .iter()
658            .enumerate()
659            .map(|(col_idx, col_meta)| {
660                let page_infos = col_meta
661                    .pages
662                    .iter()
663                    .map(|page| {
664                        let num_rows = page.length;
665                        let encoding = match file_version {
666                            LanceFileVersion::V2_0 => {
667                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
668                                    page.encoding.as_ref().unwrap(),
669                                ))
670                            }
671                            _ => {
672                                PageEncoding::Structural(Self::fetch_encoding::<pbenc::PageLayout>(
673                                    page.encoding.as_ref().unwrap(),
674                                ))
675                            }
676                        };
677                        let buffer_offsets_and_sizes = Arc::from(
678                            page.buffer_offsets
679                                .iter()
680                                .zip(page.buffer_sizes.iter())
681                                .map(|(offset, size)| {
682                                    // Starting with version 2.1 we can assert that page buffers are aligned
683                                    assert!(
684                                        file_version < LanceFileVersion::V2_1
685                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
686                                    );
687                                    (*offset, *size)
688                                })
689                                .collect::<Vec<_>>(),
690                        );
691                        PageInfo {
692                            buffer_offsets_and_sizes,
693                            encoding,
694                            num_rows,
695                            priority: page.priority,
696                        }
697                    })
698                    .collect::<Vec<_>>();
699                let buffer_offsets_and_sizes = Arc::from(
700                    col_meta
701                        .buffer_offsets
702                        .iter()
703                        .zip(col_meta.buffer_sizes.iter())
704                        .map(|(offset, size)| (*offset, *size))
705                        .collect::<Vec<_>>(),
706                );
707                Arc::new(ColumnInfo {
708                    index: col_idx as u32,
709                    page_infos: Arc::from(page_infos),
710                    buffer_offsets_and_sizes,
711                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
712                })
713            })
714            .collect::<Vec<_>>()
715    }
716
717    fn validate_projection(
718        projection: &ReaderProjection,
719        metadata: &CachedFileMetadata,
720    ) -> Result<()> {
721        if projection.schema.fields.is_empty() {
722            return Err(Error::invalid_input(
723                "Attempt to read zero columns from the file, at least one column must be specified"
724                    .to_string(),
725                location!(),
726            ));
727        }
728        let mut column_indices_seen = BTreeSet::new();
729        for column_index in &projection.column_indices {
730            if !column_indices_seen.insert(*column_index) {
731                return Err(Error::invalid_input(
732                    format!(
733                        "The projection specified the column index {} more than once",
734                        column_index
735                    ),
736                    location!(),
737                ));
738            }
739            if *column_index >= metadata.column_infos.len() as u32 {
740                return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
741            }
742        }
743        Ok(())
744    }
745
746    /// Opens a new file reader without any pre-existing knowledge
747    ///
748    /// This will read the file schema from the file itself and thus requires a bit more I/O
749    ///
750    /// A `base_projection` can also be provided.  If provided, then the projection will apply
751    /// to all reads from the file that do not specify their own projection.
752    pub async fn try_open(
753        scheduler: FileScheduler,
754        base_projection: Option<ReaderProjection>,
755        decoder_plugins: Arc<DecoderPlugins>,
756        cache: &LanceCache,
757        options: FileReaderOptions,
758    ) -> Result<Self> {
759        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
760        let path = scheduler.reader().path().clone();
761        Self::try_open_with_file_metadata(
762            Arc::new(LanceEncodingsIo(scheduler)),
763            path,
764            base_projection,
765            decoder_plugins,
766            file_metadata,
767            cache,
768            options,
769        )
770        .await
771    }
772
773    /// Same as `try_open` but with the file metadata already loaded.
774    ///
775    /// This method also can accept any kind of `EncodingsIo` implementation allowing
776    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
777    /// disks it may be better to avoid asynchronous overhead).
778    pub async fn try_open_with_file_metadata(
779        scheduler: Arc<dyn EncodingsIo>,
780        path: Path,
781        base_projection: Option<ReaderProjection>,
782        decoder_plugins: Arc<DecoderPlugins>,
783        file_metadata: Arc<CachedFileMetadata>,
784        cache: &LanceCache,
785        options: FileReaderOptions,
786    ) -> Result<Self> {
787        let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
788
789        if let Some(base_projection) = base_projection.as_ref() {
790            Self::validate_projection(base_projection, &file_metadata)?;
791        }
792        let num_rows = file_metadata.num_rows;
793        Ok(Self {
794            scheduler,
795            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
796                file_metadata.file_schema.as_ref(),
797                file_metadata.version(),
798            )),
799            num_rows,
800            metadata: file_metadata,
801            decoder_plugins,
802            cache,
803            options,
804        })
805    }
806
807    // The actual decoder needs all the column infos that make up a type.  In other words, if
808    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
809    //
810    // This is a file reader concern because the file reader needs to support late projection of columns
811    // and so it will need to figure this out anyways.
812    //
813    // It's a bit of a tricky process though because the number of column infos may depend on the
814    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
815    // only be a single column in the file (and not 3).
816    //
817    // At the moment this method words because our rules are simple and we just repeat them here.  See
818    // Self::default_projection for a similar problem.  In the future this is something the encodings
819    // registry will need to figure out.
820    fn collect_columns_from_projection(
821        &self,
822        _projection: &ReaderProjection,
823    ) -> Result<Vec<Arc<ColumnInfo>>> {
824        Ok(self.metadata.column_infos.to_vec())
825    }
826
827    #[allow(clippy::too_many_arguments)]
828    fn do_read_range(
829        column_infos: Vec<Arc<ColumnInfo>>,
830        io: Arc<dyn EncodingsIo>,
831        cache: Arc<LanceCache>,
832        num_rows: u64,
833        decoder_plugins: Arc<DecoderPlugins>,
834        range: Range<u64>,
835        batch_size: u32,
836        projection: ReaderProjection,
837        filter: FilterExpression,
838        should_validate: bool,
839    ) -> Result<BoxStream<'static, ReadBatchTask>> {
840        debug!(
841            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
842            range,
843            batch_size,
844            num_rows,
845            column_infos.len(),
846            projection.schema.fields.len(),
847        );
848
849        let config = SchedulerDecoderConfig {
850            batch_size,
851            cache,
852            decoder_plugins,
853            io,
854            should_validate,
855        };
856
857        let requested_rows = RequestedRows::Ranges(vec![range]);
858
859        Ok(schedule_and_decode(
860            column_infos,
861            requested_rows,
862            filter,
863            projection.column_indices,
864            projection.schema,
865            config,
866        ))
867    }
868
869    fn read_range(
870        &self,
871        range: Range<u64>,
872        batch_size: u32,
873        projection: ReaderProjection,
874        filter: FilterExpression,
875    ) -> Result<BoxStream<'static, ReadBatchTask>> {
876        // Create and initialize the stream
877        Self::do_read_range(
878            self.collect_columns_from_projection(&projection)?,
879            self.scheduler.clone(),
880            self.cache.clone(),
881            self.num_rows,
882            self.decoder_plugins.clone(),
883            range,
884            batch_size,
885            projection,
886            filter,
887            self.options.validate_on_decode,
888        )
889    }
890
891    #[allow(clippy::too_many_arguments)]
892    fn do_take_rows(
893        column_infos: Vec<Arc<ColumnInfo>>,
894        io: Arc<dyn EncodingsIo>,
895        cache: Arc<LanceCache>,
896        decoder_plugins: Arc<DecoderPlugins>,
897        indices: Vec<u64>,
898        batch_size: u32,
899        projection: ReaderProjection,
900        filter: FilterExpression,
901        should_validate: bool,
902    ) -> Result<BoxStream<'static, ReadBatchTask>> {
903        debug!(
904            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
905            indices.len(),
906            indices[0],
907            indices[indices.len() - 1],
908            batch_size,
909            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
910        );
911
912        let config = SchedulerDecoderConfig {
913            batch_size,
914            cache,
915            decoder_plugins,
916            io,
917            should_validate,
918        };
919
920        let requested_rows = RequestedRows::Indices(indices);
921
922        Ok(schedule_and_decode(
923            column_infos,
924            requested_rows,
925            filter,
926            projection.column_indices,
927            projection.schema,
928            config,
929        ))
930    }
931
932    fn take_rows(
933        &self,
934        indices: Vec<u64>,
935        batch_size: u32,
936        projection: ReaderProjection,
937    ) -> Result<BoxStream<'static, ReadBatchTask>> {
938        // Create and initialize the stream
939        Self::do_take_rows(
940            self.collect_columns_from_projection(&projection)?,
941            self.scheduler.clone(),
942            self.cache.clone(),
943            self.decoder_plugins.clone(),
944            indices,
945            batch_size,
946            projection,
947            FilterExpression::no_filter(),
948            self.options.validate_on_decode,
949        )
950    }
951
952    #[allow(clippy::too_many_arguments)]
953    fn do_read_ranges(
954        column_infos: Vec<Arc<ColumnInfo>>,
955        io: Arc<dyn EncodingsIo>,
956        cache: Arc<LanceCache>,
957        decoder_plugins: Arc<DecoderPlugins>,
958        ranges: Vec<Range<u64>>,
959        batch_size: u32,
960        projection: ReaderProjection,
961        filter: FilterExpression,
962        should_validate: bool,
963    ) -> Result<BoxStream<'static, ReadBatchTask>> {
964        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
965        debug!(
966            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
967            ranges.len(),
968            num_rows,
969            ranges[0].start,
970            ranges[ranges.len() - 1].end,
971            batch_size,
972            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
973        );
974
975        let config = SchedulerDecoderConfig {
976            batch_size,
977            cache,
978            decoder_plugins,
979            io,
980            should_validate,
981        };
982
983        let requested_rows = RequestedRows::Ranges(ranges);
984
985        Ok(schedule_and_decode(
986            column_infos,
987            requested_rows,
988            filter,
989            projection.column_indices,
990            projection.schema,
991            config,
992        ))
993    }
994
995    fn read_ranges(
996        &self,
997        ranges: Vec<Range<u64>>,
998        batch_size: u32,
999        projection: ReaderProjection,
1000        filter: FilterExpression,
1001    ) -> Result<BoxStream<'static, ReadBatchTask>> {
1002        Self::do_read_ranges(
1003            self.collect_columns_from_projection(&projection)?,
1004            self.scheduler.clone(),
1005            self.cache.clone(),
1006            self.decoder_plugins.clone(),
1007            ranges,
1008            batch_size,
1009            projection,
1010            filter,
1011            self.options.validate_on_decode,
1012        )
1013    }
1014
1015    /// Creates a stream of "read tasks" to read the data from the file
1016    ///
1017    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
1018    /// of record batches it returns a stream of "read tasks".
1019    ///
1020    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1021    ///
1022    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1023    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1024    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1025    pub fn read_tasks(
1026        &self,
1027        params: ReadBatchParams,
1028        batch_size: u32,
1029        projection: Option<ReaderProjection>,
1030        filter: FilterExpression,
1031    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1032        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1033        Self::validate_projection(&projection, &self.metadata)?;
1034        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1035            if bound > self.num_rows || bound == self.num_rows && inclusive {
1036                Err(Error::invalid_input(
1037                    format!(
1038                        "cannot read {:?} from file with {} rows",
1039                        params, self.num_rows
1040                    ),
1041                    location!(),
1042                ))
1043            } else {
1044                Ok(())
1045            }
1046        };
1047        match &params {
1048            ReadBatchParams::Indices(indices) => {
1049                for idx in indices {
1050                    match idx {
1051                        None => {
1052                            return Err(Error::invalid_input(
1053                                "Null value in indices array",
1054                                location!(),
1055                            ));
1056                        }
1057                        Some(idx) => {
1058                            verify_bound(&params, idx as u64, true)?;
1059                        }
1060                    }
1061                }
1062                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1063                self.take_rows(indices, batch_size, projection)
1064            }
1065            ReadBatchParams::Range(range) => {
1066                verify_bound(&params, range.end as u64, false)?;
1067                self.read_range(
1068                    range.start as u64..range.end as u64,
1069                    batch_size,
1070                    projection,
1071                    filter,
1072                )
1073            }
1074            ReadBatchParams::Ranges(ranges) => {
1075                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1076                for range in ranges.as_ref() {
1077                    verify_bound(&params, range.end, false)?;
1078                    ranges_u64.push(range.start..range.end);
1079                }
1080                self.read_ranges(ranges_u64, batch_size, projection, filter)
1081            }
1082            ReadBatchParams::RangeFrom(range) => {
1083                verify_bound(&params, range.start as u64, true)?;
1084                self.read_range(
1085                    range.start as u64..self.num_rows,
1086                    batch_size,
1087                    projection,
1088                    filter,
1089                )
1090            }
1091            ReadBatchParams::RangeTo(range) => {
1092                verify_bound(&params, range.end as u64, false)?;
1093                self.read_range(0..range.end as u64, batch_size, projection, filter)
1094            }
1095            ReadBatchParams::RangeFull => {
1096                self.read_range(0..self.num_rows, batch_size, projection, filter)
1097            }
1098        }
1099    }
1100
1101    /// Reads data from the file as a stream of record batches
1102    ///
1103    /// * `params` - Specifies the range (or indices) of data to read
1104    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1105    ///   if it is the last batch or if it is not possible to create a batch of the
1106    ///   requested size.
1107    ///
1108    ///   For example, if the batch size is 1024 and one of the columns is a string
1109    ///   column then there may be some ranges of 1024 rows that contain more than
1110    ///   2^31 bytes of string data (which is the maximum size of a string column
1111    ///   in Arrow).  In this case smaller batches may be emitted.
1112    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1113    ///   amount of CPU parallelism of the read.  In other words it controls how many
1114    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1115    ///   of the read (how many I/O requests are in flight at once).
1116    ///
1117    ///   This parameter also is also related to backpressure.  If the consumer of the
1118    ///   stream is slow then the reader will build up RAM.
1119    /// * `projection` - A projection to apply to the read.  This controls which columns
1120    ///   are read from the file.  The projection is NOT applied on top of the base
1121    ///   projection.  The projection is applied directly to the file schema.
1122    pub fn read_stream_projected(
1123        &self,
1124        params: ReadBatchParams,
1125        batch_size: u32,
1126        batch_readahead: u32,
1127        projection: ReaderProjection,
1128        filter: FilterExpression,
1129    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1130        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1131        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1132        let batch_stream = tasks_stream
1133            .map(|task| task.task)
1134            .buffered(batch_readahead as usize)
1135            .boxed();
1136        Ok(Box::pin(RecordBatchStreamAdapter::new(
1137            arrow_schema,
1138            batch_stream,
1139        )))
1140    }
1141
1142    fn take_rows_blocking(
1143        &self,
1144        indices: Vec<u64>,
1145        batch_size: u32,
1146        projection: ReaderProjection,
1147        filter: FilterExpression,
1148    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1149        let column_infos = self.collect_columns_from_projection(&projection)?;
1150        debug!(
1151            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1152            indices.len(),
1153            indices[0],
1154            indices[indices.len() - 1],
1155            batch_size,
1156            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1157        );
1158
1159        let config = SchedulerDecoderConfig {
1160            batch_size,
1161            cache: self.cache.clone(),
1162            decoder_plugins: self.decoder_plugins.clone(),
1163            io: self.scheduler.clone(),
1164            should_validate: self.options.validate_on_decode,
1165        };
1166
1167        let requested_rows = RequestedRows::Indices(indices);
1168
1169        schedule_and_decode_blocking(
1170            column_infos,
1171            requested_rows,
1172            filter,
1173            projection.column_indices,
1174            projection.schema,
1175            config,
1176        )
1177    }
1178
1179    fn read_ranges_blocking(
1180        &self,
1181        ranges: Vec<Range<u64>>,
1182        batch_size: u32,
1183        projection: ReaderProjection,
1184        filter: FilterExpression,
1185    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1186        let column_infos = self.collect_columns_from_projection(&projection)?;
1187        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1188        debug!(
1189            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1190            ranges.len(),
1191            num_rows,
1192            ranges[0].start,
1193            ranges[ranges.len() - 1].end,
1194            batch_size,
1195            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1196        );
1197
1198        let config = SchedulerDecoderConfig {
1199            batch_size,
1200            cache: self.cache.clone(),
1201            decoder_plugins: self.decoder_plugins.clone(),
1202            io: self.scheduler.clone(),
1203            should_validate: self.options.validate_on_decode,
1204        };
1205
1206        let requested_rows = RequestedRows::Ranges(ranges);
1207
1208        schedule_and_decode_blocking(
1209            column_infos,
1210            requested_rows,
1211            filter,
1212            projection.column_indices,
1213            projection.schema,
1214            config,
1215        )
1216    }
1217
1218    fn read_range_blocking(
1219        &self,
1220        range: Range<u64>,
1221        batch_size: u32,
1222        projection: ReaderProjection,
1223        filter: FilterExpression,
1224    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1225        let column_infos = self.collect_columns_from_projection(&projection)?;
1226        let num_rows = self.num_rows;
1227
1228        debug!(
1229            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1230            range,
1231            batch_size,
1232            num_rows,
1233            column_infos.len(),
1234            projection.schema.fields.len(),
1235        );
1236
1237        let config = SchedulerDecoderConfig {
1238            batch_size,
1239            cache: self.cache.clone(),
1240            decoder_plugins: self.decoder_plugins.clone(),
1241            io: self.scheduler.clone(),
1242            should_validate: self.options.validate_on_decode,
1243        };
1244
1245        let requested_rows = RequestedRows::Ranges(vec![range]);
1246
1247        schedule_and_decode_blocking(
1248            column_infos,
1249            requested_rows,
1250            filter,
1251            projection.column_indices,
1252            projection.schema,
1253            config,
1254        )
1255    }
1256
1257    /// Read data from the file as an iterator of record batches
1258    ///
1259    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1260    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1261    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1262    ///
1263    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1264    /// use this method) because we can parallelize the decode.
1265    ///
1266    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1267    /// from a spawn_blocking context.
1268    pub fn read_stream_projected_blocking(
1269        &self,
1270        params: ReadBatchParams,
1271        batch_size: u32,
1272        projection: Option<ReaderProjection>,
1273        filter: FilterExpression,
1274    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1275        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1276        Self::validate_projection(&projection, &self.metadata)?;
1277        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1278            if bound > self.num_rows || bound == self.num_rows && inclusive {
1279                Err(Error::invalid_input(
1280                    format!(
1281                        "cannot read {:?} from file with {} rows",
1282                        params, self.num_rows
1283                    ),
1284                    location!(),
1285                ))
1286            } else {
1287                Ok(())
1288            }
1289        };
1290        match &params {
1291            ReadBatchParams::Indices(indices) => {
1292                for idx in indices {
1293                    match idx {
1294                        None => {
1295                            return Err(Error::invalid_input(
1296                                "Null value in indices array",
1297                                location!(),
1298                            ));
1299                        }
1300                        Some(idx) => {
1301                            verify_bound(&params, idx as u64, true)?;
1302                        }
1303                    }
1304                }
1305                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1306                self.take_rows_blocking(indices, batch_size, projection, filter)
1307            }
1308            ReadBatchParams::Range(range) => {
1309                verify_bound(&params, range.end as u64, false)?;
1310                self.read_range_blocking(
1311                    range.start as u64..range.end as u64,
1312                    batch_size,
1313                    projection,
1314                    filter,
1315                )
1316            }
1317            ReadBatchParams::Ranges(ranges) => {
1318                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1319                for range in ranges.as_ref() {
1320                    verify_bound(&params, range.end, false)?;
1321                    ranges_u64.push(range.start..range.end);
1322                }
1323                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1324            }
1325            ReadBatchParams::RangeFrom(range) => {
1326                verify_bound(&params, range.start as u64, true)?;
1327                self.read_range_blocking(
1328                    range.start as u64..self.num_rows,
1329                    batch_size,
1330                    projection,
1331                    filter,
1332                )
1333            }
1334            ReadBatchParams::RangeTo(range) => {
1335                verify_bound(&params, range.end as u64, false)?;
1336                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1337            }
1338            ReadBatchParams::RangeFull => {
1339                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1340            }
1341        }
1342    }
1343
1344    /// Reads data from the file as a stream of record batches
1345    ///
1346    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1347    /// provided when the file was opened (or reads all columns if the file was
1348    /// opened without a base projection)
1349    pub fn read_stream(
1350        &self,
1351        params: ReadBatchParams,
1352        batch_size: u32,
1353        batch_readahead: u32,
1354        filter: FilterExpression,
1355    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1356        self.read_stream_projected(
1357            params,
1358            batch_size,
1359            batch_readahead,
1360            self.base_projection.clone(),
1361            filter,
1362        )
1363    }
1364
1365    pub fn schema(&self) -> &Arc<Schema> {
1366        &self.metadata.file_schema
1367    }
1368}
1369
1370/// Inspects a page and returns a String describing the page's encoding
1371pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1372    if let Some(encoding) = &page.encoding {
1373        if let Some(style) = &encoding.location {
1374            match style {
1375                pbfile::encoding::Location::Indirect(indirect) => {
1376                    format!(
1377                        "IndirectEncoding(pos={},size={})",
1378                        indirect.buffer_location, indirect.buffer_length
1379                    )
1380                }
1381                pbfile::encoding::Location::Direct(direct) => {
1382                    let encoding_any =
1383                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1384                            .expect("failed to deserialize encoding as protobuf");
1385                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1386                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1387                        match encoding {
1388                            Ok(encoding) => {
1389                                format!("{:#?}", encoding)
1390                            }
1391                            Err(err) => {
1392                                format!("Unsupported(decode_err={})", err)
1393                            }
1394                        }
1395                    } else if encoding_any.type_url == "/lance.encodings.PageLayout" {
1396                        let encoding = encoding_any.to_msg::<pbenc::PageLayout>();
1397                        match encoding {
1398                            Ok(encoding) => {
1399                                format!("{:#?}", encoding)
1400                            }
1401                            Err(err) => {
1402                                format!("Unsupported(decode_err={})", err)
1403                            }
1404                        }
1405                    } else {
1406                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1407                    }
1408                }
1409                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1410            }
1411        } else {
1412            "MISSING STYLE".to_string()
1413        }
1414    } else {
1415        "MISSING".to_string()
1416    }
1417}
1418
1419pub trait EncodedBatchReaderExt {
1420    fn try_from_mini_lance(
1421        bytes: Bytes,
1422        schema: &Schema,
1423        version: LanceFileVersion,
1424    ) -> Result<Self>
1425    where
1426        Self: Sized;
1427    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1428    where
1429        Self: Sized;
1430}
1431
1432impl EncodedBatchReaderExt for EncodedBatch {
1433    fn try_from_mini_lance(
1434        bytes: Bytes,
1435        schema: &Schema,
1436        file_version: LanceFileVersion,
1437    ) -> Result<Self>
1438    where
1439        Self: Sized,
1440    {
1441        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1442        let footer = FileReader::decode_footer(&bytes)?;
1443
1444        // Next, read the metadata for the columns
1445        // This is both the column metadata and the CMO table
1446        let column_metadata_start = footer.column_meta_start as usize;
1447        let column_metadata_end = footer.global_buff_offsets_start as usize;
1448        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1449        let column_metadatas =
1450            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1451
1452        let file_version = LanceFileVersion::try_from_major_minor(
1453            footer.major_version as u32,
1454            footer.minor_version as u32,
1455        )?;
1456
1457        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1458
1459        Ok(Self {
1460            data: bytes,
1461            num_rows: page_table
1462                .first()
1463                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1464                .unwrap_or(0),
1465            page_table,
1466            top_level_columns: projection.column_indices,
1467            schema: Arc::new(schema.clone()),
1468        })
1469    }
1470
1471    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1472    where
1473        Self: Sized,
1474    {
1475        let footer = FileReader::decode_footer(&bytes)?;
1476        let file_version = LanceFileVersion::try_from_major_minor(
1477            footer.major_version as u32,
1478            footer.minor_version as u32,
1479        )?;
1480
1481        let gbo_table = FileReader::do_decode_gbo_table(
1482            &bytes.slice(footer.global_buff_offsets_start as usize..),
1483            &footer,
1484            file_version,
1485        )?;
1486        if gbo_table.is_empty() {
1487            return Err(Error::Internal {
1488                message: "File did not contain any global buffers, schema expected".to_string(),
1489                location: location!(),
1490            });
1491        }
1492        let schema_start = gbo_table[0].position as usize;
1493        let schema_size = gbo_table[0].size as usize;
1494
1495        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1496        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1497        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1498
1499        // Next, read the metadata for the columns
1500        // This is both the column metadata and the CMO table
1501        let column_metadata_start = footer.column_meta_start as usize;
1502        let column_metadata_end = footer.global_buff_offsets_start as usize;
1503        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1504        let column_metadatas =
1505            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1506
1507        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1508
1509        Ok(Self {
1510            data: bytes,
1511            num_rows: page_table
1512                .first()
1513                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1514                .unwrap_or(0),
1515            page_table,
1516            top_level_columns: projection.column_indices,
1517            schema: Arc::new(schema),
1518        })
1519    }
1520}
1521
1522#[cfg(test)]
1523pub mod tests {
1524    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1525
1526    use arrow_array::{
1527        types::{Float64Type, Int32Type},
1528        RecordBatch, UInt32Array,
1529    };
1530    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1531    use bytes::Bytes;
1532    use futures::{prelude::stream::TryStreamExt, StreamExt};
1533    use lance_arrow::RecordBatchExt;
1534    use lance_core::{datatypes::Schema, ArrowResult};
1535    use lance_datagen::{array, gen, BatchCount, ByteCount, RowCount};
1536    use lance_encoding::{
1537        decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1538        encoder::{encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodingOptions},
1539        version::LanceFileVersion,
1540    };
1541    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1542    use log::debug;
1543    use rstest::rstest;
1544    use tokio::sync::mpsc;
1545
1546    use crate::v2::{
1547        reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1548        testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1549        writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1550    };
1551
1552    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1553        let location_type = DataType::Struct(Fields::from(vec![
1554            Field::new("x", DataType::Float64, true),
1555            Field::new("y", DataType::Float64, true),
1556        ]));
1557        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1558
1559        let mut reader = gen()
1560            .col("score", array::rand::<Float64Type>())
1561            .col("location", array::rand_type(&location_type))
1562            .col("categories", array::rand_type(&categories_type))
1563            .col("binary", array::rand_type(&DataType::Binary));
1564        if version <= LanceFileVersion::V2_0 {
1565            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1566        }
1567        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1568
1569        write_lance_file(
1570            reader,
1571            fs,
1572            FileWriterOptions {
1573                format_version: Some(version),
1574                ..Default::default()
1575            },
1576        )
1577        .await
1578    }
1579
1580    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1581
1582    async fn verify_expected(
1583        expected: &[RecordBatch],
1584        mut actual: Pin<Box<dyn RecordBatchStream>>,
1585        read_size: u32,
1586        transform: Option<Transformer>,
1587    ) {
1588        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1589        let mut expected_iter = expected.iter().map(|batch| {
1590            if let Some(transform) = &transform {
1591                transform(batch)
1592            } else {
1593                batch.clone()
1594            }
1595        });
1596        let mut next_expected = expected_iter.next().unwrap().clone();
1597        while let Some(actual) = actual.next().await {
1598            let mut actual = actual.unwrap();
1599            let mut rows_to_verify = actual.num_rows() as u32;
1600            let expected_length = remaining.min(read_size);
1601            assert_eq!(expected_length, rows_to_verify);
1602
1603            while rows_to_verify > 0 {
1604                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1605                assert_eq!(
1606                    next_expected.slice(0, next_slice_len as usize),
1607                    actual.slice(0, next_slice_len as usize)
1608                );
1609                remaining -= next_slice_len;
1610                rows_to_verify -= next_slice_len;
1611                if remaining > 0 {
1612                    if next_slice_len == next_expected.num_rows() as u32 {
1613                        next_expected = expected_iter.next().unwrap().clone();
1614                    } else {
1615                        next_expected = next_expected.slice(
1616                            next_slice_len as usize,
1617                            next_expected.num_rows() - next_slice_len as usize,
1618                        );
1619                    }
1620                }
1621                if rows_to_verify > 0 {
1622                    actual = actual.slice(
1623                        next_slice_len as usize,
1624                        actual.num_rows() - next_slice_len as usize,
1625                    );
1626                }
1627            }
1628        }
1629        assert_eq!(remaining, 0);
1630    }
1631
1632    #[tokio::test]
1633    async fn test_round_trip() {
1634        let fs = FsFixture::default();
1635
1636        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1637
1638        for read_size in [32, 1024, 1024 * 1024] {
1639            let file_scheduler = fs
1640                .scheduler
1641                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1642                .await
1643                .unwrap();
1644            let file_reader = FileReader::try_open(
1645                file_scheduler,
1646                None,
1647                Arc::<DecoderPlugins>::default(),
1648                &test_cache(),
1649                FileReaderOptions::default(),
1650            )
1651            .await
1652            .unwrap();
1653
1654            let schema = file_reader.schema();
1655            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1656
1657            let batch_stream = file_reader
1658                .read_stream(
1659                    lance_io::ReadBatchParams::RangeFull,
1660                    read_size,
1661                    16,
1662                    FilterExpression::no_filter(),
1663                )
1664                .unwrap();
1665
1666            verify_expected(&data, batch_stream, read_size, None).await;
1667        }
1668    }
1669
1670    #[test_log::test(tokio::test)]
1671    async fn test_encoded_batch_round_trip() {
1672        let data = gen()
1673            .col("x", array::rand::<Int32Type>())
1674            .col("y", array::rand_utf8(ByteCount::from(16), false))
1675            .into_batch_rows(RowCount::from(10000))
1676            .unwrap();
1677
1678        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1679
1680        let encoding_options = EncodingOptions {
1681            cache_bytes_per_column: 4096,
1682            max_page_bytes: 32 * 1024 * 1024,
1683            keep_original_array: true,
1684            buffer_alignment: 64,
1685        };
1686        let encoded_batch = encode_batch(
1687            &data,
1688            lance_schema.clone(),
1689            &CoreFieldEncodingStrategy::default(),
1690            &encoding_options,
1691        )
1692        .await
1693        .unwrap();
1694
1695        // Test self described
1696        let bytes = encoded_batch.try_to_self_described_lance().unwrap();
1697
1698        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1699
1700        let decoded = decode_batch(
1701            &decoded_batch,
1702            &FilterExpression::no_filter(),
1703            Arc::<DecoderPlugins>::default(),
1704            false,
1705            LanceFileVersion::default(),
1706            None,
1707        )
1708        .await
1709        .unwrap();
1710
1711        assert_eq!(data, decoded);
1712
1713        // Test mini
1714        let bytes = encoded_batch.try_to_mini_lance().unwrap();
1715        let decoded_batch =
1716            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1717                .unwrap();
1718        let decoded = decode_batch(
1719            &decoded_batch,
1720            &FilterExpression::no_filter(),
1721            Arc::<DecoderPlugins>::default(),
1722            false,
1723            LanceFileVersion::default(),
1724            None,
1725        )
1726        .await
1727        .unwrap();
1728
1729        assert_eq!(data, decoded);
1730    }
1731
1732    #[rstest]
1733    #[test_log::test(tokio::test)]
1734    async fn test_projection(
1735        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1736    ) {
1737        let fs = FsFixture::default();
1738
1739        let written_file = create_some_file(&fs, version).await;
1740        let file_scheduler = fs
1741            .scheduler
1742            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1743            .await
1744            .unwrap();
1745
1746        let field_id_mapping = written_file
1747            .field_id_mapping
1748            .iter()
1749            .copied()
1750            .collect::<BTreeMap<_, _>>();
1751
1752        let empty_projection = ReaderProjection {
1753            column_indices: Vec::default(),
1754            schema: Arc::new(Schema::default()),
1755        };
1756
1757        for columns in [
1758            vec!["score"],
1759            vec!["location"],
1760            vec!["categories"],
1761            vec!["score.x"],
1762            vec!["score", "categories"],
1763            vec!["score", "location"],
1764            vec!["location", "categories"],
1765            vec!["score.y", "location", "categories"],
1766        ] {
1767            debug!("Testing round trip with projection {:?}", columns);
1768            for use_field_ids in [true, false] {
1769                // We can specify the projection as part of the read operation via read_stream_projected
1770                let file_reader = FileReader::try_open(
1771                    file_scheduler.clone(),
1772                    None,
1773                    Arc::<DecoderPlugins>::default(),
1774                    &test_cache(),
1775                    FileReaderOptions::default(),
1776                )
1777                .await
1778                .unwrap();
1779
1780                let projected_schema = written_file.schema.project(&columns).unwrap();
1781                let projection = if use_field_ids {
1782                    ReaderProjection::from_field_ids(
1783                        file_reader.metadata.version(),
1784                        &projected_schema,
1785                        &field_id_mapping,
1786                    )
1787                    .unwrap()
1788                } else {
1789                    ReaderProjection::from_column_names(
1790                        file_reader.metadata.version(),
1791                        &written_file.schema,
1792                        &columns,
1793                    )
1794                    .unwrap()
1795                };
1796
1797                let batch_stream = file_reader
1798                    .read_stream_projected(
1799                        lance_io::ReadBatchParams::RangeFull,
1800                        1024,
1801                        16,
1802                        projection.clone(),
1803                        FilterExpression::no_filter(),
1804                    )
1805                    .unwrap();
1806
1807                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1808                verify_expected(
1809                    &written_file.data,
1810                    batch_stream,
1811                    1024,
1812                    Some(Box::new(move |batch: &RecordBatch| {
1813                        batch.project_by_schema(&projection_arrow).unwrap()
1814                    })),
1815                )
1816                .await;
1817
1818                // We can also specify the projection as a base projection when we open the file
1819                let file_reader = FileReader::try_open(
1820                    file_scheduler.clone(),
1821                    Some(projection.clone()),
1822                    Arc::<DecoderPlugins>::default(),
1823                    &test_cache(),
1824                    FileReaderOptions::default(),
1825                )
1826                .await
1827                .unwrap();
1828
1829                let batch_stream = file_reader
1830                    .read_stream(
1831                        lance_io::ReadBatchParams::RangeFull,
1832                        1024,
1833                        16,
1834                        FilterExpression::no_filter(),
1835                    )
1836                    .unwrap();
1837
1838                let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1839                verify_expected(
1840                    &written_file.data,
1841                    batch_stream,
1842                    1024,
1843                    Some(Box::new(move |batch: &RecordBatch| {
1844                        batch.project_by_schema(&projection_arrow).unwrap()
1845                    })),
1846                )
1847                .await;
1848
1849                assert!(file_reader
1850                    .read_stream_projected(
1851                        lance_io::ReadBatchParams::RangeFull,
1852                        1024,
1853                        16,
1854                        empty_projection.clone(),
1855                        FilterExpression::no_filter(),
1856                    )
1857                    .is_err());
1858            }
1859        }
1860
1861        assert!(FileReader::try_open(
1862            file_scheduler.clone(),
1863            Some(empty_projection),
1864            Arc::<DecoderPlugins>::default(),
1865            &test_cache(),
1866            FileReaderOptions::default(),
1867        )
1868        .await
1869        .is_err());
1870
1871        let arrow_schema = ArrowSchema::new(vec![
1872            Field::new("x", DataType::Int32, true),
1873            Field::new("y", DataType::Int32, true),
1874        ]);
1875        let schema = Schema::try_from(&arrow_schema).unwrap();
1876
1877        let projection_with_dupes = ReaderProjection {
1878            column_indices: vec![0, 0],
1879            schema: Arc::new(schema),
1880        };
1881
1882        assert!(FileReader::try_open(
1883            file_scheduler.clone(),
1884            Some(projection_with_dupes),
1885            Arc::<DecoderPlugins>::default(),
1886            &test_cache(),
1887            FileReaderOptions::default(),
1888        )
1889        .await
1890        .is_err());
1891    }
1892
1893    #[test_log::test(tokio::test)]
1894    async fn test_compressing_buffer() {
1895        let fs = FsFixture::default();
1896
1897        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1898        let file_scheduler = fs
1899            .scheduler
1900            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1901            .await
1902            .unwrap();
1903
1904        // We can specify the projection as part of the read operation via read_stream_projected
1905        let file_reader = FileReader::try_open(
1906            file_scheduler.clone(),
1907            None,
1908            Arc::<DecoderPlugins>::default(),
1909            &test_cache(),
1910            FileReaderOptions::default(),
1911        )
1912        .await
1913        .unwrap();
1914
1915        let mut projection = written_file.schema.project(&["score"]).unwrap();
1916        for field in projection.fields.iter_mut() {
1917            field
1918                .metadata
1919                .insert("lance:compression".to_string(), "zstd".to_string());
1920        }
1921        let projection = ReaderProjection {
1922            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1923            schema: Arc::new(projection),
1924        };
1925
1926        let batch_stream = file_reader
1927            .read_stream_projected(
1928                lance_io::ReadBatchParams::RangeFull,
1929                1024,
1930                16,
1931                projection.clone(),
1932                FilterExpression::no_filter(),
1933            )
1934            .unwrap();
1935
1936        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1937        verify_expected(
1938            &written_file.data,
1939            batch_stream,
1940            1024,
1941            Some(Box::new(move |batch: &RecordBatch| {
1942                batch.project_by_schema(&projection_arrow).unwrap()
1943            })),
1944        )
1945        .await;
1946    }
1947
1948    #[tokio::test]
1949    async fn test_read_all() {
1950        let fs = FsFixture::default();
1951        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1952        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1953
1954        let file_scheduler = fs
1955            .scheduler
1956            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1957            .await
1958            .unwrap();
1959        let file_reader = FileReader::try_open(
1960            file_scheduler.clone(),
1961            None,
1962            Arc::<DecoderPlugins>::default(),
1963            &test_cache(),
1964            FileReaderOptions::default(),
1965        )
1966        .await
1967        .unwrap();
1968
1969        let batches = file_reader
1970            .read_stream(
1971                lance_io::ReadBatchParams::RangeFull,
1972                total_rows as u32,
1973                16,
1974                FilterExpression::no_filter(),
1975            )
1976            .unwrap()
1977            .try_collect::<Vec<_>>()
1978            .await
1979            .unwrap();
1980        assert_eq!(batches.len(), 1);
1981        assert_eq!(batches[0].num_rows(), total_rows);
1982    }
1983
1984    #[tokio::test]
1985    async fn test_blocking_take() {
1986        let fs = FsFixture::default();
1987        let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
1988        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1989
1990        let file_scheduler = fs
1991            .scheduler
1992            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1993            .await
1994            .unwrap();
1995        let file_reader = FileReader::try_open(
1996            file_scheduler.clone(),
1997            Some(
1998                ReaderProjection::from_column_names(LanceFileVersion::V2_1, &schema, &["score"])
1999                    .unwrap(),
2000            ),
2001            Arc::<DecoderPlugins>::default(),
2002            &test_cache(),
2003            FileReaderOptions::default(),
2004        )
2005        .await
2006        .unwrap();
2007
2008        let batches = tokio::task::spawn_blocking(move || {
2009            file_reader
2010                .read_stream_projected_blocking(
2011                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2012                    total_rows as u32,
2013                    None,
2014                    FilterExpression::no_filter(),
2015                )
2016                .unwrap()
2017                .collect::<ArrowResult<Vec<_>>>()
2018                .unwrap()
2019        })
2020        .await
2021        .unwrap();
2022
2023        assert_eq!(batches.len(), 1);
2024        assert_eq!(batches[0].num_rows(), 5);
2025        assert_eq!(batches[0].num_columns(), 1);
2026    }
2027
2028    #[tokio::test(flavor = "multi_thread")]
2029    async fn test_drop_in_progress() {
2030        let fs = FsFixture::default();
2031        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2032        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2033
2034        let file_scheduler = fs
2035            .scheduler
2036            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2037            .await
2038            .unwrap();
2039        let file_reader = FileReader::try_open(
2040            file_scheduler.clone(),
2041            None,
2042            Arc::<DecoderPlugins>::default(),
2043            &test_cache(),
2044            FileReaderOptions::default(),
2045        )
2046        .await
2047        .unwrap();
2048
2049        let mut batches = file_reader
2050            .read_stream(
2051                lance_io::ReadBatchParams::RangeFull,
2052                (total_rows / 10) as u32,
2053                16,
2054                FilterExpression::no_filter(),
2055            )
2056            .unwrap();
2057
2058        drop(file_reader);
2059
2060        let batch = batches.next().await.unwrap().unwrap();
2061        assert!(batch.num_rows() > 0);
2062
2063        // Drop in-progress scan
2064        drop(batches);
2065    }
2066
2067    #[tokio::test]
2068    async fn drop_while_scheduling() {
2069        // This is a bit of a white-box test, pokes at the internals.  We want to
2070        // test the case where the read stream is dropped before the scheduling
2071        // thread finishes.  We can't do that in a black-box fashion because the
2072        // scheduling thread runs in the background and there is no easy way to
2073        // pause / gate it.
2074
2075        // It's a regression for a bug where the scheduling thread would panic
2076        // if the stream was dropped before it finished.
2077
2078        let fs = FsFixture::default();
2079        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2080        let total_rows = written_file
2081            .data
2082            .iter()
2083            .map(|batch| batch.num_rows())
2084            .sum::<usize>();
2085
2086        let file_scheduler = fs
2087            .scheduler
2088            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2089            .await
2090            .unwrap();
2091        let file_reader = FileReader::try_open(
2092            file_scheduler.clone(),
2093            None,
2094            Arc::<DecoderPlugins>::default(),
2095            &test_cache(),
2096            FileReaderOptions::default(),
2097        )
2098        .await
2099        .unwrap();
2100
2101        let projection =
2102            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2103        let column_infos = file_reader
2104            .collect_columns_from_projection(&projection)
2105            .unwrap();
2106        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2107            &projection.schema,
2108            &projection.column_indices,
2109            &column_infos,
2110            &vec![],
2111            total_rows as u64,
2112            Arc::<DecoderPlugins>::default(),
2113            file_reader.scheduler.clone(),
2114            test_cache(),
2115            &FilterExpression::no_filter(),
2116        )
2117        .await
2118        .unwrap();
2119
2120        let range = 0..total_rows as u64;
2121
2122        let (tx, rx) = mpsc::unbounded_channel();
2123
2124        // Simulate the stream / decoder being dropped
2125        drop(rx);
2126
2127        // Scheduling should not panic
2128        decode_scheduler.schedule_range(
2129            range,
2130            &FilterExpression::no_filter(),
2131            tx,
2132            file_reader.scheduler.clone(),
2133        )
2134    }
2135
2136    #[tokio::test]
2137    async fn test_global_buffers() {
2138        let fs = FsFixture::default();
2139
2140        let lance_schema =
2141            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2142                "foo",
2143                DataType::Int32,
2144                true,
2145            )]))
2146            .unwrap();
2147
2148        let mut file_writer = FileWriter::try_new(
2149            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2150            lance_schema.clone(),
2151            FileWriterOptions::default(),
2152        )
2153        .unwrap();
2154
2155        let test_bytes = Bytes::from_static(b"hello");
2156
2157        let buf_index = file_writer
2158            .add_global_buffer(test_bytes.clone())
2159            .await
2160            .unwrap();
2161
2162        assert_eq!(buf_index, 1);
2163
2164        file_writer.finish().await.unwrap();
2165
2166        let file_scheduler = fs
2167            .scheduler
2168            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2169            .await
2170            .unwrap();
2171        let file_reader = FileReader::try_open(
2172            file_scheduler.clone(),
2173            None,
2174            Arc::<DecoderPlugins>::default(),
2175            &test_cache(),
2176            FileReaderOptions::default(),
2177        )
2178        .await
2179        .unwrap();
2180
2181        let buf = file_reader.read_global_buffer(1).await.unwrap();
2182        assert_eq!(buf, test_bytes);
2183    }
2184}