1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19 decoder::{
20 schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderPlugins,
21 FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22 SchedulerDecoderConfig,
23 },
24 encoder::EncodedBatch,
25 version::LanceFileVersion,
26 EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36 Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_io::{
40 scheduler::FileScheduler,
41 stream::{RecordBatchStream, RecordBatchStreamAdapter},
42 ReadBatchParams,
43};
44
45use crate::{
46 datatypes::{Fields, FieldsWithMeta},
47 format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
48 v2::writer::PAGE_BUFFER_ALIGNMENT,
49};
50
51use super::io::LanceEncodingsIo;
52
53#[derive(Debug, DeepSizeOf)]
58pub struct BufferDescriptor {
59 pub position: u64,
60 pub size: u64,
61}
62
63#[derive(Debug)]
65pub struct FileStatistics {
66 pub columns: Vec<ColumnStatistics>,
68}
69
70#[derive(Debug)]
72pub struct ColumnStatistics {
73 pub num_pages: usize,
75 pub size_bytes: u64,
79}
80
81#[derive(Debug)]
83pub struct CachedFileMetadata {
84 pub file_schema: Arc<Schema>,
86 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
88 pub column_infos: Vec<Arc<ColumnInfo>>,
89 pub num_rows: u64,
91 pub file_buffers: Vec<BufferDescriptor>,
92 pub num_data_bytes: u64,
94 pub num_column_metadata_bytes: u64,
97 pub num_global_buffer_bytes: u64,
99 pub num_footer_bytes: u64,
101 pub major_version: u16,
102 pub minor_version: u16,
103}
104
105impl DeepSizeOf for CachedFileMetadata {
106 fn deep_size_of_children(&self, context: &mut Context) -> usize {
108 self.file_schema.deep_size_of_children(context)
109 + self
110 .file_buffers
111 .iter()
112 .map(|file_buffer| file_buffer.deep_size_of_children(context))
113 .sum::<usize>()
114 }
115}
116
117impl CachedFileMetadata {
118 pub fn version(&self) -> LanceFileVersion {
119 match (self.major_version, self.minor_version) {
120 (0, 3) => LanceFileVersion::V2_0,
121 (2, 1) => LanceFileVersion::V2_1,
122 _ => panic!(
123 "Unsupported version: {}.{}",
124 self.major_version, self.minor_version
125 ),
126 }
127 }
128}
129
130#[derive(Debug, Clone)]
152pub struct ReaderProjection {
153 pub schema: Arc<Schema>,
156 pub column_indices: Vec<u32>,
196}
197
198impl ReaderProjection {
199 fn from_field_ids_helper<'a>(
200 file_version: LanceFileVersion,
201 fields: impl Iterator<Item = &'a Field>,
202 field_id_to_column_index: &BTreeMap<u32, u32>,
203 column_indices: &mut Vec<u32>,
204 ) -> Result<()> {
205 for field in fields {
206 let is_structural = file_version >= LanceFileVersion::V2_1;
207 if !is_structural || field.children.is_empty() {
210 if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
211 {
212 column_indices.push(column_idx);
213 }
214 }
215 Self::from_field_ids_helper(
216 file_version,
217 field.children.iter(),
218 field_id_to_column_index,
219 column_indices,
220 )?;
221 }
222 Ok(())
223 }
224
225 pub fn from_field_ids(
230 file_version: LanceFileVersion,
231 schema: &Schema,
232 field_id_to_column_index: &BTreeMap<u32, u32>,
233 ) -> Result<Self> {
234 let mut column_indices = Vec::new();
235 Self::from_field_ids_helper(
236 file_version,
237 schema.fields.iter(),
238 field_id_to_column_index,
239 &mut column_indices,
240 )?;
241 Ok(Self {
242 schema: Arc::new(schema.clone()),
243 column_indices,
244 })
245 }
246
247 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
255 let schema = Arc::new(schema.clone());
256 let is_structural = version >= LanceFileVersion::V2_1;
257 let mut column_indices = vec![];
258 let mut curr_column_idx = 0;
259 let mut packed_struct_fields_num = 0;
260 for field in schema.fields_pre_order() {
261 if packed_struct_fields_num > 0 {
262 packed_struct_fields_num -= 1;
263 continue;
264 }
265 if field.is_packed_struct() {
266 column_indices.push(curr_column_idx);
267 curr_column_idx += 1;
268 packed_struct_fields_num = field.children.len();
269 } else if field.children.is_empty() || !is_structural {
270 column_indices.push(curr_column_idx);
271 curr_column_idx += 1;
272 }
273 }
274 Self {
275 schema,
276 column_indices,
277 }
278 }
279
280 pub fn from_column_names(
287 file_version: LanceFileVersion,
288 schema: &Schema,
289 column_names: &[&str],
290 ) -> Result<Self> {
291 let field_id_to_column_index = schema
292 .fields_pre_order()
293 .filter(|field| file_version < LanceFileVersion::V2_1 || field.is_leaf())
296 .enumerate()
297 .map(|(idx, field)| (field.id as u32, idx as u32))
298 .collect::<BTreeMap<_, _>>();
299 let projected = schema.project(column_names)?;
300 let mut column_indices = Vec::new();
301 Self::from_field_ids_helper(
302 file_version,
303 projected.fields.iter(),
304 &field_id_to_column_index,
305 &mut column_indices,
306 )?;
307 Ok(Self {
308 schema: Arc::new(projected),
309 column_indices,
310 })
311 }
312}
313
314#[derive(Clone, Debug, Default)]
315pub struct FileReaderOptions {
316 validate_on_decode: bool,
317}
318
319#[derive(Debug)]
320pub struct FileReader {
321 scheduler: Arc<dyn EncodingsIo>,
322 base_projection: ReaderProjection,
324 num_rows: u64,
325 metadata: Arc<CachedFileMetadata>,
326 decoder_plugins: Arc<DecoderPlugins>,
327 cache: Arc<LanceCache>,
328 options: FileReaderOptions,
329}
330#[derive(Debug)]
331struct Footer {
332 #[allow(dead_code)]
333 column_meta_start: u64,
334 #[allow(dead_code)]
337 column_meta_offsets_start: u64,
338 global_buff_offsets_start: u64,
339 num_global_buffers: u32,
340 num_columns: u32,
341 major_version: u16,
342 minor_version: u16,
343}
344
345const FOOTER_LEN: usize = 40;
346
347impl FileReader {
348 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
349 Self {
350 scheduler,
351 base_projection: self.base_projection.clone(),
352 cache: self.cache.clone(),
353 decoder_plugins: self.decoder_plugins.clone(),
354 metadata: self.metadata.clone(),
355 options: self.options.clone(),
356 num_rows: self.num_rows,
357 }
358 }
359
360 pub fn num_rows(&self) -> u64 {
361 self.num_rows
362 }
363
364 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
365 &self.metadata
366 }
367
368 pub fn file_statistics(&self) -> FileStatistics {
369 let column_metadatas = &self.metadata().column_metadatas;
370
371 let column_stats = column_metadatas
372 .iter()
373 .map(|col_metadata| {
374 let num_pages = col_metadata.pages.len();
375 let size_bytes = col_metadata
376 .pages
377 .iter()
378 .map(|page| page.buffer_sizes.iter().sum::<u64>())
379 .sum::<u64>();
380 ColumnStatistics {
381 num_pages,
382 size_bytes,
383 }
384 })
385 .collect();
386
387 FileStatistics {
388 columns: column_stats,
389 }
390 }
391
392 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
393 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
394 self.scheduler
395 .submit_single(
396 buffer_desc.position..buffer_desc.position + buffer_desc.size,
397 0,
398 )
399 .await
400 }
401
402 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
403 let file_size = scheduler.reader().size().await? as u64;
404 let begin = if file_size < scheduler.reader().block_size() as u64 {
405 0
406 } else {
407 file_size - scheduler.reader().block_size() as u64
408 };
409 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
410 Ok((tail_bytes, file_size))
411 }
412
413 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
416 let len = footer_bytes.len();
417 if len < FOOTER_LEN {
418 return Err(Error::io(
419 format!(
420 "does not have sufficient data, len: {}, bytes: {:?}",
421 len, footer_bytes
422 ),
423 location!(),
424 ));
425 }
426 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
427
428 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
429 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
430 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
431 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
432 let num_columns = cursor.read_u32::<LittleEndian>()?;
433 let major_version = cursor.read_u16::<LittleEndian>()?;
434 let minor_version = cursor.read_u16::<LittleEndian>()?;
435
436 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
437 return Err(Error::version_conflict(
438 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
439 major_version,
440 minor_version,
441 location!(),
442 ));
443 }
444
445 let magic_bytes = footer_bytes.slice(len - 4..);
446 if magic_bytes.as_ref() != MAGIC {
447 return Err(Error::io(
448 format!(
449 "file does not appear to be a Lance file (invalid magic: {:?})",
450 MAGIC
451 ),
452 location!(),
453 ));
454 }
455 Ok(Footer {
456 column_meta_start,
457 column_meta_offsets_start,
458 global_buff_offsets_start,
459 num_global_buffers,
460 num_columns,
461 major_version,
462 minor_version,
463 })
464 }
465
466 fn read_all_column_metadata(
468 column_metadata_bytes: Bytes,
469 footer: &Footer,
470 ) -> Result<Vec<pbfile::ColumnMetadata>> {
471 let column_metadata_start = footer.column_meta_start;
472 let cmo_table_size = 16 * footer.num_columns as usize;
474 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
475
476 (0..footer.num_columns)
477 .map(|col_idx| {
478 let offset = (col_idx * 16) as usize;
479 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
480 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
481 let normalized_position = (position - column_metadata_start) as usize;
482 let normalized_end = normalized_position + (length as usize);
483 Ok(pbfile::ColumnMetadata::decode(
484 &column_metadata_bytes[normalized_position..normalized_end],
485 )?)
486 })
487 .collect::<Result<Vec<_>>>()
488 }
489
490 async fn optimistic_tail_read(
491 data: &Bytes,
492 start_pos: u64,
493 scheduler: &FileScheduler,
494 file_len: u64,
495 ) -> Result<Bytes> {
496 let num_bytes_needed = (file_len - start_pos) as usize;
497 if data.len() >= num_bytes_needed {
498 Ok(data.slice((data.len() - num_bytes_needed)..))
499 } else {
500 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
501 let start = file_len - num_bytes_needed as u64;
502 let missing_bytes = scheduler
503 .submit_single(start..start + num_bytes_missing, 0)
504 .await?;
505 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
506 combined.extend(missing_bytes);
507 combined.extend(data);
508 Ok(combined.freeze())
509 }
510 }
511
512 fn do_decode_gbo_table(
513 gbo_bytes: &Bytes,
514 footer: &Footer,
515 version: LanceFileVersion,
516 ) -> Result<Vec<BufferDescriptor>> {
517 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
518
519 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
520 for _ in 0..footer.num_global_buffers {
521 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
522 assert!(
523 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
524 );
525 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
526 global_buffers.push(BufferDescriptor {
527 position: buf_pos,
528 size: buf_size,
529 });
530 }
531
532 Ok(global_buffers)
533 }
534
535 async fn decode_gbo_table(
536 tail_bytes: &Bytes,
537 file_len: u64,
538 scheduler: &FileScheduler,
539 footer: &Footer,
540 version: LanceFileVersion,
541 ) -> Result<Vec<BufferDescriptor>> {
542 let gbo_bytes = Self::optimistic_tail_read(
545 tail_bytes,
546 footer.global_buff_offsets_start,
547 scheduler,
548 file_len,
549 )
550 .await?;
551 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
552 }
553
554 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
555 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
556 let pb_schema = file_descriptor.schema.unwrap();
557 let num_rows = file_descriptor.length;
558 let fields_with_meta = FieldsWithMeta {
559 fields: Fields(pb_schema.fields),
560 metadata: pb_schema.metadata,
561 };
562 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
563 Ok((num_rows, schema))
564 }
565
566 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
580 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
582 let footer = Self::decode_footer(&tail_bytes)?;
583
584 let file_version = LanceFileVersion::try_from_major_minor(
585 footer.major_version as u32,
586 footer.minor_version as u32,
587 )?;
588
589 let gbo_table =
590 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
591 if gbo_table.is_empty() {
592 return Err(Error::Internal {
593 message: "File did not contain any global buffers, schema expected".to_string(),
594 location: location!(),
595 });
596 }
597 let schema_start = gbo_table[0].position;
598 let schema_size = gbo_table[0].size;
599
600 let num_footer_bytes = file_len - schema_start;
601
602 let all_metadata_bytes =
605 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
606
607 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
608 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
609
610 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
613 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
614 let column_metadata_bytes =
615 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
616 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
617
618 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
619 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
620 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
621
622 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
623
624 Ok(CachedFileMetadata {
625 file_schema: Arc::new(schema),
626 column_metadatas,
627 column_infos,
628 num_rows,
629 num_data_bytes,
630 num_column_metadata_bytes,
631 num_global_buffer_bytes,
632 num_footer_bytes,
633 file_buffers: gbo_table,
634 major_version: footer.major_version,
635 minor_version: footer.minor_version,
636 })
637 }
638
639 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
640 match &encoding.location {
641 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
642 Some(pbfile::encoding::Location::Direct(encoding)) => {
643 let encoding_buf = Bytes::from(encoding.encoding.clone());
644 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
645 encoding_any.to_msg::<M>().unwrap()
646 }
647 Some(pbfile::encoding::Location::None(_)) => panic!(),
648 None => panic!(),
649 }
650 }
651
652 fn meta_to_col_infos(
653 column_metadatas: &[pbfile::ColumnMetadata],
654 file_version: LanceFileVersion,
655 ) -> Vec<Arc<ColumnInfo>> {
656 column_metadatas
657 .iter()
658 .enumerate()
659 .map(|(col_idx, col_meta)| {
660 let page_infos = col_meta
661 .pages
662 .iter()
663 .map(|page| {
664 let num_rows = page.length;
665 let encoding = match file_version {
666 LanceFileVersion::V2_0 => {
667 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
668 page.encoding.as_ref().unwrap(),
669 ))
670 }
671 _ => {
672 PageEncoding::Structural(Self::fetch_encoding::<pbenc::PageLayout>(
673 page.encoding.as_ref().unwrap(),
674 ))
675 }
676 };
677 let buffer_offsets_and_sizes = Arc::from(
678 page.buffer_offsets
679 .iter()
680 .zip(page.buffer_sizes.iter())
681 .map(|(offset, size)| {
682 assert!(
684 file_version < LanceFileVersion::V2_1
685 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
686 );
687 (*offset, *size)
688 })
689 .collect::<Vec<_>>(),
690 );
691 PageInfo {
692 buffer_offsets_and_sizes,
693 encoding,
694 num_rows,
695 priority: page.priority,
696 }
697 })
698 .collect::<Vec<_>>();
699 let buffer_offsets_and_sizes = Arc::from(
700 col_meta
701 .buffer_offsets
702 .iter()
703 .zip(col_meta.buffer_sizes.iter())
704 .map(|(offset, size)| (*offset, *size))
705 .collect::<Vec<_>>(),
706 );
707 Arc::new(ColumnInfo {
708 index: col_idx as u32,
709 page_infos: Arc::from(page_infos),
710 buffer_offsets_and_sizes,
711 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
712 })
713 })
714 .collect::<Vec<_>>()
715 }
716
717 fn validate_projection(
718 projection: &ReaderProjection,
719 metadata: &CachedFileMetadata,
720 ) -> Result<()> {
721 if projection.schema.fields.is_empty() {
722 return Err(Error::invalid_input(
723 "Attempt to read zero columns from the file, at least one column must be specified"
724 .to_string(),
725 location!(),
726 ));
727 }
728 let mut column_indices_seen = BTreeSet::new();
729 for column_index in &projection.column_indices {
730 if !column_indices_seen.insert(*column_index) {
731 return Err(Error::invalid_input(
732 format!(
733 "The projection specified the column index {} more than once",
734 column_index
735 ),
736 location!(),
737 ));
738 }
739 if *column_index >= metadata.column_infos.len() as u32 {
740 return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
741 }
742 }
743 Ok(())
744 }
745
746 pub async fn try_open(
753 scheduler: FileScheduler,
754 base_projection: Option<ReaderProjection>,
755 decoder_plugins: Arc<DecoderPlugins>,
756 cache: &LanceCache,
757 options: FileReaderOptions,
758 ) -> Result<Self> {
759 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
760 let path = scheduler.reader().path().clone();
761 Self::try_open_with_file_metadata(
762 Arc::new(LanceEncodingsIo(scheduler)),
763 path,
764 base_projection,
765 decoder_plugins,
766 file_metadata,
767 cache,
768 options,
769 )
770 .await
771 }
772
773 pub async fn try_open_with_file_metadata(
779 scheduler: Arc<dyn EncodingsIo>,
780 path: Path,
781 base_projection: Option<ReaderProjection>,
782 decoder_plugins: Arc<DecoderPlugins>,
783 file_metadata: Arc<CachedFileMetadata>,
784 cache: &LanceCache,
785 options: FileReaderOptions,
786 ) -> Result<Self> {
787 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
788
789 if let Some(base_projection) = base_projection.as_ref() {
790 Self::validate_projection(base_projection, &file_metadata)?;
791 }
792 let num_rows = file_metadata.num_rows;
793 Ok(Self {
794 scheduler,
795 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
796 file_metadata.file_schema.as_ref(),
797 file_metadata.version(),
798 )),
799 num_rows,
800 metadata: file_metadata,
801 decoder_plugins,
802 cache,
803 options,
804 })
805 }
806
807 fn collect_columns_from_projection(
821 &self,
822 _projection: &ReaderProjection,
823 ) -> Result<Vec<Arc<ColumnInfo>>> {
824 Ok(self.metadata.column_infos.to_vec())
825 }
826
827 #[allow(clippy::too_many_arguments)]
828 fn do_read_range(
829 column_infos: Vec<Arc<ColumnInfo>>,
830 io: Arc<dyn EncodingsIo>,
831 cache: Arc<LanceCache>,
832 num_rows: u64,
833 decoder_plugins: Arc<DecoderPlugins>,
834 range: Range<u64>,
835 batch_size: u32,
836 projection: ReaderProjection,
837 filter: FilterExpression,
838 should_validate: bool,
839 ) -> Result<BoxStream<'static, ReadBatchTask>> {
840 debug!(
841 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
842 range,
843 batch_size,
844 num_rows,
845 column_infos.len(),
846 projection.schema.fields.len(),
847 );
848
849 let config = SchedulerDecoderConfig {
850 batch_size,
851 cache,
852 decoder_plugins,
853 io,
854 should_validate,
855 };
856
857 let requested_rows = RequestedRows::Ranges(vec![range]);
858
859 Ok(schedule_and_decode(
860 column_infos,
861 requested_rows,
862 filter,
863 projection.column_indices,
864 projection.schema,
865 config,
866 ))
867 }
868
869 fn read_range(
870 &self,
871 range: Range<u64>,
872 batch_size: u32,
873 projection: ReaderProjection,
874 filter: FilterExpression,
875 ) -> Result<BoxStream<'static, ReadBatchTask>> {
876 Self::do_read_range(
878 self.collect_columns_from_projection(&projection)?,
879 self.scheduler.clone(),
880 self.cache.clone(),
881 self.num_rows,
882 self.decoder_plugins.clone(),
883 range,
884 batch_size,
885 projection,
886 filter,
887 self.options.validate_on_decode,
888 )
889 }
890
891 #[allow(clippy::too_many_arguments)]
892 fn do_take_rows(
893 column_infos: Vec<Arc<ColumnInfo>>,
894 io: Arc<dyn EncodingsIo>,
895 cache: Arc<LanceCache>,
896 decoder_plugins: Arc<DecoderPlugins>,
897 indices: Vec<u64>,
898 batch_size: u32,
899 projection: ReaderProjection,
900 filter: FilterExpression,
901 should_validate: bool,
902 ) -> Result<BoxStream<'static, ReadBatchTask>> {
903 debug!(
904 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
905 indices.len(),
906 indices[0],
907 indices[indices.len() - 1],
908 batch_size,
909 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
910 );
911
912 let config = SchedulerDecoderConfig {
913 batch_size,
914 cache,
915 decoder_plugins,
916 io,
917 should_validate,
918 };
919
920 let requested_rows = RequestedRows::Indices(indices);
921
922 Ok(schedule_and_decode(
923 column_infos,
924 requested_rows,
925 filter,
926 projection.column_indices,
927 projection.schema,
928 config,
929 ))
930 }
931
932 fn take_rows(
933 &self,
934 indices: Vec<u64>,
935 batch_size: u32,
936 projection: ReaderProjection,
937 ) -> Result<BoxStream<'static, ReadBatchTask>> {
938 Self::do_take_rows(
940 self.collect_columns_from_projection(&projection)?,
941 self.scheduler.clone(),
942 self.cache.clone(),
943 self.decoder_plugins.clone(),
944 indices,
945 batch_size,
946 projection,
947 FilterExpression::no_filter(),
948 self.options.validate_on_decode,
949 )
950 }
951
952 #[allow(clippy::too_many_arguments)]
953 fn do_read_ranges(
954 column_infos: Vec<Arc<ColumnInfo>>,
955 io: Arc<dyn EncodingsIo>,
956 cache: Arc<LanceCache>,
957 decoder_plugins: Arc<DecoderPlugins>,
958 ranges: Vec<Range<u64>>,
959 batch_size: u32,
960 projection: ReaderProjection,
961 filter: FilterExpression,
962 should_validate: bool,
963 ) -> Result<BoxStream<'static, ReadBatchTask>> {
964 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
965 debug!(
966 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
967 ranges.len(),
968 num_rows,
969 ranges[0].start,
970 ranges[ranges.len() - 1].end,
971 batch_size,
972 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
973 );
974
975 let config = SchedulerDecoderConfig {
976 batch_size,
977 cache,
978 decoder_plugins,
979 io,
980 should_validate,
981 };
982
983 let requested_rows = RequestedRows::Ranges(ranges);
984
985 Ok(schedule_and_decode(
986 column_infos,
987 requested_rows,
988 filter,
989 projection.column_indices,
990 projection.schema,
991 config,
992 ))
993 }
994
995 fn read_ranges(
996 &self,
997 ranges: Vec<Range<u64>>,
998 batch_size: u32,
999 projection: ReaderProjection,
1000 filter: FilterExpression,
1001 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1002 Self::do_read_ranges(
1003 self.collect_columns_from_projection(&projection)?,
1004 self.scheduler.clone(),
1005 self.cache.clone(),
1006 self.decoder_plugins.clone(),
1007 ranges,
1008 batch_size,
1009 projection,
1010 filter,
1011 self.options.validate_on_decode,
1012 )
1013 }
1014
1015 pub fn read_tasks(
1026 &self,
1027 params: ReadBatchParams,
1028 batch_size: u32,
1029 projection: Option<ReaderProjection>,
1030 filter: FilterExpression,
1031 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1032 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1033 Self::validate_projection(&projection, &self.metadata)?;
1034 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1035 if bound > self.num_rows || bound == self.num_rows && inclusive {
1036 Err(Error::invalid_input(
1037 format!(
1038 "cannot read {:?} from file with {} rows",
1039 params, self.num_rows
1040 ),
1041 location!(),
1042 ))
1043 } else {
1044 Ok(())
1045 }
1046 };
1047 match ¶ms {
1048 ReadBatchParams::Indices(indices) => {
1049 for idx in indices {
1050 match idx {
1051 None => {
1052 return Err(Error::invalid_input(
1053 "Null value in indices array",
1054 location!(),
1055 ));
1056 }
1057 Some(idx) => {
1058 verify_bound(¶ms, idx as u64, true)?;
1059 }
1060 }
1061 }
1062 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1063 self.take_rows(indices, batch_size, projection)
1064 }
1065 ReadBatchParams::Range(range) => {
1066 verify_bound(¶ms, range.end as u64, false)?;
1067 self.read_range(
1068 range.start as u64..range.end as u64,
1069 batch_size,
1070 projection,
1071 filter,
1072 )
1073 }
1074 ReadBatchParams::Ranges(ranges) => {
1075 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1076 for range in ranges.as_ref() {
1077 verify_bound(¶ms, range.end, false)?;
1078 ranges_u64.push(range.start..range.end);
1079 }
1080 self.read_ranges(ranges_u64, batch_size, projection, filter)
1081 }
1082 ReadBatchParams::RangeFrom(range) => {
1083 verify_bound(¶ms, range.start as u64, true)?;
1084 self.read_range(
1085 range.start as u64..self.num_rows,
1086 batch_size,
1087 projection,
1088 filter,
1089 )
1090 }
1091 ReadBatchParams::RangeTo(range) => {
1092 verify_bound(¶ms, range.end as u64, false)?;
1093 self.read_range(0..range.end as u64, batch_size, projection, filter)
1094 }
1095 ReadBatchParams::RangeFull => {
1096 self.read_range(0..self.num_rows, batch_size, projection, filter)
1097 }
1098 }
1099 }
1100
1101 pub fn read_stream_projected(
1123 &self,
1124 params: ReadBatchParams,
1125 batch_size: u32,
1126 batch_readahead: u32,
1127 projection: ReaderProjection,
1128 filter: FilterExpression,
1129 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1130 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1131 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1132 let batch_stream = tasks_stream
1133 .map(|task| task.task)
1134 .buffered(batch_readahead as usize)
1135 .boxed();
1136 Ok(Box::pin(RecordBatchStreamAdapter::new(
1137 arrow_schema,
1138 batch_stream,
1139 )))
1140 }
1141
1142 fn take_rows_blocking(
1143 &self,
1144 indices: Vec<u64>,
1145 batch_size: u32,
1146 projection: ReaderProjection,
1147 filter: FilterExpression,
1148 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1149 let column_infos = self.collect_columns_from_projection(&projection)?;
1150 debug!(
1151 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1152 indices.len(),
1153 indices[0],
1154 indices[indices.len() - 1],
1155 batch_size,
1156 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1157 );
1158
1159 let config = SchedulerDecoderConfig {
1160 batch_size,
1161 cache: self.cache.clone(),
1162 decoder_plugins: self.decoder_plugins.clone(),
1163 io: self.scheduler.clone(),
1164 should_validate: self.options.validate_on_decode,
1165 };
1166
1167 let requested_rows = RequestedRows::Indices(indices);
1168
1169 schedule_and_decode_blocking(
1170 column_infos,
1171 requested_rows,
1172 filter,
1173 projection.column_indices,
1174 projection.schema,
1175 config,
1176 )
1177 }
1178
1179 fn read_ranges_blocking(
1180 &self,
1181 ranges: Vec<Range<u64>>,
1182 batch_size: u32,
1183 projection: ReaderProjection,
1184 filter: FilterExpression,
1185 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1186 let column_infos = self.collect_columns_from_projection(&projection)?;
1187 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1188 debug!(
1189 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1190 ranges.len(),
1191 num_rows,
1192 ranges[0].start,
1193 ranges[ranges.len() - 1].end,
1194 batch_size,
1195 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1196 );
1197
1198 let config = SchedulerDecoderConfig {
1199 batch_size,
1200 cache: self.cache.clone(),
1201 decoder_plugins: self.decoder_plugins.clone(),
1202 io: self.scheduler.clone(),
1203 should_validate: self.options.validate_on_decode,
1204 };
1205
1206 let requested_rows = RequestedRows::Ranges(ranges);
1207
1208 schedule_and_decode_blocking(
1209 column_infos,
1210 requested_rows,
1211 filter,
1212 projection.column_indices,
1213 projection.schema,
1214 config,
1215 )
1216 }
1217
1218 fn read_range_blocking(
1219 &self,
1220 range: Range<u64>,
1221 batch_size: u32,
1222 projection: ReaderProjection,
1223 filter: FilterExpression,
1224 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1225 let column_infos = self.collect_columns_from_projection(&projection)?;
1226 let num_rows = self.num_rows;
1227
1228 debug!(
1229 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1230 range,
1231 batch_size,
1232 num_rows,
1233 column_infos.len(),
1234 projection.schema.fields.len(),
1235 );
1236
1237 let config = SchedulerDecoderConfig {
1238 batch_size,
1239 cache: self.cache.clone(),
1240 decoder_plugins: self.decoder_plugins.clone(),
1241 io: self.scheduler.clone(),
1242 should_validate: self.options.validate_on_decode,
1243 };
1244
1245 let requested_rows = RequestedRows::Ranges(vec![range]);
1246
1247 schedule_and_decode_blocking(
1248 column_infos,
1249 requested_rows,
1250 filter,
1251 projection.column_indices,
1252 projection.schema,
1253 config,
1254 )
1255 }
1256
1257 pub fn read_stream_projected_blocking(
1269 &self,
1270 params: ReadBatchParams,
1271 batch_size: u32,
1272 projection: Option<ReaderProjection>,
1273 filter: FilterExpression,
1274 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1275 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1276 Self::validate_projection(&projection, &self.metadata)?;
1277 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1278 if bound > self.num_rows || bound == self.num_rows && inclusive {
1279 Err(Error::invalid_input(
1280 format!(
1281 "cannot read {:?} from file with {} rows",
1282 params, self.num_rows
1283 ),
1284 location!(),
1285 ))
1286 } else {
1287 Ok(())
1288 }
1289 };
1290 match ¶ms {
1291 ReadBatchParams::Indices(indices) => {
1292 for idx in indices {
1293 match idx {
1294 None => {
1295 return Err(Error::invalid_input(
1296 "Null value in indices array",
1297 location!(),
1298 ));
1299 }
1300 Some(idx) => {
1301 verify_bound(¶ms, idx as u64, true)?;
1302 }
1303 }
1304 }
1305 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1306 self.take_rows_blocking(indices, batch_size, projection, filter)
1307 }
1308 ReadBatchParams::Range(range) => {
1309 verify_bound(¶ms, range.end as u64, false)?;
1310 self.read_range_blocking(
1311 range.start as u64..range.end as u64,
1312 batch_size,
1313 projection,
1314 filter,
1315 )
1316 }
1317 ReadBatchParams::Ranges(ranges) => {
1318 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1319 for range in ranges.as_ref() {
1320 verify_bound(¶ms, range.end, false)?;
1321 ranges_u64.push(range.start..range.end);
1322 }
1323 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1324 }
1325 ReadBatchParams::RangeFrom(range) => {
1326 verify_bound(¶ms, range.start as u64, true)?;
1327 self.read_range_blocking(
1328 range.start as u64..self.num_rows,
1329 batch_size,
1330 projection,
1331 filter,
1332 )
1333 }
1334 ReadBatchParams::RangeTo(range) => {
1335 verify_bound(¶ms, range.end as u64, false)?;
1336 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1337 }
1338 ReadBatchParams::RangeFull => {
1339 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1340 }
1341 }
1342 }
1343
1344 pub fn read_stream(
1350 &self,
1351 params: ReadBatchParams,
1352 batch_size: u32,
1353 batch_readahead: u32,
1354 filter: FilterExpression,
1355 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1356 self.read_stream_projected(
1357 params,
1358 batch_size,
1359 batch_readahead,
1360 self.base_projection.clone(),
1361 filter,
1362 )
1363 }
1364
1365 pub fn schema(&self) -> &Arc<Schema> {
1366 &self.metadata.file_schema
1367 }
1368}
1369
1370pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1372 if let Some(encoding) = &page.encoding {
1373 if let Some(style) = &encoding.location {
1374 match style {
1375 pbfile::encoding::Location::Indirect(indirect) => {
1376 format!(
1377 "IndirectEncoding(pos={},size={})",
1378 indirect.buffer_location, indirect.buffer_length
1379 )
1380 }
1381 pbfile::encoding::Location::Direct(direct) => {
1382 let encoding_any =
1383 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1384 .expect("failed to deserialize encoding as protobuf");
1385 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1386 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1387 match encoding {
1388 Ok(encoding) => {
1389 format!("{:#?}", encoding)
1390 }
1391 Err(err) => {
1392 format!("Unsupported(decode_err={})", err)
1393 }
1394 }
1395 } else if encoding_any.type_url == "/lance.encodings.PageLayout" {
1396 let encoding = encoding_any.to_msg::<pbenc::PageLayout>();
1397 match encoding {
1398 Ok(encoding) => {
1399 format!("{:#?}", encoding)
1400 }
1401 Err(err) => {
1402 format!("Unsupported(decode_err={})", err)
1403 }
1404 }
1405 } else {
1406 format!("Unrecognized(type_url={})", encoding_any.type_url)
1407 }
1408 }
1409 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1410 }
1411 } else {
1412 "MISSING STYLE".to_string()
1413 }
1414 } else {
1415 "MISSING".to_string()
1416 }
1417}
1418
1419pub trait EncodedBatchReaderExt {
1420 fn try_from_mini_lance(
1421 bytes: Bytes,
1422 schema: &Schema,
1423 version: LanceFileVersion,
1424 ) -> Result<Self>
1425 where
1426 Self: Sized;
1427 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1428 where
1429 Self: Sized;
1430}
1431
1432impl EncodedBatchReaderExt for EncodedBatch {
1433 fn try_from_mini_lance(
1434 bytes: Bytes,
1435 schema: &Schema,
1436 file_version: LanceFileVersion,
1437 ) -> Result<Self>
1438 where
1439 Self: Sized,
1440 {
1441 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1442 let footer = FileReader::decode_footer(&bytes)?;
1443
1444 let column_metadata_start = footer.column_meta_start as usize;
1447 let column_metadata_end = footer.global_buff_offsets_start as usize;
1448 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1449 let column_metadatas =
1450 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1451
1452 let file_version = LanceFileVersion::try_from_major_minor(
1453 footer.major_version as u32,
1454 footer.minor_version as u32,
1455 )?;
1456
1457 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1458
1459 Ok(Self {
1460 data: bytes,
1461 num_rows: page_table
1462 .first()
1463 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1464 .unwrap_or(0),
1465 page_table,
1466 top_level_columns: projection.column_indices,
1467 schema: Arc::new(schema.clone()),
1468 })
1469 }
1470
1471 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1472 where
1473 Self: Sized,
1474 {
1475 let footer = FileReader::decode_footer(&bytes)?;
1476 let file_version = LanceFileVersion::try_from_major_minor(
1477 footer.major_version as u32,
1478 footer.minor_version as u32,
1479 )?;
1480
1481 let gbo_table = FileReader::do_decode_gbo_table(
1482 &bytes.slice(footer.global_buff_offsets_start as usize..),
1483 &footer,
1484 file_version,
1485 )?;
1486 if gbo_table.is_empty() {
1487 return Err(Error::Internal {
1488 message: "File did not contain any global buffers, schema expected".to_string(),
1489 location: location!(),
1490 });
1491 }
1492 let schema_start = gbo_table[0].position as usize;
1493 let schema_size = gbo_table[0].size as usize;
1494
1495 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1496 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1497 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1498
1499 let column_metadata_start = footer.column_meta_start as usize;
1502 let column_metadata_end = footer.global_buff_offsets_start as usize;
1503 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1504 let column_metadatas =
1505 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1506
1507 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1508
1509 Ok(Self {
1510 data: bytes,
1511 num_rows: page_table
1512 .first()
1513 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1514 .unwrap_or(0),
1515 page_table,
1516 top_level_columns: projection.column_indices,
1517 schema: Arc::new(schema),
1518 })
1519 }
1520}
1521
1522#[cfg(test)]
1523pub mod tests {
1524 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1525
1526 use arrow_array::{
1527 types::{Float64Type, Int32Type},
1528 RecordBatch, UInt32Array,
1529 };
1530 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1531 use bytes::Bytes;
1532 use futures::{prelude::stream::TryStreamExt, StreamExt};
1533 use lance_arrow::RecordBatchExt;
1534 use lance_core::{datatypes::Schema, ArrowResult};
1535 use lance_datagen::{array, gen, BatchCount, ByteCount, RowCount};
1536 use lance_encoding::{
1537 decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1538 encoder::{encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodingOptions},
1539 version::LanceFileVersion,
1540 };
1541 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1542 use log::debug;
1543 use rstest::rstest;
1544 use tokio::sync::mpsc;
1545
1546 use crate::v2::{
1547 reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1548 testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1549 writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1550 };
1551
1552 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1553 let location_type = DataType::Struct(Fields::from(vec![
1554 Field::new("x", DataType::Float64, true),
1555 Field::new("y", DataType::Float64, true),
1556 ]));
1557 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1558
1559 let mut reader = gen()
1560 .col("score", array::rand::<Float64Type>())
1561 .col("location", array::rand_type(&location_type))
1562 .col("categories", array::rand_type(&categories_type))
1563 .col("binary", array::rand_type(&DataType::Binary));
1564 if version <= LanceFileVersion::V2_0 {
1565 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1566 }
1567 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1568
1569 write_lance_file(
1570 reader,
1571 fs,
1572 FileWriterOptions {
1573 format_version: Some(version),
1574 ..Default::default()
1575 },
1576 )
1577 .await
1578 }
1579
1580 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1581
1582 async fn verify_expected(
1583 expected: &[RecordBatch],
1584 mut actual: Pin<Box<dyn RecordBatchStream>>,
1585 read_size: u32,
1586 transform: Option<Transformer>,
1587 ) {
1588 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1589 let mut expected_iter = expected.iter().map(|batch| {
1590 if let Some(transform) = &transform {
1591 transform(batch)
1592 } else {
1593 batch.clone()
1594 }
1595 });
1596 let mut next_expected = expected_iter.next().unwrap().clone();
1597 while let Some(actual) = actual.next().await {
1598 let mut actual = actual.unwrap();
1599 let mut rows_to_verify = actual.num_rows() as u32;
1600 let expected_length = remaining.min(read_size);
1601 assert_eq!(expected_length, rows_to_verify);
1602
1603 while rows_to_verify > 0 {
1604 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1605 assert_eq!(
1606 next_expected.slice(0, next_slice_len as usize),
1607 actual.slice(0, next_slice_len as usize)
1608 );
1609 remaining -= next_slice_len;
1610 rows_to_verify -= next_slice_len;
1611 if remaining > 0 {
1612 if next_slice_len == next_expected.num_rows() as u32 {
1613 next_expected = expected_iter.next().unwrap().clone();
1614 } else {
1615 next_expected = next_expected.slice(
1616 next_slice_len as usize,
1617 next_expected.num_rows() - next_slice_len as usize,
1618 );
1619 }
1620 }
1621 if rows_to_verify > 0 {
1622 actual = actual.slice(
1623 next_slice_len as usize,
1624 actual.num_rows() - next_slice_len as usize,
1625 );
1626 }
1627 }
1628 }
1629 assert_eq!(remaining, 0);
1630 }
1631
1632 #[tokio::test]
1633 async fn test_round_trip() {
1634 let fs = FsFixture::default();
1635
1636 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1637
1638 for read_size in [32, 1024, 1024 * 1024] {
1639 let file_scheduler = fs
1640 .scheduler
1641 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1642 .await
1643 .unwrap();
1644 let file_reader = FileReader::try_open(
1645 file_scheduler,
1646 None,
1647 Arc::<DecoderPlugins>::default(),
1648 &test_cache(),
1649 FileReaderOptions::default(),
1650 )
1651 .await
1652 .unwrap();
1653
1654 let schema = file_reader.schema();
1655 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1656
1657 let batch_stream = file_reader
1658 .read_stream(
1659 lance_io::ReadBatchParams::RangeFull,
1660 read_size,
1661 16,
1662 FilterExpression::no_filter(),
1663 )
1664 .unwrap();
1665
1666 verify_expected(&data, batch_stream, read_size, None).await;
1667 }
1668 }
1669
1670 #[test_log::test(tokio::test)]
1671 async fn test_encoded_batch_round_trip() {
1672 let data = gen()
1673 .col("x", array::rand::<Int32Type>())
1674 .col("y", array::rand_utf8(ByteCount::from(16), false))
1675 .into_batch_rows(RowCount::from(10000))
1676 .unwrap();
1677
1678 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1679
1680 let encoding_options = EncodingOptions {
1681 cache_bytes_per_column: 4096,
1682 max_page_bytes: 32 * 1024 * 1024,
1683 keep_original_array: true,
1684 buffer_alignment: 64,
1685 };
1686 let encoded_batch = encode_batch(
1687 &data,
1688 lance_schema.clone(),
1689 &CoreFieldEncodingStrategy::default(),
1690 &encoding_options,
1691 )
1692 .await
1693 .unwrap();
1694
1695 let bytes = encoded_batch.try_to_self_described_lance().unwrap();
1697
1698 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1699
1700 let decoded = decode_batch(
1701 &decoded_batch,
1702 &FilterExpression::no_filter(),
1703 Arc::<DecoderPlugins>::default(),
1704 false,
1705 LanceFileVersion::default(),
1706 None,
1707 )
1708 .await
1709 .unwrap();
1710
1711 assert_eq!(data, decoded);
1712
1713 let bytes = encoded_batch.try_to_mini_lance().unwrap();
1715 let decoded_batch =
1716 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1717 .unwrap();
1718 let decoded = decode_batch(
1719 &decoded_batch,
1720 &FilterExpression::no_filter(),
1721 Arc::<DecoderPlugins>::default(),
1722 false,
1723 LanceFileVersion::default(),
1724 None,
1725 )
1726 .await
1727 .unwrap();
1728
1729 assert_eq!(data, decoded);
1730 }
1731
1732 #[rstest]
1733 #[test_log::test(tokio::test)]
1734 async fn test_projection(
1735 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1736 ) {
1737 let fs = FsFixture::default();
1738
1739 let written_file = create_some_file(&fs, version).await;
1740 let file_scheduler = fs
1741 .scheduler
1742 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1743 .await
1744 .unwrap();
1745
1746 let field_id_mapping = written_file
1747 .field_id_mapping
1748 .iter()
1749 .copied()
1750 .collect::<BTreeMap<_, _>>();
1751
1752 let empty_projection = ReaderProjection {
1753 column_indices: Vec::default(),
1754 schema: Arc::new(Schema::default()),
1755 };
1756
1757 for columns in [
1758 vec!["score"],
1759 vec!["location"],
1760 vec!["categories"],
1761 vec!["score.x"],
1762 vec!["score", "categories"],
1763 vec!["score", "location"],
1764 vec!["location", "categories"],
1765 vec!["score.y", "location", "categories"],
1766 ] {
1767 debug!("Testing round trip with projection {:?}", columns);
1768 for use_field_ids in [true, false] {
1769 let file_reader = FileReader::try_open(
1771 file_scheduler.clone(),
1772 None,
1773 Arc::<DecoderPlugins>::default(),
1774 &test_cache(),
1775 FileReaderOptions::default(),
1776 )
1777 .await
1778 .unwrap();
1779
1780 let projected_schema = written_file.schema.project(&columns).unwrap();
1781 let projection = if use_field_ids {
1782 ReaderProjection::from_field_ids(
1783 file_reader.metadata.version(),
1784 &projected_schema,
1785 &field_id_mapping,
1786 )
1787 .unwrap()
1788 } else {
1789 ReaderProjection::from_column_names(
1790 file_reader.metadata.version(),
1791 &written_file.schema,
1792 &columns,
1793 )
1794 .unwrap()
1795 };
1796
1797 let batch_stream = file_reader
1798 .read_stream_projected(
1799 lance_io::ReadBatchParams::RangeFull,
1800 1024,
1801 16,
1802 projection.clone(),
1803 FilterExpression::no_filter(),
1804 )
1805 .unwrap();
1806
1807 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1808 verify_expected(
1809 &written_file.data,
1810 batch_stream,
1811 1024,
1812 Some(Box::new(move |batch: &RecordBatch| {
1813 batch.project_by_schema(&projection_arrow).unwrap()
1814 })),
1815 )
1816 .await;
1817
1818 let file_reader = FileReader::try_open(
1820 file_scheduler.clone(),
1821 Some(projection.clone()),
1822 Arc::<DecoderPlugins>::default(),
1823 &test_cache(),
1824 FileReaderOptions::default(),
1825 )
1826 .await
1827 .unwrap();
1828
1829 let batch_stream = file_reader
1830 .read_stream(
1831 lance_io::ReadBatchParams::RangeFull,
1832 1024,
1833 16,
1834 FilterExpression::no_filter(),
1835 )
1836 .unwrap();
1837
1838 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1839 verify_expected(
1840 &written_file.data,
1841 batch_stream,
1842 1024,
1843 Some(Box::new(move |batch: &RecordBatch| {
1844 batch.project_by_schema(&projection_arrow).unwrap()
1845 })),
1846 )
1847 .await;
1848
1849 assert!(file_reader
1850 .read_stream_projected(
1851 lance_io::ReadBatchParams::RangeFull,
1852 1024,
1853 16,
1854 empty_projection.clone(),
1855 FilterExpression::no_filter(),
1856 )
1857 .is_err());
1858 }
1859 }
1860
1861 assert!(FileReader::try_open(
1862 file_scheduler.clone(),
1863 Some(empty_projection),
1864 Arc::<DecoderPlugins>::default(),
1865 &test_cache(),
1866 FileReaderOptions::default(),
1867 )
1868 .await
1869 .is_err());
1870
1871 let arrow_schema = ArrowSchema::new(vec![
1872 Field::new("x", DataType::Int32, true),
1873 Field::new("y", DataType::Int32, true),
1874 ]);
1875 let schema = Schema::try_from(&arrow_schema).unwrap();
1876
1877 let projection_with_dupes = ReaderProjection {
1878 column_indices: vec![0, 0],
1879 schema: Arc::new(schema),
1880 };
1881
1882 assert!(FileReader::try_open(
1883 file_scheduler.clone(),
1884 Some(projection_with_dupes),
1885 Arc::<DecoderPlugins>::default(),
1886 &test_cache(),
1887 FileReaderOptions::default(),
1888 )
1889 .await
1890 .is_err());
1891 }
1892
1893 #[test_log::test(tokio::test)]
1894 async fn test_compressing_buffer() {
1895 let fs = FsFixture::default();
1896
1897 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1898 let file_scheduler = fs
1899 .scheduler
1900 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1901 .await
1902 .unwrap();
1903
1904 let file_reader = FileReader::try_open(
1906 file_scheduler.clone(),
1907 None,
1908 Arc::<DecoderPlugins>::default(),
1909 &test_cache(),
1910 FileReaderOptions::default(),
1911 )
1912 .await
1913 .unwrap();
1914
1915 let mut projection = written_file.schema.project(&["score"]).unwrap();
1916 for field in projection.fields.iter_mut() {
1917 field
1918 .metadata
1919 .insert("lance:compression".to_string(), "zstd".to_string());
1920 }
1921 let projection = ReaderProjection {
1922 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1923 schema: Arc::new(projection),
1924 };
1925
1926 let batch_stream = file_reader
1927 .read_stream_projected(
1928 lance_io::ReadBatchParams::RangeFull,
1929 1024,
1930 16,
1931 projection.clone(),
1932 FilterExpression::no_filter(),
1933 )
1934 .unwrap();
1935
1936 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1937 verify_expected(
1938 &written_file.data,
1939 batch_stream,
1940 1024,
1941 Some(Box::new(move |batch: &RecordBatch| {
1942 batch.project_by_schema(&projection_arrow).unwrap()
1943 })),
1944 )
1945 .await;
1946 }
1947
1948 #[tokio::test]
1949 async fn test_read_all() {
1950 let fs = FsFixture::default();
1951 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1952 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1953
1954 let file_scheduler = fs
1955 .scheduler
1956 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1957 .await
1958 .unwrap();
1959 let file_reader = FileReader::try_open(
1960 file_scheduler.clone(),
1961 None,
1962 Arc::<DecoderPlugins>::default(),
1963 &test_cache(),
1964 FileReaderOptions::default(),
1965 )
1966 .await
1967 .unwrap();
1968
1969 let batches = file_reader
1970 .read_stream(
1971 lance_io::ReadBatchParams::RangeFull,
1972 total_rows as u32,
1973 16,
1974 FilterExpression::no_filter(),
1975 )
1976 .unwrap()
1977 .try_collect::<Vec<_>>()
1978 .await
1979 .unwrap();
1980 assert_eq!(batches.len(), 1);
1981 assert_eq!(batches[0].num_rows(), total_rows);
1982 }
1983
1984 #[tokio::test]
1985 async fn test_blocking_take() {
1986 let fs = FsFixture::default();
1987 let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
1988 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1989
1990 let file_scheduler = fs
1991 .scheduler
1992 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1993 .await
1994 .unwrap();
1995 let file_reader = FileReader::try_open(
1996 file_scheduler.clone(),
1997 Some(
1998 ReaderProjection::from_column_names(LanceFileVersion::V2_1, &schema, &["score"])
1999 .unwrap(),
2000 ),
2001 Arc::<DecoderPlugins>::default(),
2002 &test_cache(),
2003 FileReaderOptions::default(),
2004 )
2005 .await
2006 .unwrap();
2007
2008 let batches = tokio::task::spawn_blocking(move || {
2009 file_reader
2010 .read_stream_projected_blocking(
2011 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2012 total_rows as u32,
2013 None,
2014 FilterExpression::no_filter(),
2015 )
2016 .unwrap()
2017 .collect::<ArrowResult<Vec<_>>>()
2018 .unwrap()
2019 })
2020 .await
2021 .unwrap();
2022
2023 assert_eq!(batches.len(), 1);
2024 assert_eq!(batches[0].num_rows(), 5);
2025 assert_eq!(batches[0].num_columns(), 1);
2026 }
2027
2028 #[tokio::test(flavor = "multi_thread")]
2029 async fn test_drop_in_progress() {
2030 let fs = FsFixture::default();
2031 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2032 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2033
2034 let file_scheduler = fs
2035 .scheduler
2036 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2037 .await
2038 .unwrap();
2039 let file_reader = FileReader::try_open(
2040 file_scheduler.clone(),
2041 None,
2042 Arc::<DecoderPlugins>::default(),
2043 &test_cache(),
2044 FileReaderOptions::default(),
2045 )
2046 .await
2047 .unwrap();
2048
2049 let mut batches = file_reader
2050 .read_stream(
2051 lance_io::ReadBatchParams::RangeFull,
2052 (total_rows / 10) as u32,
2053 16,
2054 FilterExpression::no_filter(),
2055 )
2056 .unwrap();
2057
2058 drop(file_reader);
2059
2060 let batch = batches.next().await.unwrap().unwrap();
2061 assert!(batch.num_rows() > 0);
2062
2063 drop(batches);
2065 }
2066
2067 #[tokio::test]
2068 async fn drop_while_scheduling() {
2069 let fs = FsFixture::default();
2079 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2080 let total_rows = written_file
2081 .data
2082 .iter()
2083 .map(|batch| batch.num_rows())
2084 .sum::<usize>();
2085
2086 let file_scheduler = fs
2087 .scheduler
2088 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2089 .await
2090 .unwrap();
2091 let file_reader = FileReader::try_open(
2092 file_scheduler.clone(),
2093 None,
2094 Arc::<DecoderPlugins>::default(),
2095 &test_cache(),
2096 FileReaderOptions::default(),
2097 )
2098 .await
2099 .unwrap();
2100
2101 let projection =
2102 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2103 let column_infos = file_reader
2104 .collect_columns_from_projection(&projection)
2105 .unwrap();
2106 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2107 &projection.schema,
2108 &projection.column_indices,
2109 &column_infos,
2110 &vec![],
2111 total_rows as u64,
2112 Arc::<DecoderPlugins>::default(),
2113 file_reader.scheduler.clone(),
2114 test_cache(),
2115 &FilterExpression::no_filter(),
2116 )
2117 .await
2118 .unwrap();
2119
2120 let range = 0..total_rows as u64;
2121
2122 let (tx, rx) = mpsc::unbounded_channel();
2123
2124 drop(rx);
2126
2127 decode_scheduler.schedule_range(
2129 range,
2130 &FilterExpression::no_filter(),
2131 tx,
2132 file_reader.scheduler.clone(),
2133 )
2134 }
2135
2136 #[tokio::test]
2137 async fn test_global_buffers() {
2138 let fs = FsFixture::default();
2139
2140 let lance_schema =
2141 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2142 "foo",
2143 DataType::Int32,
2144 true,
2145 )]))
2146 .unwrap();
2147
2148 let mut file_writer = FileWriter::try_new(
2149 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2150 lance_schema.clone(),
2151 FileWriterOptions::default(),
2152 )
2153 .unwrap();
2154
2155 let test_bytes = Bytes::from_static(b"hello");
2156
2157 let buf_index = file_writer
2158 .add_global_buffer(test_bytes.clone())
2159 .await
2160 .unwrap();
2161
2162 assert_eq!(buf_index, 1);
2163
2164 file_writer.finish().await.unwrap();
2165
2166 let file_scheduler = fs
2167 .scheduler
2168 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2169 .await
2170 .unwrap();
2171 let file_reader = FileReader::try_open(
2172 file_scheduler.clone(),
2173 None,
2174 Arc::<DecoderPlugins>::default(),
2175 &test_cache(),
2176 FileReaderOptions::default(),
2177 )
2178 .await
2179 .unwrap();
2180
2181 let buf = file_reader.read_global_buffer(1).await.unwrap();
2182 assert_eq!(buf, test_bytes);
2183 }
2184}