1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19 decoder::{
20 schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderPlugins,
21 FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22 SchedulerDecoderConfig,
23 },
24 encoder::EncodedBatch,
25 version::LanceFileVersion,
26 EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36 Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_io::{
40 scheduler::FileScheduler,
41 stream::{RecordBatchStream, RecordBatchStreamAdapter},
42 ReadBatchParams,
43};
44
45use crate::{
46 datatypes::{Fields, FieldsWithMeta},
47 format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
48 v2::writer::PAGE_BUFFER_ALIGNMENT,
49};
50
51use super::io::LanceEncodingsIo;
52
53#[derive(Debug, DeepSizeOf)]
58pub struct BufferDescriptor {
59 pub position: u64,
60 pub size: u64,
61}
62
63#[derive(Debug)]
65pub struct FileStatistics {
66 pub columns: Vec<ColumnStatistics>,
68}
69
70#[derive(Debug)]
72pub struct ColumnStatistics {
73 pub num_pages: usize,
75 pub size_bytes: u64,
79}
80
81#[derive(Debug)]
83pub struct CachedFileMetadata {
84 pub file_schema: Arc<Schema>,
86 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
88 pub column_infos: Vec<Arc<ColumnInfo>>,
89 pub num_rows: u64,
91 pub file_buffers: Vec<BufferDescriptor>,
92 pub num_data_bytes: u64,
94 pub num_column_metadata_bytes: u64,
97 pub num_global_buffer_bytes: u64,
99 pub num_footer_bytes: u64,
101 pub major_version: u16,
102 pub minor_version: u16,
103}
104
105impl DeepSizeOf for CachedFileMetadata {
106 fn deep_size_of_children(&self, context: &mut Context) -> usize {
108 self.file_schema.deep_size_of_children(context)
109 + self
110 .file_buffers
111 .iter()
112 .map(|file_buffer| file_buffer.deep_size_of_children(context))
113 .sum::<usize>()
114 }
115}
116
117impl CachedFileMetadata {
118 pub fn version(&self) -> LanceFileVersion {
119 match (self.major_version, self.minor_version) {
120 (0, 3) => LanceFileVersion::V2_0,
121 (2, 1) => LanceFileVersion::V2_1,
122 _ => panic!(
123 "Unsupported version: {}.{}",
124 self.major_version, self.minor_version
125 ),
126 }
127 }
128}
129
130#[derive(Debug, Clone)]
152pub struct ReaderProjection {
153 pub schema: Arc<Schema>,
156 pub column_indices: Vec<u32>,
196}
197
198impl ReaderProjection {
199 fn from_field_ids_helper<'a>(
200 file_version: LanceFileVersion,
201 fields: impl Iterator<Item = &'a Field>,
202 field_id_to_column_index: &BTreeMap<u32, u32>,
203 column_indices: &mut Vec<u32>,
204 ) -> Result<()> {
205 for field in fields {
206 let is_structural = file_version >= LanceFileVersion::V2_1;
207 if !is_structural || field.children.is_empty() {
210 if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
211 {
212 column_indices.push(column_idx);
213 }
214 }
215 Self::from_field_ids_helper(
216 file_version,
217 field.children.iter(),
218 field_id_to_column_index,
219 column_indices,
220 )?;
221 }
222 Ok(())
223 }
224
225 pub fn from_field_ids(
230 file_version: LanceFileVersion,
231 schema: &Schema,
232 field_id_to_column_index: &BTreeMap<u32, u32>,
233 ) -> Result<Self> {
234 let mut column_indices = Vec::new();
235 Self::from_field_ids_helper(
236 file_version,
237 schema.fields.iter(),
238 field_id_to_column_index,
239 &mut column_indices,
240 )?;
241 Ok(Self {
242 schema: Arc::new(schema.clone()),
243 column_indices,
244 })
245 }
246
247 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
255 let schema = Arc::new(schema.clone());
256 let is_structural = version >= LanceFileVersion::V2_1;
257 let mut column_indices = vec![];
258 let mut curr_column_idx = 0;
259 let mut packed_struct_fields_num = 0;
260 for field in schema.fields_pre_order() {
261 if packed_struct_fields_num > 0 {
262 packed_struct_fields_num -= 1;
263 continue;
264 }
265 if field.is_packed_struct() {
266 column_indices.push(curr_column_idx);
267 curr_column_idx += 1;
268 packed_struct_fields_num = field.children.len();
269 } else if field.children.is_empty() || !is_structural {
270 column_indices.push(curr_column_idx);
271 curr_column_idx += 1;
272 }
273 }
274 Self {
275 schema,
276 column_indices,
277 }
278 }
279
280 pub fn from_column_names(
287 file_version: LanceFileVersion,
288 schema: &Schema,
289 column_names: &[&str],
290 ) -> Result<Self> {
291 let field_id_to_column_index = schema
292 .fields_pre_order()
293 .filter(|field| file_version < LanceFileVersion::V2_1 || field.is_leaf())
296 .enumerate()
297 .map(|(idx, field)| (field.id as u32, idx as u32))
298 .collect::<BTreeMap<_, _>>();
299 let projected = schema.project(column_names)?;
300 let mut column_indices = Vec::new();
301 Self::from_field_ids_helper(
302 file_version,
303 projected.fields.iter(),
304 &field_id_to_column_index,
305 &mut column_indices,
306 )?;
307 Ok(Self {
308 schema: Arc::new(projected),
309 column_indices,
310 })
311 }
312}
313
314#[derive(Clone, Debug, Default)]
315pub struct FileReaderOptions {
316 validate_on_decode: bool,
317 pub cache_repetition_index: bool,
320}
321
322#[derive(Debug)]
323pub struct FileReader {
324 scheduler: Arc<dyn EncodingsIo>,
325 base_projection: ReaderProjection,
327 num_rows: u64,
328 metadata: Arc<CachedFileMetadata>,
329 decoder_plugins: Arc<DecoderPlugins>,
330 cache: Arc<LanceCache>,
331 options: FileReaderOptions,
332}
333#[derive(Debug)]
334struct Footer {
335 #[allow(dead_code)]
336 column_meta_start: u64,
337 #[allow(dead_code)]
340 column_meta_offsets_start: u64,
341 global_buff_offsets_start: u64,
342 num_global_buffers: u32,
343 num_columns: u32,
344 major_version: u16,
345 minor_version: u16,
346}
347
348const FOOTER_LEN: usize = 40;
349
350impl FileReader {
351 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
352 Self {
353 scheduler,
354 base_projection: self.base_projection.clone(),
355 cache: self.cache.clone(),
356 decoder_plugins: self.decoder_plugins.clone(),
357 metadata: self.metadata.clone(),
358 options: self.options.clone(),
359 num_rows: self.num_rows,
360 }
361 }
362
363 pub fn num_rows(&self) -> u64 {
364 self.num_rows
365 }
366
367 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
368 &self.metadata
369 }
370
371 pub fn file_statistics(&self) -> FileStatistics {
372 let column_metadatas = &self.metadata().column_metadatas;
373
374 let column_stats = column_metadatas
375 .iter()
376 .map(|col_metadata| {
377 let num_pages = col_metadata.pages.len();
378 let size_bytes = col_metadata
379 .pages
380 .iter()
381 .map(|page| page.buffer_sizes.iter().sum::<u64>())
382 .sum::<u64>();
383 ColumnStatistics {
384 num_pages,
385 size_bytes,
386 }
387 })
388 .collect();
389
390 FileStatistics {
391 columns: column_stats,
392 }
393 }
394
395 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
396 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
397 self.scheduler
398 .submit_single(
399 buffer_desc.position..buffer_desc.position + buffer_desc.size,
400 0,
401 )
402 .await
403 }
404
405 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
406 let file_size = scheduler.reader().size().await? as u64;
407 let begin = if file_size < scheduler.reader().block_size() as u64 {
408 0
409 } else {
410 file_size - scheduler.reader().block_size() as u64
411 };
412 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
413 Ok((tail_bytes, file_size))
414 }
415
416 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
419 let len = footer_bytes.len();
420 if len < FOOTER_LEN {
421 return Err(Error::io(
422 format!(
423 "does not have sufficient data, len: {}, bytes: {:?}",
424 len, footer_bytes
425 ),
426 location!(),
427 ));
428 }
429 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
430
431 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
432 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
433 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
434 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
435 let num_columns = cursor.read_u32::<LittleEndian>()?;
436 let major_version = cursor.read_u16::<LittleEndian>()?;
437 let minor_version = cursor.read_u16::<LittleEndian>()?;
438
439 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
440 return Err(Error::version_conflict(
441 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
442 major_version,
443 minor_version,
444 location!(),
445 ));
446 }
447
448 let magic_bytes = footer_bytes.slice(len - 4..);
449 if magic_bytes.as_ref() != MAGIC {
450 return Err(Error::io(
451 format!(
452 "file does not appear to be a Lance file (invalid magic: {:?})",
453 MAGIC
454 ),
455 location!(),
456 ));
457 }
458 Ok(Footer {
459 column_meta_start,
460 column_meta_offsets_start,
461 global_buff_offsets_start,
462 num_global_buffers,
463 num_columns,
464 major_version,
465 minor_version,
466 })
467 }
468
469 fn read_all_column_metadata(
471 column_metadata_bytes: Bytes,
472 footer: &Footer,
473 ) -> Result<Vec<pbfile::ColumnMetadata>> {
474 let column_metadata_start = footer.column_meta_start;
475 let cmo_table_size = 16 * footer.num_columns as usize;
477 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
478
479 (0..footer.num_columns)
480 .map(|col_idx| {
481 let offset = (col_idx * 16) as usize;
482 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
483 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
484 let normalized_position = (position - column_metadata_start) as usize;
485 let normalized_end = normalized_position + (length as usize);
486 Ok(pbfile::ColumnMetadata::decode(
487 &column_metadata_bytes[normalized_position..normalized_end],
488 )?)
489 })
490 .collect::<Result<Vec<_>>>()
491 }
492
493 async fn optimistic_tail_read(
494 data: &Bytes,
495 start_pos: u64,
496 scheduler: &FileScheduler,
497 file_len: u64,
498 ) -> Result<Bytes> {
499 let num_bytes_needed = (file_len - start_pos) as usize;
500 if data.len() >= num_bytes_needed {
501 Ok(data.slice((data.len() - num_bytes_needed)..))
502 } else {
503 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
504 let start = file_len - num_bytes_needed as u64;
505 let missing_bytes = scheduler
506 .submit_single(start..start + num_bytes_missing, 0)
507 .await?;
508 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
509 combined.extend(missing_bytes);
510 combined.extend(data);
511 Ok(combined.freeze())
512 }
513 }
514
515 fn do_decode_gbo_table(
516 gbo_bytes: &Bytes,
517 footer: &Footer,
518 version: LanceFileVersion,
519 ) -> Result<Vec<BufferDescriptor>> {
520 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
521
522 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
523 for _ in 0..footer.num_global_buffers {
524 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
525 assert!(
526 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
527 );
528 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
529 global_buffers.push(BufferDescriptor {
530 position: buf_pos,
531 size: buf_size,
532 });
533 }
534
535 Ok(global_buffers)
536 }
537
538 async fn decode_gbo_table(
539 tail_bytes: &Bytes,
540 file_len: u64,
541 scheduler: &FileScheduler,
542 footer: &Footer,
543 version: LanceFileVersion,
544 ) -> Result<Vec<BufferDescriptor>> {
545 let gbo_bytes = Self::optimistic_tail_read(
548 tail_bytes,
549 footer.global_buff_offsets_start,
550 scheduler,
551 file_len,
552 )
553 .await?;
554 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
555 }
556
557 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
558 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
559 let pb_schema = file_descriptor.schema.unwrap();
560 let num_rows = file_descriptor.length;
561 let fields_with_meta = FieldsWithMeta {
562 fields: Fields(pb_schema.fields),
563 metadata: pb_schema.metadata,
564 };
565 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
566 Ok((num_rows, schema))
567 }
568
569 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
583 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
585 let footer = Self::decode_footer(&tail_bytes)?;
586
587 let file_version = LanceFileVersion::try_from_major_minor(
588 footer.major_version as u32,
589 footer.minor_version as u32,
590 )?;
591
592 let gbo_table =
593 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
594 if gbo_table.is_empty() {
595 return Err(Error::Internal {
596 message: "File did not contain any global buffers, schema expected".to_string(),
597 location: location!(),
598 });
599 }
600 let schema_start = gbo_table[0].position;
601 let schema_size = gbo_table[0].size;
602
603 let num_footer_bytes = file_len - schema_start;
604
605 let all_metadata_bytes =
608 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
609
610 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
611 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
612
613 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
616 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
617 let column_metadata_bytes =
618 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
619 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
620
621 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
622 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
623 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
624
625 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
626
627 Ok(CachedFileMetadata {
628 file_schema: Arc::new(schema),
629 column_metadatas,
630 column_infos,
631 num_rows,
632 num_data_bytes,
633 num_column_metadata_bytes,
634 num_global_buffer_bytes,
635 num_footer_bytes,
636 file_buffers: gbo_table,
637 major_version: footer.major_version,
638 minor_version: footer.minor_version,
639 })
640 }
641
642 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
643 match &encoding.location {
644 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
645 Some(pbfile::encoding::Location::Direct(encoding)) => {
646 let encoding_buf = Bytes::from(encoding.encoding.clone());
647 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
648 encoding_any.to_msg::<M>().unwrap()
649 }
650 Some(pbfile::encoding::Location::None(_)) => panic!(),
651 None => panic!(),
652 }
653 }
654
655 fn meta_to_col_infos(
656 column_metadatas: &[pbfile::ColumnMetadata],
657 file_version: LanceFileVersion,
658 ) -> Vec<Arc<ColumnInfo>> {
659 column_metadatas
660 .iter()
661 .enumerate()
662 .map(|(col_idx, col_meta)| {
663 let page_infos = col_meta
664 .pages
665 .iter()
666 .map(|page| {
667 let num_rows = page.length;
668 let encoding = match file_version {
669 LanceFileVersion::V2_0 => {
670 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
671 page.encoding.as_ref().unwrap(),
672 ))
673 }
674 _ => {
675 PageEncoding::Structural(Self::fetch_encoding::<pbenc::PageLayout>(
676 page.encoding.as_ref().unwrap(),
677 ))
678 }
679 };
680 let buffer_offsets_and_sizes = Arc::from(
681 page.buffer_offsets
682 .iter()
683 .zip(page.buffer_sizes.iter())
684 .map(|(offset, size)| {
685 assert!(
687 file_version < LanceFileVersion::V2_1
688 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
689 );
690 (*offset, *size)
691 })
692 .collect::<Vec<_>>(),
693 );
694 PageInfo {
695 buffer_offsets_and_sizes,
696 encoding,
697 num_rows,
698 priority: page.priority,
699 }
700 })
701 .collect::<Vec<_>>();
702 let buffer_offsets_and_sizes = Arc::from(
703 col_meta
704 .buffer_offsets
705 .iter()
706 .zip(col_meta.buffer_sizes.iter())
707 .map(|(offset, size)| (*offset, *size))
708 .collect::<Vec<_>>(),
709 );
710 Arc::new(ColumnInfo {
711 index: col_idx as u32,
712 page_infos: Arc::from(page_infos),
713 buffer_offsets_and_sizes,
714 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
715 })
716 })
717 .collect::<Vec<_>>()
718 }
719
720 fn validate_projection(
721 projection: &ReaderProjection,
722 metadata: &CachedFileMetadata,
723 ) -> Result<()> {
724 if projection.schema.fields.is_empty() {
725 return Err(Error::invalid_input(
726 "Attempt to read zero columns from the file, at least one column must be specified"
727 .to_string(),
728 location!(),
729 ));
730 }
731 let mut column_indices_seen = BTreeSet::new();
732 for column_index in &projection.column_indices {
733 if !column_indices_seen.insert(*column_index) {
734 return Err(Error::invalid_input(
735 format!(
736 "The projection specified the column index {} more than once",
737 column_index
738 ),
739 location!(),
740 ));
741 }
742 if *column_index >= metadata.column_infos.len() as u32 {
743 return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
744 }
745 }
746 Ok(())
747 }
748
749 pub async fn try_open(
756 scheduler: FileScheduler,
757 base_projection: Option<ReaderProjection>,
758 decoder_plugins: Arc<DecoderPlugins>,
759 cache: &LanceCache,
760 options: FileReaderOptions,
761 ) -> Result<Self> {
762 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
763 let path = scheduler.reader().path().clone();
764 Self::try_open_with_file_metadata(
765 Arc::new(LanceEncodingsIo(scheduler)),
766 path,
767 base_projection,
768 decoder_plugins,
769 file_metadata,
770 cache,
771 options,
772 )
773 .await
774 }
775
776 pub async fn try_open_with_file_metadata(
782 scheduler: Arc<dyn EncodingsIo>,
783 path: Path,
784 base_projection: Option<ReaderProjection>,
785 decoder_plugins: Arc<DecoderPlugins>,
786 file_metadata: Arc<CachedFileMetadata>,
787 cache: &LanceCache,
788 options: FileReaderOptions,
789 ) -> Result<Self> {
790 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
791
792 if let Some(base_projection) = base_projection.as_ref() {
793 Self::validate_projection(base_projection, &file_metadata)?;
794 }
795 let num_rows = file_metadata.num_rows;
796 Ok(Self {
797 scheduler,
798 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
799 file_metadata.file_schema.as_ref(),
800 file_metadata.version(),
801 )),
802 num_rows,
803 metadata: file_metadata,
804 decoder_plugins,
805 cache,
806 options,
807 })
808 }
809
810 fn collect_columns_from_projection(
824 &self,
825 _projection: &ReaderProjection,
826 ) -> Result<Vec<Arc<ColumnInfo>>> {
827 Ok(self.metadata.column_infos.to_vec())
828 }
829
830 #[allow(clippy::too_many_arguments)]
831 fn do_read_range(
832 column_infos: Vec<Arc<ColumnInfo>>,
833 io: Arc<dyn EncodingsIo>,
834 cache: Arc<LanceCache>,
835 num_rows: u64,
836 decoder_plugins: Arc<DecoderPlugins>,
837 range: Range<u64>,
838 batch_size: u32,
839 projection: ReaderProjection,
840 filter: FilterExpression,
841 should_validate: bool,
842 cache_repetition_index: bool,
843 ) -> Result<BoxStream<'static, ReadBatchTask>> {
844 debug!(
845 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
846 range,
847 batch_size,
848 num_rows,
849 column_infos.len(),
850 projection.schema.fields.len(),
851 );
852
853 let config = SchedulerDecoderConfig {
854 batch_size,
855 cache,
856 decoder_plugins,
857 io,
858 should_validate,
859 cache_repetition_index,
860 };
861
862 let requested_rows = RequestedRows::Ranges(vec![range]);
863
864 Ok(schedule_and_decode(
865 column_infos,
866 requested_rows,
867 filter,
868 projection.column_indices,
869 projection.schema,
870 config,
871 ))
872 }
873
874 fn read_range(
875 &self,
876 range: Range<u64>,
877 batch_size: u32,
878 projection: ReaderProjection,
879 filter: FilterExpression,
880 ) -> Result<BoxStream<'static, ReadBatchTask>> {
881 Self::do_read_range(
883 self.collect_columns_from_projection(&projection)?,
884 self.scheduler.clone(),
885 self.cache.clone(),
886 self.num_rows,
887 self.decoder_plugins.clone(),
888 range,
889 batch_size,
890 projection,
891 filter,
892 self.options.validate_on_decode,
893 self.options.cache_repetition_index,
894 )
895 }
896
897 #[allow(clippy::too_many_arguments)]
898 fn do_take_rows(
899 column_infos: Vec<Arc<ColumnInfo>>,
900 io: Arc<dyn EncodingsIo>,
901 cache: Arc<LanceCache>,
902 decoder_plugins: Arc<DecoderPlugins>,
903 indices: Vec<u64>,
904 batch_size: u32,
905 projection: ReaderProjection,
906 filter: FilterExpression,
907 should_validate: bool,
908 cache_repetition_index: bool,
909 ) -> Result<BoxStream<'static, ReadBatchTask>> {
910 debug!(
911 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
912 indices.len(),
913 indices[0],
914 indices[indices.len() - 1],
915 batch_size,
916 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
917 );
918
919 let config = SchedulerDecoderConfig {
920 batch_size,
921 cache,
922 decoder_plugins,
923 io,
924 should_validate,
925 cache_repetition_index,
926 };
927
928 let requested_rows = RequestedRows::Indices(indices);
929
930 Ok(schedule_and_decode(
931 column_infos,
932 requested_rows,
933 filter,
934 projection.column_indices,
935 projection.schema,
936 config,
937 ))
938 }
939
940 fn take_rows(
941 &self,
942 indices: Vec<u64>,
943 batch_size: u32,
944 projection: ReaderProjection,
945 ) -> Result<BoxStream<'static, ReadBatchTask>> {
946 Self::do_take_rows(
948 self.collect_columns_from_projection(&projection)?,
949 self.scheduler.clone(),
950 self.cache.clone(),
951 self.decoder_plugins.clone(),
952 indices,
953 batch_size,
954 projection,
955 FilterExpression::no_filter(),
956 self.options.validate_on_decode,
957 self.options.cache_repetition_index,
958 )
959 }
960
961 #[allow(clippy::too_many_arguments)]
962 fn do_read_ranges(
963 column_infos: Vec<Arc<ColumnInfo>>,
964 io: Arc<dyn EncodingsIo>,
965 cache: Arc<LanceCache>,
966 decoder_plugins: Arc<DecoderPlugins>,
967 ranges: Vec<Range<u64>>,
968 batch_size: u32,
969 projection: ReaderProjection,
970 filter: FilterExpression,
971 should_validate: bool,
972 cache_repetition_index: bool,
973 ) -> Result<BoxStream<'static, ReadBatchTask>> {
974 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
975 debug!(
976 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
977 ranges.len(),
978 num_rows,
979 ranges[0].start,
980 ranges[ranges.len() - 1].end,
981 batch_size,
982 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
983 );
984
985 let config = SchedulerDecoderConfig {
986 batch_size,
987 cache,
988 decoder_plugins,
989 io,
990 should_validate,
991 cache_repetition_index,
992 };
993
994 let requested_rows = RequestedRows::Ranges(ranges);
995
996 Ok(schedule_and_decode(
997 column_infos,
998 requested_rows,
999 filter,
1000 projection.column_indices,
1001 projection.schema,
1002 config,
1003 ))
1004 }
1005
1006 fn read_ranges(
1007 &self,
1008 ranges: Vec<Range<u64>>,
1009 batch_size: u32,
1010 projection: ReaderProjection,
1011 filter: FilterExpression,
1012 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1013 Self::do_read_ranges(
1014 self.collect_columns_from_projection(&projection)?,
1015 self.scheduler.clone(),
1016 self.cache.clone(),
1017 self.decoder_plugins.clone(),
1018 ranges,
1019 batch_size,
1020 projection,
1021 filter,
1022 self.options.validate_on_decode,
1023 self.options.cache_repetition_index,
1024 )
1025 }
1026
1027 pub fn read_tasks(
1038 &self,
1039 params: ReadBatchParams,
1040 batch_size: u32,
1041 projection: Option<ReaderProjection>,
1042 filter: FilterExpression,
1043 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1044 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1045 Self::validate_projection(&projection, &self.metadata)?;
1046 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1047 if bound > self.num_rows || bound == self.num_rows && inclusive {
1048 Err(Error::invalid_input(
1049 format!(
1050 "cannot read {:?} from file with {} rows",
1051 params, self.num_rows
1052 ),
1053 location!(),
1054 ))
1055 } else {
1056 Ok(())
1057 }
1058 };
1059 match ¶ms {
1060 ReadBatchParams::Indices(indices) => {
1061 for idx in indices {
1062 match idx {
1063 None => {
1064 return Err(Error::invalid_input(
1065 "Null value in indices array",
1066 location!(),
1067 ));
1068 }
1069 Some(idx) => {
1070 verify_bound(¶ms, idx as u64, true)?;
1071 }
1072 }
1073 }
1074 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1075 self.take_rows(indices, batch_size, projection)
1076 }
1077 ReadBatchParams::Range(range) => {
1078 verify_bound(¶ms, range.end as u64, false)?;
1079 self.read_range(
1080 range.start as u64..range.end as u64,
1081 batch_size,
1082 projection,
1083 filter,
1084 )
1085 }
1086 ReadBatchParams::Ranges(ranges) => {
1087 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1088 for range in ranges.as_ref() {
1089 verify_bound(¶ms, range.end, false)?;
1090 ranges_u64.push(range.start..range.end);
1091 }
1092 self.read_ranges(ranges_u64, batch_size, projection, filter)
1093 }
1094 ReadBatchParams::RangeFrom(range) => {
1095 verify_bound(¶ms, range.start as u64, true)?;
1096 self.read_range(
1097 range.start as u64..self.num_rows,
1098 batch_size,
1099 projection,
1100 filter,
1101 )
1102 }
1103 ReadBatchParams::RangeTo(range) => {
1104 verify_bound(¶ms, range.end as u64, false)?;
1105 self.read_range(0..range.end as u64, batch_size, projection, filter)
1106 }
1107 ReadBatchParams::RangeFull => {
1108 self.read_range(0..self.num_rows, batch_size, projection, filter)
1109 }
1110 }
1111 }
1112
1113 pub fn read_stream_projected(
1135 &self,
1136 params: ReadBatchParams,
1137 batch_size: u32,
1138 batch_readahead: u32,
1139 projection: ReaderProjection,
1140 filter: FilterExpression,
1141 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1142 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1143 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1144 let batch_stream = tasks_stream
1145 .map(|task| task.task)
1146 .buffered(batch_readahead as usize)
1147 .boxed();
1148 Ok(Box::pin(RecordBatchStreamAdapter::new(
1149 arrow_schema,
1150 batch_stream,
1151 )))
1152 }
1153
1154 fn take_rows_blocking(
1155 &self,
1156 indices: Vec<u64>,
1157 batch_size: u32,
1158 projection: ReaderProjection,
1159 filter: FilterExpression,
1160 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1161 let column_infos = self.collect_columns_from_projection(&projection)?;
1162 debug!(
1163 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1164 indices.len(),
1165 indices[0],
1166 indices[indices.len() - 1],
1167 batch_size,
1168 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1169 );
1170
1171 let config = SchedulerDecoderConfig {
1172 batch_size,
1173 cache: self.cache.clone(),
1174 decoder_plugins: self.decoder_plugins.clone(),
1175 io: self.scheduler.clone(),
1176 should_validate: self.options.validate_on_decode,
1177 cache_repetition_index: self.options.cache_repetition_index,
1178 };
1179
1180 let requested_rows = RequestedRows::Indices(indices);
1181
1182 schedule_and_decode_blocking(
1183 column_infos,
1184 requested_rows,
1185 filter,
1186 projection.column_indices,
1187 projection.schema,
1188 config,
1189 )
1190 }
1191
1192 fn read_ranges_blocking(
1193 &self,
1194 ranges: Vec<Range<u64>>,
1195 batch_size: u32,
1196 projection: ReaderProjection,
1197 filter: FilterExpression,
1198 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1199 let column_infos = self.collect_columns_from_projection(&projection)?;
1200 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1201 debug!(
1202 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1203 ranges.len(),
1204 num_rows,
1205 ranges[0].start,
1206 ranges[ranges.len() - 1].end,
1207 batch_size,
1208 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1209 );
1210
1211 let config = SchedulerDecoderConfig {
1212 batch_size,
1213 cache: self.cache.clone(),
1214 decoder_plugins: self.decoder_plugins.clone(),
1215 io: self.scheduler.clone(),
1216 should_validate: self.options.validate_on_decode,
1217 cache_repetition_index: self.options.cache_repetition_index,
1218 };
1219
1220 let requested_rows = RequestedRows::Ranges(ranges);
1221
1222 schedule_and_decode_blocking(
1223 column_infos,
1224 requested_rows,
1225 filter,
1226 projection.column_indices,
1227 projection.schema,
1228 config,
1229 )
1230 }
1231
1232 fn read_range_blocking(
1233 &self,
1234 range: Range<u64>,
1235 batch_size: u32,
1236 projection: ReaderProjection,
1237 filter: FilterExpression,
1238 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1239 let column_infos = self.collect_columns_from_projection(&projection)?;
1240 let num_rows = self.num_rows;
1241
1242 debug!(
1243 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1244 range,
1245 batch_size,
1246 num_rows,
1247 column_infos.len(),
1248 projection.schema.fields.len(),
1249 );
1250
1251 let config = SchedulerDecoderConfig {
1252 batch_size,
1253 cache: self.cache.clone(),
1254 decoder_plugins: self.decoder_plugins.clone(),
1255 io: self.scheduler.clone(),
1256 should_validate: self.options.validate_on_decode,
1257 cache_repetition_index: self.options.cache_repetition_index,
1258 };
1259
1260 let requested_rows = RequestedRows::Ranges(vec![range]);
1261
1262 schedule_and_decode_blocking(
1263 column_infos,
1264 requested_rows,
1265 filter,
1266 projection.column_indices,
1267 projection.schema,
1268 config,
1269 )
1270 }
1271
1272 pub fn read_stream_projected_blocking(
1284 &self,
1285 params: ReadBatchParams,
1286 batch_size: u32,
1287 projection: Option<ReaderProjection>,
1288 filter: FilterExpression,
1289 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1290 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1291 Self::validate_projection(&projection, &self.metadata)?;
1292 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1293 if bound > self.num_rows || bound == self.num_rows && inclusive {
1294 Err(Error::invalid_input(
1295 format!(
1296 "cannot read {:?} from file with {} rows",
1297 params, self.num_rows
1298 ),
1299 location!(),
1300 ))
1301 } else {
1302 Ok(())
1303 }
1304 };
1305 match ¶ms {
1306 ReadBatchParams::Indices(indices) => {
1307 for idx in indices {
1308 match idx {
1309 None => {
1310 return Err(Error::invalid_input(
1311 "Null value in indices array",
1312 location!(),
1313 ));
1314 }
1315 Some(idx) => {
1316 verify_bound(¶ms, idx as u64, true)?;
1317 }
1318 }
1319 }
1320 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1321 self.take_rows_blocking(indices, batch_size, projection, filter)
1322 }
1323 ReadBatchParams::Range(range) => {
1324 verify_bound(¶ms, range.end as u64, false)?;
1325 self.read_range_blocking(
1326 range.start as u64..range.end as u64,
1327 batch_size,
1328 projection,
1329 filter,
1330 )
1331 }
1332 ReadBatchParams::Ranges(ranges) => {
1333 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1334 for range in ranges.as_ref() {
1335 verify_bound(¶ms, range.end, false)?;
1336 ranges_u64.push(range.start..range.end);
1337 }
1338 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1339 }
1340 ReadBatchParams::RangeFrom(range) => {
1341 verify_bound(¶ms, range.start as u64, true)?;
1342 self.read_range_blocking(
1343 range.start as u64..self.num_rows,
1344 batch_size,
1345 projection,
1346 filter,
1347 )
1348 }
1349 ReadBatchParams::RangeTo(range) => {
1350 verify_bound(¶ms, range.end as u64, false)?;
1351 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1352 }
1353 ReadBatchParams::RangeFull => {
1354 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1355 }
1356 }
1357 }
1358
1359 pub fn read_stream(
1365 &self,
1366 params: ReadBatchParams,
1367 batch_size: u32,
1368 batch_readahead: u32,
1369 filter: FilterExpression,
1370 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1371 self.read_stream_projected(
1372 params,
1373 batch_size,
1374 batch_readahead,
1375 self.base_projection.clone(),
1376 filter,
1377 )
1378 }
1379
1380 pub fn schema(&self) -> &Arc<Schema> {
1381 &self.metadata.file_schema
1382 }
1383}
1384
1385pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1387 if let Some(encoding) = &page.encoding {
1388 if let Some(style) = &encoding.location {
1389 match style {
1390 pbfile::encoding::Location::Indirect(indirect) => {
1391 format!(
1392 "IndirectEncoding(pos={},size={})",
1393 indirect.buffer_location, indirect.buffer_length
1394 )
1395 }
1396 pbfile::encoding::Location::Direct(direct) => {
1397 let encoding_any =
1398 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1399 .expect("failed to deserialize encoding as protobuf");
1400 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1401 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1402 match encoding {
1403 Ok(encoding) => {
1404 format!("{:#?}", encoding)
1405 }
1406 Err(err) => {
1407 format!("Unsupported(decode_err={})", err)
1408 }
1409 }
1410 } else if encoding_any.type_url == "/lance.encodings.PageLayout" {
1411 let encoding = encoding_any.to_msg::<pbenc::PageLayout>();
1412 match encoding {
1413 Ok(encoding) => {
1414 format!("{:#?}", encoding)
1415 }
1416 Err(err) => {
1417 format!("Unsupported(decode_err={})", err)
1418 }
1419 }
1420 } else {
1421 format!("Unrecognized(type_url={})", encoding_any.type_url)
1422 }
1423 }
1424 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1425 }
1426 } else {
1427 "MISSING STYLE".to_string()
1428 }
1429 } else {
1430 "MISSING".to_string()
1431 }
1432}
1433
1434pub trait EncodedBatchReaderExt {
1435 fn try_from_mini_lance(
1436 bytes: Bytes,
1437 schema: &Schema,
1438 version: LanceFileVersion,
1439 ) -> Result<Self>
1440 where
1441 Self: Sized;
1442 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1443 where
1444 Self: Sized;
1445}
1446
1447impl EncodedBatchReaderExt for EncodedBatch {
1448 fn try_from_mini_lance(
1449 bytes: Bytes,
1450 schema: &Schema,
1451 file_version: LanceFileVersion,
1452 ) -> Result<Self>
1453 where
1454 Self: Sized,
1455 {
1456 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1457 let footer = FileReader::decode_footer(&bytes)?;
1458
1459 let column_metadata_start = footer.column_meta_start as usize;
1462 let column_metadata_end = footer.global_buff_offsets_start as usize;
1463 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1464 let column_metadatas =
1465 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1466
1467 let file_version = LanceFileVersion::try_from_major_minor(
1468 footer.major_version as u32,
1469 footer.minor_version as u32,
1470 )?;
1471
1472 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1473
1474 Ok(Self {
1475 data: bytes,
1476 num_rows: page_table
1477 .first()
1478 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1479 .unwrap_or(0),
1480 page_table,
1481 top_level_columns: projection.column_indices,
1482 schema: Arc::new(schema.clone()),
1483 })
1484 }
1485
1486 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1487 where
1488 Self: Sized,
1489 {
1490 let footer = FileReader::decode_footer(&bytes)?;
1491 let file_version = LanceFileVersion::try_from_major_minor(
1492 footer.major_version as u32,
1493 footer.minor_version as u32,
1494 )?;
1495
1496 let gbo_table = FileReader::do_decode_gbo_table(
1497 &bytes.slice(footer.global_buff_offsets_start as usize..),
1498 &footer,
1499 file_version,
1500 )?;
1501 if gbo_table.is_empty() {
1502 return Err(Error::Internal {
1503 message: "File did not contain any global buffers, schema expected".to_string(),
1504 location: location!(),
1505 });
1506 }
1507 let schema_start = gbo_table[0].position as usize;
1508 let schema_size = gbo_table[0].size as usize;
1509
1510 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1511 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1512 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1513
1514 let column_metadata_start = footer.column_meta_start as usize;
1517 let column_metadata_end = footer.global_buff_offsets_start as usize;
1518 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1519 let column_metadatas =
1520 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1521
1522 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1523
1524 Ok(Self {
1525 data: bytes,
1526 num_rows: page_table
1527 .first()
1528 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1529 .unwrap_or(0),
1530 page_table,
1531 top_level_columns: projection.column_indices,
1532 schema: Arc::new(schema),
1533 })
1534 }
1535}
1536
1537#[cfg(test)]
1538pub mod tests {
1539 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1540
1541 use arrow_array::{
1542 types::{Float64Type, Int32Type},
1543 RecordBatch, UInt32Array,
1544 };
1545 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1546 use bytes::Bytes;
1547 use futures::{prelude::stream::TryStreamExt, StreamExt};
1548 use lance_arrow::RecordBatchExt;
1549 use lance_core::{datatypes::Schema, ArrowResult};
1550 use lance_datagen::{array, gen, BatchCount, ByteCount, RowCount};
1551 use lance_encoding::{
1552 decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1553 encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1554 version::LanceFileVersion,
1555 };
1556 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1557 use log::debug;
1558 use rstest::rstest;
1559 use tokio::sync::mpsc;
1560
1561 use crate::v2::{
1562 reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1563 testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1564 writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1565 };
1566
1567 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1568 let location_type = DataType::Struct(Fields::from(vec![
1569 Field::new("x", DataType::Float64, true),
1570 Field::new("y", DataType::Float64, true),
1571 ]));
1572 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1573
1574 let mut reader = gen()
1575 .col("score", array::rand::<Float64Type>())
1576 .col("location", array::rand_type(&location_type))
1577 .col("categories", array::rand_type(&categories_type))
1578 .col("binary", array::rand_type(&DataType::Binary));
1579 if version <= LanceFileVersion::V2_0 {
1580 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1581 }
1582 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1583
1584 write_lance_file(
1585 reader,
1586 fs,
1587 FileWriterOptions {
1588 format_version: Some(version),
1589 ..Default::default()
1590 },
1591 )
1592 .await
1593 }
1594
1595 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1596
1597 async fn verify_expected(
1598 expected: &[RecordBatch],
1599 mut actual: Pin<Box<dyn RecordBatchStream>>,
1600 read_size: u32,
1601 transform: Option<Transformer>,
1602 ) {
1603 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1604 let mut expected_iter = expected.iter().map(|batch| {
1605 if let Some(transform) = &transform {
1606 transform(batch)
1607 } else {
1608 batch.clone()
1609 }
1610 });
1611 let mut next_expected = expected_iter.next().unwrap().clone();
1612 while let Some(actual) = actual.next().await {
1613 let mut actual = actual.unwrap();
1614 let mut rows_to_verify = actual.num_rows() as u32;
1615 let expected_length = remaining.min(read_size);
1616 assert_eq!(expected_length, rows_to_verify);
1617
1618 while rows_to_verify > 0 {
1619 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1620 assert_eq!(
1621 next_expected.slice(0, next_slice_len as usize),
1622 actual.slice(0, next_slice_len as usize)
1623 );
1624 remaining -= next_slice_len;
1625 rows_to_verify -= next_slice_len;
1626 if remaining > 0 {
1627 if next_slice_len == next_expected.num_rows() as u32 {
1628 next_expected = expected_iter.next().unwrap().clone();
1629 } else {
1630 next_expected = next_expected.slice(
1631 next_slice_len as usize,
1632 next_expected.num_rows() - next_slice_len as usize,
1633 );
1634 }
1635 }
1636 if rows_to_verify > 0 {
1637 actual = actual.slice(
1638 next_slice_len as usize,
1639 actual.num_rows() - next_slice_len as usize,
1640 );
1641 }
1642 }
1643 }
1644 assert_eq!(remaining, 0);
1645 }
1646
1647 #[tokio::test]
1648 async fn test_round_trip() {
1649 let fs = FsFixture::default();
1650
1651 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1652
1653 for read_size in [32, 1024, 1024 * 1024] {
1654 let file_scheduler = fs
1655 .scheduler
1656 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1657 .await
1658 .unwrap();
1659 let file_reader = FileReader::try_open(
1660 file_scheduler,
1661 None,
1662 Arc::<DecoderPlugins>::default(),
1663 &test_cache(),
1664 FileReaderOptions::default(),
1665 )
1666 .await
1667 .unwrap();
1668
1669 let schema = file_reader.schema();
1670 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1671
1672 let batch_stream = file_reader
1673 .read_stream(
1674 lance_io::ReadBatchParams::RangeFull,
1675 read_size,
1676 16,
1677 FilterExpression::no_filter(),
1678 )
1679 .unwrap();
1680
1681 verify_expected(&data, batch_stream, read_size, None).await;
1682 }
1683 }
1684
1685 #[rstest]
1686 #[test_log::test(tokio::test)]
1687 async fn test_encoded_batch_round_trip(
1688 #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1690 ) {
1691 let data = gen()
1692 .col("x", array::rand::<Int32Type>())
1693 .col("y", array::rand_utf8(ByteCount::from(16), false))
1694 .into_batch_rows(RowCount::from(10000))
1695 .unwrap();
1696
1697 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1698
1699 let encoding_options = EncodingOptions {
1700 cache_bytes_per_column: 4096,
1701 max_page_bytes: 32 * 1024 * 1024,
1702 keep_original_array: true,
1703 buffer_alignment: 64,
1704 };
1705
1706 let encoding_strategy = default_encoding_strategy(version);
1707
1708 let encoded_batch = encode_batch(
1709 &data,
1710 lance_schema.clone(),
1711 encoding_strategy.as_ref(),
1712 &encoding_options,
1713 )
1714 .await
1715 .unwrap();
1716
1717 let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1719
1720 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1721
1722 let decoded = decode_batch(
1723 &decoded_batch,
1724 &FilterExpression::no_filter(),
1725 Arc::<DecoderPlugins>::default(),
1726 false,
1727 LanceFileVersion::default(),
1728 None,
1729 )
1730 .await
1731 .unwrap();
1732
1733 assert_eq!(data, decoded);
1734
1735 let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1737 let decoded_batch =
1738 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1739 .unwrap();
1740 let decoded = decode_batch(
1741 &decoded_batch,
1742 &FilterExpression::no_filter(),
1743 Arc::<DecoderPlugins>::default(),
1744 false,
1745 LanceFileVersion::default(),
1746 None,
1747 )
1748 .await
1749 .unwrap();
1750
1751 assert_eq!(data, decoded);
1752 }
1753
1754 #[rstest]
1755 #[test_log::test(tokio::test)]
1756 async fn test_projection(
1757 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1758 ) {
1759 let fs = FsFixture::default();
1760
1761 let written_file = create_some_file(&fs, version).await;
1762 let file_scheduler = fs
1763 .scheduler
1764 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1765 .await
1766 .unwrap();
1767
1768 let field_id_mapping = written_file
1769 .field_id_mapping
1770 .iter()
1771 .copied()
1772 .collect::<BTreeMap<_, _>>();
1773
1774 let empty_projection = ReaderProjection {
1775 column_indices: Vec::default(),
1776 schema: Arc::new(Schema::default()),
1777 };
1778
1779 for columns in [
1780 vec!["score"],
1781 vec!["location"],
1782 vec!["categories"],
1783 vec!["score.x"],
1784 vec!["score", "categories"],
1785 vec!["score", "location"],
1786 vec!["location", "categories"],
1787 vec!["score.y", "location", "categories"],
1788 ] {
1789 debug!("Testing round trip with projection {:?}", columns);
1790 for use_field_ids in [true, false] {
1791 let file_reader = FileReader::try_open(
1793 file_scheduler.clone(),
1794 None,
1795 Arc::<DecoderPlugins>::default(),
1796 &test_cache(),
1797 FileReaderOptions::default(),
1798 )
1799 .await
1800 .unwrap();
1801
1802 let projected_schema = written_file.schema.project(&columns).unwrap();
1803 let projection = if use_field_ids {
1804 ReaderProjection::from_field_ids(
1805 file_reader.metadata.version(),
1806 &projected_schema,
1807 &field_id_mapping,
1808 )
1809 .unwrap()
1810 } else {
1811 ReaderProjection::from_column_names(
1812 file_reader.metadata.version(),
1813 &written_file.schema,
1814 &columns,
1815 )
1816 .unwrap()
1817 };
1818
1819 let batch_stream = file_reader
1820 .read_stream_projected(
1821 lance_io::ReadBatchParams::RangeFull,
1822 1024,
1823 16,
1824 projection.clone(),
1825 FilterExpression::no_filter(),
1826 )
1827 .unwrap();
1828
1829 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1830 verify_expected(
1831 &written_file.data,
1832 batch_stream,
1833 1024,
1834 Some(Box::new(move |batch: &RecordBatch| {
1835 batch.project_by_schema(&projection_arrow).unwrap()
1836 })),
1837 )
1838 .await;
1839
1840 let file_reader = FileReader::try_open(
1842 file_scheduler.clone(),
1843 Some(projection.clone()),
1844 Arc::<DecoderPlugins>::default(),
1845 &test_cache(),
1846 FileReaderOptions::default(),
1847 )
1848 .await
1849 .unwrap();
1850
1851 let batch_stream = file_reader
1852 .read_stream(
1853 lance_io::ReadBatchParams::RangeFull,
1854 1024,
1855 16,
1856 FilterExpression::no_filter(),
1857 )
1858 .unwrap();
1859
1860 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1861 verify_expected(
1862 &written_file.data,
1863 batch_stream,
1864 1024,
1865 Some(Box::new(move |batch: &RecordBatch| {
1866 batch.project_by_schema(&projection_arrow).unwrap()
1867 })),
1868 )
1869 .await;
1870
1871 assert!(file_reader
1872 .read_stream_projected(
1873 lance_io::ReadBatchParams::RangeFull,
1874 1024,
1875 16,
1876 empty_projection.clone(),
1877 FilterExpression::no_filter(),
1878 )
1879 .is_err());
1880 }
1881 }
1882
1883 assert!(FileReader::try_open(
1884 file_scheduler.clone(),
1885 Some(empty_projection),
1886 Arc::<DecoderPlugins>::default(),
1887 &test_cache(),
1888 FileReaderOptions::default(),
1889 )
1890 .await
1891 .is_err());
1892
1893 let arrow_schema = ArrowSchema::new(vec![
1894 Field::new("x", DataType::Int32, true),
1895 Field::new("y", DataType::Int32, true),
1896 ]);
1897 let schema = Schema::try_from(&arrow_schema).unwrap();
1898
1899 let projection_with_dupes = ReaderProjection {
1900 column_indices: vec![0, 0],
1901 schema: Arc::new(schema),
1902 };
1903
1904 assert!(FileReader::try_open(
1905 file_scheduler.clone(),
1906 Some(projection_with_dupes),
1907 Arc::<DecoderPlugins>::default(),
1908 &test_cache(),
1909 FileReaderOptions::default(),
1910 )
1911 .await
1912 .is_err());
1913 }
1914
1915 #[test_log::test(tokio::test)]
1916 async fn test_compressing_buffer() {
1917 let fs = FsFixture::default();
1918
1919 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1920 let file_scheduler = fs
1921 .scheduler
1922 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1923 .await
1924 .unwrap();
1925
1926 let file_reader = FileReader::try_open(
1928 file_scheduler.clone(),
1929 None,
1930 Arc::<DecoderPlugins>::default(),
1931 &test_cache(),
1932 FileReaderOptions::default(),
1933 )
1934 .await
1935 .unwrap();
1936
1937 let mut projection = written_file.schema.project(&["score"]).unwrap();
1938 for field in projection.fields.iter_mut() {
1939 field
1940 .metadata
1941 .insert("lance:compression".to_string(), "zstd".to_string());
1942 }
1943 let projection = ReaderProjection {
1944 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1945 schema: Arc::new(projection),
1946 };
1947
1948 let batch_stream = file_reader
1949 .read_stream_projected(
1950 lance_io::ReadBatchParams::RangeFull,
1951 1024,
1952 16,
1953 projection.clone(),
1954 FilterExpression::no_filter(),
1955 )
1956 .unwrap();
1957
1958 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1959 verify_expected(
1960 &written_file.data,
1961 batch_stream,
1962 1024,
1963 Some(Box::new(move |batch: &RecordBatch| {
1964 batch.project_by_schema(&projection_arrow).unwrap()
1965 })),
1966 )
1967 .await;
1968 }
1969
1970 #[tokio::test]
1971 async fn test_read_all() {
1972 let fs = FsFixture::default();
1973 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1974 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1975
1976 let file_scheduler = fs
1977 .scheduler
1978 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1979 .await
1980 .unwrap();
1981 let file_reader = FileReader::try_open(
1982 file_scheduler.clone(),
1983 None,
1984 Arc::<DecoderPlugins>::default(),
1985 &test_cache(),
1986 FileReaderOptions::default(),
1987 )
1988 .await
1989 .unwrap();
1990
1991 let batches = file_reader
1992 .read_stream(
1993 lance_io::ReadBatchParams::RangeFull,
1994 total_rows as u32,
1995 16,
1996 FilterExpression::no_filter(),
1997 )
1998 .unwrap()
1999 .try_collect::<Vec<_>>()
2000 .await
2001 .unwrap();
2002 assert_eq!(batches.len(), 1);
2003 assert_eq!(batches[0].num_rows(), total_rows);
2004 }
2005
2006 #[tokio::test]
2007 async fn test_blocking_take() {
2008 let fs = FsFixture::default();
2009 let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
2010 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2011
2012 let file_scheduler = fs
2013 .scheduler
2014 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2015 .await
2016 .unwrap();
2017 let file_reader = FileReader::try_open(
2018 file_scheduler.clone(),
2019 Some(
2020 ReaderProjection::from_column_names(LanceFileVersion::V2_1, &schema, &["score"])
2021 .unwrap(),
2022 ),
2023 Arc::<DecoderPlugins>::default(),
2024 &test_cache(),
2025 FileReaderOptions::default(),
2026 )
2027 .await
2028 .unwrap();
2029
2030 let batches = tokio::task::spawn_blocking(move || {
2031 file_reader
2032 .read_stream_projected_blocking(
2033 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2034 total_rows as u32,
2035 None,
2036 FilterExpression::no_filter(),
2037 )
2038 .unwrap()
2039 .collect::<ArrowResult<Vec<_>>>()
2040 .unwrap()
2041 })
2042 .await
2043 .unwrap();
2044
2045 assert_eq!(batches.len(), 1);
2046 assert_eq!(batches[0].num_rows(), 5);
2047 assert_eq!(batches[0].num_columns(), 1);
2048 }
2049
2050 #[tokio::test(flavor = "multi_thread")]
2051 async fn test_drop_in_progress() {
2052 let fs = FsFixture::default();
2053 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2054 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2055
2056 let file_scheduler = fs
2057 .scheduler
2058 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2059 .await
2060 .unwrap();
2061 let file_reader = FileReader::try_open(
2062 file_scheduler.clone(),
2063 None,
2064 Arc::<DecoderPlugins>::default(),
2065 &test_cache(),
2066 FileReaderOptions::default(),
2067 )
2068 .await
2069 .unwrap();
2070
2071 let mut batches = file_reader
2072 .read_stream(
2073 lance_io::ReadBatchParams::RangeFull,
2074 (total_rows / 10) as u32,
2075 16,
2076 FilterExpression::no_filter(),
2077 )
2078 .unwrap();
2079
2080 drop(file_reader);
2081
2082 let batch = batches.next().await.unwrap().unwrap();
2083 assert!(batch.num_rows() > 0);
2084
2085 drop(batches);
2087 }
2088
2089 #[tokio::test]
2090 async fn drop_while_scheduling() {
2091 let fs = FsFixture::default();
2101 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2102 let total_rows = written_file
2103 .data
2104 .iter()
2105 .map(|batch| batch.num_rows())
2106 .sum::<usize>();
2107
2108 let file_scheduler = fs
2109 .scheduler
2110 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2111 .await
2112 .unwrap();
2113 let file_reader = FileReader::try_open(
2114 file_scheduler.clone(),
2115 None,
2116 Arc::<DecoderPlugins>::default(),
2117 &test_cache(),
2118 FileReaderOptions::default(),
2119 )
2120 .await
2121 .unwrap();
2122
2123 let projection =
2124 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2125 let column_infos = file_reader
2126 .collect_columns_from_projection(&projection)
2127 .unwrap();
2128 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2129 &projection.schema,
2130 &projection.column_indices,
2131 &column_infos,
2132 &vec![],
2133 total_rows as u64,
2134 Arc::<DecoderPlugins>::default(),
2135 file_reader.scheduler.clone(),
2136 test_cache(),
2137 &FilterExpression::no_filter(),
2138 false,
2139 )
2140 .await
2141 .unwrap();
2142
2143 let range = 0..total_rows as u64;
2144
2145 let (tx, rx) = mpsc::unbounded_channel();
2146
2147 drop(rx);
2149
2150 decode_scheduler.schedule_range(
2152 range,
2153 &FilterExpression::no_filter(),
2154 tx,
2155 file_reader.scheduler.clone(),
2156 )
2157 }
2158
2159 #[tokio::test]
2160 async fn test_global_buffers() {
2161 let fs = FsFixture::default();
2162
2163 let lance_schema =
2164 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2165 "foo",
2166 DataType::Int32,
2167 true,
2168 )]))
2169 .unwrap();
2170
2171 let mut file_writer = FileWriter::try_new(
2172 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2173 lance_schema.clone(),
2174 FileWriterOptions::default(),
2175 )
2176 .unwrap();
2177
2178 let test_bytes = Bytes::from_static(b"hello");
2179
2180 let buf_index = file_writer
2181 .add_global_buffer(test_bytes.clone())
2182 .await
2183 .unwrap();
2184
2185 assert_eq!(buf_index, 1);
2186
2187 file_writer.finish().await.unwrap();
2188
2189 let file_scheduler = fs
2190 .scheduler
2191 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2192 .await
2193 .unwrap();
2194 let file_reader = FileReader::try_open(
2195 file_scheduler.clone(),
2196 None,
2197 Arc::<DecoderPlugins>::default(),
2198 &test_cache(),
2199 FileReaderOptions::default(),
2200 )
2201 .await
2202 .unwrap();
2203
2204 let buf = file_reader.read_global_buffer(1).await.unwrap();
2205 assert_eq!(buf, test_bytes);
2206 }
2207}