1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use futures::{Stream, StreamExt, stream::BoxStream};
17use lance_core::deepsize::{Context, DeepSizeOf};
18use lance_encoding::{
19 EncodingsIo,
20 decoder::{
21 ColumnInfo, DecoderConfig, DecoderPlugins, FilterExpression, PageEncoding, PageInfo,
22 ReadBatchTask, RequestedRows, SchedulerDecoderConfig, schedule_and_decode,
23 schedule_and_decode_blocking,
24 },
25 encoder::EncodedBatch,
26 version::LanceFileVersion,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31
32use lance_core::{
33 Error, Result,
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36};
37use lance_encoding::format::pb as pbenc;
38use lance_encoding::format::pb21 as pbenc21;
39use lance_io::{
40 ReadBatchParams,
41 scheduler::FileScheduler,
42 stream::{RecordBatchStream, RecordBatchStreamAdapter},
43};
44
45use crate::{
46 datatypes::{Fields, FieldsWithMeta},
47 format::{MAGIC, MAJOR_VERSION, MINOR_VERSION, pb, pbfile},
48 io::LanceEncodingsIo,
49 writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
55
56#[derive(Debug, DeepSizeOf)]
61pub struct BufferDescriptor {
62 pub position: u64,
63 pub size: u64,
64}
65
66#[derive(Debug)]
68pub struct FileStatistics {
69 pub columns: Vec<ColumnStatistics>,
71}
72
73#[derive(Debug)]
75pub struct ColumnStatistics {
76 pub num_pages: usize,
78 pub size_bytes: u64,
82}
83
84#[derive(Debug)]
86pub struct CachedFileMetadata {
87 pub file_schema: Arc<Schema>,
89 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
91 pub column_infos: Vec<Arc<ColumnInfo>>,
92 pub num_rows: u64,
94 pub file_buffers: Vec<BufferDescriptor>,
95 pub num_data_bytes: u64,
97 pub num_column_metadata_bytes: u64,
100 pub num_global_buffer_bytes: u64,
102 pub num_footer_bytes: u64,
104 pub major_version: u16,
105 pub minor_version: u16,
106 pub file_size_bytes: u64,
108 pub retained_global_buffers: BTreeMap<u32, Bytes>,
121}
122
123impl CachedFileMetadata {
124 pub fn file_size(&self) -> u64 {
126 self.file_size_bytes
127 }
128}
129
130impl DeepSizeOf for CachedFileMetadata {
131 fn deep_size_of_children(&self, context: &mut Context) -> usize {
132 let schema_size = self.file_schema.deep_size_of_children(context);
133
134 let buffers_size: usize = self
135 .file_buffers
136 .iter()
137 .map(|fb| fb.deep_size_of_children(context))
138 .sum();
139
140 let column_metadatas_size: usize = self
146 .column_metadatas
147 .iter()
148 .map(|cm| cm.encoded_len() * 4)
149 .sum::<usize>()
150 + std::mem::size_of_val(self.column_metadatas.as_slice());
151
152 let column_infos_size: usize = self
156 .column_infos
157 .iter()
158 .map(|ci| {
159 let pages_size: usize = ci
160 .page_infos
161 .iter()
162 .map(|pi| {
163 let enc_size = match &pi.encoding {
164 lance_encoding::decoder::PageEncoding::Legacy(e) => e.encoded_len() * 4,
165 lance_encoding::decoder::PageEncoding::Structural(e) => {
166 e.encoded_len() * 4
167 }
168 };
169 enc_size
170 + std::mem::size_of_val(pi.buffer_offsets_and_sizes.as_ref())
171 + std::mem::size_of::<u64>() * 2 })
173 .sum();
174 pages_size
175 + std::mem::size_of_val(ci.buffer_offsets_and_sizes.as_ref())
176 + ci.encoding.encoded_len() * 4
177 + std::mem::size_of::<u32>() + std::mem::size_of::<usize>() * 2 })
180 .sum();
181
182 let retained_buffers_size: usize = self
184 .retained_global_buffers
185 .values()
186 .map(|buf| buf.len())
187 .sum();
188
189 schema_size
190 + buffers_size
191 + column_metadatas_size
192 + column_infos_size
193 + retained_buffers_size
194 }
195}
196
197impl CachedFileMetadata {
198 pub fn version(&self) -> LanceFileVersion {
199 match (self.major_version, self.minor_version) {
200 (0, 3) => LanceFileVersion::V2_0,
201 (2, 0) => LanceFileVersion::V2_0,
202 (2, 1) => LanceFileVersion::V2_1,
203 (2, 2) => LanceFileVersion::V2_2,
204 (2, 3) => LanceFileVersion::V2_3,
205 _ => panic!(
206 "Unsupported version: {}.{}",
207 self.major_version, self.minor_version
208 ),
209 }
210 }
211}
212
213#[derive(Debug, Clone)]
235pub struct ReaderProjection {
236 pub schema: Arc<Schema>,
239 pub column_indices: Vec<u32>,
279}
280
281impl ReaderProjection {
282 fn from_field_ids_helper<'a>(
283 file_version: LanceFileVersion,
284 fields: impl Iterator<Item = &'a Field>,
285 field_id_to_column_index: &BTreeMap<u32, u32>,
286 column_indices: &mut Vec<u32>,
287 ) -> Result<()> {
288 for field in fields {
289 let is_structural = file_version >= LanceFileVersion::V2_1;
290 if (!is_structural
293 || field.children.is_empty()
294 || field.is_blob()
295 || field.is_packed_struct())
296 && let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
297 {
298 column_indices.push(column_idx);
299 }
300 if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
302 Self::from_field_ids_helper(
303 file_version,
304 field.children.iter(),
305 field_id_to_column_index,
306 column_indices,
307 )?;
308 }
309 }
310 Ok(())
311 }
312
313 pub fn from_field_ids(
318 file_version: LanceFileVersion,
319 schema: &Schema,
320 field_id_to_column_index: &BTreeMap<u32, u32>,
321 ) -> Result<Self> {
322 let mut column_indices = Vec::new();
323 Self::from_field_ids_helper(
324 file_version,
325 schema.fields.iter(),
326 field_id_to_column_index,
327 &mut column_indices,
328 )?;
329 let projection = Self {
330 schema: Arc::new(schema.clone()),
331 column_indices,
332 };
333 Ok(projection)
334 }
335
336 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
344 let schema = Arc::new(schema.clone());
345 let is_structural = version >= LanceFileVersion::V2_1;
346 let mut column_indices = vec![];
347 let mut curr_column_idx = 0;
348 let mut packed_struct_fields_num = 0;
349 for field in schema.fields_pre_order() {
350 if packed_struct_fields_num > 0 {
351 packed_struct_fields_num -= 1;
352 continue;
353 }
354 if field.is_packed_struct() {
355 column_indices.push(curr_column_idx);
356 curr_column_idx += 1;
357 packed_struct_fields_num = field.children.len();
358 } else if field.children.is_empty() || !is_structural {
359 column_indices.push(curr_column_idx);
360 curr_column_idx += 1;
361 }
362 }
363 Self {
364 schema,
365 column_indices,
366 }
367 }
368
369 pub fn from_column_names(
376 file_version: LanceFileVersion,
377 schema: &Schema,
378 column_names: &[&str],
379 ) -> Result<Self> {
380 let field_id_to_column_index = schema
381 .fields_pre_order()
382 .filter(|field| {
385 file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
386 })
387 .enumerate()
388 .map(|(idx, field)| (field.id as u32, idx as u32))
389 .collect::<BTreeMap<_, _>>();
390 let projected = schema.project(column_names)?;
391 let mut column_indices = Vec::new();
392 Self::from_field_ids_helper(
393 file_version,
394 projected.fields.iter(),
395 &field_id_to_column_index,
396 &mut column_indices,
397 )?;
398 Ok(Self {
399 schema: Arc::new(projected),
400 column_indices,
401 })
402 }
403}
404
405#[derive(Clone, Debug)]
407pub struct FileReaderOptions {
408 pub decoder_config: DecoderConfig,
409 pub read_chunk_size: u64,
413 pub batch_size_bytes: Option<u64>,
420}
421
422impl Default for FileReaderOptions {
423 fn default() -> Self {
424 Self {
425 decoder_config: DecoderConfig::default(),
426 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
427 batch_size_bytes: None,
428 }
429 }
430}
431
432#[derive(Debug, Clone)]
433pub struct FileReader {
434 scheduler: Arc<dyn EncodingsIo>,
435 base_projection: ReaderProjection,
437 num_rows: u64,
438 metadata: Arc<CachedFileMetadata>,
439 decoder_plugins: Arc<DecoderPlugins>,
440 cache: Arc<LanceCache>,
441 options: FileReaderOptions,
442}
443#[derive(Debug)]
444struct Footer {
445 #[allow(dead_code)]
446 column_meta_start: u64,
447 #[allow(dead_code)]
450 column_meta_offsets_start: u64,
451 global_buff_offsets_start: u64,
452 num_global_buffers: u32,
453 num_columns: u32,
454 major_version: u16,
455 minor_version: u16,
456}
457
458const FOOTER_LEN: usize = 40;
459
460impl FileReader {
461 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
462 Self {
463 scheduler,
464 base_projection: self.base_projection.clone(),
465 cache: self.cache.clone(),
466 decoder_plugins: self.decoder_plugins.clone(),
467 metadata: self.metadata.clone(),
468 options: self.options.clone(),
469 num_rows: self.num_rows,
470 }
471 }
472
473 pub fn with_io_stats(
481 &self,
482 stats: Arc<dyn lance_core::utils::io_stats::IoStatsRecorder>,
483 ) -> Self {
484 match self.scheduler.with_io_stats(stats) {
485 Some(scheduler) => self.with_scheduler(scheduler),
486 None => self.clone(),
487 }
488 }
489
490 pub fn num_rows(&self) -> u64 {
491 self.num_rows
492 }
493
494 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
495 &self.metadata
496 }
497
498 pub fn file_statistics(&self) -> FileStatistics {
499 let column_metadatas = &self.metadata().column_metadatas;
500
501 let column_stats = column_metadatas
502 .iter()
503 .map(|col_metadata| {
504 let num_pages = col_metadata.pages.len();
505 let size_bytes = col_metadata
506 .pages
507 .iter()
508 .map(|page| page.buffer_sizes.iter().sum::<u64>())
509 .sum::<u64>();
510 ColumnStatistics {
511 num_pages,
512 size_bytes,
513 }
514 })
515 .collect();
516
517 FileStatistics {
518 columns: column_stats,
519 }
520 }
521
522 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
523 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?;
524
525 if let Some(bytes) = self.metadata.retained_global_buffers.get(&index) {
529 return Ok(bytes.clone());
530 }
531
532 self.scheduler
533 .submit_single(
534 buffer_desc.position..buffer_desc.position + buffer_desc.size,
535 0,
536 )
537 .await
538 }
539
540 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
541 let file_size = scheduler.reader().size().await? as u64;
542 let begin = if file_size < scheduler.reader().block_size() as u64 {
543 0
544 } else {
545 file_size - scheduler.reader().block_size() as u64
546 };
547 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
548 Ok((tail_bytes, file_size))
549 }
550
551 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
554 let len = footer_bytes.len();
555 if len < FOOTER_LEN {
556 return Err(Error::invalid_input(format!(
557 "does not have sufficient data, len: {}, bytes: {:?}",
558 len, footer_bytes
559 )));
560 }
561 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
562
563 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
564 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
565 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
566 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
567 let num_columns = cursor.read_u32::<LittleEndian>()?;
568 let major_version = cursor.read_u16::<LittleEndian>()?;
569 let minor_version = cursor.read_u16::<LittleEndian>()?;
570
571 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
572 return Err(Error::version_conflict(
573 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
574 major_version,
575 minor_version,
576 ));
577 }
578
579 let magic_bytes = footer_bytes.slice(len - 4..);
580 if magic_bytes.as_ref() != MAGIC {
581 return Err(Error::invalid_input(format!(
582 "file does not appear to be a Lance file (invalid magic: {:?})",
583 MAGIC
584 )));
585 }
586 Ok(Footer {
587 column_meta_start,
588 column_meta_offsets_start,
589 global_buff_offsets_start,
590 num_global_buffers,
591 num_columns,
592 major_version,
593 minor_version,
594 })
595 }
596
597 fn read_all_column_metadata(
599 column_metadata_bytes: Bytes,
600 footer: &Footer,
601 ) -> Result<Vec<pbfile::ColumnMetadata>> {
602 let column_metadata_start = footer.column_meta_start;
603 let cmo_table_size = 16 * footer.num_columns as usize;
605 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
606
607 (0..footer.num_columns)
608 .map(|col_idx| {
609 let offset = (col_idx * 16) as usize;
610 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
611 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
612 let normalized_position = (position - column_metadata_start) as usize;
613 let normalized_end = normalized_position + (length as usize);
614 Ok(pbfile::ColumnMetadata::decode(
615 &column_metadata_bytes[normalized_position..normalized_end],
616 )?)
617 })
618 .collect::<Result<Vec<_>>>()
619 }
620
621 async fn optimistic_tail_read(
622 data: &Bytes,
623 start_pos: u64,
624 scheduler: &FileScheduler,
625 file_len: u64,
626 ) -> Result<Bytes> {
627 let num_bytes_needed = (file_len - start_pos) as usize;
628 if data.len() >= num_bytes_needed {
629 Ok(data.slice((data.len() - num_bytes_needed)..))
630 } else {
631 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
632 let start = file_len - num_bytes_needed as u64;
633 let missing_bytes = scheduler
634 .submit_single(start..start + num_bytes_missing, 0)
635 .await?;
636 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
637 combined.extend(missing_bytes);
638 combined.extend(data);
639 Ok(combined.freeze())
640 }
641 }
642
643 fn do_decode_gbo_table(
644 gbo_bytes: &Bytes,
645 footer: &Footer,
646 version: LanceFileVersion,
647 ) -> Result<Vec<BufferDescriptor>> {
648 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
649
650 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
651 for _ in 0..footer.num_global_buffers {
652 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
653 assert!(
654 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
655 );
656 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
657 global_buffers.push(BufferDescriptor {
658 position: buf_pos,
659 size: buf_size,
660 });
661 }
662
663 Ok(global_buffers)
664 }
665
666 async fn decode_gbo_table(
667 tail_bytes: &Bytes,
668 file_len: u64,
669 scheduler: &FileScheduler,
670 footer: &Footer,
671 version: LanceFileVersion,
672 ) -> Result<Vec<BufferDescriptor>> {
673 let gbo_bytes = Self::optimistic_tail_read(
676 tail_bytes,
677 footer.global_buff_offsets_start,
678 scheduler,
679 file_len,
680 )
681 .await?;
682 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
683 }
684
685 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
686 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
687 let pb_schema = file_descriptor.schema.unwrap();
688 let num_rows = file_descriptor.length;
689 let fields_with_meta = FieldsWithMeta {
690 fields: Fields(pb_schema.fields),
691 metadata: pb_schema.metadata,
692 };
693 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
694 Ok((num_rows, schema))
695 }
696
697 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
711 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
713 let tail_offset = file_len - tail_bytes.len() as u64;
714 let footer = Self::decode_footer(&tail_bytes)?;
715
716 let file_version = LanceFileVersion::try_from_major_minor(
717 footer.major_version as u32,
718 footer.minor_version as u32,
719 )?;
720
721 let gbo_table =
722 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
723 if gbo_table.is_empty() {
724 return Err(Error::internal(
725 "File did not contain any global buffers, schema expected".to_string(),
726 ));
727 }
728 let schema_start = gbo_table[0].position;
729 let schema_size = gbo_table[0].size;
730
731 let num_footer_bytes = file_len - schema_start;
732
733 let all_metadata_bytes =
736 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
737
738 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
739 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
740
741 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
744 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
745 let column_metadata_bytes =
746 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
747 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
748
749 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
750 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
751 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
752
753 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
754
755 let tail_end = tail_offset + tail_bytes.len() as u64;
761 let retained_global_buffers = gbo_table
762 .iter()
763 .enumerate()
764 .skip(1)
765 .filter_map(|(index, buffer)| {
766 let start = buffer.position;
767 let end = buffer.position + buffer.size;
768 if start >= tail_offset && end <= tail_end {
769 let rel_start = (start - tail_offset) as usize;
770 let rel_end = (end - tail_offset) as usize;
771 let bytes = Bytes::copy_from_slice(&tail_bytes[rel_start..rel_end]);
772 Some((index as u32, bytes))
773 } else {
774 None
775 }
776 })
777 .collect();
778
779 Ok(CachedFileMetadata {
780 file_schema: Arc::new(schema),
781 column_metadatas,
782 column_infos,
783 num_rows,
784 num_data_bytes,
785 num_column_metadata_bytes,
786 num_global_buffer_bytes,
787 num_footer_bytes,
788 file_buffers: gbo_table,
789 major_version: footer.major_version,
790 minor_version: footer.minor_version,
791 file_size_bytes: file_len,
792 retained_global_buffers,
793 })
794 }
795
796 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
797 match &encoding.location {
798 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
799 Some(pbfile::encoding::Location::Direct(encoding)) => {
800 let encoding_buf = Bytes::from(encoding.encoding.clone());
801 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
802 encoding_any.to_msg::<M>().unwrap()
803 }
804 Some(pbfile::encoding::Location::None(_)) => panic!(),
805 None => panic!(),
806 }
807 }
808
809 fn meta_to_col_infos(
810 column_metadatas: &[pbfile::ColumnMetadata],
811 file_version: LanceFileVersion,
812 ) -> Vec<Arc<ColumnInfo>> {
813 column_metadatas
814 .iter()
815 .enumerate()
816 .map(|(col_idx, col_meta)| {
817 let page_infos = col_meta
818 .pages
819 .iter()
820 .map(|page| {
821 let num_rows = page.length;
822 let encoding = match file_version {
823 LanceFileVersion::V2_0 => {
824 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
825 page.encoding.as_ref().unwrap(),
826 ))
827 }
828 _ => PageEncoding::Structural(Self::fetch_encoding::<
829 pbenc21::PageLayout,
830 >(
831 page.encoding.as_ref().unwrap()
832 )),
833 };
834 let buffer_offsets_and_sizes = Arc::from(
835 page.buffer_offsets
836 .iter()
837 .zip(page.buffer_sizes.iter())
838 .map(|(offset, size)| {
839 assert!(
841 file_version < LanceFileVersion::V2_1
842 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
843 );
844 (*offset, *size)
845 })
846 .collect::<Vec<_>>(),
847 );
848 PageInfo {
849 buffer_offsets_and_sizes,
850 encoding,
851 num_rows,
852 priority: page.priority,
853 }
854 })
855 .collect::<Vec<_>>();
856 let buffer_offsets_and_sizes = Arc::from(
857 col_meta
858 .buffer_offsets
859 .iter()
860 .zip(col_meta.buffer_sizes.iter())
861 .map(|(offset, size)| (*offset, *size))
862 .collect::<Vec<_>>(),
863 );
864 Arc::new(ColumnInfo {
865 index: col_idx as u32,
866 page_infos: Arc::from(page_infos),
867 buffer_offsets_and_sizes,
868 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
869 })
870 })
871 .collect::<Vec<_>>()
872 }
873
874 fn validate_projection(
875 projection: &ReaderProjection,
876 metadata: &CachedFileMetadata,
877 ) -> Result<()> {
878 if projection.schema.fields.is_empty() {
879 return Err(Error::invalid_input(
880 "Attempt to read zero columns from the file, at least one column must be specified"
881 .to_string(),
882 ));
883 }
884 let mut column_indices_seen = BTreeSet::new();
885 for column_index in &projection.column_indices {
886 if !column_indices_seen.insert(*column_index) {
887 return Err(Error::invalid_input(format!(
888 "The projection specified the column index {} more than once",
889 column_index
890 )));
891 }
892 if *column_index >= metadata.column_infos.len() as u32 {
893 return Err(Error::invalid_input(format!(
894 "The projection specified the column index {} but there are only {} columns in the file",
895 column_index,
896 metadata.column_infos.len()
897 )));
898 }
899 }
900 Ok(())
901 }
902
903 pub async fn try_open(
910 scheduler: FileScheduler,
911 base_projection: Option<ReaderProjection>,
912 decoder_plugins: Arc<DecoderPlugins>,
913 cache: &LanceCache,
914 options: FileReaderOptions,
915 ) -> Result<Self> {
916 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
917 let path = scheduler.reader().path().clone();
918
919 let encodings_io =
921 LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
922
923 Self::try_open_with_file_metadata(
924 Arc::new(encodings_io),
925 path,
926 base_projection,
927 decoder_plugins,
928 file_metadata,
929 cache,
930 options,
931 )
932 .await
933 }
934
935 pub async fn try_open_with_file_metadata(
941 scheduler: Arc<dyn EncodingsIo>,
942 path: Path,
943 base_projection: Option<ReaderProjection>,
944 decoder_plugins: Arc<DecoderPlugins>,
945 file_metadata: Arc<CachedFileMetadata>,
946 cache: &LanceCache,
947 options: FileReaderOptions,
948 ) -> Result<Self> {
949 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
950
951 if let Some(base_projection) = base_projection.as_ref() {
952 Self::validate_projection(base_projection, &file_metadata)?;
953 }
954 let num_rows = file_metadata.num_rows;
955 Ok(Self {
956 scheduler,
957 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
958 file_metadata.file_schema.as_ref(),
959 file_metadata.version(),
960 )),
961 num_rows,
962 metadata: file_metadata,
963 decoder_plugins,
964 cache,
965 options,
966 })
967 }
968
969 fn collect_columns_from_projection(
983 &self,
984 _projection: &ReaderProjection,
985 ) -> Result<Vec<Arc<ColumnInfo>>> {
986 Ok(self.metadata.column_infos.clone())
987 }
988
989 #[allow(clippy::too_many_arguments)]
990 async fn do_read_range(
991 column_infos: Vec<Arc<ColumnInfo>>,
992 io: Arc<dyn EncodingsIo>,
993 cache: Arc<LanceCache>,
994 num_rows: u64,
995 decoder_plugins: Arc<DecoderPlugins>,
996 range: Range<u64>,
997 batch_size: u32,
998 projection: ReaderProjection,
999 filter: FilterExpression,
1000 decoder_config: DecoderConfig,
1001 batch_size_bytes: Option<u64>,
1002 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1003 debug!(
1004 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1005 range,
1006 batch_size,
1007 num_rows,
1008 column_infos.len(),
1009 projection.schema.fields.len(),
1010 );
1011
1012 let config = SchedulerDecoderConfig {
1013 batch_size,
1014 cache,
1015 decoder_plugins,
1016 io,
1017 decoder_config,
1018 batch_size_bytes,
1019 };
1020
1021 let requested_rows = RequestedRows::Ranges(vec![range]);
1022
1023 schedule_and_decode(
1024 column_infos,
1025 requested_rows,
1026 filter,
1027 projection.column_indices,
1028 projection.schema,
1029 config,
1030 )
1031 .await
1032 }
1033
1034 async fn read_range(
1035 &self,
1036 range: Range<u64>,
1037 batch_size: u32,
1038 projection: ReaderProjection,
1039 filter: FilterExpression,
1040 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1041 Self::do_read_range(
1043 self.collect_columns_from_projection(&projection)?,
1044 self.scheduler.clone(),
1045 self.cache.clone(),
1046 self.num_rows,
1047 self.decoder_plugins.clone(),
1048 range,
1049 batch_size,
1050 projection,
1051 filter,
1052 self.options.decoder_config.clone(),
1053 self.options.batch_size_bytes,
1054 )
1055 .await
1056 }
1057
1058 #[allow(clippy::too_many_arguments)]
1059 async fn do_take_rows(
1060 column_infos: Vec<Arc<ColumnInfo>>,
1061 io: Arc<dyn EncodingsIo>,
1062 cache: Arc<LanceCache>,
1063 decoder_plugins: Arc<DecoderPlugins>,
1064 indices: Vec<u64>,
1065 batch_size: u32,
1066 projection: ReaderProjection,
1067 filter: FilterExpression,
1068 decoder_config: DecoderConfig,
1069 batch_size_bytes: Option<u64>,
1070 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1071 debug!(
1072 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1073 indices.len(),
1074 indices[0],
1075 indices[indices.len() - 1],
1076 batch_size,
1077 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1078 );
1079
1080 let config = SchedulerDecoderConfig {
1081 batch_size,
1082 cache,
1083 decoder_plugins,
1084 io,
1085 decoder_config,
1086 batch_size_bytes,
1087 };
1088
1089 let requested_rows = RequestedRows::Indices(indices);
1090
1091 schedule_and_decode(
1092 column_infos,
1093 requested_rows,
1094 filter,
1095 projection.column_indices,
1096 projection.schema,
1097 config,
1098 )
1099 .await
1100 }
1101
1102 async fn take_rows(
1103 &self,
1104 indices: Vec<u64>,
1105 batch_size: u32,
1106 projection: ReaderProjection,
1107 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1108 Self::do_take_rows(
1110 self.collect_columns_from_projection(&projection)?,
1111 self.scheduler.clone(),
1112 self.cache.clone(),
1113 self.decoder_plugins.clone(),
1114 indices,
1115 batch_size,
1116 projection,
1117 FilterExpression::no_filter(),
1118 self.options.decoder_config.clone(),
1119 self.options.batch_size_bytes,
1120 )
1121 .await
1122 }
1123
1124 #[allow(clippy::too_many_arguments)]
1125 async fn do_read_ranges(
1126 column_infos: Vec<Arc<ColumnInfo>>,
1127 io: Arc<dyn EncodingsIo>,
1128 cache: Arc<LanceCache>,
1129 decoder_plugins: Arc<DecoderPlugins>,
1130 ranges: Vec<Range<u64>>,
1131 batch_size: u32,
1132 projection: ReaderProjection,
1133 filter: FilterExpression,
1134 decoder_config: DecoderConfig,
1135 batch_size_bytes: Option<u64>,
1136 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1137 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1138 debug!(
1139 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1140 ranges.len(),
1141 num_rows,
1142 ranges[0].start,
1143 ranges[ranges.len() - 1].end,
1144 batch_size,
1145 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1146 );
1147
1148 let config = SchedulerDecoderConfig {
1149 batch_size,
1150 cache,
1151 decoder_plugins,
1152 io,
1153 decoder_config,
1154 batch_size_bytes,
1155 };
1156
1157 let requested_rows = RequestedRows::Ranges(ranges);
1158
1159 schedule_and_decode(
1160 column_infos,
1161 requested_rows,
1162 filter,
1163 projection.column_indices,
1164 projection.schema,
1165 config,
1166 )
1167 .await
1168 }
1169
1170 async fn read_ranges(
1171 &self,
1172 ranges: Vec<Range<u64>>,
1173 batch_size: u32,
1174 projection: ReaderProjection,
1175 filter: FilterExpression,
1176 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1177 Self::do_read_ranges(
1178 self.collect_columns_from_projection(&projection)?,
1179 self.scheduler.clone(),
1180 self.cache.clone(),
1181 self.decoder_plugins.clone(),
1182 ranges,
1183 batch_size,
1184 projection,
1185 filter,
1186 self.options.decoder_config.clone(),
1187 self.options.batch_size_bytes,
1188 )
1189 .await
1190 }
1191
1192 pub async fn read_tasks(
1216 &self,
1217 params: ReadBatchParams,
1218 batch_size: u32,
1219 projection: Option<ReaderProjection>,
1220 filter: FilterExpression,
1221 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1222 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1223 Self::validate_projection(&projection, &self.metadata)?;
1224 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1225 if bound > self.num_rows || bound == self.num_rows && inclusive {
1226 Err(Error::invalid_input(format!(
1227 "cannot read {:?} from file with {} rows",
1228 params, self.num_rows
1229 )))
1230 } else {
1231 Ok(())
1232 }
1233 };
1234 match ¶ms {
1235 ReadBatchParams::Indices(indices) => {
1236 for idx in indices {
1237 match idx {
1238 None => {
1239 return Err(Error::invalid_input("Null value in indices array"));
1240 }
1241 Some(idx) => {
1242 verify_bound(¶ms, idx as u64, true)?;
1243 }
1244 }
1245 }
1246 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1247 self.take_rows(indices, batch_size, projection).await
1248 }
1249 ReadBatchParams::Range(range) => {
1250 verify_bound(¶ms, range.end as u64, false)?;
1251 self.read_range(
1252 range.start as u64..range.end as u64,
1253 batch_size,
1254 projection,
1255 filter,
1256 )
1257 .await
1258 }
1259 ReadBatchParams::Ranges(ranges) => {
1260 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1261 for range in ranges.as_ref() {
1262 verify_bound(¶ms, range.end, false)?;
1263 ranges_u64.push(range.start..range.end);
1264 }
1265 self.read_ranges(ranges_u64, batch_size, projection, filter)
1266 .await
1267 }
1268 ReadBatchParams::RangeFrom(range) => {
1269 verify_bound(¶ms, range.start as u64, true)?;
1270 self.read_range(
1271 range.start as u64..self.num_rows,
1272 batch_size,
1273 projection,
1274 filter,
1275 )
1276 .await
1277 }
1278 ReadBatchParams::RangeTo(range) => {
1279 verify_bound(¶ms, range.end as u64, false)?;
1280 self.read_range(0..range.end as u64, batch_size, projection, filter)
1281 .await
1282 }
1283 ReadBatchParams::RangeFull => {
1284 self.read_range(0..self.num_rows, batch_size, projection, filter)
1285 .await
1286 }
1287 }
1288 }
1289
1290 pub async fn read_stream_projected(
1320 &self,
1321 params: ReadBatchParams,
1322 batch_size: u32,
1323 batch_readahead: u32,
1324 projection: ReaderProjection,
1325 filter: FilterExpression,
1326 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1327 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1328 let tasks_stream = self
1329 .read_tasks(params, batch_size, Some(projection), filter)
1330 .await?;
1331 let batch_stream = tasks_stream
1332 .map(|task| task.task)
1333 .buffered(batch_readahead as usize)
1334 .boxed();
1335 Ok(Box::pin(RecordBatchStreamAdapter::new(
1336 arrow_schema,
1337 batch_stream,
1338 )))
1339 }
1340
1341 fn take_rows_blocking(
1342 &self,
1343 indices: Vec<u64>,
1344 batch_size: u32,
1345 projection: ReaderProjection,
1346 filter: FilterExpression,
1347 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1348 let column_infos = self.collect_columns_from_projection(&projection)?;
1349 debug!(
1350 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1351 indices.len(),
1352 indices[0],
1353 indices[indices.len() - 1],
1354 batch_size,
1355 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1356 );
1357
1358 let config = SchedulerDecoderConfig {
1359 batch_size,
1360 cache: self.cache.clone(),
1361 decoder_plugins: self.decoder_plugins.clone(),
1362 io: self.scheduler.clone(),
1363 decoder_config: self.options.decoder_config.clone(),
1364 batch_size_bytes: self.options.batch_size_bytes,
1365 };
1366
1367 let requested_rows = RequestedRows::Indices(indices);
1368
1369 schedule_and_decode_blocking(
1370 column_infos,
1371 requested_rows,
1372 filter,
1373 projection.column_indices,
1374 projection.schema,
1375 config,
1376 )
1377 }
1378
1379 fn read_ranges_blocking(
1380 &self,
1381 ranges: Vec<Range<u64>>,
1382 batch_size: u32,
1383 projection: ReaderProjection,
1384 filter: FilterExpression,
1385 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1386 let column_infos = self.collect_columns_from_projection(&projection)?;
1387 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1388 debug!(
1389 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1390 ranges.len(),
1391 num_rows,
1392 ranges[0].start,
1393 ranges[ranges.len() - 1].end,
1394 batch_size,
1395 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1396 );
1397
1398 let config = SchedulerDecoderConfig {
1399 batch_size,
1400 cache: self.cache.clone(),
1401 decoder_plugins: self.decoder_plugins.clone(),
1402 io: self.scheduler.clone(),
1403 decoder_config: self.options.decoder_config.clone(),
1404 batch_size_bytes: self.options.batch_size_bytes,
1405 };
1406
1407 let requested_rows = RequestedRows::Ranges(ranges);
1408
1409 schedule_and_decode_blocking(
1410 column_infos,
1411 requested_rows,
1412 filter,
1413 projection.column_indices,
1414 projection.schema,
1415 config,
1416 )
1417 }
1418
1419 fn read_range_blocking(
1420 &self,
1421 range: Range<u64>,
1422 batch_size: u32,
1423 projection: ReaderProjection,
1424 filter: FilterExpression,
1425 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1426 let column_infos = self.collect_columns_from_projection(&projection)?;
1427 let num_rows = self.num_rows;
1428
1429 debug!(
1430 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1431 range,
1432 batch_size,
1433 num_rows,
1434 column_infos.len(),
1435 projection.schema.fields.len(),
1436 );
1437
1438 let config = SchedulerDecoderConfig {
1439 batch_size,
1440 cache: self.cache.clone(),
1441 decoder_plugins: self.decoder_plugins.clone(),
1442 io: self.scheduler.clone(),
1443 decoder_config: self.options.decoder_config.clone(),
1444 batch_size_bytes: self.options.batch_size_bytes,
1445 };
1446
1447 let requested_rows = RequestedRows::Ranges(vec![range]);
1448
1449 schedule_and_decode_blocking(
1450 column_infos,
1451 requested_rows,
1452 filter,
1453 projection.column_indices,
1454 projection.schema,
1455 config,
1456 )
1457 }
1458
1459 pub fn read_stream_projected_blocking(
1471 &self,
1472 params: ReadBatchParams,
1473 batch_size: u32,
1474 projection: Option<ReaderProjection>,
1475 filter: FilterExpression,
1476 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1477 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1478 Self::validate_projection(&projection, &self.metadata)?;
1479 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1480 if bound > self.num_rows || bound == self.num_rows && inclusive {
1481 Err(Error::invalid_input(format!(
1482 "cannot read {:?} from file with {} rows",
1483 params, self.num_rows
1484 )))
1485 } else {
1486 Ok(())
1487 }
1488 };
1489 match ¶ms {
1490 ReadBatchParams::Indices(indices) => {
1491 for idx in indices {
1492 match idx {
1493 None => {
1494 return Err(Error::invalid_input("Null value in indices array"));
1495 }
1496 Some(idx) => {
1497 verify_bound(¶ms, idx as u64, true)?;
1498 }
1499 }
1500 }
1501 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1502 self.take_rows_blocking(indices, batch_size, projection, filter)
1503 }
1504 ReadBatchParams::Range(range) => {
1505 verify_bound(¶ms, range.end as u64, false)?;
1506 self.read_range_blocking(
1507 range.start as u64..range.end as u64,
1508 batch_size,
1509 projection,
1510 filter,
1511 )
1512 }
1513 ReadBatchParams::Ranges(ranges) => {
1514 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1515 for range in ranges.as_ref() {
1516 verify_bound(¶ms, range.end, false)?;
1517 ranges_u64.push(range.start..range.end);
1518 }
1519 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1520 }
1521 ReadBatchParams::RangeFrom(range) => {
1522 verify_bound(¶ms, range.start as u64, true)?;
1523 self.read_range_blocking(
1524 range.start as u64..self.num_rows,
1525 batch_size,
1526 projection,
1527 filter,
1528 )
1529 }
1530 ReadBatchParams::RangeTo(range) => {
1531 verify_bound(¶ms, range.end as u64, false)?;
1532 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1533 }
1534 ReadBatchParams::RangeFull => {
1535 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1536 }
1537 }
1538 }
1539
1540 pub async fn read_stream(
1552 &self,
1553 params: ReadBatchParams,
1554 batch_size: u32,
1555 batch_readahead: u32,
1556 filter: FilterExpression,
1557 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1558 self.read_stream_projected(
1559 params,
1560 batch_size,
1561 batch_readahead,
1562 self.base_projection.clone(),
1563 filter,
1564 )
1565 .await
1566 }
1567
1568 pub fn schema(&self) -> &Arc<Schema> {
1569 &self.metadata.file_schema
1570 }
1571}
1572
1573pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1575 if let Some(encoding) = &page.encoding {
1576 if let Some(style) = &encoding.location {
1577 match style {
1578 pbfile::encoding::Location::Indirect(indirect) => {
1579 format!(
1580 "IndirectEncoding(pos={},size={})",
1581 indirect.buffer_location, indirect.buffer_length
1582 )
1583 }
1584 pbfile::encoding::Location::Direct(direct) => {
1585 let encoding_any =
1586 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1587 .expect("failed to deserialize encoding as protobuf");
1588 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1589 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1590 match encoding {
1591 Ok(encoding) => {
1592 format!("{:#?}", encoding)
1593 }
1594 Err(err) => {
1595 format!("Unsupported(decode_err={})", err)
1596 }
1597 }
1598 } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1599 let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1600 match encoding {
1601 Ok(encoding) => {
1602 format!("{:#?}", encoding)
1603 }
1604 Err(err) => {
1605 format!("Unsupported(decode_err={})", err)
1606 }
1607 }
1608 } else {
1609 format!("Unrecognized(type_url={})", encoding_any.type_url)
1610 }
1611 }
1612 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1613 }
1614 } else {
1615 "MISSING STYLE".to_string()
1616 }
1617 } else {
1618 "MISSING".to_string()
1619 }
1620}
1621
1622pub trait EncodedBatchReaderExt {
1623 fn try_from_mini_lance(
1624 bytes: Bytes,
1625 schema: &Schema,
1626 version: LanceFileVersion,
1627 ) -> Result<Self>
1628 where
1629 Self: Sized;
1630 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1631 where
1632 Self: Sized;
1633}
1634
1635impl EncodedBatchReaderExt for EncodedBatch {
1636 fn try_from_mini_lance(
1637 bytes: Bytes,
1638 schema: &Schema,
1639 file_version: LanceFileVersion,
1640 ) -> Result<Self>
1641 where
1642 Self: Sized,
1643 {
1644 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1645 let footer = FileReader::decode_footer(&bytes)?;
1646
1647 let column_metadata_start = footer.column_meta_start as usize;
1650 let column_metadata_end = footer.global_buff_offsets_start as usize;
1651 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1652 let column_metadatas =
1653 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1654
1655 let file_version = LanceFileVersion::try_from_major_minor(
1656 footer.major_version as u32,
1657 footer.minor_version as u32,
1658 )?;
1659
1660 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1661
1662 Ok(Self {
1663 data: bytes,
1664 num_rows: page_table
1665 .first()
1666 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1667 .unwrap_or(0),
1668 page_table,
1669 top_level_columns: projection.column_indices,
1670 schema: Arc::new(schema.clone()),
1671 })
1672 }
1673
1674 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1675 where
1676 Self: Sized,
1677 {
1678 let footer = FileReader::decode_footer(&bytes)?;
1679 let file_version = LanceFileVersion::try_from_major_minor(
1680 footer.major_version as u32,
1681 footer.minor_version as u32,
1682 )?;
1683
1684 let gbo_table = FileReader::do_decode_gbo_table(
1685 &bytes.slice(footer.global_buff_offsets_start as usize..),
1686 &footer,
1687 file_version,
1688 )?;
1689 if gbo_table.is_empty() {
1690 return Err(Error::internal(
1691 "File did not contain any global buffers, schema expected".to_string(),
1692 ));
1693 }
1694 let schema_start = gbo_table[0].position as usize;
1695 let schema_size = gbo_table[0].size as usize;
1696
1697 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1698 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1699 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1700
1701 let column_metadata_start = footer.column_meta_start as usize;
1704 let column_metadata_end = footer.global_buff_offsets_start as usize;
1705 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1706 let column_metadatas =
1707 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1708
1709 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1710
1711 Ok(Self {
1712 data: bytes,
1713 num_rows: page_table
1714 .first()
1715 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1716 .unwrap_or(0),
1717 page_table,
1718 top_level_columns: projection.column_indices,
1719 schema: Arc::new(schema),
1720 })
1721 }
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1727
1728 use arrow_array::{
1729 RecordBatch, UInt32Array,
1730 types::{Float64Type, Int32Type},
1731 };
1732 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1733 use bytes::Bytes;
1734 use futures::{StreamExt, prelude::stream::TryStreamExt};
1735 use lance_arrow::RecordBatchExt;
1736 use lance_core::{ArrowResult, datatypes::Schema};
1737 use lance_datagen::{BatchCount, ByteCount, RowCount, array, gen_batch};
1738 use lance_encoding::{
1739 decoder::{DecodeBatchScheduler, DecoderPlugins, FilterExpression, decode_batch},
1740 encoder::{EncodedBatch, EncodingOptions, default_encoding_strategy, encode_batch},
1741 version::LanceFileVersion,
1742 };
1743 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1744 use log::debug;
1745 use rstest::rstest;
1746 use tokio::sync::mpsc;
1747
1748 use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1749 use crate::testing::{FsFixture, WrittenFile, test_cache, write_lance_file};
1750 use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1751 use lance_encoding::decoder::DecoderConfig;
1752
1753 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1754 let location_type = DataType::Struct(Fields::from(vec![
1755 Field::new("x", DataType::Float64, true),
1756 Field::new("y", DataType::Float64, true),
1757 ]));
1758 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1759
1760 let mut reader = gen_batch()
1761 .col("score", array::rand::<Float64Type>())
1762 .col("location", array::rand_type(&location_type))
1763 .col("categories", array::rand_type(&categories_type))
1764 .col("binary", array::rand_type(&DataType::Binary));
1765 if version <= LanceFileVersion::V2_0 {
1766 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1767 }
1768 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1769
1770 write_lance_file(
1771 reader,
1772 fs,
1773 FileWriterOptions {
1774 format_version: Some(version),
1775 ..Default::default()
1776 },
1777 )
1778 .await
1779 }
1780
1781 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1782
1783 async fn verify_expected(
1784 expected: &[RecordBatch],
1785 mut actual: Pin<Box<dyn RecordBatchStream>>,
1786 read_size: u32,
1787 transform: Option<Transformer>,
1788 ) {
1789 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1790 let mut expected_iter = expected.iter().map(|batch| {
1791 if let Some(transform) = &transform {
1792 transform(batch)
1793 } else {
1794 batch.clone()
1795 }
1796 });
1797 let mut next_expected = expected_iter.next().unwrap().clone();
1798 while let Some(actual) = actual.next().await {
1799 let mut actual = actual.unwrap();
1800 let mut rows_to_verify = actual.num_rows() as u32;
1801 let expected_length = remaining.min(read_size);
1802 assert_eq!(expected_length, rows_to_verify);
1803
1804 while rows_to_verify > 0 {
1805 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1806 assert_eq!(
1807 next_expected.slice(0, next_slice_len as usize),
1808 actual.slice(0, next_slice_len as usize)
1809 );
1810 remaining -= next_slice_len;
1811 rows_to_verify -= next_slice_len;
1812 if remaining > 0 {
1813 if next_slice_len == next_expected.num_rows() as u32 {
1814 next_expected = expected_iter.next().unwrap().clone();
1815 } else {
1816 next_expected = next_expected.slice(
1817 next_slice_len as usize,
1818 next_expected.num_rows() - next_slice_len as usize,
1819 );
1820 }
1821 }
1822 if rows_to_verify > 0 {
1823 actual = actual.slice(
1824 next_slice_len as usize,
1825 actual.num_rows() - next_slice_len as usize,
1826 );
1827 }
1828 }
1829 }
1830 assert_eq!(remaining, 0);
1831 }
1832
1833 #[tokio::test]
1834 async fn test_round_trip() {
1835 let fs = FsFixture::default();
1836
1837 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1838
1839 for read_size in [32, 1024, 1024 * 1024] {
1840 let file_scheduler = fs
1841 .scheduler
1842 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1843 .await
1844 .unwrap();
1845 let file_reader = FileReader::try_open(
1846 file_scheduler,
1847 None,
1848 Arc::<DecoderPlugins>::default(),
1849 &test_cache(),
1850 FileReaderOptions::default(),
1851 )
1852 .await
1853 .unwrap();
1854
1855 let schema = file_reader.schema();
1856 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1857
1858 let batch_stream = file_reader
1859 .read_stream(
1860 lance_io::ReadBatchParams::RangeFull,
1861 read_size,
1862 16,
1863 FilterExpression::no_filter(),
1864 )
1865 .await
1866 .unwrap();
1867
1868 verify_expected(&data, batch_stream, read_size, None).await;
1869 }
1870 }
1871
1872 #[rstest]
1873 #[test_log::test(tokio::test)]
1874 async fn test_encoded_batch_round_trip(
1875 #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1877 ) {
1878 let data = gen_batch()
1879 .col("x", array::rand::<Int32Type>())
1880 .col("y", array::rand_utf8(ByteCount::from(16), false))
1881 .into_batch_rows(RowCount::from(10000))
1882 .unwrap();
1883
1884 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1885
1886 let encoding_options = EncodingOptions {
1887 cache_bytes_per_column: 4096,
1888 max_page_bytes: 32 * 1024 * 1024,
1889 keep_original_array: true,
1890 buffer_alignment: 64,
1891 version,
1892 };
1893
1894 let encoding_strategy = default_encoding_strategy(version);
1895
1896 let encoded_batch = encode_batch(
1897 &data,
1898 lance_schema.clone(),
1899 encoding_strategy.as_ref(),
1900 &encoding_options,
1901 )
1902 .await
1903 .unwrap();
1904
1905 let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1907
1908 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1909
1910 let decoded = decode_batch(
1911 &decoded_batch,
1912 &FilterExpression::no_filter(),
1913 Arc::<DecoderPlugins>::default(),
1914 false,
1915 version,
1916 None,
1917 )
1918 .await
1919 .unwrap();
1920
1921 assert_eq!(data, decoded);
1922
1923 let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1925 let decoded_batch =
1926 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1927 .unwrap();
1928 let decoded = decode_batch(
1929 &decoded_batch,
1930 &FilterExpression::no_filter(),
1931 Arc::<DecoderPlugins>::default(),
1932 false,
1933 version,
1934 None,
1935 )
1936 .await
1937 .unwrap();
1938
1939 assert_eq!(data, decoded);
1940 }
1941
1942 #[rstest]
1943 #[test_log::test(tokio::test)]
1944 async fn test_projection(
1945 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
1946 version: LanceFileVersion,
1947 ) {
1948 let fs = FsFixture::default();
1949
1950 let written_file = create_some_file(&fs, version).await;
1951 let file_scheduler = fs
1952 .scheduler
1953 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1954 .await
1955 .unwrap();
1956
1957 let field_id_mapping = written_file
1958 .field_id_mapping
1959 .iter()
1960 .copied()
1961 .collect::<BTreeMap<_, _>>();
1962
1963 let empty_projection = ReaderProjection {
1964 column_indices: Vec::default(),
1965 schema: Arc::new(Schema::default()),
1966 };
1967
1968 for columns in [
1969 vec!["score"],
1970 vec!["location"],
1971 vec!["categories"],
1972 vec!["score.x"],
1973 vec!["score", "categories"],
1974 vec!["score", "location"],
1975 vec!["location", "categories"],
1976 vec!["score.y", "location", "categories"],
1977 ] {
1978 debug!("Testing round trip with projection {:?}", columns);
1979 for use_field_ids in [true, false] {
1980 let file_reader = FileReader::try_open(
1982 file_scheduler.clone(),
1983 None,
1984 Arc::<DecoderPlugins>::default(),
1985 &test_cache(),
1986 FileReaderOptions::default(),
1987 )
1988 .await
1989 .unwrap();
1990
1991 let projected_schema = written_file.schema.project(&columns).unwrap();
1992 let projection = if use_field_ids {
1993 ReaderProjection::from_field_ids(
1994 file_reader.metadata.version(),
1995 &projected_schema,
1996 &field_id_mapping,
1997 )
1998 .unwrap()
1999 } else {
2000 ReaderProjection::from_column_names(
2001 file_reader.metadata.version(),
2002 &written_file.schema,
2003 &columns,
2004 )
2005 .unwrap()
2006 };
2007
2008 let batch_stream = file_reader
2009 .read_stream_projected(
2010 lance_io::ReadBatchParams::RangeFull,
2011 1024,
2012 16,
2013 projection.clone(),
2014 FilterExpression::no_filter(),
2015 )
2016 .await
2017 .unwrap();
2018
2019 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
2020 verify_expected(
2021 &written_file.data,
2022 batch_stream,
2023 1024,
2024 Some(Box::new(move |batch: &RecordBatch| {
2025 batch.project_by_schema(&projection_arrow).unwrap()
2026 })),
2027 )
2028 .await;
2029
2030 let file_reader = FileReader::try_open(
2032 file_scheduler.clone(),
2033 Some(projection.clone()),
2034 Arc::<DecoderPlugins>::default(),
2035 &test_cache(),
2036 FileReaderOptions::default(),
2037 )
2038 .await
2039 .unwrap();
2040
2041 let batch_stream = file_reader
2042 .read_stream(
2043 lance_io::ReadBatchParams::RangeFull,
2044 1024,
2045 16,
2046 FilterExpression::no_filter(),
2047 )
2048 .await
2049 .unwrap();
2050
2051 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
2052 verify_expected(
2053 &written_file.data,
2054 batch_stream,
2055 1024,
2056 Some(Box::new(move |batch: &RecordBatch| {
2057 batch.project_by_schema(&projection_arrow).unwrap()
2058 })),
2059 )
2060 .await;
2061
2062 assert!(
2063 file_reader
2064 .read_stream_projected(
2065 lance_io::ReadBatchParams::RangeFull,
2066 1024,
2067 16,
2068 empty_projection.clone(),
2069 FilterExpression::no_filter(),
2070 )
2071 .await
2072 .is_err()
2073 );
2074 }
2075 }
2076
2077 assert!(
2078 FileReader::try_open(
2079 file_scheduler.clone(),
2080 Some(empty_projection),
2081 Arc::<DecoderPlugins>::default(),
2082 &test_cache(),
2083 FileReaderOptions::default(),
2084 )
2085 .await
2086 .is_err()
2087 );
2088
2089 let arrow_schema = ArrowSchema::new(vec![
2090 Field::new("x", DataType::Int32, true),
2091 Field::new("y", DataType::Int32, true),
2092 ]);
2093 let schema = Schema::try_from(&arrow_schema).unwrap();
2094
2095 let projection_with_dupes = ReaderProjection {
2096 column_indices: vec![0, 0],
2097 schema: Arc::new(schema),
2098 };
2099
2100 assert!(
2101 FileReader::try_open(
2102 file_scheduler.clone(),
2103 Some(projection_with_dupes),
2104 Arc::<DecoderPlugins>::default(),
2105 &test_cache(),
2106 FileReaderOptions::default(),
2107 )
2108 .await
2109 .is_err()
2110 );
2111 }
2112
2113 #[test_log::test(tokio::test)]
2114 async fn test_compressing_buffer() {
2115 let fs = FsFixture::default();
2116
2117 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2118 let file_scheduler = fs
2119 .scheduler
2120 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2121 .await
2122 .unwrap();
2123
2124 let file_reader = FileReader::try_open(
2126 file_scheduler.clone(),
2127 None,
2128 Arc::<DecoderPlugins>::default(),
2129 &test_cache(),
2130 FileReaderOptions::default(),
2131 )
2132 .await
2133 .unwrap();
2134
2135 let mut projection = written_file.schema.project(&["score"]).unwrap();
2136 for field in projection.fields.iter_mut() {
2137 field
2138 .metadata
2139 .insert("lance:compression".to_string(), "zstd".to_string());
2140 }
2141 let projection = ReaderProjection {
2142 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
2143 schema: Arc::new(projection),
2144 };
2145
2146 let batch_stream = file_reader
2147 .read_stream_projected(
2148 lance_io::ReadBatchParams::RangeFull,
2149 1024,
2150 16,
2151 projection.clone(),
2152 FilterExpression::no_filter(),
2153 )
2154 .await
2155 .unwrap();
2156
2157 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
2158 verify_expected(
2159 &written_file.data,
2160 batch_stream,
2161 1024,
2162 Some(Box::new(move |batch: &RecordBatch| {
2163 batch.project_by_schema(&projection_arrow).unwrap()
2164 })),
2165 )
2166 .await;
2167 }
2168
2169 #[tokio::test]
2170 async fn test_read_all() {
2171 let fs = FsFixture::default();
2172 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2173 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2174
2175 let file_scheduler = fs
2176 .scheduler
2177 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2178 .await
2179 .unwrap();
2180 let file_reader = FileReader::try_open(
2181 file_scheduler.clone(),
2182 None,
2183 Arc::<DecoderPlugins>::default(),
2184 &test_cache(),
2185 FileReaderOptions::default(),
2186 )
2187 .await
2188 .unwrap();
2189
2190 let batches = file_reader
2191 .read_stream(
2192 lance_io::ReadBatchParams::RangeFull,
2193 total_rows as u32,
2194 16,
2195 FilterExpression::no_filter(),
2196 )
2197 .await
2198 .unwrap()
2199 .try_collect::<Vec<_>>()
2200 .await
2201 .unwrap();
2202 assert_eq!(batches.len(), 1);
2203 assert_eq!(batches[0].num_rows(), total_rows);
2204 }
2205
2206 #[rstest]
2207 #[tokio::test]
2208 async fn test_blocking_take(
2209 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
2210 version: LanceFileVersion,
2211 ) {
2212 let fs = FsFixture::default();
2213 let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2214 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2215
2216 let file_scheduler = fs
2217 .scheduler
2218 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2219 .await
2220 .unwrap();
2221 let file_reader = FileReader::try_open(
2222 file_scheduler.clone(),
2223 Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2224 Arc::<DecoderPlugins>::default(),
2225 &test_cache(),
2226 FileReaderOptions::default(),
2227 )
2228 .await
2229 .unwrap();
2230
2231 let batches = tokio::task::spawn_blocking(move || {
2232 file_reader
2233 .read_stream_projected_blocking(
2234 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2235 total_rows as u32,
2236 None,
2237 FilterExpression::no_filter(),
2238 )
2239 .unwrap()
2240 .collect::<ArrowResult<Vec<_>>>()
2241 .unwrap()
2242 })
2243 .await
2244 .unwrap();
2245
2246 assert_eq!(batches.len(), 1);
2247 assert_eq!(batches[0].num_rows(), 5);
2248 assert_eq!(batches[0].num_columns(), 1);
2249 }
2250
2251 #[tokio::test(flavor = "multi_thread")]
2252 async fn test_drop_in_progress() {
2253 let fs = FsFixture::default();
2254 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2255 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2256
2257 let file_scheduler = fs
2258 .scheduler
2259 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2260 .await
2261 .unwrap();
2262 let file_reader = FileReader::try_open(
2263 file_scheduler.clone(),
2264 None,
2265 Arc::<DecoderPlugins>::default(),
2266 &test_cache(),
2267 FileReaderOptions::default(),
2268 )
2269 .await
2270 .unwrap();
2271
2272 let mut batches = file_reader
2273 .read_stream(
2274 lance_io::ReadBatchParams::RangeFull,
2275 (total_rows / 10) as u32,
2276 16,
2277 FilterExpression::no_filter(),
2278 )
2279 .await
2280 .unwrap();
2281
2282 drop(file_reader);
2283
2284 let batch = batches.next().await.unwrap().unwrap();
2285 assert!(batch.num_rows() > 0);
2286
2287 drop(batches);
2289 }
2290
2291 #[tokio::test]
2292 async fn drop_while_scheduling() {
2293 let fs = FsFixture::default();
2303 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2304 let total_rows = written_file
2305 .data
2306 .iter()
2307 .map(|batch| batch.num_rows())
2308 .sum::<usize>();
2309
2310 let file_scheduler = fs
2311 .scheduler
2312 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2313 .await
2314 .unwrap();
2315 let file_reader = FileReader::try_open(
2316 file_scheduler.clone(),
2317 None,
2318 Arc::<DecoderPlugins>::default(),
2319 &test_cache(),
2320 FileReaderOptions::default(),
2321 )
2322 .await
2323 .unwrap();
2324
2325 let projection =
2326 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2327 let column_infos = file_reader
2328 .collect_columns_from_projection(&projection)
2329 .unwrap();
2330 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2331 &projection.schema,
2332 &projection.column_indices,
2333 &column_infos,
2334 &vec![],
2335 total_rows as u64,
2336 Arc::<DecoderPlugins>::default(),
2337 file_reader.scheduler.clone(),
2338 test_cache(),
2339 &FilterExpression::no_filter(),
2340 &DecoderConfig::default(),
2341 )
2342 .await
2343 .unwrap();
2344
2345 let range = 0..total_rows as u64;
2346
2347 let (tx, rx) = mpsc::unbounded_channel();
2348
2349 drop(rx);
2351
2352 decode_scheduler.schedule_range(
2354 range,
2355 &FilterExpression::no_filter(),
2356 tx,
2357 file_reader.scheduler.clone(),
2358 )
2359 }
2360
2361 #[tokio::test]
2362 async fn test_read_empty_range() {
2363 let fs = FsFixture::default();
2364 create_some_file(&fs, LanceFileVersion::V2_0).await;
2365
2366 let file_scheduler = fs
2367 .scheduler
2368 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2369 .await
2370 .unwrap();
2371 let file_reader = FileReader::try_open(
2372 file_scheduler.clone(),
2373 None,
2374 Arc::<DecoderPlugins>::default(),
2375 &test_cache(),
2376 FileReaderOptions::default(),
2377 )
2378 .await
2379 .unwrap();
2380
2381 let batches = file_reader
2383 .read_stream(
2384 lance_io::ReadBatchParams::Range(0..0),
2385 1024,
2386 16,
2387 FilterExpression::no_filter(),
2388 )
2389 .await
2390 .unwrap()
2391 .try_collect::<Vec<_>>()
2392 .await
2393 .unwrap();
2394
2395 assert_eq!(batches.len(), 0);
2396
2397 let batches = file_reader
2399 .read_stream(
2400 lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2401 1024,
2402 16,
2403 FilterExpression::no_filter(),
2404 )
2405 .await
2406 .unwrap()
2407 .try_collect::<Vec<_>>()
2408 .await
2409 .unwrap();
2410 assert_eq!(batches.len(), 1);
2411 }
2412
2413 async fn write_file_with_global_buffer(fs: &FsFixture, buffer: Bytes) {
2414 let lance_schema =
2415 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2416 "foo",
2417 DataType::Int32,
2418 true,
2419 )]))
2420 .unwrap();
2421
2422 let mut file_writer = FileWriter::try_new(
2423 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2424 lance_schema,
2425 FileWriterOptions::default(),
2426 )
2427 .unwrap();
2428
2429 let buf_index = file_writer.add_global_buffer(buffer).await.unwrap();
2430 assert_eq!(buf_index, 1);
2431
2432 file_writer.finish().await.unwrap();
2433 }
2434
2435 #[rstest]
2439 #[case::within_tail_window(true)]
2440 #[case::outside_tail_window(false)]
2441 #[tokio::test]
2442 async fn test_read_global_buffer(#[case] within_window: bool) {
2443 let fs = FsFixture::default();
2444
2445 let block_size = fs.object_store.block_size();
2446 let buffer = if within_window {
2447 Bytes::from_static(b"hello")
2448 } else {
2449 Bytes::from(vec![7u8; 2 * block_size])
2450 };
2451 let expected_read_iops = if within_window { 0 } else { 1 };
2452
2453 write_file_with_global_buffer(&fs, buffer.clone()).await;
2454
2455 let file_scheduler = fs
2456 .scheduler
2457 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2458 .await
2459 .unwrap();
2460 let file_reader = FileReader::try_open(
2461 file_scheduler,
2462 None,
2463 Arc::<DecoderPlugins>::default(),
2464 &test_cache(),
2465 FileReaderOptions::default(),
2466 )
2467 .await
2468 .unwrap();
2469
2470 let retained = &file_reader.metadata().retained_global_buffers;
2473 assert!(!retained.contains_key(&0), "schema must not be retained");
2474 assert_eq!(retained.contains_key(&1), within_window);
2475
2476 fs.object_store.io_stats_incremental();
2478
2479 let buf = file_reader.read_global_buffer(1).await.unwrap();
2480 assert_eq!(buf, buffer);
2481
2482 let stats = fs.object_store.io_stats_incremental();
2483 assert_eq!(stats.read_iops, expected_read_iops);
2484 }
2485
2486 #[tokio::test]
2489 async fn test_read_global_buffer_no_user_buffers() {
2490 let fs = FsFixture::default();
2491 create_some_file(&fs, LanceFileVersion::V2_1).await;
2492
2493 let file_scheduler = fs
2494 .scheduler
2495 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2496 .await
2497 .unwrap();
2498 let file_reader = FileReader::try_open(
2499 file_scheduler,
2500 None,
2501 Arc::<DecoderPlugins>::default(),
2502 &test_cache(),
2503 FileReaderOptions::default(),
2504 )
2505 .await
2506 .unwrap();
2507
2508 let metadata = file_reader.metadata();
2509 assert_eq!(metadata.file_buffers.len(), 1, "expected only the schema");
2510 assert!(
2511 metadata.retained_global_buffers.is_empty(),
2512 "a file with no user global buffers must retain nothing"
2513 );
2514 }
2515
2516 #[rstest]
2517 #[tokio::test]
2518 async fn test_deep_size_of_includes_column_metadata(
2519 #[values(
2520 LanceFileVersion::V2_0,
2521 LanceFileVersion::V2_1,
2522 LanceFileVersion::V2_2,
2523 LanceFileVersion::V2_3
2524 )]
2525 version: LanceFileVersion,
2526 ) {
2527 use lance_core::deepsize::DeepSizeOf;
2532
2533 let fs = FsFixture::default();
2534 let _written = create_some_file(&fs, version).await;
2535 let cache = test_cache();
2536 let file_scheduler = fs
2537 .scheduler
2538 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2539 .await
2540 .unwrap();
2541 let file_reader = FileReader::try_open(
2542 file_scheduler,
2543 None,
2544 Arc::<DecoderPlugins>::default(),
2545 &cache,
2546 FileReaderOptions::default(),
2547 )
2548 .await
2549 .unwrap();
2550
2551 let metadata = file_reader.metadata();
2552 let deep_size = metadata.deep_size_of();
2553
2554 assert!(
2560 deep_size > 1024,
2561 "deep_size_of ({deep_size}) is suspiciously small — \
2562 column_metadatas and column_infos may not be accounted for"
2563 );
2564
2565 assert!(
2567 !metadata.column_metadatas.is_empty(),
2568 "Expected non-empty column_metadatas"
2569 );
2570
2571 let num_columns = metadata.column_metadatas.len();
2574 assert!(
2575 deep_size > num_columns * 50,
2576 "deep_size_of ({deep_size}) should scale with column count ({num_columns})"
2577 );
2578 }
2579
2580 #[tokio::test]
2581 async fn test_read_global_buffer_out_of_range() {
2582 let fs = FsFixture::default();
2583
2584 write_file_with_global_buffer(&fs, Bytes::from_static(b"hello")).await;
2585
2586 let file_scheduler = fs
2587 .scheduler
2588 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2589 .await
2590 .unwrap();
2591 let file_reader = FileReader::try_open(
2592 file_scheduler,
2593 None,
2594 Arc::<DecoderPlugins>::default(),
2595 &test_cache(),
2596 FileReaderOptions::default(),
2597 )
2598 .await
2599 .unwrap();
2600
2601 let err = file_reader.read_global_buffer(2).await.unwrap_err();
2604 assert!(
2605 matches!(err, lance_core::Error::InvalidInput { .. }),
2606 "expected InvalidInput, got: {err:?}"
2607 );
2608 let msg = err.to_string();
2609 assert!(msg.contains('2'), "error should mention the index: {msg}");
2610 }
2611}