1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19 decoder::{
20 schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21 DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22 SchedulerDecoderConfig,
23 },
24 encoder::EncodedBatch,
25 version::LanceFileVersion,
26 EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36 Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41 scheduler::FileScheduler,
42 stream::{RecordBatchStream, RecordBatchStreamAdapter},
43 ReadBatchParams,
44};
45
46use crate::{
47 datatypes::{Fields, FieldsWithMeta},
48 format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49 v2::writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52use super::io::LanceEncodingsIo;
53
54pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
57
58#[derive(Debug, DeepSizeOf)]
63pub struct BufferDescriptor {
64 pub position: u64,
65 pub size: u64,
66}
67
68#[derive(Debug)]
70pub struct FileStatistics {
71 pub columns: Vec<ColumnStatistics>,
73}
74
75#[derive(Debug)]
77pub struct ColumnStatistics {
78 pub num_pages: usize,
80 pub size_bytes: u64,
84}
85
86#[derive(Debug)]
88pub struct CachedFileMetadata {
89 pub file_schema: Arc<Schema>,
91 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
93 pub column_infos: Vec<Arc<ColumnInfo>>,
94 pub num_rows: u64,
96 pub file_buffers: Vec<BufferDescriptor>,
97 pub num_data_bytes: u64,
99 pub num_column_metadata_bytes: u64,
102 pub num_global_buffer_bytes: u64,
104 pub num_footer_bytes: u64,
106 pub major_version: u16,
107 pub minor_version: u16,
108}
109
110impl DeepSizeOf for CachedFileMetadata {
111 fn deep_size_of_children(&self, context: &mut Context) -> usize {
113 self.file_schema.deep_size_of_children(context)
114 + self
115 .file_buffers
116 .iter()
117 .map(|file_buffer| file_buffer.deep_size_of_children(context))
118 .sum::<usize>()
119 }
120}
121
122impl CachedFileMetadata {
123 pub fn version(&self) -> LanceFileVersion {
124 match (self.major_version, self.minor_version) {
125 (0, 3) => LanceFileVersion::V2_0,
126 (2, 1) => LanceFileVersion::V2_1,
127 _ => panic!(
128 "Unsupported version: {}.{}",
129 self.major_version, self.minor_version
130 ),
131 }
132 }
133}
134
135#[derive(Debug, Clone)]
157pub struct ReaderProjection {
158 pub schema: Arc<Schema>,
161 pub column_indices: Vec<u32>,
201}
202
203impl ReaderProjection {
204 fn from_field_ids_helper<'a>(
205 file_version: LanceFileVersion,
206 fields: impl Iterator<Item = &'a Field>,
207 field_id_to_column_index: &BTreeMap<u32, u32>,
208 column_indices: &mut Vec<u32>,
209 ) -> Result<()> {
210 for field in fields {
211 let is_structural = file_version >= LanceFileVersion::V2_1;
212 if !is_structural || field.children.is_empty() {
215 if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
216 {
217 column_indices.push(column_idx);
218 }
219 }
220 Self::from_field_ids_helper(
221 file_version,
222 field.children.iter(),
223 field_id_to_column_index,
224 column_indices,
225 )?;
226 }
227 Ok(())
228 }
229
230 pub fn from_field_ids(
235 file_version: LanceFileVersion,
236 schema: &Schema,
237 field_id_to_column_index: &BTreeMap<u32, u32>,
238 ) -> Result<Self> {
239 let mut column_indices = Vec::new();
240 Self::from_field_ids_helper(
241 file_version,
242 schema.fields.iter(),
243 field_id_to_column_index,
244 &mut column_indices,
245 )?;
246 Ok(Self {
247 schema: Arc::new(schema.clone()),
248 column_indices,
249 })
250 }
251
252 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
260 let schema = Arc::new(schema.clone());
261 let is_structural = version >= LanceFileVersion::V2_1;
262 let mut column_indices = vec![];
263 let mut curr_column_idx = 0;
264 let mut packed_struct_fields_num = 0;
265 for field in schema.fields_pre_order() {
266 if packed_struct_fields_num > 0 {
267 packed_struct_fields_num -= 1;
268 continue;
269 }
270 if field.is_packed_struct() {
271 column_indices.push(curr_column_idx);
272 curr_column_idx += 1;
273 packed_struct_fields_num = field.children.len();
274 } else if field.children.is_empty() || !is_structural {
275 column_indices.push(curr_column_idx);
276 curr_column_idx += 1;
277 }
278 }
279 Self {
280 schema,
281 column_indices,
282 }
283 }
284
285 pub fn from_column_names(
292 file_version: LanceFileVersion,
293 schema: &Schema,
294 column_names: &[&str],
295 ) -> Result<Self> {
296 let field_id_to_column_index = schema
297 .fields_pre_order()
298 .filter(|field| file_version < LanceFileVersion::V2_1 || field.is_leaf())
301 .enumerate()
302 .map(|(idx, field)| (field.id as u32, idx as u32))
303 .collect::<BTreeMap<_, _>>();
304 let projected = schema.project(column_names)?;
305 let mut column_indices = Vec::new();
306 Self::from_field_ids_helper(
307 file_version,
308 projected.fields.iter(),
309 &field_id_to_column_index,
310 &mut column_indices,
311 )?;
312 Ok(Self {
313 schema: Arc::new(projected),
314 column_indices,
315 })
316 }
317}
318
319#[derive(Clone, Debug)]
321pub struct FileReaderOptions {
322 pub decoder_config: DecoderConfig,
323 pub read_chunk_size: u64,
327}
328
329impl Default for FileReaderOptions {
330 fn default() -> Self {
331 Self {
332 decoder_config: DecoderConfig::default(),
333 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
334 }
335 }
336}
337
338#[derive(Debug)]
339pub struct FileReader {
340 scheduler: Arc<dyn EncodingsIo>,
341 base_projection: ReaderProjection,
343 num_rows: u64,
344 metadata: Arc<CachedFileMetadata>,
345 decoder_plugins: Arc<DecoderPlugins>,
346 cache: Arc<LanceCache>,
347 options: FileReaderOptions,
348}
349#[derive(Debug)]
350struct Footer {
351 #[allow(dead_code)]
352 column_meta_start: u64,
353 #[allow(dead_code)]
356 column_meta_offsets_start: u64,
357 global_buff_offsets_start: u64,
358 num_global_buffers: u32,
359 num_columns: u32,
360 major_version: u16,
361 minor_version: u16,
362}
363
364const FOOTER_LEN: usize = 40;
365
366impl FileReader {
367 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
368 Self {
369 scheduler,
370 base_projection: self.base_projection.clone(),
371 cache: self.cache.clone(),
372 decoder_plugins: self.decoder_plugins.clone(),
373 metadata: self.metadata.clone(),
374 options: self.options.clone(),
375 num_rows: self.num_rows,
376 }
377 }
378
379 pub fn num_rows(&self) -> u64 {
380 self.num_rows
381 }
382
383 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
384 &self.metadata
385 }
386
387 pub fn file_statistics(&self) -> FileStatistics {
388 let column_metadatas = &self.metadata().column_metadatas;
389
390 let column_stats = column_metadatas
391 .iter()
392 .map(|col_metadata| {
393 let num_pages = col_metadata.pages.len();
394 let size_bytes = col_metadata
395 .pages
396 .iter()
397 .map(|page| page.buffer_sizes.iter().sum::<u64>())
398 .sum::<u64>();
399 ColumnStatistics {
400 num_pages,
401 size_bytes,
402 }
403 })
404 .collect();
405
406 FileStatistics {
407 columns: column_stats,
408 }
409 }
410
411 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
412 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
413 self.scheduler
414 .submit_single(
415 buffer_desc.position..buffer_desc.position + buffer_desc.size,
416 0,
417 )
418 .await
419 }
420
421 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
422 let file_size = scheduler.reader().size().await? as u64;
423 let begin = if file_size < scheduler.reader().block_size() as u64 {
424 0
425 } else {
426 file_size - scheduler.reader().block_size() as u64
427 };
428 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
429 Ok((tail_bytes, file_size))
430 }
431
432 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
435 let len = footer_bytes.len();
436 if len < FOOTER_LEN {
437 return Err(Error::io(
438 format!(
439 "does not have sufficient data, len: {}, bytes: {:?}",
440 len, footer_bytes
441 ),
442 location!(),
443 ));
444 }
445 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
446
447 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
448 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
449 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
450 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
451 let num_columns = cursor.read_u32::<LittleEndian>()?;
452 let major_version = cursor.read_u16::<LittleEndian>()?;
453 let minor_version = cursor.read_u16::<LittleEndian>()?;
454
455 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
456 return Err(Error::version_conflict(
457 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
458 major_version,
459 minor_version,
460 location!(),
461 ));
462 }
463
464 let magic_bytes = footer_bytes.slice(len - 4..);
465 if magic_bytes.as_ref() != MAGIC {
466 return Err(Error::io(
467 format!(
468 "file does not appear to be a Lance file (invalid magic: {:?})",
469 MAGIC
470 ),
471 location!(),
472 ));
473 }
474 Ok(Footer {
475 column_meta_start,
476 column_meta_offsets_start,
477 global_buff_offsets_start,
478 num_global_buffers,
479 num_columns,
480 major_version,
481 minor_version,
482 })
483 }
484
485 fn read_all_column_metadata(
487 column_metadata_bytes: Bytes,
488 footer: &Footer,
489 ) -> Result<Vec<pbfile::ColumnMetadata>> {
490 let column_metadata_start = footer.column_meta_start;
491 let cmo_table_size = 16 * footer.num_columns as usize;
493 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
494
495 (0..footer.num_columns)
496 .map(|col_idx| {
497 let offset = (col_idx * 16) as usize;
498 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
499 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
500 let normalized_position = (position - column_metadata_start) as usize;
501 let normalized_end = normalized_position + (length as usize);
502 Ok(pbfile::ColumnMetadata::decode(
503 &column_metadata_bytes[normalized_position..normalized_end],
504 )?)
505 })
506 .collect::<Result<Vec<_>>>()
507 }
508
509 async fn optimistic_tail_read(
510 data: &Bytes,
511 start_pos: u64,
512 scheduler: &FileScheduler,
513 file_len: u64,
514 ) -> Result<Bytes> {
515 let num_bytes_needed = (file_len - start_pos) as usize;
516 if data.len() >= num_bytes_needed {
517 Ok(data.slice((data.len() - num_bytes_needed)..))
518 } else {
519 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
520 let start = file_len - num_bytes_needed as u64;
521 let missing_bytes = scheduler
522 .submit_single(start..start + num_bytes_missing, 0)
523 .await?;
524 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
525 combined.extend(missing_bytes);
526 combined.extend(data);
527 Ok(combined.freeze())
528 }
529 }
530
531 fn do_decode_gbo_table(
532 gbo_bytes: &Bytes,
533 footer: &Footer,
534 version: LanceFileVersion,
535 ) -> Result<Vec<BufferDescriptor>> {
536 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
537
538 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
539 for _ in 0..footer.num_global_buffers {
540 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
541 assert!(
542 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
543 );
544 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
545 global_buffers.push(BufferDescriptor {
546 position: buf_pos,
547 size: buf_size,
548 });
549 }
550
551 Ok(global_buffers)
552 }
553
554 async fn decode_gbo_table(
555 tail_bytes: &Bytes,
556 file_len: u64,
557 scheduler: &FileScheduler,
558 footer: &Footer,
559 version: LanceFileVersion,
560 ) -> Result<Vec<BufferDescriptor>> {
561 let gbo_bytes = Self::optimistic_tail_read(
564 tail_bytes,
565 footer.global_buff_offsets_start,
566 scheduler,
567 file_len,
568 )
569 .await?;
570 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
571 }
572
573 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
574 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
575 let pb_schema = file_descriptor.schema.unwrap();
576 let num_rows = file_descriptor.length;
577 let fields_with_meta = FieldsWithMeta {
578 fields: Fields(pb_schema.fields),
579 metadata: pb_schema.metadata,
580 };
581 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
582 Ok((num_rows, schema))
583 }
584
585 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
599 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
601 let footer = Self::decode_footer(&tail_bytes)?;
602
603 let file_version = LanceFileVersion::try_from_major_minor(
604 footer.major_version as u32,
605 footer.minor_version as u32,
606 )?;
607
608 let gbo_table =
609 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
610 if gbo_table.is_empty() {
611 return Err(Error::Internal {
612 message: "File did not contain any global buffers, schema expected".to_string(),
613 location: location!(),
614 });
615 }
616 let schema_start = gbo_table[0].position;
617 let schema_size = gbo_table[0].size;
618
619 let num_footer_bytes = file_len - schema_start;
620
621 let all_metadata_bytes =
624 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
625
626 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
627 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
628
629 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
632 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
633 let column_metadata_bytes =
634 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
635 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
636
637 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
638 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
639 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
640
641 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
642
643 Ok(CachedFileMetadata {
644 file_schema: Arc::new(schema),
645 column_metadatas,
646 column_infos,
647 num_rows,
648 num_data_bytes,
649 num_column_metadata_bytes,
650 num_global_buffer_bytes,
651 num_footer_bytes,
652 file_buffers: gbo_table,
653 major_version: footer.major_version,
654 minor_version: footer.minor_version,
655 })
656 }
657
658 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
659 match &encoding.location {
660 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
661 Some(pbfile::encoding::Location::Direct(encoding)) => {
662 let encoding_buf = Bytes::from(encoding.encoding.clone());
663 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
664 encoding_any.to_msg::<M>().unwrap()
665 }
666 Some(pbfile::encoding::Location::None(_)) => panic!(),
667 None => panic!(),
668 }
669 }
670
671 fn meta_to_col_infos(
672 column_metadatas: &[pbfile::ColumnMetadata],
673 file_version: LanceFileVersion,
674 ) -> Vec<Arc<ColumnInfo>> {
675 column_metadatas
676 .iter()
677 .enumerate()
678 .map(|(col_idx, col_meta)| {
679 let page_infos = col_meta
680 .pages
681 .iter()
682 .map(|page| {
683 let num_rows = page.length;
684 let encoding = match file_version {
685 LanceFileVersion::V2_0 => {
686 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
687 page.encoding.as_ref().unwrap(),
688 ))
689 }
690 _ => PageEncoding::Structural(Self::fetch_encoding::<
691 pbenc21::PageLayout,
692 >(
693 page.encoding.as_ref().unwrap()
694 )),
695 };
696 let buffer_offsets_and_sizes = Arc::from(
697 page.buffer_offsets
698 .iter()
699 .zip(page.buffer_sizes.iter())
700 .map(|(offset, size)| {
701 assert!(
703 file_version < LanceFileVersion::V2_1
704 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
705 );
706 (*offset, *size)
707 })
708 .collect::<Vec<_>>(),
709 );
710 PageInfo {
711 buffer_offsets_and_sizes,
712 encoding,
713 num_rows,
714 priority: page.priority,
715 }
716 })
717 .collect::<Vec<_>>();
718 let buffer_offsets_and_sizes = Arc::from(
719 col_meta
720 .buffer_offsets
721 .iter()
722 .zip(col_meta.buffer_sizes.iter())
723 .map(|(offset, size)| (*offset, *size))
724 .collect::<Vec<_>>(),
725 );
726 Arc::new(ColumnInfo {
727 index: col_idx as u32,
728 page_infos: Arc::from(page_infos),
729 buffer_offsets_and_sizes,
730 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
731 })
732 })
733 .collect::<Vec<_>>()
734 }
735
736 fn validate_projection(
737 projection: &ReaderProjection,
738 metadata: &CachedFileMetadata,
739 ) -> Result<()> {
740 if projection.schema.fields.is_empty() {
741 return Err(Error::invalid_input(
742 "Attempt to read zero columns from the file, at least one column must be specified"
743 .to_string(),
744 location!(),
745 ));
746 }
747 let mut column_indices_seen = BTreeSet::new();
748 for column_index in &projection.column_indices {
749 if !column_indices_seen.insert(*column_index) {
750 return Err(Error::invalid_input(
751 format!(
752 "The projection specified the column index {} more than once",
753 column_index
754 ),
755 location!(),
756 ));
757 }
758 if *column_index >= metadata.column_infos.len() as u32 {
759 return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
760 }
761 }
762 Ok(())
763 }
764
765 pub async fn try_open(
772 scheduler: FileScheduler,
773 base_projection: Option<ReaderProjection>,
774 decoder_plugins: Arc<DecoderPlugins>,
775 cache: &LanceCache,
776 options: FileReaderOptions,
777 ) -> Result<Self> {
778 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
779 let path = scheduler.reader().path().clone();
780
781 let encodings_io =
783 LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
784
785 Self::try_open_with_file_metadata(
786 Arc::new(encodings_io),
787 path,
788 base_projection,
789 decoder_plugins,
790 file_metadata,
791 cache,
792 options,
793 )
794 .await
795 }
796
797 pub async fn try_open_with_file_metadata(
803 scheduler: Arc<dyn EncodingsIo>,
804 path: Path,
805 base_projection: Option<ReaderProjection>,
806 decoder_plugins: Arc<DecoderPlugins>,
807 file_metadata: Arc<CachedFileMetadata>,
808 cache: &LanceCache,
809 options: FileReaderOptions,
810 ) -> Result<Self> {
811 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
812
813 if let Some(base_projection) = base_projection.as_ref() {
814 Self::validate_projection(base_projection, &file_metadata)?;
815 }
816 let num_rows = file_metadata.num_rows;
817 Ok(Self {
818 scheduler,
819 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
820 file_metadata.file_schema.as_ref(),
821 file_metadata.version(),
822 )),
823 num_rows,
824 metadata: file_metadata,
825 decoder_plugins,
826 cache,
827 options,
828 })
829 }
830
831 fn collect_columns_from_projection(
845 &self,
846 _projection: &ReaderProjection,
847 ) -> Result<Vec<Arc<ColumnInfo>>> {
848 Ok(self.metadata.column_infos.to_vec())
849 }
850
851 #[allow(clippy::too_many_arguments)]
852 fn do_read_range(
853 column_infos: Vec<Arc<ColumnInfo>>,
854 io: Arc<dyn EncodingsIo>,
855 cache: Arc<LanceCache>,
856 num_rows: u64,
857 decoder_plugins: Arc<DecoderPlugins>,
858 range: Range<u64>,
859 batch_size: u32,
860 projection: ReaderProjection,
861 filter: FilterExpression,
862 decoder_config: DecoderConfig,
863 ) -> Result<BoxStream<'static, ReadBatchTask>> {
864 debug!(
865 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
866 range,
867 batch_size,
868 num_rows,
869 column_infos.len(),
870 projection.schema.fields.len(),
871 );
872
873 let config = SchedulerDecoderConfig {
874 batch_size,
875 cache,
876 decoder_plugins,
877 io,
878 decoder_config,
879 };
880
881 let requested_rows = RequestedRows::Ranges(vec![range]);
882
883 Ok(schedule_and_decode(
884 column_infos,
885 requested_rows,
886 filter,
887 projection.column_indices,
888 projection.schema,
889 config,
890 ))
891 }
892
893 fn read_range(
894 &self,
895 range: Range<u64>,
896 batch_size: u32,
897 projection: ReaderProjection,
898 filter: FilterExpression,
899 ) -> Result<BoxStream<'static, ReadBatchTask>> {
900 Self::do_read_range(
902 self.collect_columns_from_projection(&projection)?,
903 self.scheduler.clone(),
904 self.cache.clone(),
905 self.num_rows,
906 self.decoder_plugins.clone(),
907 range,
908 batch_size,
909 projection,
910 filter,
911 self.options.decoder_config.clone(),
912 )
913 }
914
915 #[allow(clippy::too_many_arguments)]
916 fn do_take_rows(
917 column_infos: Vec<Arc<ColumnInfo>>,
918 io: Arc<dyn EncodingsIo>,
919 cache: Arc<LanceCache>,
920 decoder_plugins: Arc<DecoderPlugins>,
921 indices: Vec<u64>,
922 batch_size: u32,
923 projection: ReaderProjection,
924 filter: FilterExpression,
925 decoder_config: DecoderConfig,
926 ) -> Result<BoxStream<'static, ReadBatchTask>> {
927 debug!(
928 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
929 indices.len(),
930 indices[0],
931 indices[indices.len() - 1],
932 batch_size,
933 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
934 );
935
936 let config = SchedulerDecoderConfig {
937 batch_size,
938 cache,
939 decoder_plugins,
940 io,
941 decoder_config,
942 };
943
944 let requested_rows = RequestedRows::Indices(indices);
945
946 Ok(schedule_and_decode(
947 column_infos,
948 requested_rows,
949 filter,
950 projection.column_indices,
951 projection.schema,
952 config,
953 ))
954 }
955
956 fn take_rows(
957 &self,
958 indices: Vec<u64>,
959 batch_size: u32,
960 projection: ReaderProjection,
961 ) -> Result<BoxStream<'static, ReadBatchTask>> {
962 Self::do_take_rows(
964 self.collect_columns_from_projection(&projection)?,
965 self.scheduler.clone(),
966 self.cache.clone(),
967 self.decoder_plugins.clone(),
968 indices,
969 batch_size,
970 projection,
971 FilterExpression::no_filter(),
972 self.options.decoder_config.clone(),
973 )
974 }
975
976 #[allow(clippy::too_many_arguments)]
977 fn do_read_ranges(
978 column_infos: Vec<Arc<ColumnInfo>>,
979 io: Arc<dyn EncodingsIo>,
980 cache: Arc<LanceCache>,
981 decoder_plugins: Arc<DecoderPlugins>,
982 ranges: Vec<Range<u64>>,
983 batch_size: u32,
984 projection: ReaderProjection,
985 filter: FilterExpression,
986 decoder_config: DecoderConfig,
987 ) -> Result<BoxStream<'static, ReadBatchTask>> {
988 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
989 debug!(
990 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
991 ranges.len(),
992 num_rows,
993 ranges[0].start,
994 ranges[ranges.len() - 1].end,
995 batch_size,
996 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
997 );
998
999 let config = SchedulerDecoderConfig {
1000 batch_size,
1001 cache,
1002 decoder_plugins,
1003 io,
1004 decoder_config,
1005 };
1006
1007 let requested_rows = RequestedRows::Ranges(ranges);
1008
1009 Ok(schedule_and_decode(
1010 column_infos,
1011 requested_rows,
1012 filter,
1013 projection.column_indices,
1014 projection.schema,
1015 config,
1016 ))
1017 }
1018
1019 fn read_ranges(
1020 &self,
1021 ranges: Vec<Range<u64>>,
1022 batch_size: u32,
1023 projection: ReaderProjection,
1024 filter: FilterExpression,
1025 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1026 Self::do_read_ranges(
1027 self.collect_columns_from_projection(&projection)?,
1028 self.scheduler.clone(),
1029 self.cache.clone(),
1030 self.decoder_plugins.clone(),
1031 ranges,
1032 batch_size,
1033 projection,
1034 filter,
1035 self.options.decoder_config.clone(),
1036 )
1037 }
1038
1039 pub fn read_tasks(
1050 &self,
1051 params: ReadBatchParams,
1052 batch_size: u32,
1053 projection: Option<ReaderProjection>,
1054 filter: FilterExpression,
1055 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1056 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1057 Self::validate_projection(&projection, &self.metadata)?;
1058 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1059 if bound > self.num_rows || bound == self.num_rows && inclusive {
1060 Err(Error::invalid_input(
1061 format!(
1062 "cannot read {:?} from file with {} rows",
1063 params, self.num_rows
1064 ),
1065 location!(),
1066 ))
1067 } else {
1068 Ok(())
1069 }
1070 };
1071 match ¶ms {
1072 ReadBatchParams::Indices(indices) => {
1073 for idx in indices {
1074 match idx {
1075 None => {
1076 return Err(Error::invalid_input(
1077 "Null value in indices array",
1078 location!(),
1079 ));
1080 }
1081 Some(idx) => {
1082 verify_bound(¶ms, idx as u64, true)?;
1083 }
1084 }
1085 }
1086 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1087 self.take_rows(indices, batch_size, projection)
1088 }
1089 ReadBatchParams::Range(range) => {
1090 verify_bound(¶ms, range.end as u64, false)?;
1091 self.read_range(
1092 range.start as u64..range.end as u64,
1093 batch_size,
1094 projection,
1095 filter,
1096 )
1097 }
1098 ReadBatchParams::Ranges(ranges) => {
1099 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1100 for range in ranges.as_ref() {
1101 verify_bound(¶ms, range.end, false)?;
1102 ranges_u64.push(range.start..range.end);
1103 }
1104 self.read_ranges(ranges_u64, batch_size, projection, filter)
1105 }
1106 ReadBatchParams::RangeFrom(range) => {
1107 verify_bound(¶ms, range.start as u64, true)?;
1108 self.read_range(
1109 range.start as u64..self.num_rows,
1110 batch_size,
1111 projection,
1112 filter,
1113 )
1114 }
1115 ReadBatchParams::RangeTo(range) => {
1116 verify_bound(¶ms, range.end as u64, false)?;
1117 self.read_range(0..range.end as u64, batch_size, projection, filter)
1118 }
1119 ReadBatchParams::RangeFull => {
1120 self.read_range(0..self.num_rows, batch_size, projection, filter)
1121 }
1122 }
1123 }
1124
1125 pub fn read_stream_projected(
1147 &self,
1148 params: ReadBatchParams,
1149 batch_size: u32,
1150 batch_readahead: u32,
1151 projection: ReaderProjection,
1152 filter: FilterExpression,
1153 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1154 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1155 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1156 let batch_stream = tasks_stream
1157 .map(|task| task.task)
1158 .buffered(batch_readahead as usize)
1159 .boxed();
1160 Ok(Box::pin(RecordBatchStreamAdapter::new(
1161 arrow_schema,
1162 batch_stream,
1163 )))
1164 }
1165
1166 fn take_rows_blocking(
1167 &self,
1168 indices: Vec<u64>,
1169 batch_size: u32,
1170 projection: ReaderProjection,
1171 filter: FilterExpression,
1172 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1173 let column_infos = self.collect_columns_from_projection(&projection)?;
1174 debug!(
1175 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1176 indices.len(),
1177 indices[0],
1178 indices[indices.len() - 1],
1179 batch_size,
1180 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1181 );
1182
1183 let config = SchedulerDecoderConfig {
1184 batch_size,
1185 cache: self.cache.clone(),
1186 decoder_plugins: self.decoder_plugins.clone(),
1187 io: self.scheduler.clone(),
1188 decoder_config: self.options.decoder_config.clone(),
1189 };
1190
1191 let requested_rows = RequestedRows::Indices(indices);
1192
1193 schedule_and_decode_blocking(
1194 column_infos,
1195 requested_rows,
1196 filter,
1197 projection.column_indices,
1198 projection.schema,
1199 config,
1200 )
1201 }
1202
1203 fn read_ranges_blocking(
1204 &self,
1205 ranges: Vec<Range<u64>>,
1206 batch_size: u32,
1207 projection: ReaderProjection,
1208 filter: FilterExpression,
1209 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1210 let column_infos = self.collect_columns_from_projection(&projection)?;
1211 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1212 debug!(
1213 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1214 ranges.len(),
1215 num_rows,
1216 ranges[0].start,
1217 ranges[ranges.len() - 1].end,
1218 batch_size,
1219 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1220 );
1221
1222 let config = SchedulerDecoderConfig {
1223 batch_size,
1224 cache: self.cache.clone(),
1225 decoder_plugins: self.decoder_plugins.clone(),
1226 io: self.scheduler.clone(),
1227 decoder_config: self.options.decoder_config.clone(),
1228 };
1229
1230 let requested_rows = RequestedRows::Ranges(ranges);
1231
1232 schedule_and_decode_blocking(
1233 column_infos,
1234 requested_rows,
1235 filter,
1236 projection.column_indices,
1237 projection.schema,
1238 config,
1239 )
1240 }
1241
1242 fn read_range_blocking(
1243 &self,
1244 range: Range<u64>,
1245 batch_size: u32,
1246 projection: ReaderProjection,
1247 filter: FilterExpression,
1248 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1249 let column_infos = self.collect_columns_from_projection(&projection)?;
1250 let num_rows = self.num_rows;
1251
1252 debug!(
1253 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1254 range,
1255 batch_size,
1256 num_rows,
1257 column_infos.len(),
1258 projection.schema.fields.len(),
1259 );
1260
1261 let config = SchedulerDecoderConfig {
1262 batch_size,
1263 cache: self.cache.clone(),
1264 decoder_plugins: self.decoder_plugins.clone(),
1265 io: self.scheduler.clone(),
1266 decoder_config: self.options.decoder_config.clone(),
1267 };
1268
1269 let requested_rows = RequestedRows::Ranges(vec![range]);
1270
1271 schedule_and_decode_blocking(
1272 column_infos,
1273 requested_rows,
1274 filter,
1275 projection.column_indices,
1276 projection.schema,
1277 config,
1278 )
1279 }
1280
1281 pub fn read_stream_projected_blocking(
1293 &self,
1294 params: ReadBatchParams,
1295 batch_size: u32,
1296 projection: Option<ReaderProjection>,
1297 filter: FilterExpression,
1298 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1299 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1300 Self::validate_projection(&projection, &self.metadata)?;
1301 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1302 if bound > self.num_rows || bound == self.num_rows && inclusive {
1303 Err(Error::invalid_input(
1304 format!(
1305 "cannot read {:?} from file with {} rows",
1306 params, self.num_rows
1307 ),
1308 location!(),
1309 ))
1310 } else {
1311 Ok(())
1312 }
1313 };
1314 match ¶ms {
1315 ReadBatchParams::Indices(indices) => {
1316 for idx in indices {
1317 match idx {
1318 None => {
1319 return Err(Error::invalid_input(
1320 "Null value in indices array",
1321 location!(),
1322 ));
1323 }
1324 Some(idx) => {
1325 verify_bound(¶ms, idx as u64, true)?;
1326 }
1327 }
1328 }
1329 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1330 self.take_rows_blocking(indices, batch_size, projection, filter)
1331 }
1332 ReadBatchParams::Range(range) => {
1333 verify_bound(¶ms, range.end as u64, false)?;
1334 self.read_range_blocking(
1335 range.start as u64..range.end as u64,
1336 batch_size,
1337 projection,
1338 filter,
1339 )
1340 }
1341 ReadBatchParams::Ranges(ranges) => {
1342 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1343 for range in ranges.as_ref() {
1344 verify_bound(¶ms, range.end, false)?;
1345 ranges_u64.push(range.start..range.end);
1346 }
1347 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1348 }
1349 ReadBatchParams::RangeFrom(range) => {
1350 verify_bound(¶ms, range.start as u64, true)?;
1351 self.read_range_blocking(
1352 range.start as u64..self.num_rows,
1353 batch_size,
1354 projection,
1355 filter,
1356 )
1357 }
1358 ReadBatchParams::RangeTo(range) => {
1359 verify_bound(¶ms, range.end as u64, false)?;
1360 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1361 }
1362 ReadBatchParams::RangeFull => {
1363 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1364 }
1365 }
1366 }
1367
1368 pub fn read_stream(
1374 &self,
1375 params: ReadBatchParams,
1376 batch_size: u32,
1377 batch_readahead: u32,
1378 filter: FilterExpression,
1379 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1380 self.read_stream_projected(
1381 params,
1382 batch_size,
1383 batch_readahead,
1384 self.base_projection.clone(),
1385 filter,
1386 )
1387 }
1388
1389 pub fn schema(&self) -> &Arc<Schema> {
1390 &self.metadata.file_schema
1391 }
1392}
1393
1394pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1396 if let Some(encoding) = &page.encoding {
1397 if let Some(style) = &encoding.location {
1398 match style {
1399 pbfile::encoding::Location::Indirect(indirect) => {
1400 format!(
1401 "IndirectEncoding(pos={},size={})",
1402 indirect.buffer_location, indirect.buffer_length
1403 )
1404 }
1405 pbfile::encoding::Location::Direct(direct) => {
1406 let encoding_any =
1407 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1408 .expect("failed to deserialize encoding as protobuf");
1409 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1410 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1411 match encoding {
1412 Ok(encoding) => {
1413 format!("{:#?}", encoding)
1414 }
1415 Err(err) => {
1416 format!("Unsupported(decode_err={})", err)
1417 }
1418 }
1419 } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1420 let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1421 match encoding {
1422 Ok(encoding) => {
1423 format!("{:#?}", encoding)
1424 }
1425 Err(err) => {
1426 format!("Unsupported(decode_err={})", err)
1427 }
1428 }
1429 } else {
1430 format!("Unrecognized(type_url={})", encoding_any.type_url)
1431 }
1432 }
1433 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1434 }
1435 } else {
1436 "MISSING STYLE".to_string()
1437 }
1438 } else {
1439 "MISSING".to_string()
1440 }
1441}
1442
1443pub trait EncodedBatchReaderExt {
1444 fn try_from_mini_lance(
1445 bytes: Bytes,
1446 schema: &Schema,
1447 version: LanceFileVersion,
1448 ) -> Result<Self>
1449 where
1450 Self: Sized;
1451 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1452 where
1453 Self: Sized;
1454}
1455
1456impl EncodedBatchReaderExt for EncodedBatch {
1457 fn try_from_mini_lance(
1458 bytes: Bytes,
1459 schema: &Schema,
1460 file_version: LanceFileVersion,
1461 ) -> Result<Self>
1462 where
1463 Self: Sized,
1464 {
1465 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1466 let footer = FileReader::decode_footer(&bytes)?;
1467
1468 let column_metadata_start = footer.column_meta_start as usize;
1471 let column_metadata_end = footer.global_buff_offsets_start as usize;
1472 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1473 let column_metadatas =
1474 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1475
1476 let file_version = LanceFileVersion::try_from_major_minor(
1477 footer.major_version as u32,
1478 footer.minor_version as u32,
1479 )?;
1480
1481 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1482
1483 Ok(Self {
1484 data: bytes,
1485 num_rows: page_table
1486 .first()
1487 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1488 .unwrap_or(0),
1489 page_table,
1490 top_level_columns: projection.column_indices,
1491 schema: Arc::new(schema.clone()),
1492 })
1493 }
1494
1495 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1496 where
1497 Self: Sized,
1498 {
1499 let footer = FileReader::decode_footer(&bytes)?;
1500 let file_version = LanceFileVersion::try_from_major_minor(
1501 footer.major_version as u32,
1502 footer.minor_version as u32,
1503 )?;
1504
1505 let gbo_table = FileReader::do_decode_gbo_table(
1506 &bytes.slice(footer.global_buff_offsets_start as usize..),
1507 &footer,
1508 file_version,
1509 )?;
1510 if gbo_table.is_empty() {
1511 return Err(Error::Internal {
1512 message: "File did not contain any global buffers, schema expected".to_string(),
1513 location: location!(),
1514 });
1515 }
1516 let schema_start = gbo_table[0].position as usize;
1517 let schema_size = gbo_table[0].size as usize;
1518
1519 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1520 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1521 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1522
1523 let column_metadata_start = footer.column_meta_start as usize;
1526 let column_metadata_end = footer.global_buff_offsets_start as usize;
1527 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1528 let column_metadatas =
1529 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1530
1531 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1532
1533 Ok(Self {
1534 data: bytes,
1535 num_rows: page_table
1536 .first()
1537 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1538 .unwrap_or(0),
1539 page_table,
1540 top_level_columns: projection.column_indices,
1541 schema: Arc::new(schema),
1542 })
1543 }
1544}
1545
1546#[cfg(test)]
1547pub mod tests {
1548 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1549
1550 use arrow_array::{
1551 types::{Float64Type, Int32Type},
1552 RecordBatch, UInt32Array,
1553 };
1554 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1555 use bytes::Bytes;
1556 use futures::{prelude::stream::TryStreamExt, StreamExt};
1557 use lance_arrow::RecordBatchExt;
1558 use lance_core::{datatypes::Schema, ArrowResult};
1559 use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1560 use lance_encoding::{
1561 decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1562 encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1563 version::LanceFileVersion,
1564 };
1565 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1566 use log::debug;
1567 use rstest::rstest;
1568 use tokio::sync::mpsc;
1569
1570 use crate::v2::{
1571 reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1572 testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1573 writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1574 };
1575 use lance_encoding::decoder::DecoderConfig;
1576
1577 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1578 let location_type = DataType::Struct(Fields::from(vec![
1579 Field::new("x", DataType::Float64, true),
1580 Field::new("y", DataType::Float64, true),
1581 ]));
1582 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1583
1584 let mut reader = gen_batch()
1585 .col("score", array::rand::<Float64Type>())
1586 .col("location", array::rand_type(&location_type))
1587 .col("categories", array::rand_type(&categories_type))
1588 .col("binary", array::rand_type(&DataType::Binary));
1589 if version <= LanceFileVersion::V2_0 {
1590 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1591 }
1592 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1593
1594 write_lance_file(
1595 reader,
1596 fs,
1597 FileWriterOptions {
1598 format_version: Some(version),
1599 ..Default::default()
1600 },
1601 )
1602 .await
1603 }
1604
1605 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1606
1607 async fn verify_expected(
1608 expected: &[RecordBatch],
1609 mut actual: Pin<Box<dyn RecordBatchStream>>,
1610 read_size: u32,
1611 transform: Option<Transformer>,
1612 ) {
1613 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1614 let mut expected_iter = expected.iter().map(|batch| {
1615 if let Some(transform) = &transform {
1616 transform(batch)
1617 } else {
1618 batch.clone()
1619 }
1620 });
1621 let mut next_expected = expected_iter.next().unwrap().clone();
1622 while let Some(actual) = actual.next().await {
1623 let mut actual = actual.unwrap();
1624 let mut rows_to_verify = actual.num_rows() as u32;
1625 let expected_length = remaining.min(read_size);
1626 assert_eq!(expected_length, rows_to_verify);
1627
1628 while rows_to_verify > 0 {
1629 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1630 assert_eq!(
1631 next_expected.slice(0, next_slice_len as usize),
1632 actual.slice(0, next_slice_len as usize)
1633 );
1634 remaining -= next_slice_len;
1635 rows_to_verify -= next_slice_len;
1636 if remaining > 0 {
1637 if next_slice_len == next_expected.num_rows() as u32 {
1638 next_expected = expected_iter.next().unwrap().clone();
1639 } else {
1640 next_expected = next_expected.slice(
1641 next_slice_len as usize,
1642 next_expected.num_rows() - next_slice_len as usize,
1643 );
1644 }
1645 }
1646 if rows_to_verify > 0 {
1647 actual = actual.slice(
1648 next_slice_len as usize,
1649 actual.num_rows() - next_slice_len as usize,
1650 );
1651 }
1652 }
1653 }
1654 assert_eq!(remaining, 0);
1655 }
1656
1657 #[tokio::test]
1658 async fn test_round_trip() {
1659 let fs = FsFixture::default();
1660
1661 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1662
1663 for read_size in [32, 1024, 1024 * 1024] {
1664 let file_scheduler = fs
1665 .scheduler
1666 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1667 .await
1668 .unwrap();
1669 let file_reader = FileReader::try_open(
1670 file_scheduler,
1671 None,
1672 Arc::<DecoderPlugins>::default(),
1673 &test_cache(),
1674 FileReaderOptions::default(),
1675 )
1676 .await
1677 .unwrap();
1678
1679 let schema = file_reader.schema();
1680 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1681
1682 let batch_stream = file_reader
1683 .read_stream(
1684 lance_io::ReadBatchParams::RangeFull,
1685 read_size,
1686 16,
1687 FilterExpression::no_filter(),
1688 )
1689 .unwrap();
1690
1691 verify_expected(&data, batch_stream, read_size, None).await;
1692 }
1693 }
1694
1695 #[rstest]
1696 #[test_log::test(tokio::test)]
1697 async fn test_encoded_batch_round_trip(
1698 #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1700 ) {
1701 let data = gen_batch()
1702 .col("x", array::rand::<Int32Type>())
1703 .col("y", array::rand_utf8(ByteCount::from(16), false))
1704 .into_batch_rows(RowCount::from(10000))
1705 .unwrap();
1706
1707 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1708
1709 let encoding_options = EncodingOptions {
1710 cache_bytes_per_column: 4096,
1711 max_page_bytes: 32 * 1024 * 1024,
1712 keep_original_array: true,
1713 buffer_alignment: 64,
1714 };
1715
1716 let encoding_strategy = default_encoding_strategy(version);
1717
1718 let encoded_batch = encode_batch(
1719 &data,
1720 lance_schema.clone(),
1721 encoding_strategy.as_ref(),
1722 &encoding_options,
1723 )
1724 .await
1725 .unwrap();
1726
1727 let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1729
1730 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1731
1732 let decoded = decode_batch(
1733 &decoded_batch,
1734 &FilterExpression::no_filter(),
1735 Arc::<DecoderPlugins>::default(),
1736 false,
1737 LanceFileVersion::default(),
1738 None,
1739 )
1740 .await
1741 .unwrap();
1742
1743 assert_eq!(data, decoded);
1744
1745 let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1747 let decoded_batch =
1748 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1749 .unwrap();
1750 let decoded = decode_batch(
1751 &decoded_batch,
1752 &FilterExpression::no_filter(),
1753 Arc::<DecoderPlugins>::default(),
1754 false,
1755 LanceFileVersion::default(),
1756 None,
1757 )
1758 .await
1759 .unwrap();
1760
1761 assert_eq!(data, decoded);
1762 }
1763
1764 #[rstest]
1765 #[test_log::test(tokio::test)]
1766 async fn test_projection(
1767 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1768 ) {
1769 let fs = FsFixture::default();
1770
1771 let written_file = create_some_file(&fs, version).await;
1772 let file_scheduler = fs
1773 .scheduler
1774 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1775 .await
1776 .unwrap();
1777
1778 let field_id_mapping = written_file
1779 .field_id_mapping
1780 .iter()
1781 .copied()
1782 .collect::<BTreeMap<_, _>>();
1783
1784 let empty_projection = ReaderProjection {
1785 column_indices: Vec::default(),
1786 schema: Arc::new(Schema::default()),
1787 };
1788
1789 for columns in [
1790 vec!["score"],
1791 vec!["location"],
1792 vec!["categories"],
1793 vec!["score.x"],
1794 vec!["score", "categories"],
1795 vec!["score", "location"],
1796 vec!["location", "categories"],
1797 vec!["score.y", "location", "categories"],
1798 ] {
1799 debug!("Testing round trip with projection {:?}", columns);
1800 for use_field_ids in [true, false] {
1801 let file_reader = FileReader::try_open(
1803 file_scheduler.clone(),
1804 None,
1805 Arc::<DecoderPlugins>::default(),
1806 &test_cache(),
1807 FileReaderOptions::default(),
1808 )
1809 .await
1810 .unwrap();
1811
1812 let projected_schema = written_file.schema.project(&columns).unwrap();
1813 let projection = if use_field_ids {
1814 ReaderProjection::from_field_ids(
1815 file_reader.metadata.version(),
1816 &projected_schema,
1817 &field_id_mapping,
1818 )
1819 .unwrap()
1820 } else {
1821 ReaderProjection::from_column_names(
1822 file_reader.metadata.version(),
1823 &written_file.schema,
1824 &columns,
1825 )
1826 .unwrap()
1827 };
1828
1829 let batch_stream = file_reader
1830 .read_stream_projected(
1831 lance_io::ReadBatchParams::RangeFull,
1832 1024,
1833 16,
1834 projection.clone(),
1835 FilterExpression::no_filter(),
1836 )
1837 .unwrap();
1838
1839 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1840 verify_expected(
1841 &written_file.data,
1842 batch_stream,
1843 1024,
1844 Some(Box::new(move |batch: &RecordBatch| {
1845 batch.project_by_schema(&projection_arrow).unwrap()
1846 })),
1847 )
1848 .await;
1849
1850 let file_reader = FileReader::try_open(
1852 file_scheduler.clone(),
1853 Some(projection.clone()),
1854 Arc::<DecoderPlugins>::default(),
1855 &test_cache(),
1856 FileReaderOptions::default(),
1857 )
1858 .await
1859 .unwrap();
1860
1861 let batch_stream = file_reader
1862 .read_stream(
1863 lance_io::ReadBatchParams::RangeFull,
1864 1024,
1865 16,
1866 FilterExpression::no_filter(),
1867 )
1868 .unwrap();
1869
1870 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1871 verify_expected(
1872 &written_file.data,
1873 batch_stream,
1874 1024,
1875 Some(Box::new(move |batch: &RecordBatch| {
1876 batch.project_by_schema(&projection_arrow).unwrap()
1877 })),
1878 )
1879 .await;
1880
1881 assert!(file_reader
1882 .read_stream_projected(
1883 lance_io::ReadBatchParams::RangeFull,
1884 1024,
1885 16,
1886 empty_projection.clone(),
1887 FilterExpression::no_filter(),
1888 )
1889 .is_err());
1890 }
1891 }
1892
1893 assert!(FileReader::try_open(
1894 file_scheduler.clone(),
1895 Some(empty_projection),
1896 Arc::<DecoderPlugins>::default(),
1897 &test_cache(),
1898 FileReaderOptions::default(),
1899 )
1900 .await
1901 .is_err());
1902
1903 let arrow_schema = ArrowSchema::new(vec![
1904 Field::new("x", DataType::Int32, true),
1905 Field::new("y", DataType::Int32, true),
1906 ]);
1907 let schema = Schema::try_from(&arrow_schema).unwrap();
1908
1909 let projection_with_dupes = ReaderProjection {
1910 column_indices: vec![0, 0],
1911 schema: Arc::new(schema),
1912 };
1913
1914 assert!(FileReader::try_open(
1915 file_scheduler.clone(),
1916 Some(projection_with_dupes),
1917 Arc::<DecoderPlugins>::default(),
1918 &test_cache(),
1919 FileReaderOptions::default(),
1920 )
1921 .await
1922 .is_err());
1923 }
1924
1925 #[test_log::test(tokio::test)]
1926 async fn test_compressing_buffer() {
1927 let fs = FsFixture::default();
1928
1929 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1930 let file_scheduler = fs
1931 .scheduler
1932 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1933 .await
1934 .unwrap();
1935
1936 let file_reader = FileReader::try_open(
1938 file_scheduler.clone(),
1939 None,
1940 Arc::<DecoderPlugins>::default(),
1941 &test_cache(),
1942 FileReaderOptions::default(),
1943 )
1944 .await
1945 .unwrap();
1946
1947 let mut projection = written_file.schema.project(&["score"]).unwrap();
1948 for field in projection.fields.iter_mut() {
1949 field
1950 .metadata
1951 .insert("lance:compression".to_string(), "zstd".to_string());
1952 }
1953 let projection = ReaderProjection {
1954 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1955 schema: Arc::new(projection),
1956 };
1957
1958 let batch_stream = file_reader
1959 .read_stream_projected(
1960 lance_io::ReadBatchParams::RangeFull,
1961 1024,
1962 16,
1963 projection.clone(),
1964 FilterExpression::no_filter(),
1965 )
1966 .unwrap();
1967
1968 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1969 verify_expected(
1970 &written_file.data,
1971 batch_stream,
1972 1024,
1973 Some(Box::new(move |batch: &RecordBatch| {
1974 batch.project_by_schema(&projection_arrow).unwrap()
1975 })),
1976 )
1977 .await;
1978 }
1979
1980 #[tokio::test]
1981 async fn test_read_all() {
1982 let fs = FsFixture::default();
1983 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1984 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1985
1986 let file_scheduler = fs
1987 .scheduler
1988 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1989 .await
1990 .unwrap();
1991 let file_reader = FileReader::try_open(
1992 file_scheduler.clone(),
1993 None,
1994 Arc::<DecoderPlugins>::default(),
1995 &test_cache(),
1996 FileReaderOptions::default(),
1997 )
1998 .await
1999 .unwrap();
2000
2001 let batches = file_reader
2002 .read_stream(
2003 lance_io::ReadBatchParams::RangeFull,
2004 total_rows as u32,
2005 16,
2006 FilterExpression::no_filter(),
2007 )
2008 .unwrap()
2009 .try_collect::<Vec<_>>()
2010 .await
2011 .unwrap();
2012 assert_eq!(batches.len(), 1);
2013 assert_eq!(batches[0].num_rows(), total_rows);
2014 }
2015
2016 #[tokio::test]
2017 async fn test_blocking_take() {
2018 let fs = FsFixture::default();
2019 let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
2020 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2021
2022 let file_scheduler = fs
2023 .scheduler
2024 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2025 .await
2026 .unwrap();
2027 let file_reader = FileReader::try_open(
2028 file_scheduler.clone(),
2029 Some(
2030 ReaderProjection::from_column_names(LanceFileVersion::V2_1, &schema, &["score"])
2031 .unwrap(),
2032 ),
2033 Arc::<DecoderPlugins>::default(),
2034 &test_cache(),
2035 FileReaderOptions::default(),
2036 )
2037 .await
2038 .unwrap();
2039
2040 let batches = tokio::task::spawn_blocking(move || {
2041 file_reader
2042 .read_stream_projected_blocking(
2043 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2044 total_rows as u32,
2045 None,
2046 FilterExpression::no_filter(),
2047 )
2048 .unwrap()
2049 .collect::<ArrowResult<Vec<_>>>()
2050 .unwrap()
2051 })
2052 .await
2053 .unwrap();
2054
2055 assert_eq!(batches.len(), 1);
2056 assert_eq!(batches[0].num_rows(), 5);
2057 assert_eq!(batches[0].num_columns(), 1);
2058 }
2059
2060 #[tokio::test(flavor = "multi_thread")]
2061 async fn test_drop_in_progress() {
2062 let fs = FsFixture::default();
2063 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2064 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2065
2066 let file_scheduler = fs
2067 .scheduler
2068 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2069 .await
2070 .unwrap();
2071 let file_reader = FileReader::try_open(
2072 file_scheduler.clone(),
2073 None,
2074 Arc::<DecoderPlugins>::default(),
2075 &test_cache(),
2076 FileReaderOptions::default(),
2077 )
2078 .await
2079 .unwrap();
2080
2081 let mut batches = file_reader
2082 .read_stream(
2083 lance_io::ReadBatchParams::RangeFull,
2084 (total_rows / 10) as u32,
2085 16,
2086 FilterExpression::no_filter(),
2087 )
2088 .unwrap();
2089
2090 drop(file_reader);
2091
2092 let batch = batches.next().await.unwrap().unwrap();
2093 assert!(batch.num_rows() > 0);
2094
2095 drop(batches);
2097 }
2098
2099 #[tokio::test]
2100 async fn drop_while_scheduling() {
2101 let fs = FsFixture::default();
2111 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2112 let total_rows = written_file
2113 .data
2114 .iter()
2115 .map(|batch| batch.num_rows())
2116 .sum::<usize>();
2117
2118 let file_scheduler = fs
2119 .scheduler
2120 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2121 .await
2122 .unwrap();
2123 let file_reader = FileReader::try_open(
2124 file_scheduler.clone(),
2125 None,
2126 Arc::<DecoderPlugins>::default(),
2127 &test_cache(),
2128 FileReaderOptions::default(),
2129 )
2130 .await
2131 .unwrap();
2132
2133 let projection =
2134 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2135 let column_infos = file_reader
2136 .collect_columns_from_projection(&projection)
2137 .unwrap();
2138 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2139 &projection.schema,
2140 &projection.column_indices,
2141 &column_infos,
2142 &vec![],
2143 total_rows as u64,
2144 Arc::<DecoderPlugins>::default(),
2145 file_reader.scheduler.clone(),
2146 test_cache(),
2147 &FilterExpression::no_filter(),
2148 &DecoderConfig::default(),
2149 )
2150 .await
2151 .unwrap();
2152
2153 let range = 0..total_rows as u64;
2154
2155 let (tx, rx) = mpsc::unbounded_channel();
2156
2157 drop(rx);
2159
2160 decode_scheduler.schedule_range(
2162 range,
2163 &FilterExpression::no_filter(),
2164 tx,
2165 file_reader.scheduler.clone(),
2166 )
2167 }
2168
2169 #[tokio::test]
2170 async fn test_read_empty_range() {
2171 let fs = FsFixture::default();
2172 create_some_file(&fs, LanceFileVersion::V2_0).await;
2173
2174 let file_scheduler = fs
2175 .scheduler
2176 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2177 .await
2178 .unwrap();
2179 let file_reader = FileReader::try_open(
2180 file_scheduler.clone(),
2181 None,
2182 Arc::<DecoderPlugins>::default(),
2183 &test_cache(),
2184 FileReaderOptions::default(),
2185 )
2186 .await
2187 .unwrap();
2188
2189 let batches = file_reader
2191 .read_stream(
2192 lance_io::ReadBatchParams::Range(0..0),
2193 1024,
2194 16,
2195 FilterExpression::no_filter(),
2196 )
2197 .unwrap()
2198 .try_collect::<Vec<_>>()
2199 .await
2200 .unwrap();
2201
2202 assert_eq!(batches.len(), 0);
2203
2204 let batches = file_reader
2206 .read_stream(
2207 lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2208 1024,
2209 16,
2210 FilterExpression::no_filter(),
2211 )
2212 .unwrap()
2213 .try_collect::<Vec<_>>()
2214 .await
2215 .unwrap();
2216 assert_eq!(batches.len(), 1);
2217 }
2218
2219 #[tokio::test]
2220 async fn test_global_buffers() {
2221 let fs = FsFixture::default();
2222
2223 let lance_schema =
2224 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2225 "foo",
2226 DataType::Int32,
2227 true,
2228 )]))
2229 .unwrap();
2230
2231 let mut file_writer = FileWriter::try_new(
2232 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2233 lance_schema.clone(),
2234 FileWriterOptions::default(),
2235 )
2236 .unwrap();
2237
2238 let test_bytes = Bytes::from_static(b"hello");
2239
2240 let buf_index = file_writer
2241 .add_global_buffer(test_bytes.clone())
2242 .await
2243 .unwrap();
2244
2245 assert_eq!(buf_index, 1);
2246
2247 file_writer.finish().await.unwrap();
2248
2249 let file_scheduler = fs
2250 .scheduler
2251 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2252 .await
2253 .unwrap();
2254 let file_reader = FileReader::try_open(
2255 file_scheduler.clone(),
2256 None,
2257 Arc::<DecoderPlugins>::default(),
2258 &test_cache(),
2259 FileReaderOptions::default(),
2260 )
2261 .await
2262 .unwrap();
2263
2264 let buf = file_reader.read_global_buffer(1).await.unwrap();
2265 assert_eq!(buf, test_bytes);
2266 }
2267}