lance_file/v2/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19    decoder::{
20        schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderPlugins,
21        FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22        SchedulerDecoderConfig,
23    },
24    encoder::EncodedBatch,
25    version::LanceFileVersion,
26    EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34    cache::FileMetadataCache,
35    datatypes::{Field, Schema},
36    Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_io::{
40    scheduler::FileScheduler,
41    stream::{RecordBatchStream, RecordBatchStreamAdapter},
42    ReadBatchParams,
43};
44
45use crate::{
46    datatypes::{Fields, FieldsWithMeta},
47    format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
48    v2::writer::PAGE_BUFFER_ALIGNMENT,
49};
50
51use super::io::LanceEncodingsIo;
52
53// For now, we don't use global buffers for anything other than schema.  If we
54// use these later we should make them lazily loaded and then cached once loaded.
55//
56// We store their position / length for debugging purposes
57#[derive(Debug, DeepSizeOf)]
58pub struct BufferDescriptor {
59    pub position: u64,
60    pub size: u64,
61}
62
63/// Statistics summarize some of the file metadata for quick summary info
64#[derive(Debug)]
65pub struct FileStatistics {
66    /// Statistics about each of the columns in the file
67    pub columns: Vec<ColumnStatistics>,
68}
69
70/// Summary information describing a column
71#[derive(Debug)]
72pub struct ColumnStatistics {
73    /// The number of pages in the column
74    pub num_pages: usize,
75    /// The total number of data & metadata bytes in the column
76    ///
77    /// This is the compressed on-disk size
78    pub size_bytes: u64,
79}
80
81// TODO: Caching
82#[derive(Debug)]
83pub struct CachedFileMetadata {
84    /// The schema of the file
85    pub file_schema: Arc<Schema>,
86    /// The column metadatas
87    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
88    pub column_infos: Vec<Arc<ColumnInfo>>,
89    /// The number of rows in the file
90    pub num_rows: u64,
91    pub file_buffers: Vec<BufferDescriptor>,
92    /// The number of bytes contained in the data page section of the file
93    pub num_data_bytes: u64,
94    /// The number of bytes contained in the column metadata (not including buffers
95    /// referenced by the metadata)
96    pub num_column_metadata_bytes: u64,
97    /// The number of bytes contained in global buffers
98    pub num_global_buffer_bytes: u64,
99    /// The number of bytes contained in the CMO and GBO tables
100    pub num_footer_bytes: u64,
101    pub major_version: u16,
102    pub minor_version: u16,
103}
104
105impl DeepSizeOf for CachedFileMetadata {
106    // TODO: include size for `column_metadatas` and `column_infos`.
107    fn deep_size_of_children(&self, context: &mut Context) -> usize {
108        self.file_schema.deep_size_of_children(context)
109            + self
110                .file_buffers
111                .iter()
112                .map(|file_buffer| file_buffer.deep_size_of_children(context))
113                .sum::<usize>()
114    }
115}
116
117impl CachedFileMetadata {
118    pub fn version(&self) -> LanceFileVersion {
119        match (self.major_version, self.minor_version) {
120            (0, 3) => LanceFileVersion::V2_0,
121            (2, 1) => LanceFileVersion::V2_1,
122            _ => panic!(
123                "Unsupported version: {}.{}",
124                self.major_version, self.minor_version
125            ),
126        }
127    }
128}
129
130/// Selecting columns from a lance file requires specifying both the
131/// index of the column and the data type of the column
132///
133/// Partly, this is because it is not strictly required that columns
134/// be read into the same type.  For example, a string column may be
135/// read as a string, large_string or string_view type.
136///
137/// A read will only succeed if the decoder for a column is capable
138/// of decoding into the requested type.
139///
140/// Note that this should generally be limited to different in-memory
141/// representations of the same semantic type.  An encoding could
142/// theoretically support "casting" (e.g. int to string,  etc.) but
143/// there is little advantage in doing so here.
144///
145/// Note: in order to specify a projection the user will need some way
146/// to figure out the column indices.  In the table format we do this
147/// using field IDs and keeping track of the field id->column index mapping.
148///
149/// If users are not using the table format then they will need to figure
150/// out some way to do this themselves.
151#[derive(Debug, Clone)]
152pub struct ReaderProjection {
153    /// The data types (schema) of the selected columns.  The names
154    /// of the schema are arbitrary and ignored.
155    pub schema: Arc<Schema>,
156    /// The indices of the columns to load.
157    ///
158    /// The mapping should be as follows:
159    ///
160    /// - Primitive: the index of the column in the schema
161    /// - List: the index of the list column in the schema
162    ///   followed by the column indices of the children
163    /// - FixedSizeList (of primitive): the index of the column in the schema
164    ///   (this case is not nested)
165    /// - FixedSizeList (of non-primitive): not yet implemented
166    /// - Dictionary: same as primitive
167    /// - Struct: the index of the struct column in the schema
168    ///   followed by the column indices of the children
169    ///
170    /// In other words, this should be a DFS listing of the desired schema.
171    ///
172    /// For example, if the goal is to load:
173    ///
174    ///   x: int32
175    ///   y: struct<z: int32, w: string>
176    ///   z: list<int32>
177    ///
178    /// and the schema originally used to store the data was:
179    ///
180    ///   a: struct<x: int32>
181    ///   b: int64
182    ///   y: struct<z: int32, c: int64, w: string>
183    ///   z: list<int32>
184    ///
185    /// Then the column_indices should be [1, 3, 4, 6, 7, 8]
186    pub column_indices: Vec<u32>,
187}
188
189impl ReaderProjection {
190    fn from_field_ids_helper<'a>(
191        reader: &FileReader,
192        fields: impl Iterator<Item = &'a Field>,
193        field_id_to_column_index: &BTreeMap<u32, u32>,
194        column_indices: &mut Vec<u32>,
195    ) -> Result<()> {
196        for field in fields {
197            let is_structural = reader.metadata.version() >= LanceFileVersion::V2_1;
198            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
199            // we only need ids for leaf fields.
200            if !is_structural || field.children.is_empty() {
201                if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
202                {
203                    column_indices.push(column_idx);
204                }
205            }
206            Self::from_field_ids_helper(
207                reader,
208                field.children.iter(),
209                field_id_to_column_index,
210                column_indices,
211            )?;
212        }
213        Ok(())
214    }
215
216    /// Creates a projection using a mapping from field IDs to column indices
217    ///
218    /// You can obtain such a mapping when the file is written using the
219    /// [`crate::v2::writer::FileWriter::field_id_to_column_indices`] method.
220    pub fn from_field_ids(
221        reader: &FileReader,
222        schema: &Schema,
223        field_id_to_column_index: &BTreeMap<u32, u32>,
224    ) -> Result<Self> {
225        let mut column_indices = Vec::new();
226        Self::from_field_ids_helper(
227            reader,
228            schema.fields.iter(),
229            field_id_to_column_index,
230            &mut column_indices,
231        )?;
232        Ok(Self {
233            schema: Arc::new(schema.clone()),
234            column_indices,
235        })
236    }
237
238    /// Creates a projection that reads the entire file
239    ///
240    /// If the schema provided is not the schema of the entire file then
241    /// the projection will be invalid and the read will fail.
242    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
243    /// the whole struct has one column index.
244    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
245    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
246        let schema = Arc::new(schema.clone());
247        let is_structural = version >= LanceFileVersion::V2_1;
248        let mut column_indices = vec![];
249        let mut curr_column_idx = 0;
250        let mut packed_struct_fields_num = 0;
251        for field in schema.fields_pre_order() {
252            if packed_struct_fields_num > 0 {
253                packed_struct_fields_num -= 1;
254                continue;
255            }
256            if field.is_packed_struct() {
257                column_indices.push(curr_column_idx);
258                curr_column_idx += 1;
259                packed_struct_fields_num = field.children.len();
260            } else if field.children.is_empty() || !is_structural {
261                column_indices.push(curr_column_idx);
262                curr_column_idx += 1;
263            }
264        }
265        Self {
266            schema,
267            column_indices,
268        }
269    }
270
271    /// Creates a projection that reads the specified columns provided by name
272    ///
273    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
274    ///
275    /// If the schema provided is not the schema of the entire file then
276    /// the projection will be invalid and the read will fail.
277    pub fn from_column_names(schema: &Schema, column_names: &[&str]) -> Result<Self> {
278        let field_id_to_column_index = schema
279            .fields_pre_order()
280            .enumerate()
281            .map(|(idx, field)| (field.id as u32, idx as u32))
282            .collect::<BTreeMap<_, _>>();
283        let projected = schema.project(column_names)?;
284        let column_indices = projected
285            .fields_pre_order()
286            .map(|f| field_id_to_column_index[&(f.id as u32)])
287            .collect::<Vec<_>>();
288        Ok(Self {
289            schema: Arc::new(projected),
290            column_indices,
291        })
292    }
293}
294
295#[derive(Clone, Debug, Default)]
296pub struct FileReaderOptions {
297    validate_on_decode: bool,
298}
299
300#[derive(Debug)]
301pub struct FileReader {
302    scheduler: Arc<dyn EncodingsIo>,
303    // The default projection to be applied to all reads
304    base_projection: ReaderProjection,
305    num_rows: u64,
306    metadata: Arc<CachedFileMetadata>,
307    decoder_plugins: Arc<DecoderPlugins>,
308    cache: Arc<FileMetadataCache>,
309    options: FileReaderOptions,
310}
311#[derive(Debug)]
312struct Footer {
313    #[allow(dead_code)]
314    column_meta_start: u64,
315    // We don't use this today because we always load metadata for every column
316    // and don't yet support "metadata projection"
317    #[allow(dead_code)]
318    column_meta_offsets_start: u64,
319    global_buff_offsets_start: u64,
320    num_global_buffers: u32,
321    num_columns: u32,
322    major_version: u16,
323    minor_version: u16,
324}
325
326const FOOTER_LEN: usize = 40;
327
328impl FileReader {
329    pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
330        Self {
331            scheduler,
332            base_projection: self.base_projection.clone(),
333            cache: self.cache.clone(),
334            decoder_plugins: self.decoder_plugins.clone(),
335            metadata: self.metadata.clone(),
336            options: self.options.clone(),
337            num_rows: self.num_rows,
338        }
339    }
340
341    pub fn num_rows(&self) -> u64 {
342        self.num_rows
343    }
344
345    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
346        &self.metadata
347    }
348
349    pub fn file_statistics(&self) -> FileStatistics {
350        let column_metadatas = &self.metadata().column_metadatas;
351
352        let column_stats = column_metadatas
353            .iter()
354            .map(|col_metadata| {
355                let num_pages = col_metadata.pages.len();
356                let size_bytes = col_metadata
357                    .pages
358                    .iter()
359                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
360                    .sum::<u64>();
361                ColumnStatistics {
362                    num_pages,
363                    size_bytes,
364                }
365            })
366            .collect();
367
368        FileStatistics {
369            columns: column_stats,
370        }
371    }
372
373    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
374        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
375        self.scheduler
376            .submit_single(
377                buffer_desc.position..buffer_desc.position + buffer_desc.size,
378                0,
379            )
380            .await
381    }
382
383    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
384        let file_size = scheduler.reader().size().await? as u64;
385        let begin = if file_size < scheduler.reader().block_size() as u64 {
386            0
387        } else {
388            file_size - scheduler.reader().block_size() as u64
389        };
390        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
391        Ok((tail_bytes, file_size))
392    }
393
394    // Checks to make sure the footer is written correctly and returns the
395    // position of the file descriptor (which comes from the footer)
396    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
397        let len = footer_bytes.len();
398        if len < FOOTER_LEN {
399            return Err(Error::io(
400                format!(
401                    "does not have sufficient data, len: {}, bytes: {:?}",
402                    len, footer_bytes
403                ),
404                location!(),
405            ));
406        }
407        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
408
409        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
410        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
411        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
412        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
413        let num_columns = cursor.read_u32::<LittleEndian>()?;
414        let major_version = cursor.read_u16::<LittleEndian>()?;
415        let minor_version = cursor.read_u16::<LittleEndian>()?;
416
417        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
418            return Err(Error::version_conflict(
419                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
420                major_version,
421                minor_version,
422                location!(),
423            ));
424        }
425
426        let magic_bytes = footer_bytes.slice(len - 4..);
427        if magic_bytes.as_ref() != MAGIC {
428            return Err(Error::io(
429                format!(
430                    "file does not appear to be a Lance file (invalid magic: {:?})",
431                    MAGIC
432                ),
433                location!(),
434            ));
435        }
436        Ok(Footer {
437            column_meta_start,
438            column_meta_offsets_start,
439            global_buff_offsets_start,
440            num_global_buffers,
441            num_columns,
442            major_version,
443            minor_version,
444        })
445    }
446
447    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
448    fn read_all_column_metadata(
449        column_metadata_bytes: Bytes,
450        footer: &Footer,
451    ) -> Result<Vec<pbfile::ColumnMetadata>> {
452        let column_metadata_start = footer.column_meta_start;
453        // cmo == column_metadata_offsets
454        let cmo_table_size = 16 * footer.num_columns as usize;
455        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
456
457        (0..footer.num_columns)
458            .map(|col_idx| {
459                let offset = (col_idx * 16) as usize;
460                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
461                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
462                let normalized_position = (position - column_metadata_start) as usize;
463                let normalized_end = normalized_position + (length as usize);
464                Ok(pbfile::ColumnMetadata::decode(
465                    &column_metadata_bytes[normalized_position..normalized_end],
466                )?)
467            })
468            .collect::<Result<Vec<_>>>()
469    }
470
471    async fn optimistic_tail_read(
472        data: &Bytes,
473        start_pos: u64,
474        scheduler: &FileScheduler,
475        file_len: u64,
476    ) -> Result<Bytes> {
477        let num_bytes_needed = (file_len - start_pos) as usize;
478        if data.len() >= num_bytes_needed {
479            Ok(data.slice((data.len() - num_bytes_needed)..))
480        } else {
481            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
482            let start = file_len - num_bytes_needed as u64;
483            let missing_bytes = scheduler
484                .submit_single(start..start + num_bytes_missing, 0)
485                .await?;
486            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
487            combined.extend(missing_bytes);
488            combined.extend(data);
489            Ok(combined.freeze())
490        }
491    }
492
493    fn do_decode_gbo_table(
494        gbo_bytes: &Bytes,
495        footer: &Footer,
496        version: LanceFileVersion,
497    ) -> Result<Vec<BufferDescriptor>> {
498        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
499
500        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
501        for _ in 0..footer.num_global_buffers {
502            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
503            assert!(
504                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
505            );
506            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
507            global_buffers.push(BufferDescriptor {
508                position: buf_pos,
509                size: buf_size,
510            });
511        }
512
513        Ok(global_buffers)
514    }
515
516    async fn decode_gbo_table(
517        tail_bytes: &Bytes,
518        file_len: u64,
519        scheduler: &FileScheduler,
520        footer: &Footer,
521        version: LanceFileVersion,
522    ) -> Result<Vec<BufferDescriptor>> {
523        // This could, in theory, trigger another IOP but the GBO table should never be large
524        // enough for that to happen
525        let gbo_bytes = Self::optimistic_tail_read(
526            tail_bytes,
527            footer.global_buff_offsets_start,
528            scheduler,
529            file_len,
530        )
531        .await?;
532        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
533    }
534
535    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
536        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
537        let pb_schema = file_descriptor.schema.unwrap();
538        let num_rows = file_descriptor.length;
539        let fields_with_meta = FieldsWithMeta {
540            fields: Fields(pb_schema.fields),
541            metadata: pb_schema.metadata,
542        };
543        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
544        Ok((num_rows, schema))
545    }
546
547    // TODO: Support late projection.  Currently, if we want to perform a
548    // projected read of a file, we load all of the column metadata, and then
549    // only read the column data that is requested.  This is fine for most cases.
550    //
551    // However, if there are many columns then loading all of the column metadata
552    // may be expensive.  We should support a mode where we only load the column
553    // metadata for the columns that are requested (the file format supports this).
554    //
555    // The main challenge is that we either need to ignore the column metadata cache
556    // or have a more sophisticated cache that can cache per-column metadata.
557    //
558    // Also, if the number of columns is fairly small, it's faster to read them as a
559    // single IOP, but we can fix this through coalescing.
560    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
561        // 1. read the footer
562        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
563        let footer = Self::decode_footer(&tail_bytes)?;
564
565        let file_version = LanceFileVersion::try_from_major_minor(
566            footer.major_version as u32,
567            footer.minor_version as u32,
568        )?;
569
570        let gbo_table =
571            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
572        if gbo_table.is_empty() {
573            return Err(Error::Internal {
574                message: "File did not contain any global buffers, schema expected".to_string(),
575                location: location!(),
576            });
577        }
578        let schema_start = gbo_table[0].position;
579        let schema_size = gbo_table[0].size;
580
581        let num_footer_bytes = file_len - schema_start;
582
583        // By default we read all column metadatas.  We do NOT read the column metadata buffers
584        // at this point.  We only want to read the column metadata for columns we are actually loading.
585        let all_metadata_bytes =
586            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
587
588        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
589        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
590
591        // Next, read the metadata for the columns
592        // This is both the column metadata and the CMO table
593        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
594        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
595        let column_metadata_bytes =
596            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
597        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
598
599        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
600        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
601        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
602
603        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
604
605        Ok(CachedFileMetadata {
606            file_schema: Arc::new(schema),
607            column_metadatas,
608            column_infos,
609            num_rows,
610            num_data_bytes,
611            num_column_metadata_bytes,
612            num_global_buffer_bytes,
613            num_footer_bytes,
614            file_buffers: gbo_table,
615            major_version: footer.major_version,
616            minor_version: footer.minor_version,
617        })
618    }
619
620    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
621        match &encoding.location {
622            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
623            Some(pbfile::encoding::Location::Direct(encoding)) => {
624                let encoding_buf = Bytes::from(encoding.encoding.clone());
625                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
626                encoding_any.to_msg::<M>().unwrap()
627            }
628            Some(pbfile::encoding::Location::None(_)) => panic!(),
629            None => panic!(),
630        }
631    }
632
633    fn meta_to_col_infos(
634        column_metadatas: &[pbfile::ColumnMetadata],
635        file_version: LanceFileVersion,
636    ) -> Vec<Arc<ColumnInfo>> {
637        column_metadatas
638            .iter()
639            .enumerate()
640            .map(|(col_idx, col_meta)| {
641                let page_infos = col_meta
642                    .pages
643                    .iter()
644                    .map(|page| {
645                        let num_rows = page.length;
646                        let encoding = match file_version {
647                            LanceFileVersion::V2_0 => {
648                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
649                                    page.encoding.as_ref().unwrap(),
650                                ))
651                            }
652                            _ => {
653                                PageEncoding::Structural(Self::fetch_encoding::<pbenc::PageLayout>(
654                                    page.encoding.as_ref().unwrap(),
655                                ))
656                            }
657                        };
658                        let buffer_offsets_and_sizes = Arc::from(
659                            page.buffer_offsets
660                                .iter()
661                                .zip(page.buffer_sizes.iter())
662                                .map(|(offset, size)| {
663                                    // Starting with version 2.1 we can assert that page buffers are aligned
664                                    assert!(
665                                        file_version < LanceFileVersion::V2_1
666                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
667                                    );
668                                    (*offset, *size)
669                                })
670                                .collect::<Vec<_>>(),
671                        );
672                        PageInfo {
673                            buffer_offsets_and_sizes,
674                            encoding,
675                            num_rows,
676                            priority: page.priority,
677                        }
678                    })
679                    .collect::<Vec<_>>();
680                let buffer_offsets_and_sizes = Arc::from(
681                    col_meta
682                        .buffer_offsets
683                        .iter()
684                        .zip(col_meta.buffer_sizes.iter())
685                        .map(|(offset, size)| (*offset, *size))
686                        .collect::<Vec<_>>(),
687                );
688                Arc::new(ColumnInfo {
689                    index: col_idx as u32,
690                    page_infos: Arc::from(page_infos),
691                    buffer_offsets_and_sizes,
692                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
693                })
694            })
695            .collect::<Vec<_>>()
696    }
697
698    fn validate_projection(
699        projection: &ReaderProjection,
700        metadata: &CachedFileMetadata,
701    ) -> Result<()> {
702        if projection.schema.fields.is_empty() {
703            return Err(Error::invalid_input(
704                "Attempt to read zero columns from the file, at least one column must be specified"
705                    .to_string(),
706                location!(),
707            ));
708        }
709        let mut column_indices_seen = BTreeSet::new();
710        for column_index in &projection.column_indices {
711            if !column_indices_seen.insert(*column_index) {
712                return Err(Error::invalid_input(
713                    format!(
714                        "The projection specified the column index {} more than once",
715                        column_index
716                    ),
717                    location!(),
718                ));
719            }
720            if *column_index >= metadata.column_infos.len() as u32 {
721                return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
722            }
723        }
724        Ok(())
725    }
726
727    /// Opens a new file reader without any pre-existing knowledge
728    ///
729    /// This will read the file schema from the file itself and thus requires a bit more I/O
730    ///
731    /// A `base_projection` can also be provided.  If provided, then the projection will apply
732    /// to all reads from the file that do not specify their own projection.
733    pub async fn try_open(
734        scheduler: FileScheduler,
735        base_projection: Option<ReaderProjection>,
736        decoder_plugins: Arc<DecoderPlugins>,
737        cache: &FileMetadataCache,
738        options: FileReaderOptions,
739    ) -> Result<Self> {
740        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
741        let path = scheduler.reader().path().clone();
742        Self::try_open_with_file_metadata(
743            Arc::new(LanceEncodingsIo(scheduler)),
744            path,
745            base_projection,
746            decoder_plugins,
747            file_metadata,
748            cache,
749            options,
750        )
751        .await
752    }
753
754    /// Same as `try_open` but with the file metadata already loaded.
755    ///
756    /// This method also can accept any kind of `EncodingsIo` implementation allowing
757    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
758    /// disks it may be better to avoid asynchronous overhead).
759    pub async fn try_open_with_file_metadata(
760        scheduler: Arc<dyn EncodingsIo>,
761        path: Path,
762        base_projection: Option<ReaderProjection>,
763        decoder_plugins: Arc<DecoderPlugins>,
764        file_metadata: Arc<CachedFileMetadata>,
765        cache: &FileMetadataCache,
766        options: FileReaderOptions,
767    ) -> Result<Self> {
768        let cache = Arc::new(cache.with_base_path(path));
769
770        if let Some(base_projection) = base_projection.as_ref() {
771            Self::validate_projection(base_projection, &file_metadata)?;
772        }
773        let num_rows = file_metadata.num_rows;
774        Ok(Self {
775            scheduler,
776            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
777                file_metadata.file_schema.as_ref(),
778                file_metadata.version(),
779            )),
780            num_rows,
781            metadata: file_metadata,
782            decoder_plugins,
783            cache,
784            options,
785        })
786    }
787
788    // The actual decoder needs all the column infos that make up a type.  In other words, if
789    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
790    //
791    // This is a file reader concern because the file reader needs to support late projection of columns
792    // and so it will need to figure this out anyways.
793    //
794    // It's a bit of a tricky process though because the number of column infos may depend on the
795    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
796    // only be a single column in the file (and not 3).
797    //
798    // At the moment this method words because our rules are simple and we just repeat them here.  See
799    // Self::default_projection for a similar problem.  In the future this is something the encodings
800    // registry will need to figure out.
801    fn collect_columns_from_projection(
802        &self,
803        _projection: &ReaderProjection,
804    ) -> Result<Vec<Arc<ColumnInfo>>> {
805        Ok(self.metadata.column_infos.to_vec())
806    }
807
808    #[allow(clippy::too_many_arguments)]
809    fn do_read_range(
810        column_infos: Vec<Arc<ColumnInfo>>,
811        io: Arc<dyn EncodingsIo>,
812        cache: Arc<FileMetadataCache>,
813        num_rows: u64,
814        decoder_plugins: Arc<DecoderPlugins>,
815        range: Range<u64>,
816        batch_size: u32,
817        projection: ReaderProjection,
818        filter: FilterExpression,
819        should_validate: bool,
820    ) -> Result<BoxStream<'static, ReadBatchTask>> {
821        debug!(
822            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
823            range,
824            batch_size,
825            num_rows,
826            column_infos.len(),
827            projection.schema.fields.len(),
828        );
829
830        let config = SchedulerDecoderConfig {
831            batch_size,
832            cache,
833            decoder_plugins,
834            io,
835            should_validate,
836        };
837
838        let requested_rows = RequestedRows::Ranges(vec![range]);
839
840        Ok(schedule_and_decode(
841            column_infos,
842            requested_rows,
843            filter,
844            projection.column_indices,
845            projection.schema,
846            config,
847        ))
848    }
849
850    fn read_range(
851        &self,
852        range: Range<u64>,
853        batch_size: u32,
854        projection: ReaderProjection,
855        filter: FilterExpression,
856    ) -> Result<BoxStream<'static, ReadBatchTask>> {
857        // Create and initialize the stream
858        Self::do_read_range(
859            self.collect_columns_from_projection(&projection)?,
860            self.scheduler.clone(),
861            self.cache.clone(),
862            self.num_rows,
863            self.decoder_plugins.clone(),
864            range,
865            batch_size,
866            projection,
867            filter,
868            self.options.validate_on_decode,
869        )
870    }
871
872    #[allow(clippy::too_many_arguments)]
873    fn do_take_rows(
874        column_infos: Vec<Arc<ColumnInfo>>,
875        io: Arc<dyn EncodingsIo>,
876        cache: Arc<FileMetadataCache>,
877        decoder_plugins: Arc<DecoderPlugins>,
878        indices: Vec<u64>,
879        batch_size: u32,
880        projection: ReaderProjection,
881        filter: FilterExpression,
882        should_validate: bool,
883    ) -> Result<BoxStream<'static, ReadBatchTask>> {
884        debug!(
885            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
886            indices.len(),
887            indices[0],
888            indices[indices.len() - 1],
889            batch_size,
890            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
891        );
892
893        let config = SchedulerDecoderConfig {
894            batch_size,
895            cache,
896            decoder_plugins,
897            io,
898            should_validate,
899        };
900
901        let requested_rows = RequestedRows::Indices(indices);
902
903        Ok(schedule_and_decode(
904            column_infos,
905            requested_rows,
906            filter,
907            projection.column_indices,
908            projection.schema,
909            config,
910        ))
911    }
912
913    fn take_rows(
914        &self,
915        indices: Vec<u64>,
916        batch_size: u32,
917        projection: ReaderProjection,
918    ) -> Result<BoxStream<'static, ReadBatchTask>> {
919        // Create and initialize the stream
920        Self::do_take_rows(
921            self.collect_columns_from_projection(&projection)?,
922            self.scheduler.clone(),
923            self.cache.clone(),
924            self.decoder_plugins.clone(),
925            indices,
926            batch_size,
927            projection,
928            FilterExpression::no_filter(),
929            self.options.validate_on_decode,
930        )
931    }
932
933    #[allow(clippy::too_many_arguments)]
934    fn do_read_ranges(
935        column_infos: Vec<Arc<ColumnInfo>>,
936        io: Arc<dyn EncodingsIo>,
937        cache: Arc<FileMetadataCache>,
938        decoder_plugins: Arc<DecoderPlugins>,
939        ranges: Vec<Range<u64>>,
940        batch_size: u32,
941        projection: ReaderProjection,
942        filter: FilterExpression,
943        should_validate: bool,
944    ) -> Result<BoxStream<'static, ReadBatchTask>> {
945        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
946        debug!(
947            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
948            ranges.len(),
949            num_rows,
950            ranges[0].start,
951            ranges[ranges.len() - 1].end,
952            batch_size,
953            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
954        );
955
956        let config = SchedulerDecoderConfig {
957            batch_size,
958            cache,
959            decoder_plugins,
960            io,
961            should_validate,
962        };
963
964        let requested_rows = RequestedRows::Ranges(ranges);
965
966        Ok(schedule_and_decode(
967            column_infos,
968            requested_rows,
969            filter,
970            projection.column_indices,
971            projection.schema,
972            config,
973        ))
974    }
975
976    fn read_ranges(
977        &self,
978        ranges: Vec<Range<u64>>,
979        batch_size: u32,
980        projection: ReaderProjection,
981        filter: FilterExpression,
982    ) -> Result<BoxStream<'static, ReadBatchTask>> {
983        Self::do_read_ranges(
984            self.collect_columns_from_projection(&projection)?,
985            self.scheduler.clone(),
986            self.cache.clone(),
987            self.decoder_plugins.clone(),
988            ranges,
989            batch_size,
990            projection,
991            filter,
992            self.options.validate_on_decode,
993        )
994    }
995
996    /// Creates a stream of "read tasks" to read the data from the file
997    ///
998    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
999    /// of record batches it returns a stream of "read tasks".
1000    ///
1001    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
1002    ///
1003    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
1004    /// reading happens asynchronously in the background.  In other words, a single read task may map to
1005    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
1006    pub fn read_tasks(
1007        &self,
1008        params: ReadBatchParams,
1009        batch_size: u32,
1010        projection: Option<ReaderProjection>,
1011        filter: FilterExpression,
1012    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1013        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1014        Self::validate_projection(&projection, &self.metadata)?;
1015        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1016            if bound > self.num_rows || bound == self.num_rows && inclusive {
1017                Err(Error::invalid_input(
1018                    format!(
1019                        "cannot read {:?} from file with {} rows",
1020                        params, self.num_rows
1021                    ),
1022                    location!(),
1023                ))
1024            } else {
1025                Ok(())
1026            }
1027        };
1028        match &params {
1029            ReadBatchParams::Indices(indices) => {
1030                for idx in indices {
1031                    match idx {
1032                        None => {
1033                            return Err(Error::invalid_input(
1034                                "Null value in indices array",
1035                                location!(),
1036                            ));
1037                        }
1038                        Some(idx) => {
1039                            verify_bound(&params, idx as u64, true)?;
1040                        }
1041                    }
1042                }
1043                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1044                self.take_rows(indices, batch_size, projection)
1045            }
1046            ReadBatchParams::Range(range) => {
1047                verify_bound(&params, range.end as u64, false)?;
1048                self.read_range(
1049                    range.start as u64..range.end as u64,
1050                    batch_size,
1051                    projection,
1052                    filter,
1053                )
1054            }
1055            ReadBatchParams::Ranges(ranges) => {
1056                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1057                for range in ranges.as_ref() {
1058                    verify_bound(&params, range.end, false)?;
1059                    ranges_u64.push(range.start..range.end);
1060                }
1061                self.read_ranges(ranges_u64, batch_size, projection, filter)
1062            }
1063            ReadBatchParams::RangeFrom(range) => {
1064                verify_bound(&params, range.start as u64, true)?;
1065                self.read_range(
1066                    range.start as u64..self.num_rows,
1067                    batch_size,
1068                    projection,
1069                    filter,
1070                )
1071            }
1072            ReadBatchParams::RangeTo(range) => {
1073                verify_bound(&params, range.end as u64, false)?;
1074                self.read_range(0..range.end as u64, batch_size, projection, filter)
1075            }
1076            ReadBatchParams::RangeFull => {
1077                self.read_range(0..self.num_rows, batch_size, projection, filter)
1078            }
1079        }
1080    }
1081
1082    /// Reads data from the file as a stream of record batches
1083    ///
1084    /// * `params` - Specifies the range (or indices) of data to read
1085    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1086    ///   if it is the last batch or if it is not possible to create a batch of the
1087    ///   requested size.
1088    ///
1089    ///   For example, if the batch size is 1024 and one of the columns is a string
1090    ///   column then there may be some ranges of 1024 rows that contain more than
1091    ///   2^31 bytes of string data (which is the maximum size of a string column
1092    ///   in Arrow).  In this case smaller batches may be emitted.
1093    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1094    ///   amount of CPU parallelism of the read.  In other words it controls how many
1095    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1096    ///   of the read (how many I/O requests are in flight at once).
1097    ///
1098    ///   This parameter also is also related to backpressure.  If the consumer of the
1099    ///   stream is slow then the reader will build up RAM.
1100    /// * `projection` - A projection to apply to the read.  This controls which columns
1101    ///   are read from the file.  The projection is NOT applied on top of the base
1102    ///   projection.  The projection is applied directly to the file schema.
1103    pub fn read_stream_projected(
1104        &self,
1105        params: ReadBatchParams,
1106        batch_size: u32,
1107        batch_readahead: u32,
1108        projection: ReaderProjection,
1109        filter: FilterExpression,
1110    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1111        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1112        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1113        let batch_stream = tasks_stream
1114            .map(|task| task.task)
1115            .buffered(batch_readahead as usize)
1116            .boxed();
1117        Ok(Box::pin(RecordBatchStreamAdapter::new(
1118            arrow_schema,
1119            batch_stream,
1120        )))
1121    }
1122
1123    fn take_rows_blocking(
1124        &self,
1125        indices: Vec<u64>,
1126        batch_size: u32,
1127        projection: ReaderProjection,
1128        filter: FilterExpression,
1129    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1130        let column_infos = self.collect_columns_from_projection(&projection)?;
1131        debug!(
1132            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1133            indices.len(),
1134            indices[0],
1135            indices[indices.len() - 1],
1136            batch_size,
1137            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1138        );
1139
1140        let config = SchedulerDecoderConfig {
1141            batch_size,
1142            cache: self.cache.clone(),
1143            decoder_plugins: self.decoder_plugins.clone(),
1144            io: self.scheduler.clone(),
1145            should_validate: self.options.validate_on_decode,
1146        };
1147
1148        let requested_rows = RequestedRows::Indices(indices);
1149
1150        schedule_and_decode_blocking(
1151            column_infos,
1152            requested_rows,
1153            filter,
1154            projection.column_indices,
1155            projection.schema,
1156            config,
1157        )
1158    }
1159
1160    fn read_ranges_blocking(
1161        &self,
1162        ranges: Vec<Range<u64>>,
1163        batch_size: u32,
1164        projection: ReaderProjection,
1165        filter: FilterExpression,
1166    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1167        let column_infos = self.collect_columns_from_projection(&projection)?;
1168        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1169        debug!(
1170            "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1171            ranges.len(),
1172            num_rows,
1173            ranges[0].start,
1174            ranges[ranges.len() - 1].end,
1175            batch_size,
1176            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1177        );
1178
1179        let config = SchedulerDecoderConfig {
1180            batch_size,
1181            cache: self.cache.clone(),
1182            decoder_plugins: self.decoder_plugins.clone(),
1183            io: self.scheduler.clone(),
1184            should_validate: self.options.validate_on_decode,
1185        };
1186
1187        let requested_rows = RequestedRows::Ranges(ranges);
1188
1189        schedule_and_decode_blocking(
1190            column_infos,
1191            requested_rows,
1192            filter,
1193            projection.column_indices,
1194            projection.schema,
1195            config,
1196        )
1197    }
1198
1199    fn read_range_blocking(
1200        &self,
1201        range: Range<u64>,
1202        batch_size: u32,
1203        projection: ReaderProjection,
1204        filter: FilterExpression,
1205    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1206        let column_infos = self.collect_columns_from_projection(&projection)?;
1207        let num_rows = self.num_rows;
1208
1209        debug!(
1210            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1211            range,
1212            batch_size,
1213            num_rows,
1214            column_infos.len(),
1215            projection.schema.fields.len(),
1216        );
1217
1218        let config = SchedulerDecoderConfig {
1219            batch_size,
1220            cache: self.cache.clone(),
1221            decoder_plugins: self.decoder_plugins.clone(),
1222            io: self.scheduler.clone(),
1223            should_validate: self.options.validate_on_decode,
1224        };
1225
1226        let requested_rows = RequestedRows::Ranges(vec![range]);
1227
1228        schedule_and_decode_blocking(
1229            column_infos,
1230            requested_rows,
1231            filter,
1232            projection.column_indices,
1233            projection.schema,
1234            config,
1235        )
1236    }
1237
1238    /// Read data from the file as an iterator of record batches
1239    ///
1240    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1241    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1242    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1243    ///
1244    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1245    /// use this method) because we can parallelize the decode.
1246    ///
1247    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1248    /// from a spawn_blocking context.
1249    pub fn read_stream_projected_blocking(
1250        &self,
1251        params: ReadBatchParams,
1252        batch_size: u32,
1253        projection: Option<ReaderProjection>,
1254        filter: FilterExpression,
1255    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1256        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1257        Self::validate_projection(&projection, &self.metadata)?;
1258        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1259            if bound > self.num_rows || bound == self.num_rows && inclusive {
1260                Err(Error::invalid_input(
1261                    format!(
1262                        "cannot read {:?} from file with {} rows",
1263                        params, self.num_rows
1264                    ),
1265                    location!(),
1266                ))
1267            } else {
1268                Ok(())
1269            }
1270        };
1271        match &params {
1272            ReadBatchParams::Indices(indices) => {
1273                for idx in indices {
1274                    match idx {
1275                        None => {
1276                            return Err(Error::invalid_input(
1277                                "Null value in indices array",
1278                                location!(),
1279                            ));
1280                        }
1281                        Some(idx) => {
1282                            verify_bound(&params, idx as u64, true)?;
1283                        }
1284                    }
1285                }
1286                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1287                self.take_rows_blocking(indices, batch_size, projection, filter)
1288            }
1289            ReadBatchParams::Range(range) => {
1290                verify_bound(&params, range.end as u64, false)?;
1291                self.read_range_blocking(
1292                    range.start as u64..range.end as u64,
1293                    batch_size,
1294                    projection,
1295                    filter,
1296                )
1297            }
1298            ReadBatchParams::Ranges(ranges) => {
1299                let mut ranges_u64 = Vec::with_capacity(ranges.len());
1300                for range in ranges.as_ref() {
1301                    verify_bound(&params, range.end, false)?;
1302                    ranges_u64.push(range.start..range.end);
1303                }
1304                self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1305            }
1306            ReadBatchParams::RangeFrom(range) => {
1307                verify_bound(&params, range.start as u64, true)?;
1308                self.read_range_blocking(
1309                    range.start as u64..self.num_rows,
1310                    batch_size,
1311                    projection,
1312                    filter,
1313                )
1314            }
1315            ReadBatchParams::RangeTo(range) => {
1316                verify_bound(&params, range.end as u64, false)?;
1317                self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1318            }
1319            ReadBatchParams::RangeFull => {
1320                self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1321            }
1322        }
1323    }
1324
1325    /// Reads data from the file as a stream of record batches
1326    ///
1327    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1328    /// provided when the file was opened (or reads all columns if the file was
1329    /// opened without a base projection)
1330    pub fn read_stream(
1331        &self,
1332        params: ReadBatchParams,
1333        batch_size: u32,
1334        batch_readahead: u32,
1335        filter: FilterExpression,
1336    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1337        self.read_stream_projected(
1338            params,
1339            batch_size,
1340            batch_readahead,
1341            self.base_projection.clone(),
1342            filter,
1343        )
1344    }
1345
1346    pub fn schema(&self) -> &Arc<Schema> {
1347        &self.metadata.file_schema
1348    }
1349}
1350
1351/// Inspects a page and returns a String describing the page's encoding
1352pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1353    if let Some(encoding) = &page.encoding {
1354        if let Some(style) = &encoding.location {
1355            match style {
1356                pbfile::encoding::Location::Indirect(indirect) => {
1357                    format!(
1358                        "IndirectEncoding(pos={},size={})",
1359                        indirect.buffer_location, indirect.buffer_length
1360                    )
1361                }
1362                pbfile::encoding::Location::Direct(direct) => {
1363                    let encoding_any =
1364                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1365                            .expect("failed to deserialize encoding as protobuf");
1366                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1367                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1368                        match encoding {
1369                            Ok(encoding) => {
1370                                format!("{:#?}", encoding)
1371                            }
1372                            Err(err) => {
1373                                format!("Unsupported(decode_err={})", err)
1374                            }
1375                        }
1376                    } else if encoding_any.type_url == "/lance.encodings.PageLayout" {
1377                        let encoding = encoding_any.to_msg::<pbenc::PageLayout>();
1378                        match encoding {
1379                            Ok(encoding) => {
1380                                format!("{:#?}", encoding)
1381                            }
1382                            Err(err) => {
1383                                format!("Unsupported(decode_err={})", err)
1384                            }
1385                        }
1386                    } else {
1387                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1388                    }
1389                }
1390                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1391            }
1392        } else {
1393            "MISSING STYLE".to_string()
1394        }
1395    } else {
1396        "MISSING".to_string()
1397    }
1398}
1399
1400pub trait EncodedBatchReaderExt {
1401    fn try_from_mini_lance(
1402        bytes: Bytes,
1403        schema: &Schema,
1404        version: LanceFileVersion,
1405    ) -> Result<Self>
1406    where
1407        Self: Sized;
1408    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1409    where
1410        Self: Sized;
1411}
1412
1413impl EncodedBatchReaderExt for EncodedBatch {
1414    fn try_from_mini_lance(
1415        bytes: Bytes,
1416        schema: &Schema,
1417        file_version: LanceFileVersion,
1418    ) -> Result<Self>
1419    where
1420        Self: Sized,
1421    {
1422        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1423        let footer = FileReader::decode_footer(&bytes)?;
1424
1425        // Next, read the metadata for the columns
1426        // This is both the column metadata and the CMO table
1427        let column_metadata_start = footer.column_meta_start as usize;
1428        let column_metadata_end = footer.global_buff_offsets_start as usize;
1429        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1430        let column_metadatas =
1431            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1432
1433        let file_version = LanceFileVersion::try_from_major_minor(
1434            footer.major_version as u32,
1435            footer.minor_version as u32,
1436        )?;
1437
1438        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1439
1440        Ok(Self {
1441            data: bytes,
1442            num_rows: page_table
1443                .first()
1444                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1445                .unwrap_or(0),
1446            page_table,
1447            top_level_columns: projection.column_indices,
1448            schema: Arc::new(schema.clone()),
1449        })
1450    }
1451
1452    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1453    where
1454        Self: Sized,
1455    {
1456        let footer = FileReader::decode_footer(&bytes)?;
1457        let file_version = LanceFileVersion::try_from_major_minor(
1458            footer.major_version as u32,
1459            footer.minor_version as u32,
1460        )?;
1461
1462        let gbo_table = FileReader::do_decode_gbo_table(
1463            &bytes.slice(footer.global_buff_offsets_start as usize..),
1464            &footer,
1465            file_version,
1466        )?;
1467        if gbo_table.is_empty() {
1468            return Err(Error::Internal {
1469                message: "File did not contain any global buffers, schema expected".to_string(),
1470                location: location!(),
1471            });
1472        }
1473        let schema_start = gbo_table[0].position as usize;
1474        let schema_size = gbo_table[0].size as usize;
1475
1476        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1477        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1478        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1479
1480        // Next, read the metadata for the columns
1481        // This is both the column metadata and the CMO table
1482        let column_metadata_start = footer.column_meta_start as usize;
1483        let column_metadata_end = footer.global_buff_offsets_start as usize;
1484        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1485        let column_metadatas =
1486            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1487
1488        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1489
1490        Ok(Self {
1491            data: bytes,
1492            num_rows: page_table
1493                .first()
1494                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1495                .unwrap_or(0),
1496            page_table,
1497            top_level_columns: projection.column_indices,
1498            schema: Arc::new(schema),
1499        })
1500    }
1501}
1502
1503#[cfg(test)]
1504pub mod tests {
1505    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1506
1507    use arrow_array::{
1508        types::{Float64Type, Int32Type},
1509        RecordBatch, UInt32Array,
1510    };
1511    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1512    use bytes::Bytes;
1513    use futures::{prelude::stream::TryStreamExt, StreamExt};
1514    use lance_arrow::RecordBatchExt;
1515    use lance_core::{datatypes::Schema, ArrowResult};
1516    use lance_datagen::{array, gen, BatchCount, ByteCount, RowCount};
1517    use lance_encoding::{
1518        decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1519        encoder::{encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodingOptions},
1520        version::LanceFileVersion,
1521    };
1522    use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1523    use log::debug;
1524    use tokio::sync::mpsc;
1525
1526    use crate::v2::{
1527        reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1528        testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1529        writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1530    };
1531
1532    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1533        let location_type = DataType::Struct(Fields::from(vec![
1534            Field::new("x", DataType::Float64, true),
1535            Field::new("y", DataType::Float64, true),
1536        ]));
1537        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1538
1539        let mut reader = gen()
1540            .col("score", array::rand::<Float64Type>())
1541            .col("location", array::rand_type(&location_type))
1542            .col("categories", array::rand_type(&categories_type))
1543            .col("binary", array::rand_type(&DataType::Binary));
1544        if version <= LanceFileVersion::V2_0 {
1545            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1546        }
1547        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1548
1549        write_lance_file(
1550            reader,
1551            fs,
1552            FileWriterOptions {
1553                format_version: Some(version),
1554                ..Default::default()
1555            },
1556        )
1557        .await
1558    }
1559
1560    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1561
1562    async fn verify_expected(
1563        expected: &[RecordBatch],
1564        mut actual: Pin<Box<dyn RecordBatchStream>>,
1565        read_size: u32,
1566        transform: Option<Transformer>,
1567    ) {
1568        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1569        let mut expected_iter = expected.iter().map(|batch| {
1570            if let Some(transform) = &transform {
1571                transform(batch)
1572            } else {
1573                batch.clone()
1574            }
1575        });
1576        let mut next_expected = expected_iter.next().unwrap().clone();
1577        while let Some(actual) = actual.next().await {
1578            let mut actual = actual.unwrap();
1579            let mut rows_to_verify = actual.num_rows() as u32;
1580            let expected_length = remaining.min(read_size);
1581            assert_eq!(expected_length, rows_to_verify);
1582
1583            while rows_to_verify > 0 {
1584                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1585                assert_eq!(
1586                    next_expected.slice(0, next_slice_len as usize),
1587                    actual.slice(0, next_slice_len as usize)
1588                );
1589                remaining -= next_slice_len;
1590                rows_to_verify -= next_slice_len;
1591                if remaining > 0 {
1592                    if next_slice_len == next_expected.num_rows() as u32 {
1593                        next_expected = expected_iter.next().unwrap().clone();
1594                    } else {
1595                        next_expected = next_expected.slice(
1596                            next_slice_len as usize,
1597                            next_expected.num_rows() - next_slice_len as usize,
1598                        );
1599                    }
1600                }
1601                if rows_to_verify > 0 {
1602                    actual = actual.slice(
1603                        next_slice_len as usize,
1604                        actual.num_rows() - next_slice_len as usize,
1605                    );
1606                }
1607            }
1608        }
1609        assert_eq!(remaining, 0);
1610    }
1611
1612    #[tokio::test]
1613    async fn test_round_trip() {
1614        let fs = FsFixture::default();
1615
1616        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1617
1618        for read_size in [32, 1024, 1024 * 1024] {
1619            let file_scheduler = fs
1620                .scheduler
1621                .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1622                .await
1623                .unwrap();
1624            let file_reader = FileReader::try_open(
1625                file_scheduler,
1626                None,
1627                Arc::<DecoderPlugins>::default(),
1628                &test_cache(),
1629                FileReaderOptions::default(),
1630            )
1631            .await
1632            .unwrap();
1633
1634            let schema = file_reader.schema();
1635            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1636
1637            let batch_stream = file_reader
1638                .read_stream(
1639                    lance_io::ReadBatchParams::RangeFull,
1640                    read_size,
1641                    16,
1642                    FilterExpression::no_filter(),
1643                )
1644                .unwrap();
1645
1646            verify_expected(&data, batch_stream, read_size, None).await;
1647        }
1648    }
1649
1650    #[test_log::test(tokio::test)]
1651    async fn test_encoded_batch_round_trip() {
1652        let data = gen()
1653            .col("x", array::rand::<Int32Type>())
1654            .col("y", array::rand_utf8(ByteCount::from(16), false))
1655            .into_batch_rows(RowCount::from(10000))
1656            .unwrap();
1657
1658        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1659
1660        let encoding_options = EncodingOptions {
1661            cache_bytes_per_column: 4096,
1662            max_page_bytes: 32 * 1024 * 1024,
1663            keep_original_array: true,
1664            buffer_alignment: 64,
1665        };
1666        let encoded_batch = encode_batch(
1667            &data,
1668            lance_schema.clone(),
1669            &CoreFieldEncodingStrategy::default(),
1670            &encoding_options,
1671        )
1672        .await
1673        .unwrap();
1674
1675        // Test self described
1676        let bytes = encoded_batch.try_to_self_described_lance().unwrap();
1677
1678        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1679
1680        let decoded = decode_batch(
1681            &decoded_batch,
1682            &FilterExpression::no_filter(),
1683            Arc::<DecoderPlugins>::default(),
1684            false,
1685            LanceFileVersion::default(),
1686            None,
1687        )
1688        .await
1689        .unwrap();
1690
1691        assert_eq!(data, decoded);
1692
1693        // Test mini
1694        let bytes = encoded_batch.try_to_mini_lance().unwrap();
1695        let decoded_batch =
1696            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1697                .unwrap();
1698        let decoded = decode_batch(
1699            &decoded_batch,
1700            &FilterExpression::no_filter(),
1701            Arc::<DecoderPlugins>::default(),
1702            false,
1703            LanceFileVersion::default(),
1704            None,
1705        )
1706        .await
1707        .unwrap();
1708
1709        assert_eq!(data, decoded);
1710    }
1711
1712    #[test_log::test(tokio::test)]
1713    async fn test_projection() {
1714        let fs = FsFixture::default();
1715
1716        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1717        let file_scheduler = fs
1718            .scheduler
1719            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1720            .await
1721            .unwrap();
1722
1723        let field_id_mapping = written_file
1724            .field_id_mapping
1725            .iter()
1726            .copied()
1727            .collect::<BTreeMap<_, _>>();
1728
1729        let empty_projection = ReaderProjection {
1730            column_indices: Vec::default(),
1731            schema: Arc::new(Schema::default()),
1732        };
1733
1734        for columns in [
1735            vec!["score"],
1736            vec!["location"],
1737            vec!["categories"],
1738            vec!["score.x"],
1739            vec!["score", "categories"],
1740            vec!["score", "location"],
1741            vec!["location", "categories"],
1742            vec!["score.y", "location", "categories"],
1743        ] {
1744            debug!("Testing round trip with projection {:?}", columns);
1745            // We can specify the projection as part of the read operation via read_stream_projected
1746            let file_reader = FileReader::try_open(
1747                file_scheduler.clone(),
1748                None,
1749                Arc::<DecoderPlugins>::default(),
1750                &test_cache(),
1751                FileReaderOptions::default(),
1752            )
1753            .await
1754            .unwrap();
1755
1756            let projected_schema = written_file.schema.project(&columns).unwrap();
1757            let projection = ReaderProjection::from_field_ids(
1758                &file_reader,
1759                &projected_schema,
1760                &field_id_mapping,
1761            )
1762            .unwrap();
1763
1764            let batch_stream = file_reader
1765                .read_stream_projected(
1766                    lance_io::ReadBatchParams::RangeFull,
1767                    1024,
1768                    16,
1769                    projection.clone(),
1770                    FilterExpression::no_filter(),
1771                )
1772                .unwrap();
1773
1774            let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1775            verify_expected(
1776                &written_file.data,
1777                batch_stream,
1778                1024,
1779                Some(Box::new(move |batch: &RecordBatch| {
1780                    batch.project_by_schema(&projection_arrow).unwrap()
1781                })),
1782            )
1783            .await;
1784
1785            // We can also specify the projection as a base projection when we open the file
1786            let file_reader = FileReader::try_open(
1787                file_scheduler.clone(),
1788                Some(projection.clone()),
1789                Arc::<DecoderPlugins>::default(),
1790                &test_cache(),
1791                FileReaderOptions::default(),
1792            )
1793            .await
1794            .unwrap();
1795
1796            let batch_stream = file_reader
1797                .read_stream(
1798                    lance_io::ReadBatchParams::RangeFull,
1799                    1024,
1800                    16,
1801                    FilterExpression::no_filter(),
1802                )
1803                .unwrap();
1804
1805            let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1806            verify_expected(
1807                &written_file.data,
1808                batch_stream,
1809                1024,
1810                Some(Box::new(move |batch: &RecordBatch| {
1811                    batch.project_by_schema(&projection_arrow).unwrap()
1812                })),
1813            )
1814            .await;
1815
1816            assert!(file_reader
1817                .read_stream_projected(
1818                    lance_io::ReadBatchParams::RangeFull,
1819                    1024,
1820                    16,
1821                    empty_projection.clone(),
1822                    FilterExpression::no_filter(),
1823                )
1824                .is_err());
1825        }
1826
1827        assert!(FileReader::try_open(
1828            file_scheduler.clone(),
1829            Some(empty_projection),
1830            Arc::<DecoderPlugins>::default(),
1831            &test_cache(),
1832            FileReaderOptions::default(),
1833        )
1834        .await
1835        .is_err());
1836
1837        let arrow_schema = ArrowSchema::new(vec![
1838            Field::new("x", DataType::Int32, true),
1839            Field::new("y", DataType::Int32, true),
1840        ]);
1841        let schema = Schema::try_from(&arrow_schema).unwrap();
1842
1843        let projection_with_dupes = ReaderProjection {
1844            column_indices: vec![0, 0],
1845            schema: Arc::new(schema),
1846        };
1847
1848        assert!(FileReader::try_open(
1849            file_scheduler.clone(),
1850            Some(projection_with_dupes),
1851            Arc::<DecoderPlugins>::default(),
1852            &test_cache(),
1853            FileReaderOptions::default(),
1854        )
1855        .await
1856        .is_err());
1857    }
1858
1859    #[test_log::test(tokio::test)]
1860    async fn test_compressing_buffer() {
1861        let fs = FsFixture::default();
1862
1863        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1864        let file_scheduler = fs
1865            .scheduler
1866            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1867            .await
1868            .unwrap();
1869
1870        // We can specify the projection as part of the read operation via read_stream_projected
1871        let file_reader = FileReader::try_open(
1872            file_scheduler.clone(),
1873            None,
1874            Arc::<DecoderPlugins>::default(),
1875            &test_cache(),
1876            FileReaderOptions::default(),
1877        )
1878        .await
1879        .unwrap();
1880
1881        let mut projection = written_file.schema.project(&["score"]).unwrap();
1882        for field in projection.fields.iter_mut() {
1883            field
1884                .metadata
1885                .insert("lance:compression".to_string(), "zstd".to_string());
1886        }
1887        let projection = ReaderProjection {
1888            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1889            schema: Arc::new(projection),
1890        };
1891
1892        let batch_stream = file_reader
1893            .read_stream_projected(
1894                lance_io::ReadBatchParams::RangeFull,
1895                1024,
1896                16,
1897                projection.clone(),
1898                FilterExpression::no_filter(),
1899            )
1900            .unwrap();
1901
1902        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1903        verify_expected(
1904            &written_file.data,
1905            batch_stream,
1906            1024,
1907            Some(Box::new(move |batch: &RecordBatch| {
1908                batch.project_by_schema(&projection_arrow).unwrap()
1909            })),
1910        )
1911        .await;
1912    }
1913
1914    #[tokio::test]
1915    async fn test_read_all() {
1916        let fs = FsFixture::default();
1917        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1918        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1919
1920        let file_scheduler = fs
1921            .scheduler
1922            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1923            .await
1924            .unwrap();
1925        let file_reader = FileReader::try_open(
1926            file_scheduler.clone(),
1927            None,
1928            Arc::<DecoderPlugins>::default(),
1929            &test_cache(),
1930            FileReaderOptions::default(),
1931        )
1932        .await
1933        .unwrap();
1934
1935        let batches = file_reader
1936            .read_stream(
1937                lance_io::ReadBatchParams::RangeFull,
1938                total_rows as u32,
1939                16,
1940                FilterExpression::no_filter(),
1941            )
1942            .unwrap()
1943            .try_collect::<Vec<_>>()
1944            .await
1945            .unwrap();
1946        assert_eq!(batches.len(), 1);
1947        assert_eq!(batches[0].num_rows(), total_rows);
1948    }
1949
1950    #[tokio::test]
1951    async fn test_blocking_take() {
1952        let fs = FsFixture::default();
1953        let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
1954        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1955
1956        let file_scheduler = fs
1957            .scheduler
1958            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1959            .await
1960            .unwrap();
1961        let file_reader = FileReader::try_open(
1962            file_scheduler.clone(),
1963            Some(ReaderProjection::from_column_names(&schema, &["score"]).unwrap()),
1964            Arc::<DecoderPlugins>::default(),
1965            &test_cache(),
1966            FileReaderOptions::default(),
1967        )
1968        .await
1969        .unwrap();
1970
1971        let batches = tokio::task::spawn_blocking(move || {
1972            file_reader
1973                .read_stream_projected_blocking(
1974                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
1975                    total_rows as u32,
1976                    None,
1977                    FilterExpression::no_filter(),
1978                )
1979                .unwrap()
1980                .collect::<ArrowResult<Vec<_>>>()
1981                .unwrap()
1982        })
1983        .await
1984        .unwrap();
1985
1986        assert_eq!(batches.len(), 1);
1987        assert_eq!(batches[0].num_rows(), 5);
1988        assert_eq!(batches[0].num_columns(), 1);
1989    }
1990
1991    #[tokio::test(flavor = "multi_thread")]
1992    async fn test_drop_in_progress() {
1993        let fs = FsFixture::default();
1994        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1995        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1996
1997        let file_scheduler = fs
1998            .scheduler
1999            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2000            .await
2001            .unwrap();
2002        let file_reader = FileReader::try_open(
2003            file_scheduler.clone(),
2004            None,
2005            Arc::<DecoderPlugins>::default(),
2006            &test_cache(),
2007            FileReaderOptions::default(),
2008        )
2009        .await
2010        .unwrap();
2011
2012        let mut batches = file_reader
2013            .read_stream(
2014                lance_io::ReadBatchParams::RangeFull,
2015                (total_rows / 10) as u32,
2016                16,
2017                FilterExpression::no_filter(),
2018            )
2019            .unwrap();
2020
2021        drop(file_reader);
2022
2023        let batch = batches.next().await.unwrap().unwrap();
2024        assert!(batch.num_rows() > 0);
2025
2026        // Drop in-progress scan
2027        drop(batches);
2028    }
2029
2030    #[tokio::test]
2031    async fn drop_while_scheduling() {
2032        // This is a bit of a white-box test, pokes at the internals.  We want to
2033        // test the case where the read stream is dropped before the scheduling
2034        // thread finishes.  We can't do that in a black-box fashion because the
2035        // scheduling thread runs in the background and there is no easy way to
2036        // pause / gate it.
2037
2038        // It's a regression for a bug where the scheduling thread would panic
2039        // if the stream was dropped before it finished.
2040
2041        let fs = FsFixture::default();
2042        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2043        let total_rows = written_file
2044            .data
2045            .iter()
2046            .map(|batch| batch.num_rows())
2047            .sum::<usize>();
2048
2049        let file_scheduler = fs
2050            .scheduler
2051            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2052            .await
2053            .unwrap();
2054        let file_reader = FileReader::try_open(
2055            file_scheduler.clone(),
2056            None,
2057            Arc::<DecoderPlugins>::default(),
2058            &test_cache(),
2059            FileReaderOptions::default(),
2060        )
2061        .await
2062        .unwrap();
2063
2064        let projection =
2065            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2066        let column_infos = file_reader
2067            .collect_columns_from_projection(&projection)
2068            .unwrap();
2069        let mut decode_scheduler = DecodeBatchScheduler::try_new(
2070            &projection.schema,
2071            &projection.column_indices,
2072            &column_infos,
2073            &vec![],
2074            total_rows as u64,
2075            Arc::<DecoderPlugins>::default(),
2076            file_reader.scheduler.clone(),
2077            test_cache(),
2078            &FilterExpression::no_filter(),
2079        )
2080        .await
2081        .unwrap();
2082
2083        let range = 0..total_rows as u64;
2084
2085        let (tx, rx) = mpsc::unbounded_channel();
2086
2087        // Simulate the stream / decoder being dropped
2088        drop(rx);
2089
2090        // Scheduling should not panic
2091        decode_scheduler.schedule_range(
2092            range,
2093            &FilterExpression::no_filter(),
2094            tx,
2095            file_reader.scheduler.clone(),
2096        )
2097    }
2098
2099    #[tokio::test]
2100    async fn test_global_buffers() {
2101        let fs = FsFixture::default();
2102
2103        let lance_schema =
2104            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2105                "foo",
2106                DataType::Int32,
2107                true,
2108            )]))
2109            .unwrap();
2110
2111        let mut file_writer = FileWriter::try_new(
2112            fs.object_store.create(&fs.tmp_path).await.unwrap(),
2113            lance_schema.clone(),
2114            FileWriterOptions::default(),
2115        )
2116        .unwrap();
2117
2118        let test_bytes = Bytes::from_static(b"hello");
2119
2120        let buf_index = file_writer
2121            .add_global_buffer(test_bytes.clone())
2122            .await
2123            .unwrap();
2124
2125        assert_eq!(buf_index, 1);
2126
2127        file_writer.finish().await.unwrap();
2128
2129        let file_scheduler = fs
2130            .scheduler
2131            .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2132            .await
2133            .unwrap();
2134        let file_reader = FileReader::try_open(
2135            file_scheduler.clone(),
2136            None,
2137            Arc::<DecoderPlugins>::default(),
2138            &test_cache(),
2139            FileReaderOptions::default(),
2140        )
2141        .await
2142        .unwrap();
2143
2144        let buf = file_reader.read_global_buffer(1).await.unwrap();
2145        assert_eq!(buf, test_bytes);
2146    }
2147}