1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19 decoder::{
20 schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21 DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22 SchedulerDecoderConfig,
23 },
24 encoder::EncodedBatch,
25 version::LanceFileVersion,
26 EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36 Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41 scheduler::FileScheduler,
42 stream::{RecordBatchStream, RecordBatchStreamAdapter},
43 ReadBatchParams,
44};
45
46use crate::{
47 datatypes::{Fields, FieldsWithMeta},
48 format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49 io::LanceEncodingsIo,
50 writer::PAGE_BUFFER_ALIGNMENT,
51};
52
53pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
56
57#[derive(Debug, DeepSizeOf)]
62pub struct BufferDescriptor {
63 pub position: u64,
64 pub size: u64,
65}
66
67#[derive(Debug)]
69pub struct FileStatistics {
70 pub columns: Vec<ColumnStatistics>,
72}
73
74#[derive(Debug)]
76pub struct ColumnStatistics {
77 pub num_pages: usize,
79 pub size_bytes: u64,
83}
84
85#[derive(Debug)]
87pub struct CachedFileMetadata {
88 pub file_schema: Arc<Schema>,
90 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
92 pub column_infos: Vec<Arc<ColumnInfo>>,
93 pub num_rows: u64,
95 pub file_buffers: Vec<BufferDescriptor>,
96 pub num_data_bytes: u64,
98 pub num_column_metadata_bytes: u64,
101 pub num_global_buffer_bytes: u64,
103 pub num_footer_bytes: u64,
105 pub major_version: u16,
106 pub minor_version: u16,
107}
108
109impl DeepSizeOf for CachedFileMetadata {
110 fn deep_size_of_children(&self, context: &mut Context) -> usize {
112 self.file_schema.deep_size_of_children(context)
113 + self
114 .file_buffers
115 .iter()
116 .map(|file_buffer| file_buffer.deep_size_of_children(context))
117 .sum::<usize>()
118 }
119}
120
121impl CachedFileMetadata {
122 pub fn version(&self) -> LanceFileVersion {
123 match (self.major_version, self.minor_version) {
124 (0, 3) => LanceFileVersion::V2_0,
125 (2, 0) => LanceFileVersion::V2_0,
126 (2, 1) => LanceFileVersion::V2_1,
127 (2, 2) => LanceFileVersion::V2_2,
128 _ => panic!(
129 "Unsupported version: {}.{}",
130 self.major_version, self.minor_version
131 ),
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
158pub struct ReaderProjection {
159 pub schema: Arc<Schema>,
162 pub column_indices: Vec<u32>,
202}
203
204impl ReaderProjection {
205 fn from_field_ids_helper<'a>(
206 file_version: LanceFileVersion,
207 fields: impl Iterator<Item = &'a Field>,
208 field_id_to_column_index: &BTreeMap<u32, u32>,
209 column_indices: &mut Vec<u32>,
210 ) -> Result<()> {
211 for field in fields {
212 let is_structural = file_version >= LanceFileVersion::V2_1;
213 if !is_structural
216 || field.children.is_empty()
217 || field.is_blob()
218 || field.is_packed_struct()
219 {
220 if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
221 {
222 column_indices.push(column_idx);
223 }
224 }
225 if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
227 Self::from_field_ids_helper(
228 file_version,
229 field.children.iter(),
230 field_id_to_column_index,
231 column_indices,
232 )?;
233 }
234 }
235 Ok(())
236 }
237
238 pub fn from_field_ids(
243 file_version: LanceFileVersion,
244 schema: &Schema,
245 field_id_to_column_index: &BTreeMap<u32, u32>,
246 ) -> Result<Self> {
247 let mut column_indices = Vec::new();
248 Self::from_field_ids_helper(
249 file_version,
250 schema.fields.iter(),
251 field_id_to_column_index,
252 &mut column_indices,
253 )?;
254 Ok(Self {
255 schema: Arc::new(schema.clone()),
256 column_indices,
257 })
258 }
259
260 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
268 let schema = Arc::new(schema.clone());
269 let is_structural = version >= LanceFileVersion::V2_1;
270 let mut column_indices = vec![];
271 let mut curr_column_idx = 0;
272 let mut packed_struct_fields_num = 0;
273 for field in schema.fields_pre_order() {
274 if packed_struct_fields_num > 0 {
275 packed_struct_fields_num -= 1;
276 continue;
277 }
278 if field.is_packed_struct() {
279 column_indices.push(curr_column_idx);
280 curr_column_idx += 1;
281 packed_struct_fields_num = field.children.len();
282 } else if field.children.is_empty() || !is_structural {
283 column_indices.push(curr_column_idx);
284 curr_column_idx += 1;
285 }
286 }
287 Self {
288 schema,
289 column_indices,
290 }
291 }
292
293 pub fn from_column_names(
300 file_version: LanceFileVersion,
301 schema: &Schema,
302 column_names: &[&str],
303 ) -> Result<Self> {
304 let field_id_to_column_index = schema
305 .fields_pre_order()
306 .filter(|field| {
309 file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
310 })
311 .enumerate()
312 .map(|(idx, field)| (field.id as u32, idx as u32))
313 .collect::<BTreeMap<_, _>>();
314 let projected = schema.project(column_names)?;
315 let mut column_indices = Vec::new();
316 Self::from_field_ids_helper(
317 file_version,
318 projected.fields.iter(),
319 &field_id_to_column_index,
320 &mut column_indices,
321 )?;
322 Ok(Self {
323 schema: Arc::new(projected),
324 column_indices,
325 })
326 }
327}
328
329#[derive(Clone, Debug)]
331pub struct FileReaderOptions {
332 pub decoder_config: DecoderConfig,
333 pub read_chunk_size: u64,
337}
338
339impl Default for FileReaderOptions {
340 fn default() -> Self {
341 Self {
342 decoder_config: DecoderConfig::default(),
343 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
344 }
345 }
346}
347
348#[derive(Debug)]
349pub struct FileReader {
350 scheduler: Arc<dyn EncodingsIo>,
351 base_projection: ReaderProjection,
353 num_rows: u64,
354 metadata: Arc<CachedFileMetadata>,
355 decoder_plugins: Arc<DecoderPlugins>,
356 cache: Arc<LanceCache>,
357 options: FileReaderOptions,
358}
359#[derive(Debug)]
360struct Footer {
361 #[allow(dead_code)]
362 column_meta_start: u64,
363 #[allow(dead_code)]
366 column_meta_offsets_start: u64,
367 global_buff_offsets_start: u64,
368 num_global_buffers: u32,
369 num_columns: u32,
370 major_version: u16,
371 minor_version: u16,
372}
373
374const FOOTER_LEN: usize = 40;
375
376impl FileReader {
377 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
378 Self {
379 scheduler,
380 base_projection: self.base_projection.clone(),
381 cache: self.cache.clone(),
382 decoder_plugins: self.decoder_plugins.clone(),
383 metadata: self.metadata.clone(),
384 options: self.options.clone(),
385 num_rows: self.num_rows,
386 }
387 }
388
389 pub fn num_rows(&self) -> u64 {
390 self.num_rows
391 }
392
393 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
394 &self.metadata
395 }
396
397 pub fn file_statistics(&self) -> FileStatistics {
398 let column_metadatas = &self.metadata().column_metadatas;
399
400 let column_stats = column_metadatas
401 .iter()
402 .map(|col_metadata| {
403 let num_pages = col_metadata.pages.len();
404 let size_bytes = col_metadata
405 .pages
406 .iter()
407 .map(|page| page.buffer_sizes.iter().sum::<u64>())
408 .sum::<u64>();
409 ColumnStatistics {
410 num_pages,
411 size_bytes,
412 }
413 })
414 .collect();
415
416 FileStatistics {
417 columns: column_stats,
418 }
419 }
420
421 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
422 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
423 self.scheduler
424 .submit_single(
425 buffer_desc.position..buffer_desc.position + buffer_desc.size,
426 0,
427 )
428 .await
429 }
430
431 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
432 let file_size = scheduler.reader().size().await? as u64;
433 let begin = if file_size < scheduler.reader().block_size() as u64 {
434 0
435 } else {
436 file_size - scheduler.reader().block_size() as u64
437 };
438 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
439 Ok((tail_bytes, file_size))
440 }
441
442 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
445 let len = footer_bytes.len();
446 if len < FOOTER_LEN {
447 return Err(Error::io(
448 format!(
449 "does not have sufficient data, len: {}, bytes: {:?}",
450 len, footer_bytes
451 ),
452 location!(),
453 ));
454 }
455 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
456
457 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
458 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
459 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
460 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
461 let num_columns = cursor.read_u32::<LittleEndian>()?;
462 let major_version = cursor.read_u16::<LittleEndian>()?;
463 let minor_version = cursor.read_u16::<LittleEndian>()?;
464
465 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
466 return Err(Error::version_conflict(
467 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
468 major_version,
469 minor_version,
470 location!(),
471 ));
472 }
473
474 let magic_bytes = footer_bytes.slice(len - 4..);
475 if magic_bytes.as_ref() != MAGIC {
476 return Err(Error::io(
477 format!(
478 "file does not appear to be a Lance file (invalid magic: {:?})",
479 MAGIC
480 ),
481 location!(),
482 ));
483 }
484 Ok(Footer {
485 column_meta_start,
486 column_meta_offsets_start,
487 global_buff_offsets_start,
488 num_global_buffers,
489 num_columns,
490 major_version,
491 minor_version,
492 })
493 }
494
495 fn read_all_column_metadata(
497 column_metadata_bytes: Bytes,
498 footer: &Footer,
499 ) -> Result<Vec<pbfile::ColumnMetadata>> {
500 let column_metadata_start = footer.column_meta_start;
501 let cmo_table_size = 16 * footer.num_columns as usize;
503 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
504
505 (0..footer.num_columns)
506 .map(|col_idx| {
507 let offset = (col_idx * 16) as usize;
508 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
509 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
510 let normalized_position = (position - column_metadata_start) as usize;
511 let normalized_end = normalized_position + (length as usize);
512 Ok(pbfile::ColumnMetadata::decode(
513 &column_metadata_bytes[normalized_position..normalized_end],
514 )?)
515 })
516 .collect::<Result<Vec<_>>>()
517 }
518
519 async fn optimistic_tail_read(
520 data: &Bytes,
521 start_pos: u64,
522 scheduler: &FileScheduler,
523 file_len: u64,
524 ) -> Result<Bytes> {
525 let num_bytes_needed = (file_len - start_pos) as usize;
526 if data.len() >= num_bytes_needed {
527 Ok(data.slice((data.len() - num_bytes_needed)..))
528 } else {
529 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
530 let start = file_len - num_bytes_needed as u64;
531 let missing_bytes = scheduler
532 .submit_single(start..start + num_bytes_missing, 0)
533 .await?;
534 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
535 combined.extend(missing_bytes);
536 combined.extend(data);
537 Ok(combined.freeze())
538 }
539 }
540
541 fn do_decode_gbo_table(
542 gbo_bytes: &Bytes,
543 footer: &Footer,
544 version: LanceFileVersion,
545 ) -> Result<Vec<BufferDescriptor>> {
546 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
547
548 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
549 for _ in 0..footer.num_global_buffers {
550 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
551 assert!(
552 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
553 );
554 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
555 global_buffers.push(BufferDescriptor {
556 position: buf_pos,
557 size: buf_size,
558 });
559 }
560
561 Ok(global_buffers)
562 }
563
564 async fn decode_gbo_table(
565 tail_bytes: &Bytes,
566 file_len: u64,
567 scheduler: &FileScheduler,
568 footer: &Footer,
569 version: LanceFileVersion,
570 ) -> Result<Vec<BufferDescriptor>> {
571 let gbo_bytes = Self::optimistic_tail_read(
574 tail_bytes,
575 footer.global_buff_offsets_start,
576 scheduler,
577 file_len,
578 )
579 .await?;
580 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
581 }
582
583 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
584 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
585 let pb_schema = file_descriptor.schema.unwrap();
586 let num_rows = file_descriptor.length;
587 let fields_with_meta = FieldsWithMeta {
588 fields: Fields(pb_schema.fields),
589 metadata: pb_schema.metadata,
590 };
591 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
592 Ok((num_rows, schema))
593 }
594
595 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
609 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
611 let footer = Self::decode_footer(&tail_bytes)?;
612
613 let file_version = LanceFileVersion::try_from_major_minor(
614 footer.major_version as u32,
615 footer.minor_version as u32,
616 )?;
617
618 let gbo_table =
619 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
620 if gbo_table.is_empty() {
621 return Err(Error::Internal {
622 message: "File did not contain any global buffers, schema expected".to_string(),
623 location: location!(),
624 });
625 }
626 let schema_start = gbo_table[0].position;
627 let schema_size = gbo_table[0].size;
628
629 let num_footer_bytes = file_len - schema_start;
630
631 let all_metadata_bytes =
634 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
635
636 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
637 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
638
639 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
642 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
643 let column_metadata_bytes =
644 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
645 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
646
647 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
648 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
649 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
650
651 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
652
653 Ok(CachedFileMetadata {
654 file_schema: Arc::new(schema),
655 column_metadatas,
656 column_infos,
657 num_rows,
658 num_data_bytes,
659 num_column_metadata_bytes,
660 num_global_buffer_bytes,
661 num_footer_bytes,
662 file_buffers: gbo_table,
663 major_version: footer.major_version,
664 minor_version: footer.minor_version,
665 })
666 }
667
668 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
669 match &encoding.location {
670 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
671 Some(pbfile::encoding::Location::Direct(encoding)) => {
672 let encoding_buf = Bytes::from(encoding.encoding.clone());
673 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
674 encoding_any.to_msg::<M>().unwrap()
675 }
676 Some(pbfile::encoding::Location::None(_)) => panic!(),
677 None => panic!(),
678 }
679 }
680
681 fn meta_to_col_infos(
682 column_metadatas: &[pbfile::ColumnMetadata],
683 file_version: LanceFileVersion,
684 ) -> Vec<Arc<ColumnInfo>> {
685 column_metadatas
686 .iter()
687 .enumerate()
688 .map(|(col_idx, col_meta)| {
689 let page_infos = col_meta
690 .pages
691 .iter()
692 .map(|page| {
693 let num_rows = page.length;
694 let encoding = match file_version {
695 LanceFileVersion::V2_0 => {
696 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
697 page.encoding.as_ref().unwrap(),
698 ))
699 }
700 _ => PageEncoding::Structural(Self::fetch_encoding::<
701 pbenc21::PageLayout,
702 >(
703 page.encoding.as_ref().unwrap()
704 )),
705 };
706 let buffer_offsets_and_sizes = Arc::from(
707 page.buffer_offsets
708 .iter()
709 .zip(page.buffer_sizes.iter())
710 .map(|(offset, size)| {
711 assert!(
713 file_version < LanceFileVersion::V2_1
714 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
715 );
716 (*offset, *size)
717 })
718 .collect::<Vec<_>>(),
719 );
720 PageInfo {
721 buffer_offsets_and_sizes,
722 encoding,
723 num_rows,
724 priority: page.priority,
725 }
726 })
727 .collect::<Vec<_>>();
728 let buffer_offsets_and_sizes = Arc::from(
729 col_meta
730 .buffer_offsets
731 .iter()
732 .zip(col_meta.buffer_sizes.iter())
733 .map(|(offset, size)| (*offset, *size))
734 .collect::<Vec<_>>(),
735 );
736 Arc::new(ColumnInfo {
737 index: col_idx as u32,
738 page_infos: Arc::from(page_infos),
739 buffer_offsets_and_sizes,
740 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
741 })
742 })
743 .collect::<Vec<_>>()
744 }
745
746 fn validate_projection(
747 projection: &ReaderProjection,
748 metadata: &CachedFileMetadata,
749 ) -> Result<()> {
750 if projection.schema.fields.is_empty() {
751 return Err(Error::invalid_input(
752 "Attempt to read zero columns from the file, at least one column must be specified"
753 .to_string(),
754 location!(),
755 ));
756 }
757 let mut column_indices_seen = BTreeSet::new();
758 for column_index in &projection.column_indices {
759 if !column_indices_seen.insert(*column_index) {
760 return Err(Error::invalid_input(
761 format!(
762 "The projection specified the column index {} more than once",
763 column_index
764 ),
765 location!(),
766 ));
767 }
768 if *column_index >= metadata.column_infos.len() as u32 {
769 return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
770 }
771 }
772 Ok(())
773 }
774
775 pub async fn try_open(
782 scheduler: FileScheduler,
783 base_projection: Option<ReaderProjection>,
784 decoder_plugins: Arc<DecoderPlugins>,
785 cache: &LanceCache,
786 options: FileReaderOptions,
787 ) -> Result<Self> {
788 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
789 let path = scheduler.reader().path().clone();
790
791 let encodings_io =
793 LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
794
795 Self::try_open_with_file_metadata(
796 Arc::new(encodings_io),
797 path,
798 base_projection,
799 decoder_plugins,
800 file_metadata,
801 cache,
802 options,
803 )
804 .await
805 }
806
807 pub async fn try_open_with_file_metadata(
813 scheduler: Arc<dyn EncodingsIo>,
814 path: Path,
815 base_projection: Option<ReaderProjection>,
816 decoder_plugins: Arc<DecoderPlugins>,
817 file_metadata: Arc<CachedFileMetadata>,
818 cache: &LanceCache,
819 options: FileReaderOptions,
820 ) -> Result<Self> {
821 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
822
823 if let Some(base_projection) = base_projection.as_ref() {
824 Self::validate_projection(base_projection, &file_metadata)?;
825 }
826 let num_rows = file_metadata.num_rows;
827 Ok(Self {
828 scheduler,
829 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
830 file_metadata.file_schema.as_ref(),
831 file_metadata.version(),
832 )),
833 num_rows,
834 metadata: file_metadata,
835 decoder_plugins,
836 cache,
837 options,
838 })
839 }
840
841 fn collect_columns_from_projection(
855 &self,
856 _projection: &ReaderProjection,
857 ) -> Result<Vec<Arc<ColumnInfo>>> {
858 Ok(self.metadata.column_infos.to_vec())
859 }
860
861 #[allow(clippy::too_many_arguments)]
862 fn do_read_range(
863 column_infos: Vec<Arc<ColumnInfo>>,
864 io: Arc<dyn EncodingsIo>,
865 cache: Arc<LanceCache>,
866 num_rows: u64,
867 decoder_plugins: Arc<DecoderPlugins>,
868 range: Range<u64>,
869 batch_size: u32,
870 projection: ReaderProjection,
871 filter: FilterExpression,
872 decoder_config: DecoderConfig,
873 ) -> Result<BoxStream<'static, ReadBatchTask>> {
874 debug!(
875 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
876 range,
877 batch_size,
878 num_rows,
879 column_infos.len(),
880 projection.schema.fields.len(),
881 );
882
883 let config = SchedulerDecoderConfig {
884 batch_size,
885 cache,
886 decoder_plugins,
887 io,
888 decoder_config,
889 };
890
891 let requested_rows = RequestedRows::Ranges(vec![range]);
892
893 Ok(schedule_and_decode(
894 column_infos,
895 requested_rows,
896 filter,
897 projection.column_indices,
898 projection.schema,
899 config,
900 ))
901 }
902
903 fn read_range(
904 &self,
905 range: Range<u64>,
906 batch_size: u32,
907 projection: ReaderProjection,
908 filter: FilterExpression,
909 ) -> Result<BoxStream<'static, ReadBatchTask>> {
910 Self::do_read_range(
912 self.collect_columns_from_projection(&projection)?,
913 self.scheduler.clone(),
914 self.cache.clone(),
915 self.num_rows,
916 self.decoder_plugins.clone(),
917 range,
918 batch_size,
919 projection,
920 filter,
921 self.options.decoder_config.clone(),
922 )
923 }
924
925 #[allow(clippy::too_many_arguments)]
926 fn do_take_rows(
927 column_infos: Vec<Arc<ColumnInfo>>,
928 io: Arc<dyn EncodingsIo>,
929 cache: Arc<LanceCache>,
930 decoder_plugins: Arc<DecoderPlugins>,
931 indices: Vec<u64>,
932 batch_size: u32,
933 projection: ReaderProjection,
934 filter: FilterExpression,
935 decoder_config: DecoderConfig,
936 ) -> Result<BoxStream<'static, ReadBatchTask>> {
937 debug!(
938 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
939 indices.len(),
940 indices[0],
941 indices[indices.len() - 1],
942 batch_size,
943 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
944 );
945
946 let config = SchedulerDecoderConfig {
947 batch_size,
948 cache,
949 decoder_plugins,
950 io,
951 decoder_config,
952 };
953
954 let requested_rows = RequestedRows::Indices(indices);
955
956 Ok(schedule_and_decode(
957 column_infos,
958 requested_rows,
959 filter,
960 projection.column_indices,
961 projection.schema,
962 config,
963 ))
964 }
965
966 fn take_rows(
967 &self,
968 indices: Vec<u64>,
969 batch_size: u32,
970 projection: ReaderProjection,
971 ) -> Result<BoxStream<'static, ReadBatchTask>> {
972 Self::do_take_rows(
974 self.collect_columns_from_projection(&projection)?,
975 self.scheduler.clone(),
976 self.cache.clone(),
977 self.decoder_plugins.clone(),
978 indices,
979 batch_size,
980 projection,
981 FilterExpression::no_filter(),
982 self.options.decoder_config.clone(),
983 )
984 }
985
986 #[allow(clippy::too_many_arguments)]
987 fn do_read_ranges(
988 column_infos: Vec<Arc<ColumnInfo>>,
989 io: Arc<dyn EncodingsIo>,
990 cache: Arc<LanceCache>,
991 decoder_plugins: Arc<DecoderPlugins>,
992 ranges: Vec<Range<u64>>,
993 batch_size: u32,
994 projection: ReaderProjection,
995 filter: FilterExpression,
996 decoder_config: DecoderConfig,
997 ) -> Result<BoxStream<'static, ReadBatchTask>> {
998 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
999 debug!(
1000 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1001 ranges.len(),
1002 num_rows,
1003 ranges[0].start,
1004 ranges[ranges.len() - 1].end,
1005 batch_size,
1006 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1007 );
1008
1009 let config = SchedulerDecoderConfig {
1010 batch_size,
1011 cache,
1012 decoder_plugins,
1013 io,
1014 decoder_config,
1015 };
1016
1017 let requested_rows = RequestedRows::Ranges(ranges);
1018
1019 Ok(schedule_and_decode(
1020 column_infos,
1021 requested_rows,
1022 filter,
1023 projection.column_indices,
1024 projection.schema,
1025 config,
1026 ))
1027 }
1028
1029 fn read_ranges(
1030 &self,
1031 ranges: Vec<Range<u64>>,
1032 batch_size: u32,
1033 projection: ReaderProjection,
1034 filter: FilterExpression,
1035 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1036 Self::do_read_ranges(
1037 self.collect_columns_from_projection(&projection)?,
1038 self.scheduler.clone(),
1039 self.cache.clone(),
1040 self.decoder_plugins.clone(),
1041 ranges,
1042 batch_size,
1043 projection,
1044 filter,
1045 self.options.decoder_config.clone(),
1046 )
1047 }
1048
1049 pub fn read_tasks(
1060 &self,
1061 params: ReadBatchParams,
1062 batch_size: u32,
1063 projection: Option<ReaderProjection>,
1064 filter: FilterExpression,
1065 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1066 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1067 Self::validate_projection(&projection, &self.metadata)?;
1068 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1069 if bound > self.num_rows || bound == self.num_rows && inclusive {
1070 Err(Error::invalid_input(
1071 format!(
1072 "cannot read {:?} from file with {} rows",
1073 params, self.num_rows
1074 ),
1075 location!(),
1076 ))
1077 } else {
1078 Ok(())
1079 }
1080 };
1081 match ¶ms {
1082 ReadBatchParams::Indices(indices) => {
1083 for idx in indices {
1084 match idx {
1085 None => {
1086 return Err(Error::invalid_input(
1087 "Null value in indices array",
1088 location!(),
1089 ));
1090 }
1091 Some(idx) => {
1092 verify_bound(¶ms, idx as u64, true)?;
1093 }
1094 }
1095 }
1096 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1097 self.take_rows(indices, batch_size, projection)
1098 }
1099 ReadBatchParams::Range(range) => {
1100 verify_bound(¶ms, range.end as u64, false)?;
1101 self.read_range(
1102 range.start as u64..range.end as u64,
1103 batch_size,
1104 projection,
1105 filter,
1106 )
1107 }
1108 ReadBatchParams::Ranges(ranges) => {
1109 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1110 for range in ranges.as_ref() {
1111 verify_bound(¶ms, range.end, false)?;
1112 ranges_u64.push(range.start..range.end);
1113 }
1114 self.read_ranges(ranges_u64, batch_size, projection, filter)
1115 }
1116 ReadBatchParams::RangeFrom(range) => {
1117 verify_bound(¶ms, range.start as u64, true)?;
1118 self.read_range(
1119 range.start as u64..self.num_rows,
1120 batch_size,
1121 projection,
1122 filter,
1123 )
1124 }
1125 ReadBatchParams::RangeTo(range) => {
1126 verify_bound(¶ms, range.end as u64, false)?;
1127 self.read_range(0..range.end as u64, batch_size, projection, filter)
1128 }
1129 ReadBatchParams::RangeFull => {
1130 self.read_range(0..self.num_rows, batch_size, projection, filter)
1131 }
1132 }
1133 }
1134
1135 pub fn read_stream_projected(
1157 &self,
1158 params: ReadBatchParams,
1159 batch_size: u32,
1160 batch_readahead: u32,
1161 projection: ReaderProjection,
1162 filter: FilterExpression,
1163 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1164 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1165 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1166 let batch_stream = tasks_stream
1167 .map(|task| task.task)
1168 .buffered(batch_readahead as usize)
1169 .boxed();
1170 Ok(Box::pin(RecordBatchStreamAdapter::new(
1171 arrow_schema,
1172 batch_stream,
1173 )))
1174 }
1175
1176 fn take_rows_blocking(
1177 &self,
1178 indices: Vec<u64>,
1179 batch_size: u32,
1180 projection: ReaderProjection,
1181 filter: FilterExpression,
1182 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1183 let column_infos = self.collect_columns_from_projection(&projection)?;
1184 debug!(
1185 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1186 indices.len(),
1187 indices[0],
1188 indices[indices.len() - 1],
1189 batch_size,
1190 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1191 );
1192
1193 let config = SchedulerDecoderConfig {
1194 batch_size,
1195 cache: self.cache.clone(),
1196 decoder_plugins: self.decoder_plugins.clone(),
1197 io: self.scheduler.clone(),
1198 decoder_config: self.options.decoder_config.clone(),
1199 };
1200
1201 let requested_rows = RequestedRows::Indices(indices);
1202
1203 schedule_and_decode_blocking(
1204 column_infos,
1205 requested_rows,
1206 filter,
1207 projection.column_indices,
1208 projection.schema,
1209 config,
1210 )
1211 }
1212
1213 fn read_ranges_blocking(
1214 &self,
1215 ranges: Vec<Range<u64>>,
1216 batch_size: u32,
1217 projection: ReaderProjection,
1218 filter: FilterExpression,
1219 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1220 let column_infos = self.collect_columns_from_projection(&projection)?;
1221 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1222 debug!(
1223 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1224 ranges.len(),
1225 num_rows,
1226 ranges[0].start,
1227 ranges[ranges.len() - 1].end,
1228 batch_size,
1229 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1230 );
1231
1232 let config = SchedulerDecoderConfig {
1233 batch_size,
1234 cache: self.cache.clone(),
1235 decoder_plugins: self.decoder_plugins.clone(),
1236 io: self.scheduler.clone(),
1237 decoder_config: self.options.decoder_config.clone(),
1238 };
1239
1240 let requested_rows = RequestedRows::Ranges(ranges);
1241
1242 schedule_and_decode_blocking(
1243 column_infos,
1244 requested_rows,
1245 filter,
1246 projection.column_indices,
1247 projection.schema,
1248 config,
1249 )
1250 }
1251
1252 fn read_range_blocking(
1253 &self,
1254 range: Range<u64>,
1255 batch_size: u32,
1256 projection: ReaderProjection,
1257 filter: FilterExpression,
1258 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1259 let column_infos = self.collect_columns_from_projection(&projection)?;
1260 let num_rows = self.num_rows;
1261
1262 debug!(
1263 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1264 range,
1265 batch_size,
1266 num_rows,
1267 column_infos.len(),
1268 projection.schema.fields.len(),
1269 );
1270
1271 let config = SchedulerDecoderConfig {
1272 batch_size,
1273 cache: self.cache.clone(),
1274 decoder_plugins: self.decoder_plugins.clone(),
1275 io: self.scheduler.clone(),
1276 decoder_config: self.options.decoder_config.clone(),
1277 };
1278
1279 let requested_rows = RequestedRows::Ranges(vec![range]);
1280
1281 schedule_and_decode_blocking(
1282 column_infos,
1283 requested_rows,
1284 filter,
1285 projection.column_indices,
1286 projection.schema,
1287 config,
1288 )
1289 }
1290
1291 pub fn read_stream_projected_blocking(
1303 &self,
1304 params: ReadBatchParams,
1305 batch_size: u32,
1306 projection: Option<ReaderProjection>,
1307 filter: FilterExpression,
1308 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1309 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1310 Self::validate_projection(&projection, &self.metadata)?;
1311 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1312 if bound > self.num_rows || bound == self.num_rows && inclusive {
1313 Err(Error::invalid_input(
1314 format!(
1315 "cannot read {:?} from file with {} rows",
1316 params, self.num_rows
1317 ),
1318 location!(),
1319 ))
1320 } else {
1321 Ok(())
1322 }
1323 };
1324 match ¶ms {
1325 ReadBatchParams::Indices(indices) => {
1326 for idx in indices {
1327 match idx {
1328 None => {
1329 return Err(Error::invalid_input(
1330 "Null value in indices array",
1331 location!(),
1332 ));
1333 }
1334 Some(idx) => {
1335 verify_bound(¶ms, idx as u64, true)?;
1336 }
1337 }
1338 }
1339 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1340 self.take_rows_blocking(indices, batch_size, projection, filter)
1341 }
1342 ReadBatchParams::Range(range) => {
1343 verify_bound(¶ms, range.end as u64, false)?;
1344 self.read_range_blocking(
1345 range.start as u64..range.end as u64,
1346 batch_size,
1347 projection,
1348 filter,
1349 )
1350 }
1351 ReadBatchParams::Ranges(ranges) => {
1352 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1353 for range in ranges.as_ref() {
1354 verify_bound(¶ms, range.end, false)?;
1355 ranges_u64.push(range.start..range.end);
1356 }
1357 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1358 }
1359 ReadBatchParams::RangeFrom(range) => {
1360 verify_bound(¶ms, range.start as u64, true)?;
1361 self.read_range_blocking(
1362 range.start as u64..self.num_rows,
1363 batch_size,
1364 projection,
1365 filter,
1366 )
1367 }
1368 ReadBatchParams::RangeTo(range) => {
1369 verify_bound(¶ms, range.end as u64, false)?;
1370 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1371 }
1372 ReadBatchParams::RangeFull => {
1373 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1374 }
1375 }
1376 }
1377
1378 pub fn read_stream(
1384 &self,
1385 params: ReadBatchParams,
1386 batch_size: u32,
1387 batch_readahead: u32,
1388 filter: FilterExpression,
1389 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1390 self.read_stream_projected(
1391 params,
1392 batch_size,
1393 batch_readahead,
1394 self.base_projection.clone(),
1395 filter,
1396 )
1397 }
1398
1399 pub fn schema(&self) -> &Arc<Schema> {
1400 &self.metadata.file_schema
1401 }
1402}
1403
1404pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1406 if let Some(encoding) = &page.encoding {
1407 if let Some(style) = &encoding.location {
1408 match style {
1409 pbfile::encoding::Location::Indirect(indirect) => {
1410 format!(
1411 "IndirectEncoding(pos={},size={})",
1412 indirect.buffer_location, indirect.buffer_length
1413 )
1414 }
1415 pbfile::encoding::Location::Direct(direct) => {
1416 let encoding_any =
1417 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1418 .expect("failed to deserialize encoding as protobuf");
1419 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1420 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1421 match encoding {
1422 Ok(encoding) => {
1423 format!("{:#?}", encoding)
1424 }
1425 Err(err) => {
1426 format!("Unsupported(decode_err={})", err)
1427 }
1428 }
1429 } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1430 let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1431 match encoding {
1432 Ok(encoding) => {
1433 format!("{:#?}", encoding)
1434 }
1435 Err(err) => {
1436 format!("Unsupported(decode_err={})", err)
1437 }
1438 }
1439 } else {
1440 format!("Unrecognized(type_url={})", encoding_any.type_url)
1441 }
1442 }
1443 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1444 }
1445 } else {
1446 "MISSING STYLE".to_string()
1447 }
1448 } else {
1449 "MISSING".to_string()
1450 }
1451}
1452
1453pub trait EncodedBatchReaderExt {
1454 fn try_from_mini_lance(
1455 bytes: Bytes,
1456 schema: &Schema,
1457 version: LanceFileVersion,
1458 ) -> Result<Self>
1459 where
1460 Self: Sized;
1461 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1462 where
1463 Self: Sized;
1464}
1465
1466impl EncodedBatchReaderExt for EncodedBatch {
1467 fn try_from_mini_lance(
1468 bytes: Bytes,
1469 schema: &Schema,
1470 file_version: LanceFileVersion,
1471 ) -> Result<Self>
1472 where
1473 Self: Sized,
1474 {
1475 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1476 let footer = FileReader::decode_footer(&bytes)?;
1477
1478 let column_metadata_start = footer.column_meta_start as usize;
1481 let column_metadata_end = footer.global_buff_offsets_start as usize;
1482 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1483 let column_metadatas =
1484 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1485
1486 let file_version = LanceFileVersion::try_from_major_minor(
1487 footer.major_version as u32,
1488 footer.minor_version as u32,
1489 )?;
1490
1491 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1492
1493 Ok(Self {
1494 data: bytes,
1495 num_rows: page_table
1496 .first()
1497 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1498 .unwrap_or(0),
1499 page_table,
1500 top_level_columns: projection.column_indices,
1501 schema: Arc::new(schema.clone()),
1502 })
1503 }
1504
1505 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1506 where
1507 Self: Sized,
1508 {
1509 let footer = FileReader::decode_footer(&bytes)?;
1510 let file_version = LanceFileVersion::try_from_major_minor(
1511 footer.major_version as u32,
1512 footer.minor_version as u32,
1513 )?;
1514
1515 let gbo_table = FileReader::do_decode_gbo_table(
1516 &bytes.slice(footer.global_buff_offsets_start as usize..),
1517 &footer,
1518 file_version,
1519 )?;
1520 if gbo_table.is_empty() {
1521 return Err(Error::Internal {
1522 message: "File did not contain any global buffers, schema expected".to_string(),
1523 location: location!(),
1524 });
1525 }
1526 let schema_start = gbo_table[0].position as usize;
1527 let schema_size = gbo_table[0].size as usize;
1528
1529 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1530 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1531 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1532
1533 let column_metadata_start = footer.column_meta_start as usize;
1536 let column_metadata_end = footer.global_buff_offsets_start as usize;
1537 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1538 let column_metadatas =
1539 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1540
1541 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1542
1543 Ok(Self {
1544 data: bytes,
1545 num_rows: page_table
1546 .first()
1547 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1548 .unwrap_or(0),
1549 page_table,
1550 top_level_columns: projection.column_indices,
1551 schema: Arc::new(schema),
1552 })
1553 }
1554}
1555
1556#[cfg(test)]
1557pub mod tests {
1558 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1559
1560 use arrow_array::{
1561 types::{Float64Type, Int32Type},
1562 RecordBatch, UInt32Array,
1563 };
1564 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1565 use bytes::Bytes;
1566 use futures::{prelude::stream::TryStreamExt, StreamExt};
1567 use lance_arrow::RecordBatchExt;
1568 use lance_core::{datatypes::Schema, ArrowResult};
1569 use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1570 use lance_encoding::{
1571 decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1572 encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1573 version::LanceFileVersion,
1574 };
1575 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1576 use log::debug;
1577 use rstest::rstest;
1578 use tokio::sync::mpsc;
1579
1580 use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1581 use crate::testing::{test_cache, write_lance_file, FsFixture, WrittenFile};
1582 use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1583 use lance_encoding::decoder::DecoderConfig;
1584
1585 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1586 let location_type = DataType::Struct(Fields::from(vec![
1587 Field::new("x", DataType::Float64, true),
1588 Field::new("y", DataType::Float64, true),
1589 ]));
1590 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1591
1592 let mut reader = gen_batch()
1593 .col("score", array::rand::<Float64Type>())
1594 .col("location", array::rand_type(&location_type))
1595 .col("categories", array::rand_type(&categories_type))
1596 .col("binary", array::rand_type(&DataType::Binary));
1597 if version <= LanceFileVersion::V2_0 {
1598 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1599 }
1600 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1601
1602 write_lance_file(
1603 reader,
1604 fs,
1605 FileWriterOptions {
1606 format_version: Some(version),
1607 ..Default::default()
1608 },
1609 )
1610 .await
1611 }
1612
1613 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1614
1615 async fn verify_expected(
1616 expected: &[RecordBatch],
1617 mut actual: Pin<Box<dyn RecordBatchStream>>,
1618 read_size: u32,
1619 transform: Option<Transformer>,
1620 ) {
1621 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1622 let mut expected_iter = expected.iter().map(|batch| {
1623 if let Some(transform) = &transform {
1624 transform(batch)
1625 } else {
1626 batch.clone()
1627 }
1628 });
1629 let mut next_expected = expected_iter.next().unwrap().clone();
1630 while let Some(actual) = actual.next().await {
1631 let mut actual = actual.unwrap();
1632 let mut rows_to_verify = actual.num_rows() as u32;
1633 let expected_length = remaining.min(read_size);
1634 assert_eq!(expected_length, rows_to_verify);
1635
1636 while rows_to_verify > 0 {
1637 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1638 assert_eq!(
1639 next_expected.slice(0, next_slice_len as usize),
1640 actual.slice(0, next_slice_len as usize)
1641 );
1642 remaining -= next_slice_len;
1643 rows_to_verify -= next_slice_len;
1644 if remaining > 0 {
1645 if next_slice_len == next_expected.num_rows() as u32 {
1646 next_expected = expected_iter.next().unwrap().clone();
1647 } else {
1648 next_expected = next_expected.slice(
1649 next_slice_len as usize,
1650 next_expected.num_rows() - next_slice_len as usize,
1651 );
1652 }
1653 }
1654 if rows_to_verify > 0 {
1655 actual = actual.slice(
1656 next_slice_len as usize,
1657 actual.num_rows() - next_slice_len as usize,
1658 );
1659 }
1660 }
1661 }
1662 assert_eq!(remaining, 0);
1663 }
1664
1665 #[tokio::test]
1666 async fn test_round_trip() {
1667 let fs = FsFixture::default();
1668
1669 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1670
1671 for read_size in [32, 1024, 1024 * 1024] {
1672 let file_scheduler = fs
1673 .scheduler
1674 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1675 .await
1676 .unwrap();
1677 let file_reader = FileReader::try_open(
1678 file_scheduler,
1679 None,
1680 Arc::<DecoderPlugins>::default(),
1681 &test_cache(),
1682 FileReaderOptions::default(),
1683 )
1684 .await
1685 .unwrap();
1686
1687 let schema = file_reader.schema();
1688 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1689
1690 let batch_stream = file_reader
1691 .read_stream(
1692 lance_io::ReadBatchParams::RangeFull,
1693 read_size,
1694 16,
1695 FilterExpression::no_filter(),
1696 )
1697 .unwrap();
1698
1699 verify_expected(&data, batch_stream, read_size, None).await;
1700 }
1701 }
1702
1703 #[rstest]
1704 #[test_log::test(tokio::test)]
1705 async fn test_encoded_batch_round_trip(
1706 #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1708 ) {
1709 let data = gen_batch()
1710 .col("x", array::rand::<Int32Type>())
1711 .col("y", array::rand_utf8(ByteCount::from(16), false))
1712 .into_batch_rows(RowCount::from(10000))
1713 .unwrap();
1714
1715 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1716
1717 let encoding_options = EncodingOptions {
1718 cache_bytes_per_column: 4096,
1719 max_page_bytes: 32 * 1024 * 1024,
1720 keep_original_array: true,
1721 buffer_alignment: 64,
1722 };
1723
1724 let encoding_strategy = default_encoding_strategy(version);
1725
1726 let encoded_batch = encode_batch(
1727 &data,
1728 lance_schema.clone(),
1729 encoding_strategy.as_ref(),
1730 &encoding_options,
1731 )
1732 .await
1733 .unwrap();
1734
1735 let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1737
1738 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1739
1740 let decoded = decode_batch(
1741 &decoded_batch,
1742 &FilterExpression::no_filter(),
1743 Arc::<DecoderPlugins>::default(),
1744 false,
1745 version,
1746 None,
1747 )
1748 .await
1749 .unwrap();
1750
1751 assert_eq!(data, decoded);
1752
1753 let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1755 let decoded_batch =
1756 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1757 .unwrap();
1758 let decoded = decode_batch(
1759 &decoded_batch,
1760 &FilterExpression::no_filter(),
1761 Arc::<DecoderPlugins>::default(),
1762 false,
1763 version,
1764 None,
1765 )
1766 .await
1767 .unwrap();
1768
1769 assert_eq!(data, decoded);
1770 }
1771
1772 #[rstest]
1773 #[test_log::test(tokio::test)]
1774 async fn test_projection(
1775 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1776 ) {
1777 let fs = FsFixture::default();
1778
1779 let written_file = create_some_file(&fs, version).await;
1780 let file_scheduler = fs
1781 .scheduler
1782 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1783 .await
1784 .unwrap();
1785
1786 let field_id_mapping = written_file
1787 .field_id_mapping
1788 .iter()
1789 .copied()
1790 .collect::<BTreeMap<_, _>>();
1791
1792 let empty_projection = ReaderProjection {
1793 column_indices: Vec::default(),
1794 schema: Arc::new(Schema::default()),
1795 };
1796
1797 for columns in [
1798 vec!["score"],
1799 vec!["location"],
1800 vec!["categories"],
1801 vec!["score.x"],
1802 vec!["score", "categories"],
1803 vec!["score", "location"],
1804 vec!["location", "categories"],
1805 vec!["score.y", "location", "categories"],
1806 ] {
1807 debug!("Testing round trip with projection {:?}", columns);
1808 for use_field_ids in [true, false] {
1809 let file_reader = FileReader::try_open(
1811 file_scheduler.clone(),
1812 None,
1813 Arc::<DecoderPlugins>::default(),
1814 &test_cache(),
1815 FileReaderOptions::default(),
1816 )
1817 .await
1818 .unwrap();
1819
1820 let projected_schema = written_file.schema.project(&columns).unwrap();
1821 let projection = if use_field_ids {
1822 ReaderProjection::from_field_ids(
1823 file_reader.metadata.version(),
1824 &projected_schema,
1825 &field_id_mapping,
1826 )
1827 .unwrap()
1828 } else {
1829 ReaderProjection::from_column_names(
1830 file_reader.metadata.version(),
1831 &written_file.schema,
1832 &columns,
1833 )
1834 .unwrap()
1835 };
1836
1837 let batch_stream = file_reader
1838 .read_stream_projected(
1839 lance_io::ReadBatchParams::RangeFull,
1840 1024,
1841 16,
1842 projection.clone(),
1843 FilterExpression::no_filter(),
1844 )
1845 .unwrap();
1846
1847 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1848 verify_expected(
1849 &written_file.data,
1850 batch_stream,
1851 1024,
1852 Some(Box::new(move |batch: &RecordBatch| {
1853 batch.project_by_schema(&projection_arrow).unwrap()
1854 })),
1855 )
1856 .await;
1857
1858 let file_reader = FileReader::try_open(
1860 file_scheduler.clone(),
1861 Some(projection.clone()),
1862 Arc::<DecoderPlugins>::default(),
1863 &test_cache(),
1864 FileReaderOptions::default(),
1865 )
1866 .await
1867 .unwrap();
1868
1869 let batch_stream = file_reader
1870 .read_stream(
1871 lance_io::ReadBatchParams::RangeFull,
1872 1024,
1873 16,
1874 FilterExpression::no_filter(),
1875 )
1876 .unwrap();
1877
1878 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1879 verify_expected(
1880 &written_file.data,
1881 batch_stream,
1882 1024,
1883 Some(Box::new(move |batch: &RecordBatch| {
1884 batch.project_by_schema(&projection_arrow).unwrap()
1885 })),
1886 )
1887 .await;
1888
1889 assert!(file_reader
1890 .read_stream_projected(
1891 lance_io::ReadBatchParams::RangeFull,
1892 1024,
1893 16,
1894 empty_projection.clone(),
1895 FilterExpression::no_filter(),
1896 )
1897 .is_err());
1898 }
1899 }
1900
1901 assert!(FileReader::try_open(
1902 file_scheduler.clone(),
1903 Some(empty_projection),
1904 Arc::<DecoderPlugins>::default(),
1905 &test_cache(),
1906 FileReaderOptions::default(),
1907 )
1908 .await
1909 .is_err());
1910
1911 let arrow_schema = ArrowSchema::new(vec![
1912 Field::new("x", DataType::Int32, true),
1913 Field::new("y", DataType::Int32, true),
1914 ]);
1915 let schema = Schema::try_from(&arrow_schema).unwrap();
1916
1917 let projection_with_dupes = ReaderProjection {
1918 column_indices: vec![0, 0],
1919 schema: Arc::new(schema),
1920 };
1921
1922 assert!(FileReader::try_open(
1923 file_scheduler.clone(),
1924 Some(projection_with_dupes),
1925 Arc::<DecoderPlugins>::default(),
1926 &test_cache(),
1927 FileReaderOptions::default(),
1928 )
1929 .await
1930 .is_err());
1931 }
1932
1933 #[test_log::test(tokio::test)]
1934 async fn test_compressing_buffer() {
1935 let fs = FsFixture::default();
1936
1937 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1938 let file_scheduler = fs
1939 .scheduler
1940 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1941 .await
1942 .unwrap();
1943
1944 let file_reader = FileReader::try_open(
1946 file_scheduler.clone(),
1947 None,
1948 Arc::<DecoderPlugins>::default(),
1949 &test_cache(),
1950 FileReaderOptions::default(),
1951 )
1952 .await
1953 .unwrap();
1954
1955 let mut projection = written_file.schema.project(&["score"]).unwrap();
1956 for field in projection.fields.iter_mut() {
1957 field
1958 .metadata
1959 .insert("lance:compression".to_string(), "zstd".to_string());
1960 }
1961 let projection = ReaderProjection {
1962 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1963 schema: Arc::new(projection),
1964 };
1965
1966 let batch_stream = file_reader
1967 .read_stream_projected(
1968 lance_io::ReadBatchParams::RangeFull,
1969 1024,
1970 16,
1971 projection.clone(),
1972 FilterExpression::no_filter(),
1973 )
1974 .unwrap();
1975
1976 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1977 verify_expected(
1978 &written_file.data,
1979 batch_stream,
1980 1024,
1981 Some(Box::new(move |batch: &RecordBatch| {
1982 batch.project_by_schema(&projection_arrow).unwrap()
1983 })),
1984 )
1985 .await;
1986 }
1987
1988 #[tokio::test]
1989 async fn test_read_all() {
1990 let fs = FsFixture::default();
1991 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1992 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1993
1994 let file_scheduler = fs
1995 .scheduler
1996 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1997 .await
1998 .unwrap();
1999 let file_reader = FileReader::try_open(
2000 file_scheduler.clone(),
2001 None,
2002 Arc::<DecoderPlugins>::default(),
2003 &test_cache(),
2004 FileReaderOptions::default(),
2005 )
2006 .await
2007 .unwrap();
2008
2009 let batches = file_reader
2010 .read_stream(
2011 lance_io::ReadBatchParams::RangeFull,
2012 total_rows as u32,
2013 16,
2014 FilterExpression::no_filter(),
2015 )
2016 .unwrap()
2017 .try_collect::<Vec<_>>()
2018 .await
2019 .unwrap();
2020 assert_eq!(batches.len(), 1);
2021 assert_eq!(batches[0].num_rows(), total_rows);
2022 }
2023
2024 #[rstest]
2025 #[tokio::test]
2026 async fn test_blocking_take(
2027 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
2028 ) {
2029 let fs = FsFixture::default();
2030 let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2031 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2032
2033 let file_scheduler = fs
2034 .scheduler
2035 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2036 .await
2037 .unwrap();
2038 let file_reader = FileReader::try_open(
2039 file_scheduler.clone(),
2040 Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2041 Arc::<DecoderPlugins>::default(),
2042 &test_cache(),
2043 FileReaderOptions::default(),
2044 )
2045 .await
2046 .unwrap();
2047
2048 let batches = tokio::task::spawn_blocking(move || {
2049 file_reader
2050 .read_stream_projected_blocking(
2051 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2052 total_rows as u32,
2053 None,
2054 FilterExpression::no_filter(),
2055 )
2056 .unwrap()
2057 .collect::<ArrowResult<Vec<_>>>()
2058 .unwrap()
2059 })
2060 .await
2061 .unwrap();
2062
2063 assert_eq!(batches.len(), 1);
2064 assert_eq!(batches[0].num_rows(), 5);
2065 assert_eq!(batches[0].num_columns(), 1);
2066 }
2067
2068 #[tokio::test(flavor = "multi_thread")]
2069 async fn test_drop_in_progress() {
2070 let fs = FsFixture::default();
2071 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2072 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2073
2074 let file_scheduler = fs
2075 .scheduler
2076 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2077 .await
2078 .unwrap();
2079 let file_reader = FileReader::try_open(
2080 file_scheduler.clone(),
2081 None,
2082 Arc::<DecoderPlugins>::default(),
2083 &test_cache(),
2084 FileReaderOptions::default(),
2085 )
2086 .await
2087 .unwrap();
2088
2089 let mut batches = file_reader
2090 .read_stream(
2091 lance_io::ReadBatchParams::RangeFull,
2092 (total_rows / 10) as u32,
2093 16,
2094 FilterExpression::no_filter(),
2095 )
2096 .unwrap();
2097
2098 drop(file_reader);
2099
2100 let batch = batches.next().await.unwrap().unwrap();
2101 assert!(batch.num_rows() > 0);
2102
2103 drop(batches);
2105 }
2106
2107 #[tokio::test]
2108 async fn drop_while_scheduling() {
2109 let fs = FsFixture::default();
2119 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2120 let total_rows = written_file
2121 .data
2122 .iter()
2123 .map(|batch| batch.num_rows())
2124 .sum::<usize>();
2125
2126 let file_scheduler = fs
2127 .scheduler
2128 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2129 .await
2130 .unwrap();
2131 let file_reader = FileReader::try_open(
2132 file_scheduler.clone(),
2133 None,
2134 Arc::<DecoderPlugins>::default(),
2135 &test_cache(),
2136 FileReaderOptions::default(),
2137 )
2138 .await
2139 .unwrap();
2140
2141 let projection =
2142 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2143 let column_infos = file_reader
2144 .collect_columns_from_projection(&projection)
2145 .unwrap();
2146 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2147 &projection.schema,
2148 &projection.column_indices,
2149 &column_infos,
2150 &vec![],
2151 total_rows as u64,
2152 Arc::<DecoderPlugins>::default(),
2153 file_reader.scheduler.clone(),
2154 test_cache(),
2155 &FilterExpression::no_filter(),
2156 &DecoderConfig::default(),
2157 )
2158 .await
2159 .unwrap();
2160
2161 let range = 0..total_rows as u64;
2162
2163 let (tx, rx) = mpsc::unbounded_channel();
2164
2165 drop(rx);
2167
2168 decode_scheduler.schedule_range(
2170 range,
2171 &FilterExpression::no_filter(),
2172 tx,
2173 file_reader.scheduler.clone(),
2174 )
2175 }
2176
2177 #[tokio::test]
2178 async fn test_read_empty_range() {
2179 let fs = FsFixture::default();
2180 create_some_file(&fs, LanceFileVersion::V2_0).await;
2181
2182 let file_scheduler = fs
2183 .scheduler
2184 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2185 .await
2186 .unwrap();
2187 let file_reader = FileReader::try_open(
2188 file_scheduler.clone(),
2189 None,
2190 Arc::<DecoderPlugins>::default(),
2191 &test_cache(),
2192 FileReaderOptions::default(),
2193 )
2194 .await
2195 .unwrap();
2196
2197 let batches = file_reader
2199 .read_stream(
2200 lance_io::ReadBatchParams::Range(0..0),
2201 1024,
2202 16,
2203 FilterExpression::no_filter(),
2204 )
2205 .unwrap()
2206 .try_collect::<Vec<_>>()
2207 .await
2208 .unwrap();
2209
2210 assert_eq!(batches.len(), 0);
2211
2212 let batches = file_reader
2214 .read_stream(
2215 lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2216 1024,
2217 16,
2218 FilterExpression::no_filter(),
2219 )
2220 .unwrap()
2221 .try_collect::<Vec<_>>()
2222 .await
2223 .unwrap();
2224 assert_eq!(batches.len(), 1);
2225 }
2226
2227 #[tokio::test]
2228 async fn test_global_buffers() {
2229 let fs = FsFixture::default();
2230
2231 let lance_schema =
2232 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2233 "foo",
2234 DataType::Int32,
2235 true,
2236 )]))
2237 .unwrap();
2238
2239 let mut file_writer = FileWriter::try_new(
2240 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2241 lance_schema.clone(),
2242 FileWriterOptions::default(),
2243 )
2244 .unwrap();
2245
2246 let test_bytes = Bytes::from_static(b"hello");
2247
2248 let buf_index = file_writer
2249 .add_global_buffer(test_bytes.clone())
2250 .await
2251 .unwrap();
2252
2253 assert_eq!(buf_index, 1);
2254
2255 file_writer.finish().await.unwrap();
2256
2257 let file_scheduler = fs
2258 .scheduler
2259 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2260 .await
2261 .unwrap();
2262 let file_reader = FileReader::try_open(
2263 file_scheduler.clone(),
2264 None,
2265 Arc::<DecoderPlugins>::default(),
2266 &test_cache(),
2267 FileReaderOptions::default(),
2268 )
2269 .await
2270 .unwrap();
2271
2272 let buf = file_reader.read_global_buffer(1).await.unwrap();
2273 assert_eq!(buf, test_bytes);
2274 }
2275}