1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{Stream, StreamExt, stream::BoxStream};
18use lance_encoding::{
19 EncodingsIo,
20 decoder::{
21 ColumnInfo, DecoderConfig, DecoderPlugins, FilterExpression, PageEncoding, PageInfo,
22 ReadBatchTask, RequestedRows, SchedulerDecoderConfig, schedule_and_decode,
23 schedule_and_decode_blocking,
24 },
25 encoder::EncodedBatch,
26 version::LanceFileVersion,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31
32use lance_core::{
33 Error, Result,
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36};
37use lance_encoding::format::pb as pbenc;
38use lance_encoding::format::pb21 as pbenc21;
39use lance_io::{
40 ReadBatchParams,
41 scheduler::FileScheduler,
42 stream::{RecordBatchStream, RecordBatchStreamAdapter},
43};
44
45use crate::{
46 datatypes::{Fields, FieldsWithMeta},
47 format::{MAGIC, MAJOR_VERSION, MINOR_VERSION, pb, pbfile},
48 io::LanceEncodingsIo,
49 writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
55
56#[derive(Debug, DeepSizeOf)]
61pub struct BufferDescriptor {
62 pub position: u64,
63 pub size: u64,
64}
65
66#[derive(Debug)]
68pub struct FileStatistics {
69 pub columns: Vec<ColumnStatistics>,
71}
72
73#[derive(Debug)]
75pub struct ColumnStatistics {
76 pub num_pages: usize,
78 pub size_bytes: u64,
82}
83
84#[derive(Debug)]
86pub struct CachedFileMetadata {
87 pub file_schema: Arc<Schema>,
89 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
91 pub column_infos: Vec<Arc<ColumnInfo>>,
92 pub num_rows: u64,
94 pub file_buffers: Vec<BufferDescriptor>,
95 pub num_data_bytes: u64,
97 pub num_column_metadata_bytes: u64,
100 pub num_global_buffer_bytes: u64,
102 pub num_footer_bytes: u64,
104 pub major_version: u16,
105 pub minor_version: u16,
106}
107
108impl DeepSizeOf for CachedFileMetadata {
109 fn deep_size_of_children(&self, context: &mut Context) -> usize {
111 self.file_schema.deep_size_of_children(context)
112 + self
113 .file_buffers
114 .iter()
115 .map(|file_buffer| file_buffer.deep_size_of_children(context))
116 .sum::<usize>()
117 }
118}
119
120impl CachedFileMetadata {
121 pub fn version(&self) -> LanceFileVersion {
122 match (self.major_version, self.minor_version) {
123 (0, 3) => LanceFileVersion::V2_0,
124 (2, 0) => LanceFileVersion::V2_0,
125 (2, 1) => LanceFileVersion::V2_1,
126 (2, 2) => LanceFileVersion::V2_2,
127 (2, 3) => LanceFileVersion::V2_3,
128 _ => panic!(
129 "Unsupported version: {}.{}",
130 self.major_version, self.minor_version
131 ),
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
158pub struct ReaderProjection {
159 pub schema: Arc<Schema>,
162 pub column_indices: Vec<u32>,
202}
203
204impl ReaderProjection {
205 fn from_field_ids_helper<'a>(
206 file_version: LanceFileVersion,
207 fields: impl Iterator<Item = &'a Field>,
208 field_id_to_column_index: &BTreeMap<u32, u32>,
209 column_indices: &mut Vec<u32>,
210 ) -> Result<()> {
211 for field in fields {
212 let is_structural = file_version >= LanceFileVersion::V2_1;
213 if (!is_structural
216 || field.children.is_empty()
217 || field.is_blob()
218 || field.is_packed_struct())
219 && let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
220 {
221 column_indices.push(column_idx);
222 }
223 if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
225 Self::from_field_ids_helper(
226 file_version,
227 field.children.iter(),
228 field_id_to_column_index,
229 column_indices,
230 )?;
231 }
232 }
233 Ok(())
234 }
235
236 pub fn from_field_ids(
241 file_version: LanceFileVersion,
242 schema: &Schema,
243 field_id_to_column_index: &BTreeMap<u32, u32>,
244 ) -> Result<Self> {
245 let mut column_indices = Vec::new();
246 Self::from_field_ids_helper(
247 file_version,
248 schema.fields.iter(),
249 field_id_to_column_index,
250 &mut column_indices,
251 )?;
252 let projection = Self {
253 schema: Arc::new(schema.clone()),
254 column_indices,
255 };
256 Ok(projection)
257 }
258
259 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
267 let schema = Arc::new(schema.clone());
268 let is_structural = version >= LanceFileVersion::V2_1;
269 let mut column_indices = vec![];
270 let mut curr_column_idx = 0;
271 let mut packed_struct_fields_num = 0;
272 for field in schema.fields_pre_order() {
273 if packed_struct_fields_num > 0 {
274 packed_struct_fields_num -= 1;
275 continue;
276 }
277 if field.is_packed_struct() {
278 column_indices.push(curr_column_idx);
279 curr_column_idx += 1;
280 packed_struct_fields_num = field.children.len();
281 } else if field.children.is_empty() || !is_structural {
282 column_indices.push(curr_column_idx);
283 curr_column_idx += 1;
284 }
285 }
286 Self {
287 schema,
288 column_indices,
289 }
290 }
291
292 pub fn from_column_names(
299 file_version: LanceFileVersion,
300 schema: &Schema,
301 column_names: &[&str],
302 ) -> Result<Self> {
303 let field_id_to_column_index = schema
304 .fields_pre_order()
305 .filter(|field| {
308 file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
309 })
310 .enumerate()
311 .map(|(idx, field)| (field.id as u32, idx as u32))
312 .collect::<BTreeMap<_, _>>();
313 let projected = schema.project(column_names)?;
314 let mut column_indices = Vec::new();
315 Self::from_field_ids_helper(
316 file_version,
317 projected.fields.iter(),
318 &field_id_to_column_index,
319 &mut column_indices,
320 )?;
321 Ok(Self {
322 schema: Arc::new(projected),
323 column_indices,
324 })
325 }
326}
327
328#[derive(Clone, Debug)]
330pub struct FileReaderOptions {
331 pub decoder_config: DecoderConfig,
332 pub read_chunk_size: u64,
336}
337
338impl Default for FileReaderOptions {
339 fn default() -> Self {
340 Self {
341 decoder_config: DecoderConfig::default(),
342 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
343 }
344 }
345}
346
347#[derive(Debug)]
348pub struct FileReader {
349 scheduler: Arc<dyn EncodingsIo>,
350 base_projection: ReaderProjection,
352 num_rows: u64,
353 metadata: Arc<CachedFileMetadata>,
354 decoder_plugins: Arc<DecoderPlugins>,
355 cache: Arc<LanceCache>,
356 options: FileReaderOptions,
357}
358#[derive(Debug)]
359struct Footer {
360 #[allow(dead_code)]
361 column_meta_start: u64,
362 #[allow(dead_code)]
365 column_meta_offsets_start: u64,
366 global_buff_offsets_start: u64,
367 num_global_buffers: u32,
368 num_columns: u32,
369 major_version: u16,
370 minor_version: u16,
371}
372
373const FOOTER_LEN: usize = 40;
374
375impl FileReader {
376 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
377 Self {
378 scheduler,
379 base_projection: self.base_projection.clone(),
380 cache: self.cache.clone(),
381 decoder_plugins: self.decoder_plugins.clone(),
382 metadata: self.metadata.clone(),
383 options: self.options.clone(),
384 num_rows: self.num_rows,
385 }
386 }
387
388 pub fn num_rows(&self) -> u64 {
389 self.num_rows
390 }
391
392 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
393 &self.metadata
394 }
395
396 pub fn file_statistics(&self) -> FileStatistics {
397 let column_metadatas = &self.metadata().column_metadatas;
398
399 let column_stats = column_metadatas
400 .iter()
401 .map(|col_metadata| {
402 let num_pages = col_metadata.pages.len();
403 let size_bytes = col_metadata
404 .pages
405 .iter()
406 .map(|page| page.buffer_sizes.iter().sum::<u64>())
407 .sum::<u64>();
408 ColumnStatistics {
409 num_pages,
410 size_bytes,
411 }
412 })
413 .collect();
414
415 FileStatistics {
416 columns: column_stats,
417 }
418 }
419
420 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
421 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?;
422 self.scheduler
423 .submit_single(
424 buffer_desc.position..buffer_desc.position + buffer_desc.size,
425 0,
426 )
427 .await
428 }
429
430 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
431 let file_size = scheduler.reader().size().await? as u64;
432 let begin = if file_size < scheduler.reader().block_size() as u64 {
433 0
434 } else {
435 file_size - scheduler.reader().block_size() as u64
436 };
437 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
438 Ok((tail_bytes, file_size))
439 }
440
441 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
444 let len = footer_bytes.len();
445 if len < FOOTER_LEN {
446 return Err(Error::invalid_input(format!(
447 "does not have sufficient data, len: {}, bytes: {:?}",
448 len, footer_bytes
449 )));
450 }
451 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
452
453 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
454 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
455 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
456 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
457 let num_columns = cursor.read_u32::<LittleEndian>()?;
458 let major_version = cursor.read_u16::<LittleEndian>()?;
459 let minor_version = cursor.read_u16::<LittleEndian>()?;
460
461 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
462 return Err(Error::version_conflict(
463 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
464 major_version,
465 minor_version,
466 ));
467 }
468
469 let magic_bytes = footer_bytes.slice(len - 4..);
470 if magic_bytes.as_ref() != MAGIC {
471 return Err(Error::invalid_input(format!(
472 "file does not appear to be a Lance file (invalid magic: {:?})",
473 MAGIC
474 )));
475 }
476 Ok(Footer {
477 column_meta_start,
478 column_meta_offsets_start,
479 global_buff_offsets_start,
480 num_global_buffers,
481 num_columns,
482 major_version,
483 minor_version,
484 })
485 }
486
487 fn read_all_column_metadata(
489 column_metadata_bytes: Bytes,
490 footer: &Footer,
491 ) -> Result<Vec<pbfile::ColumnMetadata>> {
492 let column_metadata_start = footer.column_meta_start;
493 let cmo_table_size = 16 * footer.num_columns as usize;
495 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
496
497 (0..footer.num_columns)
498 .map(|col_idx| {
499 let offset = (col_idx * 16) as usize;
500 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
501 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
502 let normalized_position = (position - column_metadata_start) as usize;
503 let normalized_end = normalized_position + (length as usize);
504 Ok(pbfile::ColumnMetadata::decode(
505 &column_metadata_bytes[normalized_position..normalized_end],
506 )?)
507 })
508 .collect::<Result<Vec<_>>>()
509 }
510
511 async fn optimistic_tail_read(
512 data: &Bytes,
513 start_pos: u64,
514 scheduler: &FileScheduler,
515 file_len: u64,
516 ) -> Result<Bytes> {
517 let num_bytes_needed = (file_len - start_pos) as usize;
518 if data.len() >= num_bytes_needed {
519 Ok(data.slice((data.len() - num_bytes_needed)..))
520 } else {
521 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
522 let start = file_len - num_bytes_needed as u64;
523 let missing_bytes = scheduler
524 .submit_single(start..start + num_bytes_missing, 0)
525 .await?;
526 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
527 combined.extend(missing_bytes);
528 combined.extend(data);
529 Ok(combined.freeze())
530 }
531 }
532
533 fn do_decode_gbo_table(
534 gbo_bytes: &Bytes,
535 footer: &Footer,
536 version: LanceFileVersion,
537 ) -> Result<Vec<BufferDescriptor>> {
538 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
539
540 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
541 for _ in 0..footer.num_global_buffers {
542 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
543 assert!(
544 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
545 );
546 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
547 global_buffers.push(BufferDescriptor {
548 position: buf_pos,
549 size: buf_size,
550 });
551 }
552
553 Ok(global_buffers)
554 }
555
556 async fn decode_gbo_table(
557 tail_bytes: &Bytes,
558 file_len: u64,
559 scheduler: &FileScheduler,
560 footer: &Footer,
561 version: LanceFileVersion,
562 ) -> Result<Vec<BufferDescriptor>> {
563 let gbo_bytes = Self::optimistic_tail_read(
566 tail_bytes,
567 footer.global_buff_offsets_start,
568 scheduler,
569 file_len,
570 )
571 .await?;
572 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
573 }
574
575 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
576 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
577 let pb_schema = file_descriptor.schema.unwrap();
578 let num_rows = file_descriptor.length;
579 let fields_with_meta = FieldsWithMeta {
580 fields: Fields(pb_schema.fields),
581 metadata: pb_schema.metadata,
582 };
583 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
584 Ok((num_rows, schema))
585 }
586
587 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
601 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
603 let footer = Self::decode_footer(&tail_bytes)?;
604
605 let file_version = LanceFileVersion::try_from_major_minor(
606 footer.major_version as u32,
607 footer.minor_version as u32,
608 )?;
609
610 let gbo_table =
611 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
612 if gbo_table.is_empty() {
613 return Err(Error::internal(
614 "File did not contain any global buffers, schema expected".to_string(),
615 ));
616 }
617 let schema_start = gbo_table[0].position;
618 let schema_size = gbo_table[0].size;
619
620 let num_footer_bytes = file_len - schema_start;
621
622 let all_metadata_bytes =
625 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
626
627 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
628 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
629
630 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
633 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
634 let column_metadata_bytes =
635 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
636 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
637
638 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
639 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
640 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
641
642 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
643
644 Ok(CachedFileMetadata {
645 file_schema: Arc::new(schema),
646 column_metadatas,
647 column_infos,
648 num_rows,
649 num_data_bytes,
650 num_column_metadata_bytes,
651 num_global_buffer_bytes,
652 num_footer_bytes,
653 file_buffers: gbo_table,
654 major_version: footer.major_version,
655 minor_version: footer.minor_version,
656 })
657 }
658
659 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
660 match &encoding.location {
661 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
662 Some(pbfile::encoding::Location::Direct(encoding)) => {
663 let encoding_buf = Bytes::from(encoding.encoding.clone());
664 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
665 encoding_any.to_msg::<M>().unwrap()
666 }
667 Some(pbfile::encoding::Location::None(_)) => panic!(),
668 None => panic!(),
669 }
670 }
671
672 fn meta_to_col_infos(
673 column_metadatas: &[pbfile::ColumnMetadata],
674 file_version: LanceFileVersion,
675 ) -> Vec<Arc<ColumnInfo>> {
676 column_metadatas
677 .iter()
678 .enumerate()
679 .map(|(col_idx, col_meta)| {
680 let page_infos = col_meta
681 .pages
682 .iter()
683 .map(|page| {
684 let num_rows = page.length;
685 let encoding = match file_version {
686 LanceFileVersion::V2_0 => {
687 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
688 page.encoding.as_ref().unwrap(),
689 ))
690 }
691 _ => PageEncoding::Structural(Self::fetch_encoding::<
692 pbenc21::PageLayout,
693 >(
694 page.encoding.as_ref().unwrap()
695 )),
696 };
697 let buffer_offsets_and_sizes = Arc::from(
698 page.buffer_offsets
699 .iter()
700 .zip(page.buffer_sizes.iter())
701 .map(|(offset, size)| {
702 assert!(
704 file_version < LanceFileVersion::V2_1
705 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
706 );
707 (*offset, *size)
708 })
709 .collect::<Vec<_>>(),
710 );
711 PageInfo {
712 buffer_offsets_and_sizes,
713 encoding,
714 num_rows,
715 priority: page.priority,
716 }
717 })
718 .collect::<Vec<_>>();
719 let buffer_offsets_and_sizes = Arc::from(
720 col_meta
721 .buffer_offsets
722 .iter()
723 .zip(col_meta.buffer_sizes.iter())
724 .map(|(offset, size)| (*offset, *size))
725 .collect::<Vec<_>>(),
726 );
727 Arc::new(ColumnInfo {
728 index: col_idx as u32,
729 page_infos: Arc::from(page_infos),
730 buffer_offsets_and_sizes,
731 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
732 })
733 })
734 .collect::<Vec<_>>()
735 }
736
737 fn validate_projection(
738 projection: &ReaderProjection,
739 metadata: &CachedFileMetadata,
740 ) -> Result<()> {
741 if projection.schema.fields.is_empty() {
742 return Err(Error::invalid_input(
743 "Attempt to read zero columns from the file, at least one column must be specified"
744 .to_string(),
745 ));
746 }
747 let mut column_indices_seen = BTreeSet::new();
748 for column_index in &projection.column_indices {
749 if !column_indices_seen.insert(*column_index) {
750 return Err(Error::invalid_input(format!(
751 "The projection specified the column index {} more than once",
752 column_index
753 )));
754 }
755 if *column_index >= metadata.column_infos.len() as u32 {
756 return Err(Error::invalid_input(format!(
757 "The projection specified the column index {} but there are only {} columns in the file",
758 column_index,
759 metadata.column_infos.len()
760 )));
761 }
762 }
763 Ok(())
764 }
765
766 pub async fn try_open(
773 scheduler: FileScheduler,
774 base_projection: Option<ReaderProjection>,
775 decoder_plugins: Arc<DecoderPlugins>,
776 cache: &LanceCache,
777 options: FileReaderOptions,
778 ) -> Result<Self> {
779 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
780 let path = scheduler.reader().path().clone();
781
782 let encodings_io =
784 LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
785
786 Self::try_open_with_file_metadata(
787 Arc::new(encodings_io),
788 path,
789 base_projection,
790 decoder_plugins,
791 file_metadata,
792 cache,
793 options,
794 )
795 .await
796 }
797
798 pub async fn try_open_with_file_metadata(
804 scheduler: Arc<dyn EncodingsIo>,
805 path: Path,
806 base_projection: Option<ReaderProjection>,
807 decoder_plugins: Arc<DecoderPlugins>,
808 file_metadata: Arc<CachedFileMetadata>,
809 cache: &LanceCache,
810 options: FileReaderOptions,
811 ) -> Result<Self> {
812 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
813
814 if let Some(base_projection) = base_projection.as_ref() {
815 Self::validate_projection(base_projection, &file_metadata)?;
816 }
817 let num_rows = file_metadata.num_rows;
818 Ok(Self {
819 scheduler,
820 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
821 file_metadata.file_schema.as_ref(),
822 file_metadata.version(),
823 )),
824 num_rows,
825 metadata: file_metadata,
826 decoder_plugins,
827 cache,
828 options,
829 })
830 }
831
832 fn collect_columns_from_projection(
846 &self,
847 _projection: &ReaderProjection,
848 ) -> Result<Vec<Arc<ColumnInfo>>> {
849 Ok(self.metadata.column_infos.clone())
850 }
851
852 #[allow(clippy::too_many_arguments)]
853 fn do_read_range(
854 column_infos: Vec<Arc<ColumnInfo>>,
855 io: Arc<dyn EncodingsIo>,
856 cache: Arc<LanceCache>,
857 num_rows: u64,
858 decoder_plugins: Arc<DecoderPlugins>,
859 range: Range<u64>,
860 batch_size: u32,
861 projection: ReaderProjection,
862 filter: FilterExpression,
863 decoder_config: DecoderConfig,
864 ) -> Result<BoxStream<'static, ReadBatchTask>> {
865 debug!(
866 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
867 range,
868 batch_size,
869 num_rows,
870 column_infos.len(),
871 projection.schema.fields.len(),
872 );
873
874 let config = SchedulerDecoderConfig {
875 batch_size,
876 cache,
877 decoder_plugins,
878 io,
879 decoder_config,
880 };
881
882 let requested_rows = RequestedRows::Ranges(vec![range]);
883
884 Ok(schedule_and_decode(
885 column_infos,
886 requested_rows,
887 filter,
888 projection.column_indices,
889 projection.schema,
890 config,
891 ))
892 }
893
894 fn read_range(
895 &self,
896 range: Range<u64>,
897 batch_size: u32,
898 projection: ReaderProjection,
899 filter: FilterExpression,
900 ) -> Result<BoxStream<'static, ReadBatchTask>> {
901 Self::do_read_range(
903 self.collect_columns_from_projection(&projection)?,
904 self.scheduler.clone(),
905 self.cache.clone(),
906 self.num_rows,
907 self.decoder_plugins.clone(),
908 range,
909 batch_size,
910 projection,
911 filter,
912 self.options.decoder_config.clone(),
913 )
914 }
915
916 #[allow(clippy::too_many_arguments)]
917 fn do_take_rows(
918 column_infos: Vec<Arc<ColumnInfo>>,
919 io: Arc<dyn EncodingsIo>,
920 cache: Arc<LanceCache>,
921 decoder_plugins: Arc<DecoderPlugins>,
922 indices: Vec<u64>,
923 batch_size: u32,
924 projection: ReaderProjection,
925 filter: FilterExpression,
926 decoder_config: DecoderConfig,
927 ) -> Result<BoxStream<'static, ReadBatchTask>> {
928 debug!(
929 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
930 indices.len(),
931 indices[0],
932 indices[indices.len() - 1],
933 batch_size,
934 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
935 );
936
937 let config = SchedulerDecoderConfig {
938 batch_size,
939 cache,
940 decoder_plugins,
941 io,
942 decoder_config,
943 };
944
945 let requested_rows = RequestedRows::Indices(indices);
946
947 Ok(schedule_and_decode(
948 column_infos,
949 requested_rows,
950 filter,
951 projection.column_indices,
952 projection.schema,
953 config,
954 ))
955 }
956
957 fn take_rows(
958 &self,
959 indices: Vec<u64>,
960 batch_size: u32,
961 projection: ReaderProjection,
962 ) -> Result<BoxStream<'static, ReadBatchTask>> {
963 Self::do_take_rows(
965 self.collect_columns_from_projection(&projection)?,
966 self.scheduler.clone(),
967 self.cache.clone(),
968 self.decoder_plugins.clone(),
969 indices,
970 batch_size,
971 projection,
972 FilterExpression::no_filter(),
973 self.options.decoder_config.clone(),
974 )
975 }
976
977 #[allow(clippy::too_many_arguments)]
978 fn do_read_ranges(
979 column_infos: Vec<Arc<ColumnInfo>>,
980 io: Arc<dyn EncodingsIo>,
981 cache: Arc<LanceCache>,
982 decoder_plugins: Arc<DecoderPlugins>,
983 ranges: Vec<Range<u64>>,
984 batch_size: u32,
985 projection: ReaderProjection,
986 filter: FilterExpression,
987 decoder_config: DecoderConfig,
988 ) -> Result<BoxStream<'static, ReadBatchTask>> {
989 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
990 debug!(
991 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
992 ranges.len(),
993 num_rows,
994 ranges[0].start,
995 ranges[ranges.len() - 1].end,
996 batch_size,
997 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
998 );
999
1000 let config = SchedulerDecoderConfig {
1001 batch_size,
1002 cache,
1003 decoder_plugins,
1004 io,
1005 decoder_config,
1006 };
1007
1008 let requested_rows = RequestedRows::Ranges(ranges);
1009
1010 Ok(schedule_and_decode(
1011 column_infos,
1012 requested_rows,
1013 filter,
1014 projection.column_indices,
1015 projection.schema,
1016 config,
1017 ))
1018 }
1019
1020 fn read_ranges(
1021 &self,
1022 ranges: Vec<Range<u64>>,
1023 batch_size: u32,
1024 projection: ReaderProjection,
1025 filter: FilterExpression,
1026 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1027 Self::do_read_ranges(
1028 self.collect_columns_from_projection(&projection)?,
1029 self.scheduler.clone(),
1030 self.cache.clone(),
1031 self.decoder_plugins.clone(),
1032 ranges,
1033 batch_size,
1034 projection,
1035 filter,
1036 self.options.decoder_config.clone(),
1037 )
1038 }
1039
1040 pub fn read_tasks(
1051 &self,
1052 params: ReadBatchParams,
1053 batch_size: u32,
1054 projection: Option<ReaderProjection>,
1055 filter: FilterExpression,
1056 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1057 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1058 Self::validate_projection(&projection, &self.metadata)?;
1059 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1060 if bound > self.num_rows || bound == self.num_rows && inclusive {
1061 Err(Error::invalid_input(format!(
1062 "cannot read {:?} from file with {} rows",
1063 params, self.num_rows
1064 )))
1065 } else {
1066 Ok(())
1067 }
1068 };
1069 match ¶ms {
1070 ReadBatchParams::Indices(indices) => {
1071 for idx in indices {
1072 match idx {
1073 None => {
1074 return Err(Error::invalid_input("Null value in indices array"));
1075 }
1076 Some(idx) => {
1077 verify_bound(¶ms, idx as u64, true)?;
1078 }
1079 }
1080 }
1081 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1082 self.take_rows(indices, batch_size, projection)
1083 }
1084 ReadBatchParams::Range(range) => {
1085 verify_bound(¶ms, range.end as u64, false)?;
1086 self.read_range(
1087 range.start as u64..range.end as u64,
1088 batch_size,
1089 projection,
1090 filter,
1091 )
1092 }
1093 ReadBatchParams::Ranges(ranges) => {
1094 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1095 for range in ranges.as_ref() {
1096 verify_bound(¶ms, range.end, false)?;
1097 ranges_u64.push(range.start..range.end);
1098 }
1099 self.read_ranges(ranges_u64, batch_size, projection, filter)
1100 }
1101 ReadBatchParams::RangeFrom(range) => {
1102 verify_bound(¶ms, range.start as u64, true)?;
1103 self.read_range(
1104 range.start as u64..self.num_rows,
1105 batch_size,
1106 projection,
1107 filter,
1108 )
1109 }
1110 ReadBatchParams::RangeTo(range) => {
1111 verify_bound(¶ms, range.end as u64, false)?;
1112 self.read_range(0..range.end as u64, batch_size, projection, filter)
1113 }
1114 ReadBatchParams::RangeFull => {
1115 self.read_range(0..self.num_rows, batch_size, projection, filter)
1116 }
1117 }
1118 }
1119
1120 pub fn read_stream_projected(
1142 &self,
1143 params: ReadBatchParams,
1144 batch_size: u32,
1145 batch_readahead: u32,
1146 projection: ReaderProjection,
1147 filter: FilterExpression,
1148 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1149 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1150 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1151 let batch_stream = tasks_stream
1152 .map(|task| task.task)
1153 .buffered(batch_readahead as usize)
1154 .boxed();
1155 Ok(Box::pin(RecordBatchStreamAdapter::new(
1156 arrow_schema,
1157 batch_stream,
1158 )))
1159 }
1160
1161 fn take_rows_blocking(
1162 &self,
1163 indices: Vec<u64>,
1164 batch_size: u32,
1165 projection: ReaderProjection,
1166 filter: FilterExpression,
1167 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1168 let column_infos = self.collect_columns_from_projection(&projection)?;
1169 debug!(
1170 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1171 indices.len(),
1172 indices[0],
1173 indices[indices.len() - 1],
1174 batch_size,
1175 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1176 );
1177
1178 let config = SchedulerDecoderConfig {
1179 batch_size,
1180 cache: self.cache.clone(),
1181 decoder_plugins: self.decoder_plugins.clone(),
1182 io: self.scheduler.clone(),
1183 decoder_config: self.options.decoder_config.clone(),
1184 };
1185
1186 let requested_rows = RequestedRows::Indices(indices);
1187
1188 schedule_and_decode_blocking(
1189 column_infos,
1190 requested_rows,
1191 filter,
1192 projection.column_indices,
1193 projection.schema,
1194 config,
1195 )
1196 }
1197
1198 fn read_ranges_blocking(
1199 &self,
1200 ranges: Vec<Range<u64>>,
1201 batch_size: u32,
1202 projection: ReaderProjection,
1203 filter: FilterExpression,
1204 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1205 let column_infos = self.collect_columns_from_projection(&projection)?;
1206 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1207 debug!(
1208 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1209 ranges.len(),
1210 num_rows,
1211 ranges[0].start,
1212 ranges[ranges.len() - 1].end,
1213 batch_size,
1214 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1215 );
1216
1217 let config = SchedulerDecoderConfig {
1218 batch_size,
1219 cache: self.cache.clone(),
1220 decoder_plugins: self.decoder_plugins.clone(),
1221 io: self.scheduler.clone(),
1222 decoder_config: self.options.decoder_config.clone(),
1223 };
1224
1225 let requested_rows = RequestedRows::Ranges(ranges);
1226
1227 schedule_and_decode_blocking(
1228 column_infos,
1229 requested_rows,
1230 filter,
1231 projection.column_indices,
1232 projection.schema,
1233 config,
1234 )
1235 }
1236
1237 fn read_range_blocking(
1238 &self,
1239 range: Range<u64>,
1240 batch_size: u32,
1241 projection: ReaderProjection,
1242 filter: FilterExpression,
1243 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1244 let column_infos = self.collect_columns_from_projection(&projection)?;
1245 let num_rows = self.num_rows;
1246
1247 debug!(
1248 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1249 range,
1250 batch_size,
1251 num_rows,
1252 column_infos.len(),
1253 projection.schema.fields.len(),
1254 );
1255
1256 let config = SchedulerDecoderConfig {
1257 batch_size,
1258 cache: self.cache.clone(),
1259 decoder_plugins: self.decoder_plugins.clone(),
1260 io: self.scheduler.clone(),
1261 decoder_config: self.options.decoder_config.clone(),
1262 };
1263
1264 let requested_rows = RequestedRows::Ranges(vec![range]);
1265
1266 schedule_and_decode_blocking(
1267 column_infos,
1268 requested_rows,
1269 filter,
1270 projection.column_indices,
1271 projection.schema,
1272 config,
1273 )
1274 }
1275
1276 pub fn read_stream_projected_blocking(
1288 &self,
1289 params: ReadBatchParams,
1290 batch_size: u32,
1291 projection: Option<ReaderProjection>,
1292 filter: FilterExpression,
1293 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1294 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1295 Self::validate_projection(&projection, &self.metadata)?;
1296 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1297 if bound > self.num_rows || bound == self.num_rows && inclusive {
1298 Err(Error::invalid_input(format!(
1299 "cannot read {:?} from file with {} rows",
1300 params, self.num_rows
1301 )))
1302 } else {
1303 Ok(())
1304 }
1305 };
1306 match ¶ms {
1307 ReadBatchParams::Indices(indices) => {
1308 for idx in indices {
1309 match idx {
1310 None => {
1311 return Err(Error::invalid_input("Null value in indices array"));
1312 }
1313 Some(idx) => {
1314 verify_bound(¶ms, idx as u64, true)?;
1315 }
1316 }
1317 }
1318 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1319 self.take_rows_blocking(indices, batch_size, projection, filter)
1320 }
1321 ReadBatchParams::Range(range) => {
1322 verify_bound(¶ms, range.end as u64, false)?;
1323 self.read_range_blocking(
1324 range.start as u64..range.end as u64,
1325 batch_size,
1326 projection,
1327 filter,
1328 )
1329 }
1330 ReadBatchParams::Ranges(ranges) => {
1331 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1332 for range in ranges.as_ref() {
1333 verify_bound(¶ms, range.end, false)?;
1334 ranges_u64.push(range.start..range.end);
1335 }
1336 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1337 }
1338 ReadBatchParams::RangeFrom(range) => {
1339 verify_bound(¶ms, range.start as u64, true)?;
1340 self.read_range_blocking(
1341 range.start as u64..self.num_rows,
1342 batch_size,
1343 projection,
1344 filter,
1345 )
1346 }
1347 ReadBatchParams::RangeTo(range) => {
1348 verify_bound(¶ms, range.end as u64, false)?;
1349 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1350 }
1351 ReadBatchParams::RangeFull => {
1352 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1353 }
1354 }
1355 }
1356
1357 pub fn read_stream(
1363 &self,
1364 params: ReadBatchParams,
1365 batch_size: u32,
1366 batch_readahead: u32,
1367 filter: FilterExpression,
1368 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1369 self.read_stream_projected(
1370 params,
1371 batch_size,
1372 batch_readahead,
1373 self.base_projection.clone(),
1374 filter,
1375 )
1376 }
1377
1378 pub fn schema(&self) -> &Arc<Schema> {
1379 &self.metadata.file_schema
1380 }
1381}
1382
1383pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1385 if let Some(encoding) = &page.encoding {
1386 if let Some(style) = &encoding.location {
1387 match style {
1388 pbfile::encoding::Location::Indirect(indirect) => {
1389 format!(
1390 "IndirectEncoding(pos={},size={})",
1391 indirect.buffer_location, indirect.buffer_length
1392 )
1393 }
1394 pbfile::encoding::Location::Direct(direct) => {
1395 let encoding_any =
1396 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1397 .expect("failed to deserialize encoding as protobuf");
1398 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1399 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1400 match encoding {
1401 Ok(encoding) => {
1402 format!("{:#?}", encoding)
1403 }
1404 Err(err) => {
1405 format!("Unsupported(decode_err={})", err)
1406 }
1407 }
1408 } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1409 let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1410 match encoding {
1411 Ok(encoding) => {
1412 format!("{:#?}", encoding)
1413 }
1414 Err(err) => {
1415 format!("Unsupported(decode_err={})", err)
1416 }
1417 }
1418 } else {
1419 format!("Unrecognized(type_url={})", encoding_any.type_url)
1420 }
1421 }
1422 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1423 }
1424 } else {
1425 "MISSING STYLE".to_string()
1426 }
1427 } else {
1428 "MISSING".to_string()
1429 }
1430}
1431
1432pub trait EncodedBatchReaderExt {
1433 fn try_from_mini_lance(
1434 bytes: Bytes,
1435 schema: &Schema,
1436 version: LanceFileVersion,
1437 ) -> Result<Self>
1438 where
1439 Self: Sized;
1440 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1441 where
1442 Self: Sized;
1443}
1444
1445impl EncodedBatchReaderExt for EncodedBatch {
1446 fn try_from_mini_lance(
1447 bytes: Bytes,
1448 schema: &Schema,
1449 file_version: LanceFileVersion,
1450 ) -> Result<Self>
1451 where
1452 Self: Sized,
1453 {
1454 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1455 let footer = FileReader::decode_footer(&bytes)?;
1456
1457 let column_metadata_start = footer.column_meta_start as usize;
1460 let column_metadata_end = footer.global_buff_offsets_start as usize;
1461 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1462 let column_metadatas =
1463 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1464
1465 let file_version = LanceFileVersion::try_from_major_minor(
1466 footer.major_version as u32,
1467 footer.minor_version as u32,
1468 )?;
1469
1470 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1471
1472 Ok(Self {
1473 data: bytes,
1474 num_rows: page_table
1475 .first()
1476 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1477 .unwrap_or(0),
1478 page_table,
1479 top_level_columns: projection.column_indices,
1480 schema: Arc::new(schema.clone()),
1481 })
1482 }
1483
1484 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1485 where
1486 Self: Sized,
1487 {
1488 let footer = FileReader::decode_footer(&bytes)?;
1489 let file_version = LanceFileVersion::try_from_major_minor(
1490 footer.major_version as u32,
1491 footer.minor_version as u32,
1492 )?;
1493
1494 let gbo_table = FileReader::do_decode_gbo_table(
1495 &bytes.slice(footer.global_buff_offsets_start as usize..),
1496 &footer,
1497 file_version,
1498 )?;
1499 if gbo_table.is_empty() {
1500 return Err(Error::internal(
1501 "File did not contain any global buffers, schema expected".to_string(),
1502 ));
1503 }
1504 let schema_start = gbo_table[0].position as usize;
1505 let schema_size = gbo_table[0].size as usize;
1506
1507 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1508 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1509 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1510
1511 let column_metadata_start = footer.column_meta_start as usize;
1514 let column_metadata_end = footer.global_buff_offsets_start as usize;
1515 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1516 let column_metadatas =
1517 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1518
1519 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1520
1521 Ok(Self {
1522 data: bytes,
1523 num_rows: page_table
1524 .first()
1525 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1526 .unwrap_or(0),
1527 page_table,
1528 top_level_columns: projection.column_indices,
1529 schema: Arc::new(schema),
1530 })
1531 }
1532}
1533
1534#[cfg(test)]
1535pub mod tests {
1536 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1537
1538 use arrow_array::{
1539 RecordBatch, UInt32Array,
1540 types::{Float64Type, Int32Type},
1541 };
1542 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1543 use bytes::Bytes;
1544 use futures::{StreamExt, prelude::stream::TryStreamExt};
1545 use lance_arrow::RecordBatchExt;
1546 use lance_core::{ArrowResult, datatypes::Schema};
1547 use lance_datagen::{BatchCount, ByteCount, RowCount, array, gen_batch};
1548 use lance_encoding::{
1549 decoder::{DecodeBatchScheduler, DecoderPlugins, FilterExpression, decode_batch},
1550 encoder::{EncodedBatch, EncodingOptions, default_encoding_strategy, encode_batch},
1551 version::LanceFileVersion,
1552 };
1553 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1554 use log::debug;
1555 use rstest::rstest;
1556 use tokio::sync::mpsc;
1557
1558 use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1559 use crate::testing::{FsFixture, WrittenFile, test_cache, write_lance_file};
1560 use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1561 use lance_encoding::decoder::DecoderConfig;
1562
1563 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1564 let location_type = DataType::Struct(Fields::from(vec![
1565 Field::new("x", DataType::Float64, true),
1566 Field::new("y", DataType::Float64, true),
1567 ]));
1568 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1569
1570 let mut reader = gen_batch()
1571 .col("score", array::rand::<Float64Type>())
1572 .col("location", array::rand_type(&location_type))
1573 .col("categories", array::rand_type(&categories_type))
1574 .col("binary", array::rand_type(&DataType::Binary));
1575 if version <= LanceFileVersion::V2_0 {
1576 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1577 }
1578 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1579
1580 write_lance_file(
1581 reader,
1582 fs,
1583 FileWriterOptions {
1584 format_version: Some(version),
1585 ..Default::default()
1586 },
1587 )
1588 .await
1589 }
1590
1591 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1592
1593 async fn verify_expected(
1594 expected: &[RecordBatch],
1595 mut actual: Pin<Box<dyn RecordBatchStream>>,
1596 read_size: u32,
1597 transform: Option<Transformer>,
1598 ) {
1599 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1600 let mut expected_iter = expected.iter().map(|batch| {
1601 if let Some(transform) = &transform {
1602 transform(batch)
1603 } else {
1604 batch.clone()
1605 }
1606 });
1607 let mut next_expected = expected_iter.next().unwrap().clone();
1608 while let Some(actual) = actual.next().await {
1609 let mut actual = actual.unwrap();
1610 let mut rows_to_verify = actual.num_rows() as u32;
1611 let expected_length = remaining.min(read_size);
1612 assert_eq!(expected_length, rows_to_verify);
1613
1614 while rows_to_verify > 0 {
1615 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1616 assert_eq!(
1617 next_expected.slice(0, next_slice_len as usize),
1618 actual.slice(0, next_slice_len as usize)
1619 );
1620 remaining -= next_slice_len;
1621 rows_to_verify -= next_slice_len;
1622 if remaining > 0 {
1623 if next_slice_len == next_expected.num_rows() as u32 {
1624 next_expected = expected_iter.next().unwrap().clone();
1625 } else {
1626 next_expected = next_expected.slice(
1627 next_slice_len as usize,
1628 next_expected.num_rows() - next_slice_len as usize,
1629 );
1630 }
1631 }
1632 if rows_to_verify > 0 {
1633 actual = actual.slice(
1634 next_slice_len as usize,
1635 actual.num_rows() - next_slice_len as usize,
1636 );
1637 }
1638 }
1639 }
1640 assert_eq!(remaining, 0);
1641 }
1642
1643 #[tokio::test]
1644 async fn test_round_trip() {
1645 let fs = FsFixture::default();
1646
1647 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1648
1649 for read_size in [32, 1024, 1024 * 1024] {
1650 let file_scheduler = fs
1651 .scheduler
1652 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1653 .await
1654 .unwrap();
1655 let file_reader = FileReader::try_open(
1656 file_scheduler,
1657 None,
1658 Arc::<DecoderPlugins>::default(),
1659 &test_cache(),
1660 FileReaderOptions::default(),
1661 )
1662 .await
1663 .unwrap();
1664
1665 let schema = file_reader.schema();
1666 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1667
1668 let batch_stream = file_reader
1669 .read_stream(
1670 lance_io::ReadBatchParams::RangeFull,
1671 read_size,
1672 16,
1673 FilterExpression::no_filter(),
1674 )
1675 .unwrap();
1676
1677 verify_expected(&data, batch_stream, read_size, None).await;
1678 }
1679 }
1680
1681 #[rstest]
1682 #[test_log::test(tokio::test)]
1683 async fn test_encoded_batch_round_trip(
1684 #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1686 ) {
1687 let data = gen_batch()
1688 .col("x", array::rand::<Int32Type>())
1689 .col("y", array::rand_utf8(ByteCount::from(16), false))
1690 .into_batch_rows(RowCount::from(10000))
1691 .unwrap();
1692
1693 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1694
1695 let encoding_options = EncodingOptions {
1696 cache_bytes_per_column: 4096,
1697 max_page_bytes: 32 * 1024 * 1024,
1698 keep_original_array: true,
1699 buffer_alignment: 64,
1700 version,
1701 };
1702
1703 let encoding_strategy = default_encoding_strategy(version);
1704
1705 let encoded_batch = encode_batch(
1706 &data,
1707 lance_schema.clone(),
1708 encoding_strategy.as_ref(),
1709 &encoding_options,
1710 )
1711 .await
1712 .unwrap();
1713
1714 let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1716
1717 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1718
1719 let decoded = decode_batch(
1720 &decoded_batch,
1721 &FilterExpression::no_filter(),
1722 Arc::<DecoderPlugins>::default(),
1723 false,
1724 version,
1725 None,
1726 )
1727 .await
1728 .unwrap();
1729
1730 assert_eq!(data, decoded);
1731
1732 let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1734 let decoded_batch =
1735 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1736 .unwrap();
1737 let decoded = decode_batch(
1738 &decoded_batch,
1739 &FilterExpression::no_filter(),
1740 Arc::<DecoderPlugins>::default(),
1741 false,
1742 version,
1743 None,
1744 )
1745 .await
1746 .unwrap();
1747
1748 assert_eq!(data, decoded);
1749 }
1750
1751 #[rstest]
1752 #[test_log::test(tokio::test)]
1753 async fn test_projection(
1754 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
1755 version: LanceFileVersion,
1756 ) {
1757 let fs = FsFixture::default();
1758
1759 let written_file = create_some_file(&fs, version).await;
1760 let file_scheduler = fs
1761 .scheduler
1762 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1763 .await
1764 .unwrap();
1765
1766 let field_id_mapping = written_file
1767 .field_id_mapping
1768 .iter()
1769 .copied()
1770 .collect::<BTreeMap<_, _>>();
1771
1772 let empty_projection = ReaderProjection {
1773 column_indices: Vec::default(),
1774 schema: Arc::new(Schema::default()),
1775 };
1776
1777 for columns in [
1778 vec!["score"],
1779 vec!["location"],
1780 vec!["categories"],
1781 vec!["score.x"],
1782 vec!["score", "categories"],
1783 vec!["score", "location"],
1784 vec!["location", "categories"],
1785 vec!["score.y", "location", "categories"],
1786 ] {
1787 debug!("Testing round trip with projection {:?}", columns);
1788 for use_field_ids in [true, false] {
1789 let file_reader = FileReader::try_open(
1791 file_scheduler.clone(),
1792 None,
1793 Arc::<DecoderPlugins>::default(),
1794 &test_cache(),
1795 FileReaderOptions::default(),
1796 )
1797 .await
1798 .unwrap();
1799
1800 let projected_schema = written_file.schema.project(&columns).unwrap();
1801 let projection = if use_field_ids {
1802 ReaderProjection::from_field_ids(
1803 file_reader.metadata.version(),
1804 &projected_schema,
1805 &field_id_mapping,
1806 )
1807 .unwrap()
1808 } else {
1809 ReaderProjection::from_column_names(
1810 file_reader.metadata.version(),
1811 &written_file.schema,
1812 &columns,
1813 )
1814 .unwrap()
1815 };
1816
1817 let batch_stream = file_reader
1818 .read_stream_projected(
1819 lance_io::ReadBatchParams::RangeFull,
1820 1024,
1821 16,
1822 projection.clone(),
1823 FilterExpression::no_filter(),
1824 )
1825 .unwrap();
1826
1827 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1828 verify_expected(
1829 &written_file.data,
1830 batch_stream,
1831 1024,
1832 Some(Box::new(move |batch: &RecordBatch| {
1833 batch.project_by_schema(&projection_arrow).unwrap()
1834 })),
1835 )
1836 .await;
1837
1838 let file_reader = FileReader::try_open(
1840 file_scheduler.clone(),
1841 Some(projection.clone()),
1842 Arc::<DecoderPlugins>::default(),
1843 &test_cache(),
1844 FileReaderOptions::default(),
1845 )
1846 .await
1847 .unwrap();
1848
1849 let batch_stream = file_reader
1850 .read_stream(
1851 lance_io::ReadBatchParams::RangeFull,
1852 1024,
1853 16,
1854 FilterExpression::no_filter(),
1855 )
1856 .unwrap();
1857
1858 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1859 verify_expected(
1860 &written_file.data,
1861 batch_stream,
1862 1024,
1863 Some(Box::new(move |batch: &RecordBatch| {
1864 batch.project_by_schema(&projection_arrow).unwrap()
1865 })),
1866 )
1867 .await;
1868
1869 assert!(
1870 file_reader
1871 .read_stream_projected(
1872 lance_io::ReadBatchParams::RangeFull,
1873 1024,
1874 16,
1875 empty_projection.clone(),
1876 FilterExpression::no_filter(),
1877 )
1878 .is_err()
1879 );
1880 }
1881 }
1882
1883 assert!(
1884 FileReader::try_open(
1885 file_scheduler.clone(),
1886 Some(empty_projection),
1887 Arc::<DecoderPlugins>::default(),
1888 &test_cache(),
1889 FileReaderOptions::default(),
1890 )
1891 .await
1892 .is_err()
1893 );
1894
1895 let arrow_schema = ArrowSchema::new(vec![
1896 Field::new("x", DataType::Int32, true),
1897 Field::new("y", DataType::Int32, true),
1898 ]);
1899 let schema = Schema::try_from(&arrow_schema).unwrap();
1900
1901 let projection_with_dupes = ReaderProjection {
1902 column_indices: vec![0, 0],
1903 schema: Arc::new(schema),
1904 };
1905
1906 assert!(
1907 FileReader::try_open(
1908 file_scheduler.clone(),
1909 Some(projection_with_dupes),
1910 Arc::<DecoderPlugins>::default(),
1911 &test_cache(),
1912 FileReaderOptions::default(),
1913 )
1914 .await
1915 .is_err()
1916 );
1917 }
1918
1919 #[test_log::test(tokio::test)]
1920 async fn test_compressing_buffer() {
1921 let fs = FsFixture::default();
1922
1923 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1924 let file_scheduler = fs
1925 .scheduler
1926 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1927 .await
1928 .unwrap();
1929
1930 let file_reader = FileReader::try_open(
1932 file_scheduler.clone(),
1933 None,
1934 Arc::<DecoderPlugins>::default(),
1935 &test_cache(),
1936 FileReaderOptions::default(),
1937 )
1938 .await
1939 .unwrap();
1940
1941 let mut projection = written_file.schema.project(&["score"]).unwrap();
1942 for field in projection.fields.iter_mut() {
1943 field
1944 .metadata
1945 .insert("lance:compression".to_string(), "zstd".to_string());
1946 }
1947 let projection = ReaderProjection {
1948 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1949 schema: Arc::new(projection),
1950 };
1951
1952 let batch_stream = file_reader
1953 .read_stream_projected(
1954 lance_io::ReadBatchParams::RangeFull,
1955 1024,
1956 16,
1957 projection.clone(),
1958 FilterExpression::no_filter(),
1959 )
1960 .unwrap();
1961
1962 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1963 verify_expected(
1964 &written_file.data,
1965 batch_stream,
1966 1024,
1967 Some(Box::new(move |batch: &RecordBatch| {
1968 batch.project_by_schema(&projection_arrow).unwrap()
1969 })),
1970 )
1971 .await;
1972 }
1973
1974 #[tokio::test]
1975 async fn test_read_all() {
1976 let fs = FsFixture::default();
1977 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1978 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1979
1980 let file_scheduler = fs
1981 .scheduler
1982 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1983 .await
1984 .unwrap();
1985 let file_reader = FileReader::try_open(
1986 file_scheduler.clone(),
1987 None,
1988 Arc::<DecoderPlugins>::default(),
1989 &test_cache(),
1990 FileReaderOptions::default(),
1991 )
1992 .await
1993 .unwrap();
1994
1995 let batches = file_reader
1996 .read_stream(
1997 lance_io::ReadBatchParams::RangeFull,
1998 total_rows as u32,
1999 16,
2000 FilterExpression::no_filter(),
2001 )
2002 .unwrap()
2003 .try_collect::<Vec<_>>()
2004 .await
2005 .unwrap();
2006 assert_eq!(batches.len(), 1);
2007 assert_eq!(batches[0].num_rows(), total_rows);
2008 }
2009
2010 #[rstest]
2011 #[tokio::test]
2012 async fn test_blocking_take(
2013 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
2014 version: LanceFileVersion,
2015 ) {
2016 let fs = FsFixture::default();
2017 let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2018 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2019
2020 let file_scheduler = fs
2021 .scheduler
2022 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2023 .await
2024 .unwrap();
2025 let file_reader = FileReader::try_open(
2026 file_scheduler.clone(),
2027 Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2028 Arc::<DecoderPlugins>::default(),
2029 &test_cache(),
2030 FileReaderOptions::default(),
2031 )
2032 .await
2033 .unwrap();
2034
2035 let batches = tokio::task::spawn_blocking(move || {
2036 file_reader
2037 .read_stream_projected_blocking(
2038 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2039 total_rows as u32,
2040 None,
2041 FilterExpression::no_filter(),
2042 )
2043 .unwrap()
2044 .collect::<ArrowResult<Vec<_>>>()
2045 .unwrap()
2046 })
2047 .await
2048 .unwrap();
2049
2050 assert_eq!(batches.len(), 1);
2051 assert_eq!(batches[0].num_rows(), 5);
2052 assert_eq!(batches[0].num_columns(), 1);
2053 }
2054
2055 #[tokio::test(flavor = "multi_thread")]
2056 async fn test_drop_in_progress() {
2057 let fs = FsFixture::default();
2058 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2059 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2060
2061 let file_scheduler = fs
2062 .scheduler
2063 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2064 .await
2065 .unwrap();
2066 let file_reader = FileReader::try_open(
2067 file_scheduler.clone(),
2068 None,
2069 Arc::<DecoderPlugins>::default(),
2070 &test_cache(),
2071 FileReaderOptions::default(),
2072 )
2073 .await
2074 .unwrap();
2075
2076 let mut batches = file_reader
2077 .read_stream(
2078 lance_io::ReadBatchParams::RangeFull,
2079 (total_rows / 10) as u32,
2080 16,
2081 FilterExpression::no_filter(),
2082 )
2083 .unwrap();
2084
2085 drop(file_reader);
2086
2087 let batch = batches.next().await.unwrap().unwrap();
2088 assert!(batch.num_rows() > 0);
2089
2090 drop(batches);
2092 }
2093
2094 #[tokio::test]
2095 async fn drop_while_scheduling() {
2096 let fs = FsFixture::default();
2106 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2107 let total_rows = written_file
2108 .data
2109 .iter()
2110 .map(|batch| batch.num_rows())
2111 .sum::<usize>();
2112
2113 let file_scheduler = fs
2114 .scheduler
2115 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2116 .await
2117 .unwrap();
2118 let file_reader = FileReader::try_open(
2119 file_scheduler.clone(),
2120 None,
2121 Arc::<DecoderPlugins>::default(),
2122 &test_cache(),
2123 FileReaderOptions::default(),
2124 )
2125 .await
2126 .unwrap();
2127
2128 let projection =
2129 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2130 let column_infos = file_reader
2131 .collect_columns_from_projection(&projection)
2132 .unwrap();
2133 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2134 &projection.schema,
2135 &projection.column_indices,
2136 &column_infos,
2137 &vec![],
2138 total_rows as u64,
2139 Arc::<DecoderPlugins>::default(),
2140 file_reader.scheduler.clone(),
2141 test_cache(),
2142 &FilterExpression::no_filter(),
2143 &DecoderConfig::default(),
2144 )
2145 .await
2146 .unwrap();
2147
2148 let range = 0..total_rows as u64;
2149
2150 let (tx, rx) = mpsc::unbounded_channel();
2151
2152 drop(rx);
2154
2155 decode_scheduler.schedule_range(
2157 range,
2158 &FilterExpression::no_filter(),
2159 tx,
2160 file_reader.scheduler.clone(),
2161 )
2162 }
2163
2164 #[tokio::test]
2165 async fn test_read_empty_range() {
2166 let fs = FsFixture::default();
2167 create_some_file(&fs, LanceFileVersion::V2_0).await;
2168
2169 let file_scheduler = fs
2170 .scheduler
2171 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2172 .await
2173 .unwrap();
2174 let file_reader = FileReader::try_open(
2175 file_scheduler.clone(),
2176 None,
2177 Arc::<DecoderPlugins>::default(),
2178 &test_cache(),
2179 FileReaderOptions::default(),
2180 )
2181 .await
2182 .unwrap();
2183
2184 let batches = file_reader
2186 .read_stream(
2187 lance_io::ReadBatchParams::Range(0..0),
2188 1024,
2189 16,
2190 FilterExpression::no_filter(),
2191 )
2192 .unwrap()
2193 .try_collect::<Vec<_>>()
2194 .await
2195 .unwrap();
2196
2197 assert_eq!(batches.len(), 0);
2198
2199 let batches = file_reader
2201 .read_stream(
2202 lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2203 1024,
2204 16,
2205 FilterExpression::no_filter(),
2206 )
2207 .unwrap()
2208 .try_collect::<Vec<_>>()
2209 .await
2210 .unwrap();
2211 assert_eq!(batches.len(), 1);
2212 }
2213
2214 #[tokio::test]
2215 async fn test_global_buffers() {
2216 let fs = FsFixture::default();
2217
2218 let lance_schema =
2219 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2220 "foo",
2221 DataType::Int32,
2222 true,
2223 )]))
2224 .unwrap();
2225
2226 let mut file_writer = FileWriter::try_new(
2227 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2228 lance_schema.clone(),
2229 FileWriterOptions::default(),
2230 )
2231 .unwrap();
2232
2233 let test_bytes = Bytes::from_static(b"hello");
2234
2235 let buf_index = file_writer
2236 .add_global_buffer(test_bytes.clone())
2237 .await
2238 .unwrap();
2239
2240 assert_eq!(buf_index, 1);
2241
2242 file_writer.finish().await.unwrap();
2243
2244 let file_scheduler = fs
2245 .scheduler
2246 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2247 .await
2248 .unwrap();
2249 let file_reader = FileReader::try_open(
2250 file_scheduler.clone(),
2251 None,
2252 Arc::<DecoderPlugins>::default(),
2253 &test_cache(),
2254 FileReaderOptions::default(),
2255 )
2256 .await
2257 .unwrap();
2258
2259 let buf = file_reader.read_global_buffer(1).await.unwrap();
2260 assert_eq!(buf, test_bytes);
2261 }
2262}