1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19 decoder::{
20 schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21 DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22 SchedulerDecoderConfig,
23 },
24 encoder::EncodedBatch,
25 version::LanceFileVersion,
26 EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36 Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41 scheduler::FileScheduler,
42 stream::{RecordBatchStream, RecordBatchStreamAdapter},
43 ReadBatchParams,
44};
45
46use crate::{
47 datatypes::{Fields, FieldsWithMeta},
48 format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49 io::LanceEncodingsIo,
50 writer::PAGE_BUFFER_ALIGNMENT,
51};
52
53pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
56
57#[derive(Debug, DeepSizeOf)]
62pub struct BufferDescriptor {
63 pub position: u64,
64 pub size: u64,
65}
66
67#[derive(Debug)]
69pub struct FileStatistics {
70 pub columns: Vec<ColumnStatistics>,
72}
73
74#[derive(Debug)]
76pub struct ColumnStatistics {
77 pub num_pages: usize,
79 pub size_bytes: u64,
83}
84
85#[derive(Debug)]
87pub struct CachedFileMetadata {
88 pub file_schema: Arc<Schema>,
90 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
92 pub column_infos: Vec<Arc<ColumnInfo>>,
93 pub num_rows: u64,
95 pub file_buffers: Vec<BufferDescriptor>,
96 pub num_data_bytes: u64,
98 pub num_column_metadata_bytes: u64,
101 pub num_global_buffer_bytes: u64,
103 pub num_footer_bytes: u64,
105 pub major_version: u16,
106 pub minor_version: u16,
107}
108
109impl DeepSizeOf for CachedFileMetadata {
110 fn deep_size_of_children(&self, context: &mut Context) -> usize {
112 self.file_schema.deep_size_of_children(context)
113 + self
114 .file_buffers
115 .iter()
116 .map(|file_buffer| file_buffer.deep_size_of_children(context))
117 .sum::<usize>()
118 }
119}
120
121impl CachedFileMetadata {
122 pub fn version(&self) -> LanceFileVersion {
123 match (self.major_version, self.minor_version) {
124 (0, 3) => LanceFileVersion::V2_0,
125 (2, 0) => LanceFileVersion::V2_0,
126 (2, 1) => LanceFileVersion::V2_1,
127 (2, 2) => LanceFileVersion::V2_2,
128 _ => panic!(
129 "Unsupported version: {}.{}",
130 self.major_version, self.minor_version
131 ),
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
158pub struct ReaderProjection {
159 pub schema: Arc<Schema>,
162 pub column_indices: Vec<u32>,
202}
203
204impl ReaderProjection {
205 fn from_field_ids_helper<'a>(
206 file_version: LanceFileVersion,
207 fields: impl Iterator<Item = &'a Field>,
208 field_id_to_column_index: &BTreeMap<u32, u32>,
209 column_indices: &mut Vec<u32>,
210 ) -> Result<()> {
211 for field in fields {
212 let is_structural = file_version >= LanceFileVersion::V2_1;
213 if !is_structural
216 || field.children.is_empty()
217 || field.is_blob()
218 || field.is_packed_struct()
219 {
220 if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
221 {
222 column_indices.push(column_idx);
223 }
224 }
225 if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
227 Self::from_field_ids_helper(
228 file_version,
229 field.children.iter(),
230 field_id_to_column_index,
231 column_indices,
232 )?;
233 }
234 }
235 Ok(())
236 }
237
238 pub fn from_field_ids(
243 file_version: LanceFileVersion,
244 schema: &Schema,
245 field_id_to_column_index: &BTreeMap<u32, u32>,
246 ) -> Result<Self> {
247 let mut column_indices = Vec::new();
248 Self::from_field_ids_helper(
249 file_version,
250 schema.fields.iter(),
251 field_id_to_column_index,
252 &mut column_indices,
253 )?;
254 let projection = Self {
255 schema: Arc::new(schema.clone()),
256 column_indices,
257 };
258 Ok(projection)
259 }
260
261 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
269 let schema = Arc::new(schema.clone());
270 let is_structural = version >= LanceFileVersion::V2_1;
271 let mut column_indices = vec![];
272 let mut curr_column_idx = 0;
273 let mut packed_struct_fields_num = 0;
274 for field in schema.fields_pre_order() {
275 if packed_struct_fields_num > 0 {
276 packed_struct_fields_num -= 1;
277 continue;
278 }
279 if field.is_packed_struct() {
280 column_indices.push(curr_column_idx);
281 curr_column_idx += 1;
282 packed_struct_fields_num = field.children.len();
283 } else if field.children.is_empty() || !is_structural {
284 column_indices.push(curr_column_idx);
285 curr_column_idx += 1;
286 }
287 }
288 Self {
289 schema,
290 column_indices,
291 }
292 }
293
294 pub fn from_column_names(
301 file_version: LanceFileVersion,
302 schema: &Schema,
303 column_names: &[&str],
304 ) -> Result<Self> {
305 let field_id_to_column_index = schema
306 .fields_pre_order()
307 .filter(|field| {
310 file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
311 })
312 .enumerate()
313 .map(|(idx, field)| (field.id as u32, idx as u32))
314 .collect::<BTreeMap<_, _>>();
315 let projected = schema.project(column_names)?;
316 let mut column_indices = Vec::new();
317 Self::from_field_ids_helper(
318 file_version,
319 projected.fields.iter(),
320 &field_id_to_column_index,
321 &mut column_indices,
322 )?;
323 Ok(Self {
324 schema: Arc::new(projected),
325 column_indices,
326 })
327 }
328}
329
330#[derive(Clone, Debug)]
332pub struct FileReaderOptions {
333 pub decoder_config: DecoderConfig,
334 pub read_chunk_size: u64,
338}
339
340impl Default for FileReaderOptions {
341 fn default() -> Self {
342 Self {
343 decoder_config: DecoderConfig::default(),
344 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
345 }
346 }
347}
348
349#[derive(Debug)]
350pub struct FileReader {
351 scheduler: Arc<dyn EncodingsIo>,
352 base_projection: ReaderProjection,
354 num_rows: u64,
355 metadata: Arc<CachedFileMetadata>,
356 decoder_plugins: Arc<DecoderPlugins>,
357 cache: Arc<LanceCache>,
358 options: FileReaderOptions,
359}
360#[derive(Debug)]
361struct Footer {
362 #[allow(dead_code)]
363 column_meta_start: u64,
364 #[allow(dead_code)]
367 column_meta_offsets_start: u64,
368 global_buff_offsets_start: u64,
369 num_global_buffers: u32,
370 num_columns: u32,
371 major_version: u16,
372 minor_version: u16,
373}
374
375const FOOTER_LEN: usize = 40;
376
377impl FileReader {
378 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
379 Self {
380 scheduler,
381 base_projection: self.base_projection.clone(),
382 cache: self.cache.clone(),
383 decoder_plugins: self.decoder_plugins.clone(),
384 metadata: self.metadata.clone(),
385 options: self.options.clone(),
386 num_rows: self.num_rows,
387 }
388 }
389
390 pub fn num_rows(&self) -> u64 {
391 self.num_rows
392 }
393
394 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
395 &self.metadata
396 }
397
398 pub fn file_statistics(&self) -> FileStatistics {
399 let column_metadatas = &self.metadata().column_metadatas;
400
401 let column_stats = column_metadatas
402 .iter()
403 .map(|col_metadata| {
404 let num_pages = col_metadata.pages.len();
405 let size_bytes = col_metadata
406 .pages
407 .iter()
408 .map(|page| page.buffer_sizes.iter().sum::<u64>())
409 .sum::<u64>();
410 ColumnStatistics {
411 num_pages,
412 size_bytes,
413 }
414 })
415 .collect();
416
417 FileStatistics {
418 columns: column_stats,
419 }
420 }
421
422 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
423 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
424 self.scheduler
425 .submit_single(
426 buffer_desc.position..buffer_desc.position + buffer_desc.size,
427 0,
428 )
429 .await
430 }
431
432 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
433 let file_size = scheduler.reader().size().await? as u64;
434 let begin = if file_size < scheduler.reader().block_size() as u64 {
435 0
436 } else {
437 file_size - scheduler.reader().block_size() as u64
438 };
439 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
440 Ok((tail_bytes, file_size))
441 }
442
443 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
446 let len = footer_bytes.len();
447 if len < FOOTER_LEN {
448 return Err(Error::invalid_input(
449 format!(
450 "does not have sufficient data, len: {}, bytes: {:?}",
451 len, footer_bytes
452 ),
453 location!(),
454 ));
455 }
456 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
457
458 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
459 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
460 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
461 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
462 let num_columns = cursor.read_u32::<LittleEndian>()?;
463 let major_version = cursor.read_u16::<LittleEndian>()?;
464 let minor_version = cursor.read_u16::<LittleEndian>()?;
465
466 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
467 return Err(Error::version_conflict(
468 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
469 major_version,
470 minor_version,
471 location!(),
472 ));
473 }
474
475 let magic_bytes = footer_bytes.slice(len - 4..);
476 if magic_bytes.as_ref() != MAGIC {
477 return Err(Error::invalid_input(
478 format!(
479 "file does not appear to be a Lance file (invalid magic: {:?})",
480 MAGIC
481 ),
482 location!(),
483 ));
484 }
485 Ok(Footer {
486 column_meta_start,
487 column_meta_offsets_start,
488 global_buff_offsets_start,
489 num_global_buffers,
490 num_columns,
491 major_version,
492 minor_version,
493 })
494 }
495
496 fn read_all_column_metadata(
498 column_metadata_bytes: Bytes,
499 footer: &Footer,
500 ) -> Result<Vec<pbfile::ColumnMetadata>> {
501 let column_metadata_start = footer.column_meta_start;
502 let cmo_table_size = 16 * footer.num_columns as usize;
504 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
505
506 (0..footer.num_columns)
507 .map(|col_idx| {
508 let offset = (col_idx * 16) as usize;
509 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
510 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
511 let normalized_position = (position - column_metadata_start) as usize;
512 let normalized_end = normalized_position + (length as usize);
513 Ok(pbfile::ColumnMetadata::decode(
514 &column_metadata_bytes[normalized_position..normalized_end],
515 )?)
516 })
517 .collect::<Result<Vec<_>>>()
518 }
519
520 async fn optimistic_tail_read(
521 data: &Bytes,
522 start_pos: u64,
523 scheduler: &FileScheduler,
524 file_len: u64,
525 ) -> Result<Bytes> {
526 let num_bytes_needed = (file_len - start_pos) as usize;
527 if data.len() >= num_bytes_needed {
528 Ok(data.slice((data.len() - num_bytes_needed)..))
529 } else {
530 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
531 let start = file_len - num_bytes_needed as u64;
532 let missing_bytes = scheduler
533 .submit_single(start..start + num_bytes_missing, 0)
534 .await?;
535 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
536 combined.extend(missing_bytes);
537 combined.extend(data);
538 Ok(combined.freeze())
539 }
540 }
541
542 fn do_decode_gbo_table(
543 gbo_bytes: &Bytes,
544 footer: &Footer,
545 version: LanceFileVersion,
546 ) -> Result<Vec<BufferDescriptor>> {
547 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
548
549 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
550 for _ in 0..footer.num_global_buffers {
551 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
552 assert!(
553 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
554 );
555 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
556 global_buffers.push(BufferDescriptor {
557 position: buf_pos,
558 size: buf_size,
559 });
560 }
561
562 Ok(global_buffers)
563 }
564
565 async fn decode_gbo_table(
566 tail_bytes: &Bytes,
567 file_len: u64,
568 scheduler: &FileScheduler,
569 footer: &Footer,
570 version: LanceFileVersion,
571 ) -> Result<Vec<BufferDescriptor>> {
572 let gbo_bytes = Self::optimistic_tail_read(
575 tail_bytes,
576 footer.global_buff_offsets_start,
577 scheduler,
578 file_len,
579 )
580 .await?;
581 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
582 }
583
584 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
585 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
586 let pb_schema = file_descriptor.schema.unwrap();
587 let num_rows = file_descriptor.length;
588 let fields_with_meta = FieldsWithMeta {
589 fields: Fields(pb_schema.fields),
590 metadata: pb_schema.metadata,
591 };
592 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
593 Ok((num_rows, schema))
594 }
595
596 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
610 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
612 let footer = Self::decode_footer(&tail_bytes)?;
613
614 let file_version = LanceFileVersion::try_from_major_minor(
615 footer.major_version as u32,
616 footer.minor_version as u32,
617 )?;
618
619 let gbo_table =
620 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
621 if gbo_table.is_empty() {
622 return Err(Error::Internal {
623 message: "File did not contain any global buffers, schema expected".to_string(),
624 location: location!(),
625 });
626 }
627 let schema_start = gbo_table[0].position;
628 let schema_size = gbo_table[0].size;
629
630 let num_footer_bytes = file_len - schema_start;
631
632 let all_metadata_bytes =
635 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
636
637 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
638 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
639
640 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
643 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
644 let column_metadata_bytes =
645 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
646 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
647
648 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
649 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
650 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
651
652 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
653
654 Ok(CachedFileMetadata {
655 file_schema: Arc::new(schema),
656 column_metadatas,
657 column_infos,
658 num_rows,
659 num_data_bytes,
660 num_column_metadata_bytes,
661 num_global_buffer_bytes,
662 num_footer_bytes,
663 file_buffers: gbo_table,
664 major_version: footer.major_version,
665 minor_version: footer.minor_version,
666 })
667 }
668
669 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
670 match &encoding.location {
671 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
672 Some(pbfile::encoding::Location::Direct(encoding)) => {
673 let encoding_buf = Bytes::from(encoding.encoding.clone());
674 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
675 encoding_any.to_msg::<M>().unwrap()
676 }
677 Some(pbfile::encoding::Location::None(_)) => panic!(),
678 None => panic!(),
679 }
680 }
681
682 fn meta_to_col_infos(
683 column_metadatas: &[pbfile::ColumnMetadata],
684 file_version: LanceFileVersion,
685 ) -> Vec<Arc<ColumnInfo>> {
686 column_metadatas
687 .iter()
688 .enumerate()
689 .map(|(col_idx, col_meta)| {
690 let page_infos = col_meta
691 .pages
692 .iter()
693 .map(|page| {
694 let num_rows = page.length;
695 let encoding = match file_version {
696 LanceFileVersion::V2_0 => {
697 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
698 page.encoding.as_ref().unwrap(),
699 ))
700 }
701 _ => PageEncoding::Structural(Self::fetch_encoding::<
702 pbenc21::PageLayout,
703 >(
704 page.encoding.as_ref().unwrap()
705 )),
706 };
707 let buffer_offsets_and_sizes = Arc::from(
708 page.buffer_offsets
709 .iter()
710 .zip(page.buffer_sizes.iter())
711 .map(|(offset, size)| {
712 assert!(
714 file_version < LanceFileVersion::V2_1
715 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
716 );
717 (*offset, *size)
718 })
719 .collect::<Vec<_>>(),
720 );
721 PageInfo {
722 buffer_offsets_and_sizes,
723 encoding,
724 num_rows,
725 priority: page.priority,
726 }
727 })
728 .collect::<Vec<_>>();
729 let buffer_offsets_and_sizes = Arc::from(
730 col_meta
731 .buffer_offsets
732 .iter()
733 .zip(col_meta.buffer_sizes.iter())
734 .map(|(offset, size)| (*offset, *size))
735 .collect::<Vec<_>>(),
736 );
737 Arc::new(ColumnInfo {
738 index: col_idx as u32,
739 page_infos: Arc::from(page_infos),
740 buffer_offsets_and_sizes,
741 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
742 })
743 })
744 .collect::<Vec<_>>()
745 }
746
747 fn validate_projection(
748 projection: &ReaderProjection,
749 metadata: &CachedFileMetadata,
750 ) -> Result<()> {
751 if projection.schema.fields.is_empty() {
752 return Err(Error::invalid_input(
753 "Attempt to read zero columns from the file, at least one column must be specified"
754 .to_string(),
755 location!(),
756 ));
757 }
758 let mut column_indices_seen = BTreeSet::new();
759 for column_index in &projection.column_indices {
760 if !column_indices_seen.insert(*column_index) {
761 return Err(Error::invalid_input(
762 format!(
763 "The projection specified the column index {} more than once",
764 column_index
765 ),
766 location!(),
767 ));
768 }
769 if *column_index >= metadata.column_infos.len() as u32 {
770 return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
771 }
772 }
773 Ok(())
774 }
775
776 pub async fn try_open(
783 scheduler: FileScheduler,
784 base_projection: Option<ReaderProjection>,
785 decoder_plugins: Arc<DecoderPlugins>,
786 cache: &LanceCache,
787 options: FileReaderOptions,
788 ) -> Result<Self> {
789 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
790 let path = scheduler.reader().path().clone();
791
792 let encodings_io =
794 LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
795
796 Self::try_open_with_file_metadata(
797 Arc::new(encodings_io),
798 path,
799 base_projection,
800 decoder_plugins,
801 file_metadata,
802 cache,
803 options,
804 )
805 .await
806 }
807
808 pub async fn try_open_with_file_metadata(
814 scheduler: Arc<dyn EncodingsIo>,
815 path: Path,
816 base_projection: Option<ReaderProjection>,
817 decoder_plugins: Arc<DecoderPlugins>,
818 file_metadata: Arc<CachedFileMetadata>,
819 cache: &LanceCache,
820 options: FileReaderOptions,
821 ) -> Result<Self> {
822 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
823
824 if let Some(base_projection) = base_projection.as_ref() {
825 Self::validate_projection(base_projection, &file_metadata)?;
826 }
827 let num_rows = file_metadata.num_rows;
828 Ok(Self {
829 scheduler,
830 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
831 file_metadata.file_schema.as_ref(),
832 file_metadata.version(),
833 )),
834 num_rows,
835 metadata: file_metadata,
836 decoder_plugins,
837 cache,
838 options,
839 })
840 }
841
842 fn collect_columns_from_projection(
856 &self,
857 _projection: &ReaderProjection,
858 ) -> Result<Vec<Arc<ColumnInfo>>> {
859 Ok(self.metadata.column_infos.clone())
860 }
861
862 #[allow(clippy::too_many_arguments)]
863 fn do_read_range(
864 column_infos: Vec<Arc<ColumnInfo>>,
865 io: Arc<dyn EncodingsIo>,
866 cache: Arc<LanceCache>,
867 num_rows: u64,
868 decoder_plugins: Arc<DecoderPlugins>,
869 range: Range<u64>,
870 batch_size: u32,
871 projection: ReaderProjection,
872 filter: FilterExpression,
873 decoder_config: DecoderConfig,
874 ) -> Result<BoxStream<'static, ReadBatchTask>> {
875 debug!(
876 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
877 range,
878 batch_size,
879 num_rows,
880 column_infos.len(),
881 projection.schema.fields.len(),
882 );
883
884 let config = SchedulerDecoderConfig {
885 batch_size,
886 cache,
887 decoder_plugins,
888 io,
889 decoder_config,
890 };
891
892 let requested_rows = RequestedRows::Ranges(vec![range]);
893
894 Ok(schedule_and_decode(
895 column_infos,
896 requested_rows,
897 filter,
898 projection.column_indices,
899 projection.schema,
900 config,
901 ))
902 }
903
904 fn read_range(
905 &self,
906 range: Range<u64>,
907 batch_size: u32,
908 projection: ReaderProjection,
909 filter: FilterExpression,
910 ) -> Result<BoxStream<'static, ReadBatchTask>> {
911 Self::do_read_range(
913 self.collect_columns_from_projection(&projection)?,
914 self.scheduler.clone(),
915 self.cache.clone(),
916 self.num_rows,
917 self.decoder_plugins.clone(),
918 range,
919 batch_size,
920 projection,
921 filter,
922 self.options.decoder_config.clone(),
923 )
924 }
925
926 #[allow(clippy::too_many_arguments)]
927 fn do_take_rows(
928 column_infos: Vec<Arc<ColumnInfo>>,
929 io: Arc<dyn EncodingsIo>,
930 cache: Arc<LanceCache>,
931 decoder_plugins: Arc<DecoderPlugins>,
932 indices: Vec<u64>,
933 batch_size: u32,
934 projection: ReaderProjection,
935 filter: FilterExpression,
936 decoder_config: DecoderConfig,
937 ) -> Result<BoxStream<'static, ReadBatchTask>> {
938 debug!(
939 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
940 indices.len(),
941 indices[0],
942 indices[indices.len() - 1],
943 batch_size,
944 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
945 );
946
947 let config = SchedulerDecoderConfig {
948 batch_size,
949 cache,
950 decoder_plugins,
951 io,
952 decoder_config,
953 };
954
955 let requested_rows = RequestedRows::Indices(indices);
956
957 Ok(schedule_and_decode(
958 column_infos,
959 requested_rows,
960 filter,
961 projection.column_indices,
962 projection.schema,
963 config,
964 ))
965 }
966
967 fn take_rows(
968 &self,
969 indices: Vec<u64>,
970 batch_size: u32,
971 projection: ReaderProjection,
972 ) -> Result<BoxStream<'static, ReadBatchTask>> {
973 Self::do_take_rows(
975 self.collect_columns_from_projection(&projection)?,
976 self.scheduler.clone(),
977 self.cache.clone(),
978 self.decoder_plugins.clone(),
979 indices,
980 batch_size,
981 projection,
982 FilterExpression::no_filter(),
983 self.options.decoder_config.clone(),
984 )
985 }
986
987 #[allow(clippy::too_many_arguments)]
988 fn do_read_ranges(
989 column_infos: Vec<Arc<ColumnInfo>>,
990 io: Arc<dyn EncodingsIo>,
991 cache: Arc<LanceCache>,
992 decoder_plugins: Arc<DecoderPlugins>,
993 ranges: Vec<Range<u64>>,
994 batch_size: u32,
995 projection: ReaderProjection,
996 filter: FilterExpression,
997 decoder_config: DecoderConfig,
998 ) -> Result<BoxStream<'static, ReadBatchTask>> {
999 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1000 debug!(
1001 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1002 ranges.len(),
1003 num_rows,
1004 ranges[0].start,
1005 ranges[ranges.len() - 1].end,
1006 batch_size,
1007 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1008 );
1009
1010 let config = SchedulerDecoderConfig {
1011 batch_size,
1012 cache,
1013 decoder_plugins,
1014 io,
1015 decoder_config,
1016 };
1017
1018 let requested_rows = RequestedRows::Ranges(ranges);
1019
1020 Ok(schedule_and_decode(
1021 column_infos,
1022 requested_rows,
1023 filter,
1024 projection.column_indices,
1025 projection.schema,
1026 config,
1027 ))
1028 }
1029
1030 fn read_ranges(
1031 &self,
1032 ranges: Vec<Range<u64>>,
1033 batch_size: u32,
1034 projection: ReaderProjection,
1035 filter: FilterExpression,
1036 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1037 Self::do_read_ranges(
1038 self.collect_columns_from_projection(&projection)?,
1039 self.scheduler.clone(),
1040 self.cache.clone(),
1041 self.decoder_plugins.clone(),
1042 ranges,
1043 batch_size,
1044 projection,
1045 filter,
1046 self.options.decoder_config.clone(),
1047 )
1048 }
1049
1050 pub fn read_tasks(
1061 &self,
1062 params: ReadBatchParams,
1063 batch_size: u32,
1064 projection: Option<ReaderProjection>,
1065 filter: FilterExpression,
1066 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1067 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1068 Self::validate_projection(&projection, &self.metadata)?;
1069 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1070 if bound > self.num_rows || bound == self.num_rows && inclusive {
1071 Err(Error::invalid_input(
1072 format!(
1073 "cannot read {:?} from file with {} rows",
1074 params, self.num_rows
1075 ),
1076 location!(),
1077 ))
1078 } else {
1079 Ok(())
1080 }
1081 };
1082 match ¶ms {
1083 ReadBatchParams::Indices(indices) => {
1084 for idx in indices {
1085 match idx {
1086 None => {
1087 return Err(Error::invalid_input(
1088 "Null value in indices array",
1089 location!(),
1090 ));
1091 }
1092 Some(idx) => {
1093 verify_bound(¶ms, idx as u64, true)?;
1094 }
1095 }
1096 }
1097 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1098 self.take_rows(indices, batch_size, projection)
1099 }
1100 ReadBatchParams::Range(range) => {
1101 verify_bound(¶ms, range.end as u64, false)?;
1102 self.read_range(
1103 range.start as u64..range.end as u64,
1104 batch_size,
1105 projection,
1106 filter,
1107 )
1108 }
1109 ReadBatchParams::Ranges(ranges) => {
1110 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1111 for range in ranges.as_ref() {
1112 verify_bound(¶ms, range.end, false)?;
1113 ranges_u64.push(range.start..range.end);
1114 }
1115 self.read_ranges(ranges_u64, batch_size, projection, filter)
1116 }
1117 ReadBatchParams::RangeFrom(range) => {
1118 verify_bound(¶ms, range.start as u64, true)?;
1119 self.read_range(
1120 range.start as u64..self.num_rows,
1121 batch_size,
1122 projection,
1123 filter,
1124 )
1125 }
1126 ReadBatchParams::RangeTo(range) => {
1127 verify_bound(¶ms, range.end as u64, false)?;
1128 self.read_range(0..range.end as u64, batch_size, projection, filter)
1129 }
1130 ReadBatchParams::RangeFull => {
1131 self.read_range(0..self.num_rows, batch_size, projection, filter)
1132 }
1133 }
1134 }
1135
1136 pub fn read_stream_projected(
1158 &self,
1159 params: ReadBatchParams,
1160 batch_size: u32,
1161 batch_readahead: u32,
1162 projection: ReaderProjection,
1163 filter: FilterExpression,
1164 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1165 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1166 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1167 let batch_stream = tasks_stream
1168 .map(|task| task.task)
1169 .buffered(batch_readahead as usize)
1170 .boxed();
1171 Ok(Box::pin(RecordBatchStreamAdapter::new(
1172 arrow_schema,
1173 batch_stream,
1174 )))
1175 }
1176
1177 fn take_rows_blocking(
1178 &self,
1179 indices: Vec<u64>,
1180 batch_size: u32,
1181 projection: ReaderProjection,
1182 filter: FilterExpression,
1183 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1184 let column_infos = self.collect_columns_from_projection(&projection)?;
1185 debug!(
1186 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1187 indices.len(),
1188 indices[0],
1189 indices[indices.len() - 1],
1190 batch_size,
1191 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1192 );
1193
1194 let config = SchedulerDecoderConfig {
1195 batch_size,
1196 cache: self.cache.clone(),
1197 decoder_plugins: self.decoder_plugins.clone(),
1198 io: self.scheduler.clone(),
1199 decoder_config: self.options.decoder_config.clone(),
1200 };
1201
1202 let requested_rows = RequestedRows::Indices(indices);
1203
1204 schedule_and_decode_blocking(
1205 column_infos,
1206 requested_rows,
1207 filter,
1208 projection.column_indices,
1209 projection.schema,
1210 config,
1211 )
1212 }
1213
1214 fn read_ranges_blocking(
1215 &self,
1216 ranges: Vec<Range<u64>>,
1217 batch_size: u32,
1218 projection: ReaderProjection,
1219 filter: FilterExpression,
1220 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1221 let column_infos = self.collect_columns_from_projection(&projection)?;
1222 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1223 debug!(
1224 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1225 ranges.len(),
1226 num_rows,
1227 ranges[0].start,
1228 ranges[ranges.len() - 1].end,
1229 batch_size,
1230 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1231 );
1232
1233 let config = SchedulerDecoderConfig {
1234 batch_size,
1235 cache: self.cache.clone(),
1236 decoder_plugins: self.decoder_plugins.clone(),
1237 io: self.scheduler.clone(),
1238 decoder_config: self.options.decoder_config.clone(),
1239 };
1240
1241 let requested_rows = RequestedRows::Ranges(ranges);
1242
1243 schedule_and_decode_blocking(
1244 column_infos,
1245 requested_rows,
1246 filter,
1247 projection.column_indices,
1248 projection.schema,
1249 config,
1250 )
1251 }
1252
1253 fn read_range_blocking(
1254 &self,
1255 range: Range<u64>,
1256 batch_size: u32,
1257 projection: ReaderProjection,
1258 filter: FilterExpression,
1259 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1260 let column_infos = self.collect_columns_from_projection(&projection)?;
1261 let num_rows = self.num_rows;
1262
1263 debug!(
1264 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1265 range,
1266 batch_size,
1267 num_rows,
1268 column_infos.len(),
1269 projection.schema.fields.len(),
1270 );
1271
1272 let config = SchedulerDecoderConfig {
1273 batch_size,
1274 cache: self.cache.clone(),
1275 decoder_plugins: self.decoder_plugins.clone(),
1276 io: self.scheduler.clone(),
1277 decoder_config: self.options.decoder_config.clone(),
1278 };
1279
1280 let requested_rows = RequestedRows::Ranges(vec![range]);
1281
1282 schedule_and_decode_blocking(
1283 column_infos,
1284 requested_rows,
1285 filter,
1286 projection.column_indices,
1287 projection.schema,
1288 config,
1289 )
1290 }
1291
1292 pub fn read_stream_projected_blocking(
1304 &self,
1305 params: ReadBatchParams,
1306 batch_size: u32,
1307 projection: Option<ReaderProjection>,
1308 filter: FilterExpression,
1309 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1310 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1311 Self::validate_projection(&projection, &self.metadata)?;
1312 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1313 if bound > self.num_rows || bound == self.num_rows && inclusive {
1314 Err(Error::invalid_input(
1315 format!(
1316 "cannot read {:?} from file with {} rows",
1317 params, self.num_rows
1318 ),
1319 location!(),
1320 ))
1321 } else {
1322 Ok(())
1323 }
1324 };
1325 match ¶ms {
1326 ReadBatchParams::Indices(indices) => {
1327 for idx in indices {
1328 match idx {
1329 None => {
1330 return Err(Error::invalid_input(
1331 "Null value in indices array",
1332 location!(),
1333 ));
1334 }
1335 Some(idx) => {
1336 verify_bound(¶ms, idx as u64, true)?;
1337 }
1338 }
1339 }
1340 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1341 self.take_rows_blocking(indices, batch_size, projection, filter)
1342 }
1343 ReadBatchParams::Range(range) => {
1344 verify_bound(¶ms, range.end as u64, false)?;
1345 self.read_range_blocking(
1346 range.start as u64..range.end as u64,
1347 batch_size,
1348 projection,
1349 filter,
1350 )
1351 }
1352 ReadBatchParams::Ranges(ranges) => {
1353 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1354 for range in ranges.as_ref() {
1355 verify_bound(¶ms, range.end, false)?;
1356 ranges_u64.push(range.start..range.end);
1357 }
1358 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1359 }
1360 ReadBatchParams::RangeFrom(range) => {
1361 verify_bound(¶ms, range.start as u64, true)?;
1362 self.read_range_blocking(
1363 range.start as u64..self.num_rows,
1364 batch_size,
1365 projection,
1366 filter,
1367 )
1368 }
1369 ReadBatchParams::RangeTo(range) => {
1370 verify_bound(¶ms, range.end as u64, false)?;
1371 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1372 }
1373 ReadBatchParams::RangeFull => {
1374 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1375 }
1376 }
1377 }
1378
1379 pub fn read_stream(
1385 &self,
1386 params: ReadBatchParams,
1387 batch_size: u32,
1388 batch_readahead: u32,
1389 filter: FilterExpression,
1390 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1391 self.read_stream_projected(
1392 params,
1393 batch_size,
1394 batch_readahead,
1395 self.base_projection.clone(),
1396 filter,
1397 )
1398 }
1399
1400 pub fn schema(&self) -> &Arc<Schema> {
1401 &self.metadata.file_schema
1402 }
1403}
1404
1405pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1407 if let Some(encoding) = &page.encoding {
1408 if let Some(style) = &encoding.location {
1409 match style {
1410 pbfile::encoding::Location::Indirect(indirect) => {
1411 format!(
1412 "IndirectEncoding(pos={},size={})",
1413 indirect.buffer_location, indirect.buffer_length
1414 )
1415 }
1416 pbfile::encoding::Location::Direct(direct) => {
1417 let encoding_any =
1418 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1419 .expect("failed to deserialize encoding as protobuf");
1420 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1421 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1422 match encoding {
1423 Ok(encoding) => {
1424 format!("{:#?}", encoding)
1425 }
1426 Err(err) => {
1427 format!("Unsupported(decode_err={})", err)
1428 }
1429 }
1430 } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1431 let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1432 match encoding {
1433 Ok(encoding) => {
1434 format!("{:#?}", encoding)
1435 }
1436 Err(err) => {
1437 format!("Unsupported(decode_err={})", err)
1438 }
1439 }
1440 } else {
1441 format!("Unrecognized(type_url={})", encoding_any.type_url)
1442 }
1443 }
1444 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1445 }
1446 } else {
1447 "MISSING STYLE".to_string()
1448 }
1449 } else {
1450 "MISSING".to_string()
1451 }
1452}
1453
1454pub trait EncodedBatchReaderExt {
1455 fn try_from_mini_lance(
1456 bytes: Bytes,
1457 schema: &Schema,
1458 version: LanceFileVersion,
1459 ) -> Result<Self>
1460 where
1461 Self: Sized;
1462 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1463 where
1464 Self: Sized;
1465}
1466
1467impl EncodedBatchReaderExt for EncodedBatch {
1468 fn try_from_mini_lance(
1469 bytes: Bytes,
1470 schema: &Schema,
1471 file_version: LanceFileVersion,
1472 ) -> Result<Self>
1473 where
1474 Self: Sized,
1475 {
1476 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1477 let footer = FileReader::decode_footer(&bytes)?;
1478
1479 let column_metadata_start = footer.column_meta_start as usize;
1482 let column_metadata_end = footer.global_buff_offsets_start as usize;
1483 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1484 let column_metadatas =
1485 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1486
1487 let file_version = LanceFileVersion::try_from_major_minor(
1488 footer.major_version as u32,
1489 footer.minor_version as u32,
1490 )?;
1491
1492 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1493
1494 Ok(Self {
1495 data: bytes,
1496 num_rows: page_table
1497 .first()
1498 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1499 .unwrap_or(0),
1500 page_table,
1501 top_level_columns: projection.column_indices,
1502 schema: Arc::new(schema.clone()),
1503 })
1504 }
1505
1506 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1507 where
1508 Self: Sized,
1509 {
1510 let footer = FileReader::decode_footer(&bytes)?;
1511 let file_version = LanceFileVersion::try_from_major_minor(
1512 footer.major_version as u32,
1513 footer.minor_version as u32,
1514 )?;
1515
1516 let gbo_table = FileReader::do_decode_gbo_table(
1517 &bytes.slice(footer.global_buff_offsets_start as usize..),
1518 &footer,
1519 file_version,
1520 )?;
1521 if gbo_table.is_empty() {
1522 return Err(Error::Internal {
1523 message: "File did not contain any global buffers, schema expected".to_string(),
1524 location: location!(),
1525 });
1526 }
1527 let schema_start = gbo_table[0].position as usize;
1528 let schema_size = gbo_table[0].size as usize;
1529
1530 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1531 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1532 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1533
1534 let column_metadata_start = footer.column_meta_start as usize;
1537 let column_metadata_end = footer.global_buff_offsets_start as usize;
1538 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1539 let column_metadatas =
1540 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1541
1542 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1543
1544 Ok(Self {
1545 data: bytes,
1546 num_rows: page_table
1547 .first()
1548 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1549 .unwrap_or(0),
1550 page_table,
1551 top_level_columns: projection.column_indices,
1552 schema: Arc::new(schema),
1553 })
1554 }
1555}
1556
1557#[cfg(test)]
1558pub mod tests {
1559 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1560
1561 use arrow_array::{
1562 types::{Float64Type, Int32Type},
1563 RecordBatch, UInt32Array,
1564 };
1565 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1566 use bytes::Bytes;
1567 use futures::{prelude::stream::TryStreamExt, StreamExt};
1568 use lance_arrow::RecordBatchExt;
1569 use lance_core::{datatypes::Schema, ArrowResult};
1570 use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1571 use lance_encoding::{
1572 decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1573 encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1574 version::LanceFileVersion,
1575 };
1576 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1577 use log::debug;
1578 use rstest::rstest;
1579 use tokio::sync::mpsc;
1580
1581 use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
1582 use crate::testing::{test_cache, write_lance_file, FsFixture, WrittenFile};
1583 use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
1584 use lance_encoding::decoder::DecoderConfig;
1585
1586 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1587 let location_type = DataType::Struct(Fields::from(vec![
1588 Field::new("x", DataType::Float64, true),
1589 Field::new("y", DataType::Float64, true),
1590 ]));
1591 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1592
1593 let mut reader = gen_batch()
1594 .col("score", array::rand::<Float64Type>())
1595 .col("location", array::rand_type(&location_type))
1596 .col("categories", array::rand_type(&categories_type))
1597 .col("binary", array::rand_type(&DataType::Binary));
1598 if version <= LanceFileVersion::V2_0 {
1599 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1600 }
1601 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1602
1603 write_lance_file(
1604 reader,
1605 fs,
1606 FileWriterOptions {
1607 format_version: Some(version),
1608 ..Default::default()
1609 },
1610 )
1611 .await
1612 }
1613
1614 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1615
1616 async fn verify_expected(
1617 expected: &[RecordBatch],
1618 mut actual: Pin<Box<dyn RecordBatchStream>>,
1619 read_size: u32,
1620 transform: Option<Transformer>,
1621 ) {
1622 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1623 let mut expected_iter = expected.iter().map(|batch| {
1624 if let Some(transform) = &transform {
1625 transform(batch)
1626 } else {
1627 batch.clone()
1628 }
1629 });
1630 let mut next_expected = expected_iter.next().unwrap().clone();
1631 while let Some(actual) = actual.next().await {
1632 let mut actual = actual.unwrap();
1633 let mut rows_to_verify = actual.num_rows() as u32;
1634 let expected_length = remaining.min(read_size);
1635 assert_eq!(expected_length, rows_to_verify);
1636
1637 while rows_to_verify > 0 {
1638 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1639 assert_eq!(
1640 next_expected.slice(0, next_slice_len as usize),
1641 actual.slice(0, next_slice_len as usize)
1642 );
1643 remaining -= next_slice_len;
1644 rows_to_verify -= next_slice_len;
1645 if remaining > 0 {
1646 if next_slice_len == next_expected.num_rows() as u32 {
1647 next_expected = expected_iter.next().unwrap().clone();
1648 } else {
1649 next_expected = next_expected.slice(
1650 next_slice_len as usize,
1651 next_expected.num_rows() - next_slice_len as usize,
1652 );
1653 }
1654 }
1655 if rows_to_verify > 0 {
1656 actual = actual.slice(
1657 next_slice_len as usize,
1658 actual.num_rows() - next_slice_len as usize,
1659 );
1660 }
1661 }
1662 }
1663 assert_eq!(remaining, 0);
1664 }
1665
1666 #[tokio::test]
1667 async fn test_round_trip() {
1668 let fs = FsFixture::default();
1669
1670 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1671
1672 for read_size in [32, 1024, 1024 * 1024] {
1673 let file_scheduler = fs
1674 .scheduler
1675 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1676 .await
1677 .unwrap();
1678 let file_reader = FileReader::try_open(
1679 file_scheduler,
1680 None,
1681 Arc::<DecoderPlugins>::default(),
1682 &test_cache(),
1683 FileReaderOptions::default(),
1684 )
1685 .await
1686 .unwrap();
1687
1688 let schema = file_reader.schema();
1689 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1690
1691 let batch_stream = file_reader
1692 .read_stream(
1693 lance_io::ReadBatchParams::RangeFull,
1694 read_size,
1695 16,
1696 FilterExpression::no_filter(),
1697 )
1698 .unwrap();
1699
1700 verify_expected(&data, batch_stream, read_size, None).await;
1701 }
1702 }
1703
1704 #[rstest]
1705 #[test_log::test(tokio::test)]
1706 async fn test_encoded_batch_round_trip(
1707 #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1709 ) {
1710 let data = gen_batch()
1711 .col("x", array::rand::<Int32Type>())
1712 .col("y", array::rand_utf8(ByteCount::from(16), false))
1713 .into_batch_rows(RowCount::from(10000))
1714 .unwrap();
1715
1716 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1717
1718 let encoding_options = EncodingOptions {
1719 cache_bytes_per_column: 4096,
1720 max_page_bytes: 32 * 1024 * 1024,
1721 keep_original_array: true,
1722 buffer_alignment: 64,
1723 version,
1724 };
1725
1726 let encoding_strategy = default_encoding_strategy(version);
1727
1728 let encoded_batch = encode_batch(
1729 &data,
1730 lance_schema.clone(),
1731 encoding_strategy.as_ref(),
1732 &encoding_options,
1733 )
1734 .await
1735 .unwrap();
1736
1737 let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1739
1740 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1741
1742 let decoded = decode_batch(
1743 &decoded_batch,
1744 &FilterExpression::no_filter(),
1745 Arc::<DecoderPlugins>::default(),
1746 false,
1747 version,
1748 None,
1749 )
1750 .await
1751 .unwrap();
1752
1753 assert_eq!(data, decoded);
1754
1755 let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1757 let decoded_batch =
1758 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1759 .unwrap();
1760 let decoded = decode_batch(
1761 &decoded_batch,
1762 &FilterExpression::no_filter(),
1763 Arc::<DecoderPlugins>::default(),
1764 false,
1765 version,
1766 None,
1767 )
1768 .await
1769 .unwrap();
1770
1771 assert_eq!(data, decoded);
1772 }
1773
1774 #[rstest]
1775 #[test_log::test(tokio::test)]
1776 async fn test_projection(
1777 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1778 ) {
1779 let fs = FsFixture::default();
1780
1781 let written_file = create_some_file(&fs, version).await;
1782 let file_scheduler = fs
1783 .scheduler
1784 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1785 .await
1786 .unwrap();
1787
1788 let field_id_mapping = written_file
1789 .field_id_mapping
1790 .iter()
1791 .copied()
1792 .collect::<BTreeMap<_, _>>();
1793
1794 let empty_projection = ReaderProjection {
1795 column_indices: Vec::default(),
1796 schema: Arc::new(Schema::default()),
1797 };
1798
1799 for columns in [
1800 vec!["score"],
1801 vec!["location"],
1802 vec!["categories"],
1803 vec!["score.x"],
1804 vec!["score", "categories"],
1805 vec!["score", "location"],
1806 vec!["location", "categories"],
1807 vec!["score.y", "location", "categories"],
1808 ] {
1809 debug!("Testing round trip with projection {:?}", columns);
1810 for use_field_ids in [true, false] {
1811 let file_reader = FileReader::try_open(
1813 file_scheduler.clone(),
1814 None,
1815 Arc::<DecoderPlugins>::default(),
1816 &test_cache(),
1817 FileReaderOptions::default(),
1818 )
1819 .await
1820 .unwrap();
1821
1822 let projected_schema = written_file.schema.project(&columns).unwrap();
1823 let projection = if use_field_ids {
1824 ReaderProjection::from_field_ids(
1825 file_reader.metadata.version(),
1826 &projected_schema,
1827 &field_id_mapping,
1828 )
1829 .unwrap()
1830 } else {
1831 ReaderProjection::from_column_names(
1832 file_reader.metadata.version(),
1833 &written_file.schema,
1834 &columns,
1835 )
1836 .unwrap()
1837 };
1838
1839 let batch_stream = file_reader
1840 .read_stream_projected(
1841 lance_io::ReadBatchParams::RangeFull,
1842 1024,
1843 16,
1844 projection.clone(),
1845 FilterExpression::no_filter(),
1846 )
1847 .unwrap();
1848
1849 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1850 verify_expected(
1851 &written_file.data,
1852 batch_stream,
1853 1024,
1854 Some(Box::new(move |batch: &RecordBatch| {
1855 batch.project_by_schema(&projection_arrow).unwrap()
1856 })),
1857 )
1858 .await;
1859
1860 let file_reader = FileReader::try_open(
1862 file_scheduler.clone(),
1863 Some(projection.clone()),
1864 Arc::<DecoderPlugins>::default(),
1865 &test_cache(),
1866 FileReaderOptions::default(),
1867 )
1868 .await
1869 .unwrap();
1870
1871 let batch_stream = file_reader
1872 .read_stream(
1873 lance_io::ReadBatchParams::RangeFull,
1874 1024,
1875 16,
1876 FilterExpression::no_filter(),
1877 )
1878 .unwrap();
1879
1880 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1881 verify_expected(
1882 &written_file.data,
1883 batch_stream,
1884 1024,
1885 Some(Box::new(move |batch: &RecordBatch| {
1886 batch.project_by_schema(&projection_arrow).unwrap()
1887 })),
1888 )
1889 .await;
1890
1891 assert!(file_reader
1892 .read_stream_projected(
1893 lance_io::ReadBatchParams::RangeFull,
1894 1024,
1895 16,
1896 empty_projection.clone(),
1897 FilterExpression::no_filter(),
1898 )
1899 .is_err());
1900 }
1901 }
1902
1903 assert!(FileReader::try_open(
1904 file_scheduler.clone(),
1905 Some(empty_projection),
1906 Arc::<DecoderPlugins>::default(),
1907 &test_cache(),
1908 FileReaderOptions::default(),
1909 )
1910 .await
1911 .is_err());
1912
1913 let arrow_schema = ArrowSchema::new(vec![
1914 Field::new("x", DataType::Int32, true),
1915 Field::new("y", DataType::Int32, true),
1916 ]);
1917 let schema = Schema::try_from(&arrow_schema).unwrap();
1918
1919 let projection_with_dupes = ReaderProjection {
1920 column_indices: vec![0, 0],
1921 schema: Arc::new(schema),
1922 };
1923
1924 assert!(FileReader::try_open(
1925 file_scheduler.clone(),
1926 Some(projection_with_dupes),
1927 Arc::<DecoderPlugins>::default(),
1928 &test_cache(),
1929 FileReaderOptions::default(),
1930 )
1931 .await
1932 .is_err());
1933 }
1934
1935 #[test_log::test(tokio::test)]
1936 async fn test_compressing_buffer() {
1937 let fs = FsFixture::default();
1938
1939 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1940 let file_scheduler = fs
1941 .scheduler
1942 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1943 .await
1944 .unwrap();
1945
1946 let file_reader = FileReader::try_open(
1948 file_scheduler.clone(),
1949 None,
1950 Arc::<DecoderPlugins>::default(),
1951 &test_cache(),
1952 FileReaderOptions::default(),
1953 )
1954 .await
1955 .unwrap();
1956
1957 let mut projection = written_file.schema.project(&["score"]).unwrap();
1958 for field in projection.fields.iter_mut() {
1959 field
1960 .metadata
1961 .insert("lance:compression".to_string(), "zstd".to_string());
1962 }
1963 let projection = ReaderProjection {
1964 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1965 schema: Arc::new(projection),
1966 };
1967
1968 let batch_stream = file_reader
1969 .read_stream_projected(
1970 lance_io::ReadBatchParams::RangeFull,
1971 1024,
1972 16,
1973 projection.clone(),
1974 FilterExpression::no_filter(),
1975 )
1976 .unwrap();
1977
1978 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1979 verify_expected(
1980 &written_file.data,
1981 batch_stream,
1982 1024,
1983 Some(Box::new(move |batch: &RecordBatch| {
1984 batch.project_by_schema(&projection_arrow).unwrap()
1985 })),
1986 )
1987 .await;
1988 }
1989
1990 #[tokio::test]
1991 async fn test_read_all() {
1992 let fs = FsFixture::default();
1993 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1994 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1995
1996 let file_scheduler = fs
1997 .scheduler
1998 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1999 .await
2000 .unwrap();
2001 let file_reader = FileReader::try_open(
2002 file_scheduler.clone(),
2003 None,
2004 Arc::<DecoderPlugins>::default(),
2005 &test_cache(),
2006 FileReaderOptions::default(),
2007 )
2008 .await
2009 .unwrap();
2010
2011 let batches = file_reader
2012 .read_stream(
2013 lance_io::ReadBatchParams::RangeFull,
2014 total_rows as u32,
2015 16,
2016 FilterExpression::no_filter(),
2017 )
2018 .unwrap()
2019 .try_collect::<Vec<_>>()
2020 .await
2021 .unwrap();
2022 assert_eq!(batches.len(), 1);
2023 assert_eq!(batches[0].num_rows(), total_rows);
2024 }
2025
2026 #[rstest]
2027 #[tokio::test]
2028 async fn test_blocking_take(
2029 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
2030 ) {
2031 let fs = FsFixture::default();
2032 let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2033 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2034
2035 let file_scheduler = fs
2036 .scheduler
2037 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2038 .await
2039 .unwrap();
2040 let file_reader = FileReader::try_open(
2041 file_scheduler.clone(),
2042 Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2043 Arc::<DecoderPlugins>::default(),
2044 &test_cache(),
2045 FileReaderOptions::default(),
2046 )
2047 .await
2048 .unwrap();
2049
2050 let batches = tokio::task::spawn_blocking(move || {
2051 file_reader
2052 .read_stream_projected_blocking(
2053 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2054 total_rows as u32,
2055 None,
2056 FilterExpression::no_filter(),
2057 )
2058 .unwrap()
2059 .collect::<ArrowResult<Vec<_>>>()
2060 .unwrap()
2061 })
2062 .await
2063 .unwrap();
2064
2065 assert_eq!(batches.len(), 1);
2066 assert_eq!(batches[0].num_rows(), 5);
2067 assert_eq!(batches[0].num_columns(), 1);
2068 }
2069
2070 #[tokio::test(flavor = "multi_thread")]
2071 async fn test_drop_in_progress() {
2072 let fs = FsFixture::default();
2073 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2074 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2075
2076 let file_scheduler = fs
2077 .scheduler
2078 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2079 .await
2080 .unwrap();
2081 let file_reader = FileReader::try_open(
2082 file_scheduler.clone(),
2083 None,
2084 Arc::<DecoderPlugins>::default(),
2085 &test_cache(),
2086 FileReaderOptions::default(),
2087 )
2088 .await
2089 .unwrap();
2090
2091 let mut batches = file_reader
2092 .read_stream(
2093 lance_io::ReadBatchParams::RangeFull,
2094 (total_rows / 10) as u32,
2095 16,
2096 FilterExpression::no_filter(),
2097 )
2098 .unwrap();
2099
2100 drop(file_reader);
2101
2102 let batch = batches.next().await.unwrap().unwrap();
2103 assert!(batch.num_rows() > 0);
2104
2105 drop(batches);
2107 }
2108
2109 #[tokio::test]
2110 async fn drop_while_scheduling() {
2111 let fs = FsFixture::default();
2121 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2122 let total_rows = written_file
2123 .data
2124 .iter()
2125 .map(|batch| batch.num_rows())
2126 .sum::<usize>();
2127
2128 let file_scheduler = fs
2129 .scheduler
2130 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2131 .await
2132 .unwrap();
2133 let file_reader = FileReader::try_open(
2134 file_scheduler.clone(),
2135 None,
2136 Arc::<DecoderPlugins>::default(),
2137 &test_cache(),
2138 FileReaderOptions::default(),
2139 )
2140 .await
2141 .unwrap();
2142
2143 let projection =
2144 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2145 let column_infos = file_reader
2146 .collect_columns_from_projection(&projection)
2147 .unwrap();
2148 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2149 &projection.schema,
2150 &projection.column_indices,
2151 &column_infos,
2152 &vec![],
2153 total_rows as u64,
2154 Arc::<DecoderPlugins>::default(),
2155 file_reader.scheduler.clone(),
2156 test_cache(),
2157 &FilterExpression::no_filter(),
2158 &DecoderConfig::default(),
2159 )
2160 .await
2161 .unwrap();
2162
2163 let range = 0..total_rows as u64;
2164
2165 let (tx, rx) = mpsc::unbounded_channel();
2166
2167 drop(rx);
2169
2170 decode_scheduler.schedule_range(
2172 range,
2173 &FilterExpression::no_filter(),
2174 tx,
2175 file_reader.scheduler.clone(),
2176 )
2177 }
2178
2179 #[tokio::test]
2180 async fn test_read_empty_range() {
2181 let fs = FsFixture::default();
2182 create_some_file(&fs, LanceFileVersion::V2_0).await;
2183
2184 let file_scheduler = fs
2185 .scheduler
2186 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2187 .await
2188 .unwrap();
2189 let file_reader = FileReader::try_open(
2190 file_scheduler.clone(),
2191 None,
2192 Arc::<DecoderPlugins>::default(),
2193 &test_cache(),
2194 FileReaderOptions::default(),
2195 )
2196 .await
2197 .unwrap();
2198
2199 let batches = file_reader
2201 .read_stream(
2202 lance_io::ReadBatchParams::Range(0..0),
2203 1024,
2204 16,
2205 FilterExpression::no_filter(),
2206 )
2207 .unwrap()
2208 .try_collect::<Vec<_>>()
2209 .await
2210 .unwrap();
2211
2212 assert_eq!(batches.len(), 0);
2213
2214 let batches = file_reader
2216 .read_stream(
2217 lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2218 1024,
2219 16,
2220 FilterExpression::no_filter(),
2221 )
2222 .unwrap()
2223 .try_collect::<Vec<_>>()
2224 .await
2225 .unwrap();
2226 assert_eq!(batches.len(), 1);
2227 }
2228
2229 #[tokio::test]
2230 async fn test_global_buffers() {
2231 let fs = FsFixture::default();
2232
2233 let lance_schema =
2234 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2235 "foo",
2236 DataType::Int32,
2237 true,
2238 )]))
2239 .unwrap();
2240
2241 let mut file_writer = FileWriter::try_new(
2242 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2243 lance_schema.clone(),
2244 FileWriterOptions::default(),
2245 )
2246 .unwrap();
2247
2248 let test_bytes = Bytes::from_static(b"hello");
2249
2250 let buf_index = file_writer
2251 .add_global_buffer(test_bytes.clone())
2252 .await
2253 .unwrap();
2254
2255 assert_eq!(buf_index, 1);
2256
2257 file_writer.finish().await.unwrap();
2258
2259 let file_scheduler = fs
2260 .scheduler
2261 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2262 .await
2263 .unwrap();
2264 let file_reader = FileReader::try_open(
2265 file_scheduler.clone(),
2266 None,
2267 Arc::<DecoderPlugins>::default(),
2268 &test_cache(),
2269 FileReaderOptions::default(),
2270 )
2271 .await
2272 .unwrap();
2273
2274 let buf = file_reader.read_global_buffer(1).await.unwrap();
2275 assert_eq!(buf, test_bytes);
2276 }
2277}