1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{Stream, StreamExt, stream::BoxStream};
18use lance_encoding::{
19 EncodingsIo,
20 decoder::{
21 ColumnInfo, DecoderConfig, DecoderPlugins, FilterExpression, PageEncoding, PageInfo,
22 ReadBatchTask, RequestedRows, SchedulerDecoderConfig, schedule_and_decode,
23 schedule_and_decode_blocking,
24 },
25 encoder::EncodedBatch,
26 version::LanceFileVersion,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31
32use lance_core::{
33 Error, Result,
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36};
37use lance_encoding::format::pb as pbenc;
38use lance_encoding::format::pb21 as pbenc21;
39use lance_io::{
40 ReadBatchParams,
41 scheduler::FileScheduler,
42 stream::{RecordBatchStream, RecordBatchStreamAdapter},
43};
44
45use crate::{
46 datatypes::{Fields, FieldsWithMeta},
47 format::{MAGIC, MAJOR_VERSION, MINOR_VERSION, pb, pbfile},
48 io::LanceEncodingsIo,
49 writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
55
56#[derive(Debug, DeepSizeOf)]
61pub struct BufferDescriptor {
62 pub position: u64,
63 pub size: u64,
64}
65
66#[derive(Debug)]
68pub struct FileStatistics {
69 pub columns: Vec<ColumnStatistics>,
71}
72
73#[derive(Debug)]
75pub struct ColumnStatistics {
76 pub num_pages: usize,
78 pub size_bytes: u64,
82}
83
84#[derive(Debug)]
86pub struct CachedFileMetadata {
87 pub file_schema: Arc<Schema>,
89 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
91 pub column_infos: Vec<Arc<ColumnInfo>>,
92 pub num_rows: u64,
94 pub file_buffers: Vec<BufferDescriptor>,
95 pub num_data_bytes: u64,
97 pub num_column_metadata_bytes: u64,
100 pub num_global_buffer_bytes: u64,
102 pub num_footer_bytes: u64,
104 pub major_version: u16,
105 pub minor_version: u16,
106 pub file_size_bytes: u64,
108}
109
110impl CachedFileMetadata {
111 pub fn file_size(&self) -> u64 {
113 self.file_size_bytes
114 }
115}
116
117impl DeepSizeOf for CachedFileMetadata {
118 fn deep_size_of_children(&self, context: &mut Context) -> usize {
119 let schema_size = self.file_schema.deep_size_of_children(context);
120
121 let buffers_size: usize = self
122 .file_buffers
123 .iter()
124 .map(|fb| fb.deep_size_of_children(context))
125 .sum();
126
127 let column_metadatas_size: usize = self
133 .column_metadatas
134 .iter()
135 .map(|cm| cm.encoded_len() * 4)
136 .sum::<usize>()
137 + std::mem::size_of_val(self.column_metadatas.as_slice());
138
139 let column_infos_size: usize = self
143 .column_infos
144 .iter()
145 .map(|ci| {
146 let pages_size: usize = ci
147 .page_infos
148 .iter()
149 .map(|pi| {
150 let enc_size = match &pi.encoding {
151 lance_encoding::decoder::PageEncoding::Legacy(e) => e.encoded_len() * 4,
152 lance_encoding::decoder::PageEncoding::Structural(e) => {
153 e.encoded_len() * 4
154 }
155 };
156 enc_size
157 + std::mem::size_of_val(pi.buffer_offsets_and_sizes.as_ref())
158 + std::mem::size_of::<u64>() * 2 })
160 .sum();
161 pages_size
162 + std::mem::size_of_val(ci.buffer_offsets_and_sizes.as_ref())
163 + ci.encoding.encoded_len() * 4
164 + std::mem::size_of::<u32>() + std::mem::size_of::<usize>() * 2 })
167 .sum();
168
169 schema_size + buffers_size + column_metadatas_size + column_infos_size
170 }
171}
172
173impl CachedFileMetadata {
174 pub fn version(&self) -> LanceFileVersion {
175 match (self.major_version, self.minor_version) {
176 (0, 3) => LanceFileVersion::V2_0,
177 (2, 0) => LanceFileVersion::V2_0,
178 (2, 1) => LanceFileVersion::V2_1,
179 (2, 2) => LanceFileVersion::V2_2,
180 (2, 3) => LanceFileVersion::V2_3,
181 _ => panic!(
182 "Unsupported version: {}.{}",
183 self.major_version, self.minor_version
184 ),
185 }
186 }
187}
188
189#[derive(Debug, Clone)]
211pub struct ReaderProjection {
212 pub schema: Arc<Schema>,
215 pub column_indices: Vec<u32>,
255}
256
257impl ReaderProjection {
258 fn from_field_ids_helper<'a>(
259 file_version: LanceFileVersion,
260 fields: impl Iterator<Item = &'a Field>,
261 field_id_to_column_index: &BTreeMap<u32, u32>,
262 column_indices: &mut Vec<u32>,
263 ) -> Result<()> {
264 for field in fields {
265 let is_structural = file_version >= LanceFileVersion::V2_1;
266 if (!is_structural
269 || field.children.is_empty()
270 || field.is_blob()
271 || field.is_packed_struct())
272 && let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
273 {
274 column_indices.push(column_idx);
275 }
276 if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
278 Self::from_field_ids_helper(
279 file_version,
280 field.children.iter(),
281 field_id_to_column_index,
282 column_indices,
283 )?;
284 }
285 }
286 Ok(())
287 }
288
289 pub fn from_field_ids(
294 file_version: LanceFileVersion,
295 schema: &Schema,
296 field_id_to_column_index: &BTreeMap<u32, u32>,
297 ) -> Result<Self> {
298 let mut column_indices = Vec::new();
299 Self::from_field_ids_helper(
300 file_version,
301 schema.fields.iter(),
302 field_id_to_column_index,
303 &mut column_indices,
304 )?;
305 let projection = Self {
306 schema: Arc::new(schema.clone()),
307 column_indices,
308 };
309 Ok(projection)
310 }
311
312 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
320 let schema = Arc::new(schema.clone());
321 let is_structural = version >= LanceFileVersion::V2_1;
322 let mut column_indices = vec![];
323 let mut curr_column_idx = 0;
324 let mut packed_struct_fields_num = 0;
325 for field in schema.fields_pre_order() {
326 if packed_struct_fields_num > 0 {
327 packed_struct_fields_num -= 1;
328 continue;
329 }
330 if field.is_packed_struct() {
331 column_indices.push(curr_column_idx);
332 curr_column_idx += 1;
333 packed_struct_fields_num = field.children.len();
334 } else if field.children.is_empty() || !is_structural {
335 column_indices.push(curr_column_idx);
336 curr_column_idx += 1;
337 }
338 }
339 Self {
340 schema,
341 column_indices,
342 }
343 }
344
345 pub fn from_column_names(
352 file_version: LanceFileVersion,
353 schema: &Schema,
354 column_names: &[&str],
355 ) -> Result<Self> {
356 let field_id_to_column_index = schema
357 .fields_pre_order()
358 .filter(|field| {
361 file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
362 })
363 .enumerate()
364 .map(|(idx, field)| (field.id as u32, idx as u32))
365 .collect::<BTreeMap<_, _>>();
366 let projected = schema.project(column_names)?;
367 let mut column_indices = Vec::new();
368 Self::from_field_ids_helper(
369 file_version,
370 projected.fields.iter(),
371 &field_id_to_column_index,
372 &mut column_indices,
373 )?;
374 Ok(Self {
375 schema: Arc::new(projected),
376 column_indices,
377 })
378 }
379}
380
381#[derive(Clone, Debug)]
383pub struct FileReaderOptions {
384 pub decoder_config: DecoderConfig,
385 pub read_chunk_size: u64,
389 pub batch_size_bytes: Option<u64>,
396}
397
398impl Default for FileReaderOptions {
399 fn default() -> Self {
400 Self {
401 decoder_config: DecoderConfig::default(),
402 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
403 batch_size_bytes: None,
404 }
405 }
406}
407
408#[derive(Debug, Clone)]
409pub struct FileReader {
410 scheduler: Arc<dyn EncodingsIo>,
411 base_projection: ReaderProjection,
413 num_rows: u64,
414 metadata: Arc<CachedFileMetadata>,
415 decoder_plugins: Arc<DecoderPlugins>,
416 cache: Arc<LanceCache>,
417 options: FileReaderOptions,
418}
419#[derive(Debug)]
420struct Footer {
421 #[allow(dead_code)]
422 column_meta_start: u64,
423 #[allow(dead_code)]
426 column_meta_offsets_start: u64,
427 global_buff_offsets_start: u64,
428 num_global_buffers: u32,
429 num_columns: u32,
430 major_version: u16,
431 minor_version: u16,
432}
433
434const FOOTER_LEN: usize = 40;
435
436impl FileReader {
437 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
438 Self {
439 scheduler,
440 base_projection: self.base_projection.clone(),
441 cache: self.cache.clone(),
442 decoder_plugins: self.decoder_plugins.clone(),
443 metadata: self.metadata.clone(),
444 options: self.options.clone(),
445 num_rows: self.num_rows,
446 }
447 }
448
449 pub fn num_rows(&self) -> u64 {
450 self.num_rows
451 }
452
453 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
454 &self.metadata
455 }
456
457 pub fn file_statistics(&self) -> FileStatistics {
458 let column_metadatas = &self.metadata().column_metadatas;
459
460 let column_stats = column_metadatas
461 .iter()
462 .map(|col_metadata| {
463 let num_pages = col_metadata.pages.len();
464 let size_bytes = col_metadata
465 .pages
466 .iter()
467 .map(|page| page.buffer_sizes.iter().sum::<u64>())
468 .sum::<u64>();
469 ColumnStatistics {
470 num_pages,
471 size_bytes,
472 }
473 })
474 .collect();
475
476 FileStatistics {
477 columns: column_stats,
478 }
479 }
480
481 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
482 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?;
483 self.scheduler
484 .submit_single(
485 buffer_desc.position..buffer_desc.position + buffer_desc.size,
486 0,
487 )
488 .await
489 }
490
491 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
492 let file_size = scheduler.reader().size().await? as u64;
493 let begin = if file_size < scheduler.reader().block_size() as u64 {
494 0
495 } else {
496 file_size - scheduler.reader().block_size() as u64
497 };
498 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
499 Ok((tail_bytes, file_size))
500 }
501
502 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
505 let len = footer_bytes.len();
506 if len < FOOTER_LEN {
507 return Err(Error::invalid_input(format!(
508 "does not have sufficient data, len: {}, bytes: {:?}",
509 len, footer_bytes
510 )));
511 }
512 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
513
514 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
515 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
516 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
517 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
518 let num_columns = cursor.read_u32::<LittleEndian>()?;
519 let major_version = cursor.read_u16::<LittleEndian>()?;
520 let minor_version = cursor.read_u16::<LittleEndian>()?;
521
522 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
523 return Err(Error::version_conflict(
524 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
525 major_version,
526 minor_version,
527 ));
528 }
529
530 let magic_bytes = footer_bytes.slice(len - 4..);
531 if magic_bytes.as_ref() != MAGIC {
532 return Err(Error::invalid_input(format!(
533 "file does not appear to be a Lance file (invalid magic: {:?})",
534 MAGIC
535 )));
536 }
537 Ok(Footer {
538 column_meta_start,
539 column_meta_offsets_start,
540 global_buff_offsets_start,
541 num_global_buffers,
542 num_columns,
543 major_version,
544 minor_version,
545 })
546 }
547
548 fn read_all_column_metadata(
550 column_metadata_bytes: Bytes,
551 footer: &Footer,
552 ) -> Result<Vec<pbfile::ColumnMetadata>> {
553 let column_metadata_start = footer.column_meta_start;
554 let cmo_table_size = 16 * footer.num_columns as usize;
556 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
557
558 (0..footer.num_columns)
559 .map(|col_idx| {
560 let offset = (col_idx * 16) as usize;
561 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
562 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
563 let normalized_position = (position - column_metadata_start) as usize;
564 let normalized_end = normalized_position + (length as usize);
565 Ok(pbfile::ColumnMetadata::decode(
566 &column_metadata_bytes[normalized_position..normalized_end],
567 )?)
568 })
569 .collect::<Result<Vec<_>>>()
570 }
571
572 async fn optimistic_tail_read(
573 data: &Bytes,
574 start_pos: u64,
575 scheduler: &FileScheduler,
576 file_len: u64,
577 ) -> Result<Bytes> {
578 let num_bytes_needed = (file_len - start_pos) as usize;
579 if data.len() >= num_bytes_needed {
580 Ok(data.slice((data.len() - num_bytes_needed)..))
581 } else {
582 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
583 let start = file_len - num_bytes_needed as u64;
584 let missing_bytes = scheduler
585 .submit_single(start..start + num_bytes_missing, 0)
586 .await?;
587 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
588 combined.extend(missing_bytes);
589 combined.extend(data);
590 Ok(combined.freeze())
591 }
592 }
593
594 fn do_decode_gbo_table(
595 gbo_bytes: &Bytes,
596 footer: &Footer,
597 version: LanceFileVersion,
598 ) -> Result<Vec<BufferDescriptor>> {
599 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
600
601 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
602 for _ in 0..footer.num_global_buffers {
603 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
604 assert!(
605 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
606 );
607 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
608 global_buffers.push(BufferDescriptor {
609 position: buf_pos,
610 size: buf_size,
611 });
612 }
613
614 Ok(global_buffers)
615 }
616
617 async fn decode_gbo_table(
618 tail_bytes: &Bytes,
619 file_len: u64,
620 scheduler: &FileScheduler,
621 footer: &Footer,
622 version: LanceFileVersion,
623 ) -> Result<Vec<BufferDescriptor>> {
624 let gbo_bytes = Self::optimistic_tail_read(
627 tail_bytes,
628 footer.global_buff_offsets_start,
629 scheduler,
630 file_len,
631 )
632 .await?;
633 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
634 }
635
636 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
637 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
638 let pb_schema = file_descriptor.schema.unwrap();
639 let num_rows = file_descriptor.length;
640 let fields_with_meta = FieldsWithMeta {
641 fields: Fields(pb_schema.fields),
642 metadata: pb_schema.metadata,
643 };
644 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
645 Ok((num_rows, schema))
646 }
647
648 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
662 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
664 let footer = Self::decode_footer(&tail_bytes)?;
665
666 let file_version = LanceFileVersion::try_from_major_minor(
667 footer.major_version as u32,
668 footer.minor_version as u32,
669 )?;
670
671 let gbo_table =
672 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
673 if gbo_table.is_empty() {
674 return Err(Error::internal(
675 "File did not contain any global buffers, schema expected".to_string(),
676 ));
677 }
678 let schema_start = gbo_table[0].position;
679 let schema_size = gbo_table[0].size;
680
681 let num_footer_bytes = file_len - schema_start;
682
683 let all_metadata_bytes =
686 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
687
688 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
689 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
690
691 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
694 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
695 let column_metadata_bytes =
696 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
697 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
698
699 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
700 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
701 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
702
703 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
704
705 Ok(CachedFileMetadata {
706 file_schema: Arc::new(schema),
707 column_metadatas,
708 column_infos,
709 num_rows,
710 num_data_bytes,
711 num_column_metadata_bytes,
712 num_global_buffer_bytes,
713 num_footer_bytes,
714 file_buffers: gbo_table,
715 major_version: footer.major_version,
716 minor_version: footer.minor_version,
717 file_size_bytes: file_len,
718 })
719 }
720
721 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
722 match &encoding.location {
723 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
724 Some(pbfile::encoding::Location::Direct(encoding)) => {
725 let encoding_buf = Bytes::from(encoding.encoding.clone());
726 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
727 encoding_any.to_msg::<M>().unwrap()
728 }
729 Some(pbfile::encoding::Location::None(_)) => panic!(),
730 None => panic!(),
731 }
732 }
733
734 fn meta_to_col_infos(
735 column_metadatas: &[pbfile::ColumnMetadata],
736 file_version: LanceFileVersion,
737 ) -> Vec<Arc<ColumnInfo>> {
738 column_metadatas
739 .iter()
740 .enumerate()
741 .map(|(col_idx, col_meta)| {
742 let page_infos = col_meta
743 .pages
744 .iter()
745 .map(|page| {
746 let num_rows = page.length;
747 let encoding = match file_version {
748 LanceFileVersion::V2_0 => {
749 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
750 page.encoding.as_ref().unwrap(),
751 ))
752 }
753 _ => PageEncoding::Structural(Self::fetch_encoding::<
754 pbenc21::PageLayout,
755 >(
756 page.encoding.as_ref().unwrap()
757 )),
758 };
759 let buffer_offsets_and_sizes = Arc::from(
760 page.buffer_offsets
761 .iter()
762 .zip(page.buffer_sizes.iter())
763 .map(|(offset, size)| {
764 assert!(
766 file_version < LanceFileVersion::V2_1
767 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
768 );
769 (*offset, *size)
770 })
771 .collect::<Vec<_>>(),
772 );
773 PageInfo {
774 buffer_offsets_and_sizes,
775 encoding,
776 num_rows,
777 priority: page.priority,
778 }
779 })
780 .collect::<Vec<_>>();
781 let buffer_offsets_and_sizes = Arc::from(
782 col_meta
783 .buffer_offsets
784 .iter()
785 .zip(col_meta.buffer_sizes.iter())
786 .map(|(offset, size)| (*offset, *size))
787 .collect::<Vec<_>>(),
788 );
789 Arc::new(ColumnInfo {
790 index: col_idx as u32,
791 page_infos: Arc::from(page_infos),
792 buffer_offsets_and_sizes,
793 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
794 })
795 })
796 .collect::<Vec<_>>()
797 }
798
799 fn validate_projection(
800 projection: &ReaderProjection,
801 metadata: &CachedFileMetadata,
802 ) -> Result<()> {
803 if projection.schema.fields.is_empty() {
804 return Err(Error::invalid_input(
805 "Attempt to read zero columns from the file, at least one column must be specified"
806 .to_string(),
807 ));
808 }
809 let mut column_indices_seen = BTreeSet::new();
810 for column_index in &projection.column_indices {
811 if !column_indices_seen.insert(*column_index) {
812 return Err(Error::invalid_input(format!(
813 "The projection specified the column index {} more than once",
814 column_index
815 )));
816 }
817 if *column_index >= metadata.column_infos.len() as u32 {
818 return Err(Error::invalid_input(format!(
819 "The projection specified the column index {} but there are only {} columns in the file",
820 column_index,
821 metadata.column_infos.len()
822 )));
823 }
824 }
825 Ok(())
826 }
827
828 pub async fn try_open(
835 scheduler: FileScheduler,
836 base_projection: Option<ReaderProjection>,
837 decoder_plugins: Arc<DecoderPlugins>,
838 cache: &LanceCache,
839 options: FileReaderOptions,
840 ) -> Result<Self> {
841 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
842 let path = scheduler.reader().path().clone();
843
844 let encodings_io =
846 LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
847
848 Self::try_open_with_file_metadata(
849 Arc::new(encodings_io),
850 path,
851 base_projection,
852 decoder_plugins,
853 file_metadata,
854 cache,
855 options,
856 )
857 .await
858 }
859
860 pub async fn try_open_with_file_metadata(
866 scheduler: Arc<dyn EncodingsIo>,
867 path: Path,
868 base_projection: Option<ReaderProjection>,
869 decoder_plugins: Arc<DecoderPlugins>,
870 file_metadata: Arc<CachedFileMetadata>,
871 cache: &LanceCache,
872 options: FileReaderOptions,
873 ) -> Result<Self> {
874 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
875
876 if let Some(base_projection) = base_projection.as_ref() {
877 Self::validate_projection(base_projection, &file_metadata)?;
878 }
879 let num_rows = file_metadata.num_rows;
880 Ok(Self {
881 scheduler,
882 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
883 file_metadata.file_schema.as_ref(),
884 file_metadata.version(),
885 )),
886 num_rows,
887 metadata: file_metadata,
888 decoder_plugins,
889 cache,
890 options,
891 })
892 }
893
894 fn collect_columns_from_projection(
908 &self,
909 _projection: &ReaderProjection,
910 ) -> Result<Vec<Arc<ColumnInfo>>> {
911 Ok(self.metadata.column_infos.clone())
912 }
913
914 #[allow(clippy::too_many_arguments)]
915 async fn do_read_range(
916 column_infos: Vec<Arc<ColumnInfo>>,
917 io: Arc<dyn EncodingsIo>,
918 cache: Arc<LanceCache>,
919 num_rows: u64,
920 decoder_plugins: Arc<DecoderPlugins>,
921 range: Range<u64>,
922 batch_size: u32,
923 projection: ReaderProjection,
924 filter: FilterExpression,
925 decoder_config: DecoderConfig,
926 batch_size_bytes: Option<u64>,
927 ) -> Result<BoxStream<'static, ReadBatchTask>> {
928 debug!(
929 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
930 range,
931 batch_size,
932 num_rows,
933 column_infos.len(),
934 projection.schema.fields.len(),
935 );
936
937 let config = SchedulerDecoderConfig {
938 batch_size,
939 cache,
940 decoder_plugins,
941 io,
942 decoder_config,
943 batch_size_bytes,
944 };
945
946 let requested_rows = RequestedRows::Ranges(vec![range]);
947
948 schedule_and_decode(
949 column_infos,
950 requested_rows,
951 filter,
952 projection.column_indices,
953 projection.schema,
954 config,
955 )
956 .await
957 }
958
959 async fn read_range(
960 &self,
961 range: Range<u64>,
962 batch_size: u32,
963 projection: ReaderProjection,
964 filter: FilterExpression,
965 ) -> Result<BoxStream<'static, ReadBatchTask>> {
966 Self::do_read_range(
968 self.collect_columns_from_projection(&projection)?,
969 self.scheduler.clone(),
970 self.cache.clone(),
971 self.num_rows,
972 self.decoder_plugins.clone(),
973 range,
974 batch_size,
975 projection,
976 filter,
977 self.options.decoder_config.clone(),
978 self.options.batch_size_bytes,
979 )
980 .await
981 }
982
983 #[allow(clippy::too_many_arguments)]
984 async fn do_take_rows(
985 column_infos: Vec<Arc<ColumnInfo>>,
986 io: Arc<dyn EncodingsIo>,
987 cache: Arc<LanceCache>,
988 decoder_plugins: Arc<DecoderPlugins>,
989 indices: Vec<u64>,
990 batch_size: u32,
991 projection: ReaderProjection,
992 filter: FilterExpression,
993 decoder_config: DecoderConfig,
994 batch_size_bytes: Option<u64>,
995 ) -> Result<BoxStream<'static, ReadBatchTask>> {
996 debug!(
997 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
998 indices.len(),
999 indices[0],
1000 indices[indices.len() - 1],
1001 batch_size,
1002 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1003 );
1004
1005 let config = SchedulerDecoderConfig {
1006 batch_size,
1007 cache,
1008 decoder_plugins,
1009 io,
1010 decoder_config,
1011 batch_size_bytes,
1012 };
1013
1014 let requested_rows = RequestedRows::Indices(indices);
1015
1016 schedule_and_decode(
1017 column_infos,
1018 requested_rows,
1019 filter,
1020 projection.column_indices,
1021 projection.schema,
1022 config,
1023 )
1024 .await
1025 }
1026
1027 async fn take_rows(
1028 &self,
1029 indices: Vec<u64>,
1030 batch_size: u32,
1031 projection: ReaderProjection,
1032 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1033 Self::do_take_rows(
1035 self.collect_columns_from_projection(&projection)?,
1036 self.scheduler.clone(),
1037 self.cache.clone(),
1038 self.decoder_plugins.clone(),
1039 indices,
1040 batch_size,
1041 projection,
1042 FilterExpression::no_filter(),
1043 self.options.decoder_config.clone(),
1044 self.options.batch_size_bytes,
1045 )
1046 .await
1047 }
1048
1049 #[allow(clippy::too_many_arguments)]
1050 async fn do_read_ranges(
1051 column_infos: Vec<Arc<ColumnInfo>>,
1052 io: Arc<dyn EncodingsIo>,
1053 cache: Arc<LanceCache>,
1054 decoder_plugins: Arc<DecoderPlugins>,
1055 ranges: Vec<Range<u64>>,
1056 batch_size: u32,
1057 projection: ReaderProjection,
1058 filter: FilterExpression,
1059 decoder_config: DecoderConfig,
1060 batch_size_bytes: Option<u64>,
1061 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1062 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1063 debug!(
1064 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1065 ranges.len(),
1066 num_rows,
1067 ranges[0].start,
1068 ranges[ranges.len() - 1].end,
1069 batch_size,
1070 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1071 );
1072
1073 let config = SchedulerDecoderConfig {
1074 batch_size,
1075 cache,
1076 decoder_plugins,
1077 io,
1078 decoder_config,
1079 batch_size_bytes,
1080 };
1081
1082 let requested_rows = RequestedRows::Ranges(ranges);
1083
1084 schedule_and_decode(
1085 column_infos,
1086 requested_rows,
1087 filter,
1088 projection.column_indices,
1089 projection.schema,
1090 config,
1091 )
1092 .await
1093 }
1094
1095 async fn read_ranges(
1096 &self,
1097 ranges: Vec<Range<u64>>,
1098 batch_size: u32,
1099 projection: ReaderProjection,
1100 filter: FilterExpression,
1101 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1102 Self::do_read_ranges(
1103 self.collect_columns_from_projection(&projection)?,
1104 self.scheduler.clone(),
1105 self.cache.clone(),
1106 self.decoder_plugins.clone(),
1107 ranges,
1108 batch_size,
1109 projection,
1110 filter,
1111 self.options.decoder_config.clone(),
1112 self.options.batch_size_bytes,
1113 )
1114 .await
1115 }
1116
1117 pub async fn read_tasks(
1141 &self,
1142 params: ReadBatchParams,
1143 batch_size: u32,
1144 projection: Option<ReaderProjection>,
1145 filter: FilterExpression,
1146 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1147 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1148 Self::validate_projection(&projection, &self.metadata)?;
1149 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1150 if bound > self.num_rows || bound == self.num_rows && inclusive {
1151 Err(Error::invalid_input(format!(
1152 "cannot read {:?} from file with {} rows",
1153 params, self.num_rows
1154 )))
1155 } else {
1156 Ok(())
1157 }
1158 };
1159 match ¶ms {
1160 ReadBatchParams::Indices(indices) => {
1161 for idx in indices {
1162 match idx {
1163 None => {
1164 return Err(Error::invalid_input("Null value in indices array"));
1165 }
1166 Some(idx) => {
1167 verify_bound(¶ms, idx as u64, true)?;
1168 }
1169 }
1170 }
1171 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1172 self.take_rows(indices, batch_size, projection).await
1173 }
1174 ReadBatchParams::Range(range) => {
1175 verify_bound(¶ms, range.end as u64, false)?;
1176 self.read_range(
1177 range.start as u64..range.end as u64,
1178 batch_size,
1179 projection,
1180 filter,
1181 )
1182 .await
1183 }
1184 ReadBatchParams::Ranges(ranges) => {
1185 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1186 for range in ranges.as_ref() {
1187 verify_bound(¶ms, range.end, false)?;
1188 ranges_u64.push(range.start..range.end);
1189 }
1190 self.read_ranges(ranges_u64, batch_size, projection, filter)
1191 .await
1192 }
1193 ReadBatchParams::RangeFrom(range) => {
1194 verify_bound(¶ms, range.start as u64, true)?;
1195 self.read_range(
1196 range.start as u64..self.num_rows,
1197 batch_size,
1198 projection,
1199 filter,
1200 )
1201 .await
1202 }
1203 ReadBatchParams::RangeTo(range) => {
1204 verify_bound(¶ms, range.end as u64, false)?;
1205 self.read_range(0..range.end as u64, batch_size, projection, filter)
1206 .await
1207 }
1208 ReadBatchParams::RangeFull => {
1209 self.read_range(0..self.num_rows, batch_size, projection, filter)
1210 .await
1211 }
1212 }
1213 }
1214
1215 pub async fn read_stream_projected(
1245 &self,
1246 params: ReadBatchParams,
1247 batch_size: u32,
1248 batch_readahead: u32,
1249 projection: ReaderProjection,
1250 filter: FilterExpression,
1251 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1252 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1253 let tasks_stream = self
1254 .read_tasks(params, batch_size, Some(projection), filter)
1255 .await?;
1256 let batch_stream = tasks_stream
1257 .map(|task| task.task)
1258 .buffered(batch_readahead as usize)
1259 .boxed();
1260 Ok(Box::pin(RecordBatchStreamAdapter::new(
1261 arrow_schema,
1262 batch_stream,
1263 )))
1264 }
1265
1266 fn take_rows_blocking(
1267 &self,
1268 indices: Vec<u64>,
1269 batch_size: u32,
1270 projection: ReaderProjection,
1271 filter: FilterExpression,
1272 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1273 let column_infos = self.collect_columns_from_projection(&projection)?;
1274 debug!(
1275 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1276 indices.len(),
1277 indices[0],
1278 indices[indices.len() - 1],
1279 batch_size,
1280 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1281 );
1282
1283 let config = SchedulerDecoderConfig {
1284 batch_size,
1285 cache: self.cache.clone(),
1286 decoder_plugins: self.decoder_plugins.clone(),
1287 io: self.scheduler.clone(),
1288 decoder_config: self.options.decoder_config.clone(),
1289 batch_size_bytes: self.options.batch_size_bytes,
1290 };
1291
1292 let requested_rows = RequestedRows::Indices(indices);
1293
1294 schedule_and_decode_blocking(
1295 column_infos,
1296 requested_rows,
1297 filter,
1298 projection.column_indices,
1299 projection.schema,
1300 config,
1301 )
1302 }
1303
1304 fn read_ranges_blocking(
1305 &self,
1306 ranges: Vec<Range<u64>>,
1307 batch_size: u32,
1308 projection: ReaderProjection,
1309 filter: FilterExpression,
1310 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1311 let column_infos = self.collect_columns_from_projection(&projection)?;
1312 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1313 debug!(
1314 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1315 ranges.len(),
1316 num_rows,
1317 ranges[0].start,
1318 ranges[ranges.len() - 1].end,
1319 batch_size,
1320 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1321 );
1322
1323 let config = SchedulerDecoderConfig {
1324 batch_size,
1325 cache: self.cache.clone(),
1326 decoder_plugins: self.decoder_plugins.clone(),
1327 io: self.scheduler.clone(),
1328 decoder_config: self.options.decoder_config.clone(),
1329 batch_size_bytes: self.options.batch_size_bytes,
1330 };
1331
1332 let requested_rows = RequestedRows::Ranges(ranges);
1333
1334 schedule_and_decode_blocking(
1335 column_infos,
1336 requested_rows,
1337 filter,
1338 projection.column_indices,
1339 projection.schema,
1340 config,
1341 )
1342 }
1343
1344 fn read_range_blocking(
1345 &self,
1346 range: Range<u64>,
1347 batch_size: u32,
1348 projection: ReaderProjection,
1349 filter: FilterExpression,
1350 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1351 let column_infos = self.collect_columns_from_projection(&projection)?;
1352 let num_rows = self.num_rows;
1353
1354 debug!(
1355 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1356 range,
1357 batch_size,
1358 num_rows,
1359 column_infos.len(),
1360 projection.schema.fields.len(),
1361 );
1362
1363 let config = SchedulerDecoderConfig {
1364 batch_size,
1365 cache: self.cache.clone(),
1366 decoder_plugins: self.decoder_plugins.clone(),
1367 io: self.scheduler.clone(),
1368 decoder_config: self.options.decoder_config.clone(),
1369 batch_size_bytes: self.options.batch_size_bytes,
1370 };
1371
1372 let requested_rows = RequestedRows::Ranges(vec![range]);
1373
1374 schedule_and_decode_blocking(
1375 column_infos,
1376 requested_rows,
1377 filter,
1378 projection.column_indices,
1379 projection.schema,
1380 config,
1381 )
1382 }
1383
1384 pub fn read_stream_projected_blocking(
1396 &self,
1397 params: ReadBatchParams,
1398 batch_size: u32,
1399 projection: Option<ReaderProjection>,
1400 filter: FilterExpression,
1401 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1402 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1403 Self::validate_projection(&projection, &self.metadata)?;
1404 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1405 if bound > self.num_rows || bound == self.num_rows && inclusive {
1406 Err(Error::invalid_input(format!(
1407 "cannot read {:?} from file with {} rows",
1408 params, self.num_rows
1409 )))
1410 } else {
1411 Ok(())
1412 }
1413 };
1414 match ¶ms {
1415 ReadBatchParams::Indices(indices) => {
1416 for idx in indices {
1417 match idx {
1418 None => {
1419 return Err(Error::invalid_input("Null value in indices array"));
1420 }
1421 Some(idx) => {
1422 verify_bound(¶ms, idx as u64, true)?;
1423 }
1424 }
1425 }
1426 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1427 self.take_rows_blocking(indices, batch_size, projection, filter)
1428 }
1429 ReadBatchParams::Range(range) => {
1430 verify_bound(¶ms, range.end as u64, false)?;
1431 self.read_range_blocking(
1432 range.start as u64..range.end as u64,
1433 batch_size,
1434 projection,
1435 filter,
1436 )
1437 }
1438 ReadBatchParams::Ranges(ranges) => {
1439 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1440 for range in ranges.as_ref() {
1441 verify_bound(¶ms, range.end, false)?;
1442 ranges_u64.push(range.start..range.end);
1443 }
1444 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1445 }
1446 ReadBatchParams::RangeFrom(range) => {
1447 verify_bound(¶ms, range.start as u64, true)?;
1448 self.read_range_blocking(
1449 range.start as u64..self.num_rows,
1450 batch_size,
1451 projection,
1452 filter,
1453 )
1454 }
1455 ReadBatchParams::RangeTo(range) => {
1456 verify_bound(¶ms, range.end as u64, false)?;
1457 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1458 }
1459 ReadBatchParams::RangeFull => {
1460 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1461 }
1462 }
1463 }
1464
1465 pub async fn read_stream(
1477 &self,
1478 params: ReadBatchParams,
1479 batch_size: u32,
1480 batch_readahead: u32,
1481 filter: FilterExpression,
1482 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1483 self.read_stream_projected(
1484 params,
1485 batch_size,
1486 batch_readahead,
1487 self.base_projection.clone(),
1488 filter,
1489 )
1490 .await
1491 }
1492
1493 pub fn schema(&self) -> &Arc<Schema> {
1494 &self.metadata.file_schema
1495 }
1496}
1497
1498pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1500 if let Some(encoding) = &page.encoding {
1501 if let Some(style) = &encoding.location {
1502 match style {
1503 pbfile::encoding::Location::Indirect(indirect) => {
1504 format!(
1505 "IndirectEncoding(pos={},size={})",
1506 indirect.buffer_location, indirect.buffer_length
1507 )
1508 }
1509 pbfile::encoding::Location::Direct(direct) => {
1510 let encoding_any =
1511 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1512 .expect("failed to deserialize encoding as protobuf");
1513 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1514 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1515 match encoding {
1516 Ok(encoding) => {
1517 format!("{:#?}", encoding)
1518 }
1519 Err(err) => {
1520 format!("Unsupported(decode_err={})", err)
1521 }
1522 }
1523 } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1524 let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1525 match encoding {
1526 Ok(encoding) => {
1527 format!("{:#?}", encoding)
1528 }
1529 Err(err) => {
1530 format!("Unsupported(decode_err={})", err)
1531 }
1532 }
1533 } else {
1534 format!("Unrecognized(type_url={})", encoding_any.type_url)
1535 }
1536 }
1537 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1538 }
1539 } else {
1540 "MISSING STYLE".to_string()
1541 }
1542 } else {
1543 "MISSING".to_string()
1544 }
1545}
1546
1547pub trait EncodedBatchReaderExt {
1548 fn try_from_mini_lance(
1549 bytes: Bytes,
1550 schema: &Schema,
1551 version: LanceFileVersion,
1552 ) -> Result<Self>
1553 where
1554 Self: Sized;
1555 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1556 where
1557 Self: Sized;
1558}
1559
1560impl EncodedBatchReaderExt for EncodedBatch {
1561 fn try_from_mini_lance(
1562 bytes: Bytes,
1563 schema: &Schema,
1564 file_version: LanceFileVersion,
1565 ) -> Result<Self>
1566 where
1567 Self: Sized,
1568 {
1569 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1570 let footer = FileReader::decode_footer(&bytes)?;
1571
1572 let column_metadata_start = footer.column_meta_start as usize;
1575 let column_metadata_end = footer.global_buff_offsets_start as usize;
1576 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1577 let column_metadatas =
1578 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1579
1580 let file_version = LanceFileVersion::try_from_major_minor(
1581 footer.major_version as u32,
1582 footer.minor_version as u32,
1583 )?;
1584
1585 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1586
1587 Ok(Self {
1588 data: bytes,
1589 num_rows: page_table
1590 .first()
1591 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1592 .unwrap_or(0),
1593 page_table,
1594 top_level_columns: projection.column_indices,
1595 schema: Arc::new(schema.clone()),
1596 })
1597 }
1598
1599 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1600 where
1601 Self: Sized,
1602 {
1603 let footer = FileReader::decode_footer(&bytes)?;
1604 let file_version = LanceFileVersion::try_from_major_minor(
1605 footer.major_version as u32,
1606 footer.minor_version as u32,
1607 )?;
1608
1609 let gbo_table = FileReader::do_decode_gbo_table(
1610 &bytes.slice(footer.global_buff_offsets_start as usize..),
1611 &footer,
1612 file_version,
1613 )?;
1614 if gbo_table.is_empty() {
1615 return Err(Error::internal(
1616 "File did not contain any global buffers, schema expected".to_string(),
1617 ));
1618 }
1619 let schema_start = gbo_table[0].position as usize;
1620 let schema_size = gbo_table[0].size as usize;
1621
1622 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1623 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1624 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1625
1626 let column_metadata_start = footer.column_meta_start as usize;
1629 let column_metadata_end = footer.global_buff_offsets_start as usize;
1630 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1631 let column_metadatas =
1632 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1633
1634 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1635
1636 Ok(Self {
1637 data: bytes,
1638 num_rows: page_table
1639 .first()
1640 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1641 .unwrap_or(0),
1642 page_table,
1643 top_level_columns: projection.column_indices,
1644 schema: Arc::new(schema),
1645 })
1646 }
1647}
1648
1649#[cfg(test)]
1650mod tests {
1651 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1652
1653 use arrow_array::{
1654 RecordBatch, UInt32Array,
1655 types::{Float64Type, Int32Type},
1656 };
1657 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1658 use bytes::Bytes;
1659 use futures::{StreamExt, prelude::stream::TryStreamExt};
1660 use lance_arrow::RecordBatchExt;
1661 use lance_core::{ArrowResult, datatypes::Schema};
1662 use lance_datagen::{BatchCount, ByteCount, RowCount, array, gen_batch};
1663 use lance_encoding::{
1664 decoder::{DecodeBatchScheduler, DecoderPlugins, FilterExpression, decode_batch},
1665 encoder::{EncodedBatch, EncodingOptions, default_encoding_strategy, encode_batch},
1666 version::LanceFileVersion,
1667 };
1668 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1669 use log::debug;
1670 use rstest::rstest;
1671 use tokio::sync::mpsc;
1672
1673 use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1674 use crate::testing::{FsFixture, WrittenFile, test_cache, write_lance_file};
1675 use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1676 use lance_encoding::decoder::DecoderConfig;
1677
1678 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1679 let location_type = DataType::Struct(Fields::from(vec![
1680 Field::new("x", DataType::Float64, true),
1681 Field::new("y", DataType::Float64, true),
1682 ]));
1683 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1684
1685 let mut reader = gen_batch()
1686 .col("score", array::rand::<Float64Type>())
1687 .col("location", array::rand_type(&location_type))
1688 .col("categories", array::rand_type(&categories_type))
1689 .col("binary", array::rand_type(&DataType::Binary));
1690 if version <= LanceFileVersion::V2_0 {
1691 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1692 }
1693 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1694
1695 write_lance_file(
1696 reader,
1697 fs,
1698 FileWriterOptions {
1699 format_version: Some(version),
1700 ..Default::default()
1701 },
1702 )
1703 .await
1704 }
1705
1706 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1707
1708 async fn verify_expected(
1709 expected: &[RecordBatch],
1710 mut actual: Pin<Box<dyn RecordBatchStream>>,
1711 read_size: u32,
1712 transform: Option<Transformer>,
1713 ) {
1714 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1715 let mut expected_iter = expected.iter().map(|batch| {
1716 if let Some(transform) = &transform {
1717 transform(batch)
1718 } else {
1719 batch.clone()
1720 }
1721 });
1722 let mut next_expected = expected_iter.next().unwrap().clone();
1723 while let Some(actual) = actual.next().await {
1724 let mut actual = actual.unwrap();
1725 let mut rows_to_verify = actual.num_rows() as u32;
1726 let expected_length = remaining.min(read_size);
1727 assert_eq!(expected_length, rows_to_verify);
1728
1729 while rows_to_verify > 0 {
1730 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1731 assert_eq!(
1732 next_expected.slice(0, next_slice_len as usize),
1733 actual.slice(0, next_slice_len as usize)
1734 );
1735 remaining -= next_slice_len;
1736 rows_to_verify -= next_slice_len;
1737 if remaining > 0 {
1738 if next_slice_len == next_expected.num_rows() as u32 {
1739 next_expected = expected_iter.next().unwrap().clone();
1740 } else {
1741 next_expected = next_expected.slice(
1742 next_slice_len as usize,
1743 next_expected.num_rows() - next_slice_len as usize,
1744 );
1745 }
1746 }
1747 if rows_to_verify > 0 {
1748 actual = actual.slice(
1749 next_slice_len as usize,
1750 actual.num_rows() - next_slice_len as usize,
1751 );
1752 }
1753 }
1754 }
1755 assert_eq!(remaining, 0);
1756 }
1757
1758 #[tokio::test]
1759 async fn test_round_trip() {
1760 let fs = FsFixture::default();
1761
1762 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1763
1764 for read_size in [32, 1024, 1024 * 1024] {
1765 let file_scheduler = fs
1766 .scheduler
1767 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1768 .await
1769 .unwrap();
1770 let file_reader = FileReader::try_open(
1771 file_scheduler,
1772 None,
1773 Arc::<DecoderPlugins>::default(),
1774 &test_cache(),
1775 FileReaderOptions::default(),
1776 )
1777 .await
1778 .unwrap();
1779
1780 let schema = file_reader.schema();
1781 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1782
1783 let batch_stream = file_reader
1784 .read_stream(
1785 lance_io::ReadBatchParams::RangeFull,
1786 read_size,
1787 16,
1788 FilterExpression::no_filter(),
1789 )
1790 .await
1791 .unwrap();
1792
1793 verify_expected(&data, batch_stream, read_size, None).await;
1794 }
1795 }
1796
1797 #[rstest]
1798 #[test_log::test(tokio::test)]
1799 async fn test_encoded_batch_round_trip(
1800 #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1802 ) {
1803 let data = gen_batch()
1804 .col("x", array::rand::<Int32Type>())
1805 .col("y", array::rand_utf8(ByteCount::from(16), false))
1806 .into_batch_rows(RowCount::from(10000))
1807 .unwrap();
1808
1809 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1810
1811 let encoding_options = EncodingOptions {
1812 cache_bytes_per_column: 4096,
1813 max_page_bytes: 32 * 1024 * 1024,
1814 keep_original_array: true,
1815 buffer_alignment: 64,
1816 version,
1817 };
1818
1819 let encoding_strategy = default_encoding_strategy(version);
1820
1821 let encoded_batch = encode_batch(
1822 &data,
1823 lance_schema.clone(),
1824 encoding_strategy.as_ref(),
1825 &encoding_options,
1826 )
1827 .await
1828 .unwrap();
1829
1830 let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1832
1833 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1834
1835 let decoded = decode_batch(
1836 &decoded_batch,
1837 &FilterExpression::no_filter(),
1838 Arc::<DecoderPlugins>::default(),
1839 false,
1840 version,
1841 None,
1842 )
1843 .await
1844 .unwrap();
1845
1846 assert_eq!(data, decoded);
1847
1848 let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1850 let decoded_batch =
1851 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1852 .unwrap();
1853 let decoded = decode_batch(
1854 &decoded_batch,
1855 &FilterExpression::no_filter(),
1856 Arc::<DecoderPlugins>::default(),
1857 false,
1858 version,
1859 None,
1860 )
1861 .await
1862 .unwrap();
1863
1864 assert_eq!(data, decoded);
1865 }
1866
1867 #[rstest]
1868 #[test_log::test(tokio::test)]
1869 async fn test_projection(
1870 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
1871 version: LanceFileVersion,
1872 ) {
1873 let fs = FsFixture::default();
1874
1875 let written_file = create_some_file(&fs, version).await;
1876 let file_scheduler = fs
1877 .scheduler
1878 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1879 .await
1880 .unwrap();
1881
1882 let field_id_mapping = written_file
1883 .field_id_mapping
1884 .iter()
1885 .copied()
1886 .collect::<BTreeMap<_, _>>();
1887
1888 let empty_projection = ReaderProjection {
1889 column_indices: Vec::default(),
1890 schema: Arc::new(Schema::default()),
1891 };
1892
1893 for columns in [
1894 vec!["score"],
1895 vec!["location"],
1896 vec!["categories"],
1897 vec!["score.x"],
1898 vec!["score", "categories"],
1899 vec!["score", "location"],
1900 vec!["location", "categories"],
1901 vec!["score.y", "location", "categories"],
1902 ] {
1903 debug!("Testing round trip with projection {:?}", columns);
1904 for use_field_ids in [true, false] {
1905 let file_reader = FileReader::try_open(
1907 file_scheduler.clone(),
1908 None,
1909 Arc::<DecoderPlugins>::default(),
1910 &test_cache(),
1911 FileReaderOptions::default(),
1912 )
1913 .await
1914 .unwrap();
1915
1916 let projected_schema = written_file.schema.project(&columns).unwrap();
1917 let projection = if use_field_ids {
1918 ReaderProjection::from_field_ids(
1919 file_reader.metadata.version(),
1920 &projected_schema,
1921 &field_id_mapping,
1922 )
1923 .unwrap()
1924 } else {
1925 ReaderProjection::from_column_names(
1926 file_reader.metadata.version(),
1927 &written_file.schema,
1928 &columns,
1929 )
1930 .unwrap()
1931 };
1932
1933 let batch_stream = file_reader
1934 .read_stream_projected(
1935 lance_io::ReadBatchParams::RangeFull,
1936 1024,
1937 16,
1938 projection.clone(),
1939 FilterExpression::no_filter(),
1940 )
1941 .await
1942 .unwrap();
1943
1944 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1945 verify_expected(
1946 &written_file.data,
1947 batch_stream,
1948 1024,
1949 Some(Box::new(move |batch: &RecordBatch| {
1950 batch.project_by_schema(&projection_arrow).unwrap()
1951 })),
1952 )
1953 .await;
1954
1955 let file_reader = FileReader::try_open(
1957 file_scheduler.clone(),
1958 Some(projection.clone()),
1959 Arc::<DecoderPlugins>::default(),
1960 &test_cache(),
1961 FileReaderOptions::default(),
1962 )
1963 .await
1964 .unwrap();
1965
1966 let batch_stream = file_reader
1967 .read_stream(
1968 lance_io::ReadBatchParams::RangeFull,
1969 1024,
1970 16,
1971 FilterExpression::no_filter(),
1972 )
1973 .await
1974 .unwrap();
1975
1976 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1977 verify_expected(
1978 &written_file.data,
1979 batch_stream,
1980 1024,
1981 Some(Box::new(move |batch: &RecordBatch| {
1982 batch.project_by_schema(&projection_arrow).unwrap()
1983 })),
1984 )
1985 .await;
1986
1987 assert!(
1988 file_reader
1989 .read_stream_projected(
1990 lance_io::ReadBatchParams::RangeFull,
1991 1024,
1992 16,
1993 empty_projection.clone(),
1994 FilterExpression::no_filter(),
1995 )
1996 .await
1997 .is_err()
1998 );
1999 }
2000 }
2001
2002 assert!(
2003 FileReader::try_open(
2004 file_scheduler.clone(),
2005 Some(empty_projection),
2006 Arc::<DecoderPlugins>::default(),
2007 &test_cache(),
2008 FileReaderOptions::default(),
2009 )
2010 .await
2011 .is_err()
2012 );
2013
2014 let arrow_schema = ArrowSchema::new(vec![
2015 Field::new("x", DataType::Int32, true),
2016 Field::new("y", DataType::Int32, true),
2017 ]);
2018 let schema = Schema::try_from(&arrow_schema).unwrap();
2019
2020 let projection_with_dupes = ReaderProjection {
2021 column_indices: vec![0, 0],
2022 schema: Arc::new(schema),
2023 };
2024
2025 assert!(
2026 FileReader::try_open(
2027 file_scheduler.clone(),
2028 Some(projection_with_dupes),
2029 Arc::<DecoderPlugins>::default(),
2030 &test_cache(),
2031 FileReaderOptions::default(),
2032 )
2033 .await
2034 .is_err()
2035 );
2036 }
2037
2038 #[test_log::test(tokio::test)]
2039 async fn test_compressing_buffer() {
2040 let fs = FsFixture::default();
2041
2042 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2043 let file_scheduler = fs
2044 .scheduler
2045 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2046 .await
2047 .unwrap();
2048
2049 let file_reader = FileReader::try_open(
2051 file_scheduler.clone(),
2052 None,
2053 Arc::<DecoderPlugins>::default(),
2054 &test_cache(),
2055 FileReaderOptions::default(),
2056 )
2057 .await
2058 .unwrap();
2059
2060 let mut projection = written_file.schema.project(&["score"]).unwrap();
2061 for field in projection.fields.iter_mut() {
2062 field
2063 .metadata
2064 .insert("lance:compression".to_string(), "zstd".to_string());
2065 }
2066 let projection = ReaderProjection {
2067 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
2068 schema: Arc::new(projection),
2069 };
2070
2071 let batch_stream = file_reader
2072 .read_stream_projected(
2073 lance_io::ReadBatchParams::RangeFull,
2074 1024,
2075 16,
2076 projection.clone(),
2077 FilterExpression::no_filter(),
2078 )
2079 .await
2080 .unwrap();
2081
2082 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
2083 verify_expected(
2084 &written_file.data,
2085 batch_stream,
2086 1024,
2087 Some(Box::new(move |batch: &RecordBatch| {
2088 batch.project_by_schema(&projection_arrow).unwrap()
2089 })),
2090 )
2091 .await;
2092 }
2093
2094 #[tokio::test]
2095 async fn test_read_all() {
2096 let fs = FsFixture::default();
2097 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2098 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2099
2100 let file_scheduler = fs
2101 .scheduler
2102 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2103 .await
2104 .unwrap();
2105 let file_reader = FileReader::try_open(
2106 file_scheduler.clone(),
2107 None,
2108 Arc::<DecoderPlugins>::default(),
2109 &test_cache(),
2110 FileReaderOptions::default(),
2111 )
2112 .await
2113 .unwrap();
2114
2115 let batches = file_reader
2116 .read_stream(
2117 lance_io::ReadBatchParams::RangeFull,
2118 total_rows as u32,
2119 16,
2120 FilterExpression::no_filter(),
2121 )
2122 .await
2123 .unwrap()
2124 .try_collect::<Vec<_>>()
2125 .await
2126 .unwrap();
2127 assert_eq!(batches.len(), 1);
2128 assert_eq!(batches[0].num_rows(), total_rows);
2129 }
2130
2131 #[rstest]
2132 #[tokio::test]
2133 async fn test_blocking_take(
2134 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
2135 version: LanceFileVersion,
2136 ) {
2137 let fs = FsFixture::default();
2138 let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2139 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2140
2141 let file_scheduler = fs
2142 .scheduler
2143 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2144 .await
2145 .unwrap();
2146 let file_reader = FileReader::try_open(
2147 file_scheduler.clone(),
2148 Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2149 Arc::<DecoderPlugins>::default(),
2150 &test_cache(),
2151 FileReaderOptions::default(),
2152 )
2153 .await
2154 .unwrap();
2155
2156 let batches = tokio::task::spawn_blocking(move || {
2157 file_reader
2158 .read_stream_projected_blocking(
2159 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2160 total_rows as u32,
2161 None,
2162 FilterExpression::no_filter(),
2163 )
2164 .unwrap()
2165 .collect::<ArrowResult<Vec<_>>>()
2166 .unwrap()
2167 })
2168 .await
2169 .unwrap();
2170
2171 assert_eq!(batches.len(), 1);
2172 assert_eq!(batches[0].num_rows(), 5);
2173 assert_eq!(batches[0].num_columns(), 1);
2174 }
2175
2176 #[tokio::test(flavor = "multi_thread")]
2177 async fn test_drop_in_progress() {
2178 let fs = FsFixture::default();
2179 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2180 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2181
2182 let file_scheduler = fs
2183 .scheduler
2184 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2185 .await
2186 .unwrap();
2187 let file_reader = FileReader::try_open(
2188 file_scheduler.clone(),
2189 None,
2190 Arc::<DecoderPlugins>::default(),
2191 &test_cache(),
2192 FileReaderOptions::default(),
2193 )
2194 .await
2195 .unwrap();
2196
2197 let mut batches = file_reader
2198 .read_stream(
2199 lance_io::ReadBatchParams::RangeFull,
2200 (total_rows / 10) as u32,
2201 16,
2202 FilterExpression::no_filter(),
2203 )
2204 .await
2205 .unwrap();
2206
2207 drop(file_reader);
2208
2209 let batch = batches.next().await.unwrap().unwrap();
2210 assert!(batch.num_rows() > 0);
2211
2212 drop(batches);
2214 }
2215
2216 #[tokio::test]
2217 async fn drop_while_scheduling() {
2218 let fs = FsFixture::default();
2228 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2229 let total_rows = written_file
2230 .data
2231 .iter()
2232 .map(|batch| batch.num_rows())
2233 .sum::<usize>();
2234
2235 let file_scheduler = fs
2236 .scheduler
2237 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2238 .await
2239 .unwrap();
2240 let file_reader = FileReader::try_open(
2241 file_scheduler.clone(),
2242 None,
2243 Arc::<DecoderPlugins>::default(),
2244 &test_cache(),
2245 FileReaderOptions::default(),
2246 )
2247 .await
2248 .unwrap();
2249
2250 let projection =
2251 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2252 let column_infos = file_reader
2253 .collect_columns_from_projection(&projection)
2254 .unwrap();
2255 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2256 &projection.schema,
2257 &projection.column_indices,
2258 &column_infos,
2259 &vec![],
2260 total_rows as u64,
2261 Arc::<DecoderPlugins>::default(),
2262 file_reader.scheduler.clone(),
2263 test_cache(),
2264 &FilterExpression::no_filter(),
2265 &DecoderConfig::default(),
2266 )
2267 .await
2268 .unwrap();
2269
2270 let range = 0..total_rows as u64;
2271
2272 let (tx, rx) = mpsc::unbounded_channel();
2273
2274 drop(rx);
2276
2277 decode_scheduler.schedule_range(
2279 range,
2280 &FilterExpression::no_filter(),
2281 tx,
2282 file_reader.scheduler.clone(),
2283 )
2284 }
2285
2286 #[tokio::test]
2287 async fn test_read_empty_range() {
2288 let fs = FsFixture::default();
2289 create_some_file(&fs, LanceFileVersion::V2_0).await;
2290
2291 let file_scheduler = fs
2292 .scheduler
2293 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2294 .await
2295 .unwrap();
2296 let file_reader = FileReader::try_open(
2297 file_scheduler.clone(),
2298 None,
2299 Arc::<DecoderPlugins>::default(),
2300 &test_cache(),
2301 FileReaderOptions::default(),
2302 )
2303 .await
2304 .unwrap();
2305
2306 let batches = file_reader
2308 .read_stream(
2309 lance_io::ReadBatchParams::Range(0..0),
2310 1024,
2311 16,
2312 FilterExpression::no_filter(),
2313 )
2314 .await
2315 .unwrap()
2316 .try_collect::<Vec<_>>()
2317 .await
2318 .unwrap();
2319
2320 assert_eq!(batches.len(), 0);
2321
2322 let batches = file_reader
2324 .read_stream(
2325 lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2326 1024,
2327 16,
2328 FilterExpression::no_filter(),
2329 )
2330 .await
2331 .unwrap()
2332 .try_collect::<Vec<_>>()
2333 .await
2334 .unwrap();
2335 assert_eq!(batches.len(), 1);
2336 }
2337
2338 #[tokio::test]
2339 async fn test_global_buffers() {
2340 let fs = FsFixture::default();
2341
2342 let lance_schema =
2343 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2344 "foo",
2345 DataType::Int32,
2346 true,
2347 )]))
2348 .unwrap();
2349
2350 let mut file_writer = FileWriter::try_new(
2351 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2352 lance_schema.clone(),
2353 FileWriterOptions::default(),
2354 )
2355 .unwrap();
2356
2357 let test_bytes = Bytes::from_static(b"hello");
2358
2359 let buf_index = file_writer
2360 .add_global_buffer(test_bytes.clone())
2361 .await
2362 .unwrap();
2363
2364 assert_eq!(buf_index, 1);
2365
2366 file_writer.finish().await.unwrap();
2367
2368 let file_scheduler = fs
2369 .scheduler
2370 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2371 .await
2372 .unwrap();
2373 let file_reader = FileReader::try_open(
2374 file_scheduler.clone(),
2375 None,
2376 Arc::<DecoderPlugins>::default(),
2377 &test_cache(),
2378 FileReaderOptions::default(),
2379 )
2380 .await
2381 .unwrap();
2382
2383 let buf = file_reader.read_global_buffer(1).await.unwrap();
2384 assert_eq!(buf, test_bytes);
2385 }
2386
2387 #[rstest]
2388 #[tokio::test]
2389 async fn test_deep_size_of_includes_column_metadata(
2390 #[values(
2391 LanceFileVersion::V2_0,
2392 LanceFileVersion::V2_1,
2393 LanceFileVersion::V2_2,
2394 LanceFileVersion::V2_3
2395 )]
2396 version: LanceFileVersion,
2397 ) {
2398 use deepsize::DeepSizeOf;
2403
2404 let fs = FsFixture::default();
2405 let _written = create_some_file(&fs, version).await;
2406 let cache = test_cache();
2407 let file_scheduler = fs
2408 .scheduler
2409 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2410 .await
2411 .unwrap();
2412 let file_reader = FileReader::try_open(
2413 file_scheduler,
2414 None,
2415 Arc::<DecoderPlugins>::default(),
2416 &cache,
2417 FileReaderOptions::default(),
2418 )
2419 .await
2420 .unwrap();
2421
2422 let metadata = file_reader.metadata();
2423 let deep_size = metadata.deep_size_of();
2424
2425 assert!(
2431 deep_size > 1024,
2432 "deep_size_of ({deep_size}) is suspiciously small — \
2433 column_metadatas and column_infos may not be accounted for"
2434 );
2435
2436 assert!(
2438 !metadata.column_metadatas.is_empty(),
2439 "Expected non-empty column_metadatas"
2440 );
2441
2442 let num_columns = metadata.column_metadatas.len();
2445 assert!(
2446 deep_size > num_columns * 50,
2447 "deep_size_of ({deep_size}) should scale with column count ({num_columns})"
2448 );
2449 }
2450}