1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19 decoder::{
20 schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21 DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22 SchedulerDecoderConfig,
23 },
24 encoder::EncodedBatch,
25 version::LanceFileVersion,
26 EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36 Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41 scheduler::FileScheduler,
42 stream::{RecordBatchStream, RecordBatchStreamAdapter},
43 ReadBatchParams,
44};
45
46use crate::{
47 datatypes::{Fields, FieldsWithMeta},
48 format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49 v2::writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52use super::io::LanceEncodingsIo;
53
54pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
57
58#[derive(Debug, DeepSizeOf)]
63pub struct BufferDescriptor {
64 pub position: u64,
65 pub size: u64,
66}
67
68#[derive(Debug)]
70pub struct FileStatistics {
71 pub columns: Vec<ColumnStatistics>,
73}
74
75#[derive(Debug)]
77pub struct ColumnStatistics {
78 pub num_pages: usize,
80 pub size_bytes: u64,
84}
85
86#[derive(Debug)]
88pub struct CachedFileMetadata {
89 pub file_schema: Arc<Schema>,
91 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
93 pub column_infos: Vec<Arc<ColumnInfo>>,
94 pub num_rows: u64,
96 pub file_buffers: Vec<BufferDescriptor>,
97 pub num_data_bytes: u64,
99 pub num_column_metadata_bytes: u64,
102 pub num_global_buffer_bytes: u64,
104 pub num_footer_bytes: u64,
106 pub major_version: u16,
107 pub minor_version: u16,
108}
109
110impl DeepSizeOf for CachedFileMetadata {
111 fn deep_size_of_children(&self, context: &mut Context) -> usize {
113 self.file_schema.deep_size_of_children(context)
114 + self
115 .file_buffers
116 .iter()
117 .map(|file_buffer| file_buffer.deep_size_of_children(context))
118 .sum::<usize>()
119 }
120}
121
122impl CachedFileMetadata {
123 pub fn version(&self) -> LanceFileVersion {
124 match (self.major_version, self.minor_version) {
125 (0, 3) => LanceFileVersion::V2_0,
126 (2, 1) => LanceFileVersion::V2_1,
127 (2, 2) => LanceFileVersion::V2_2,
128 _ => panic!(
129 "Unsupported version: {}.{}",
130 self.major_version, self.minor_version
131 ),
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
158pub struct ReaderProjection {
159 pub schema: Arc<Schema>,
162 pub column_indices: Vec<u32>,
202}
203
204impl ReaderProjection {
205 fn from_field_ids_helper<'a>(
206 file_version: LanceFileVersion,
207 fields: impl Iterator<Item = &'a Field>,
208 field_id_to_column_index: &BTreeMap<u32, u32>,
209 column_indices: &mut Vec<u32>,
210 ) -> Result<()> {
211 for field in fields {
212 let is_structural = file_version >= LanceFileVersion::V2_1;
213 if !is_structural
216 || field.children.is_empty()
217 || field.is_blob()
218 || field.is_packed_struct()
219 {
220 if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
221 {
222 column_indices.push(column_idx);
223 }
224 }
225 if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
227 Self::from_field_ids_helper(
228 file_version,
229 field.children.iter(),
230 field_id_to_column_index,
231 column_indices,
232 )?;
233 }
234 }
235 Ok(())
236 }
237
238 pub fn from_field_ids(
243 file_version: LanceFileVersion,
244 schema: &Schema,
245 field_id_to_column_index: &BTreeMap<u32, u32>,
246 ) -> Result<Self> {
247 let mut column_indices = Vec::new();
248 Self::from_field_ids_helper(
249 file_version,
250 schema.fields.iter(),
251 field_id_to_column_index,
252 &mut column_indices,
253 )?;
254 Ok(Self {
255 schema: Arc::new(schema.clone()),
256 column_indices,
257 })
258 }
259
260 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
268 let schema = Arc::new(schema.clone());
269 let is_structural = version >= LanceFileVersion::V2_1;
270 let mut column_indices = vec![];
271 let mut curr_column_idx = 0;
272 let mut packed_struct_fields_num = 0;
273 for field in schema.fields_pre_order() {
274 if packed_struct_fields_num > 0 {
275 packed_struct_fields_num -= 1;
276 continue;
277 }
278 if field.is_packed_struct() {
279 column_indices.push(curr_column_idx);
280 curr_column_idx += 1;
281 packed_struct_fields_num = field.children.len();
282 } else if field.children.is_empty() || !is_structural {
283 column_indices.push(curr_column_idx);
284 curr_column_idx += 1;
285 }
286 }
287 Self {
288 schema,
289 column_indices,
290 }
291 }
292
293 pub fn from_column_names(
300 file_version: LanceFileVersion,
301 schema: &Schema,
302 column_names: &[&str],
303 ) -> Result<Self> {
304 let field_id_to_column_index = schema
305 .fields_pre_order()
306 .filter(|field| {
309 file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
310 })
311 .enumerate()
312 .map(|(idx, field)| (field.id as u32, idx as u32))
313 .collect::<BTreeMap<_, _>>();
314 let projected = schema.project(column_names)?;
315 let mut column_indices = Vec::new();
316 Self::from_field_ids_helper(
317 file_version,
318 projected.fields.iter(),
319 &field_id_to_column_index,
320 &mut column_indices,
321 )?;
322 Ok(Self {
323 schema: Arc::new(projected),
324 column_indices,
325 })
326 }
327}
328
329#[derive(Clone, Debug)]
331pub struct FileReaderOptions {
332 pub decoder_config: DecoderConfig,
333 pub read_chunk_size: u64,
337}
338
339impl Default for FileReaderOptions {
340 fn default() -> Self {
341 Self {
342 decoder_config: DecoderConfig::default(),
343 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
344 }
345 }
346}
347
348#[derive(Debug)]
349pub struct FileReader {
350 scheduler: Arc<dyn EncodingsIo>,
351 base_projection: ReaderProjection,
353 num_rows: u64,
354 metadata: Arc<CachedFileMetadata>,
355 decoder_plugins: Arc<DecoderPlugins>,
356 cache: Arc<LanceCache>,
357 options: FileReaderOptions,
358}
359#[derive(Debug)]
360struct Footer {
361 #[allow(dead_code)]
362 column_meta_start: u64,
363 #[allow(dead_code)]
366 column_meta_offsets_start: u64,
367 global_buff_offsets_start: u64,
368 num_global_buffers: u32,
369 num_columns: u32,
370 major_version: u16,
371 minor_version: u16,
372}
373
374const FOOTER_LEN: usize = 40;
375
376impl FileReader {
377 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
378 Self {
379 scheduler,
380 base_projection: self.base_projection.clone(),
381 cache: self.cache.clone(),
382 decoder_plugins: self.decoder_plugins.clone(),
383 metadata: self.metadata.clone(),
384 options: self.options.clone(),
385 num_rows: self.num_rows,
386 }
387 }
388
389 pub fn num_rows(&self) -> u64 {
390 self.num_rows
391 }
392
393 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
394 &self.metadata
395 }
396
397 pub fn file_statistics(&self) -> FileStatistics {
398 let column_metadatas = &self.metadata().column_metadatas;
399
400 let column_stats = column_metadatas
401 .iter()
402 .map(|col_metadata| {
403 let num_pages = col_metadata.pages.len();
404 let size_bytes = col_metadata
405 .pages
406 .iter()
407 .map(|page| page.buffer_sizes.iter().sum::<u64>())
408 .sum::<u64>();
409 ColumnStatistics {
410 num_pages,
411 size_bytes,
412 }
413 })
414 .collect();
415
416 FileStatistics {
417 columns: column_stats,
418 }
419 }
420
421 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
422 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
423 self.scheduler
424 .submit_single(
425 buffer_desc.position..buffer_desc.position + buffer_desc.size,
426 0,
427 )
428 .await
429 }
430
431 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
432 let file_size = scheduler.reader().size().await? as u64;
433 let begin = if file_size < scheduler.reader().block_size() as u64 {
434 0
435 } else {
436 file_size - scheduler.reader().block_size() as u64
437 };
438 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
439 Ok((tail_bytes, file_size))
440 }
441
442 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
445 let len = footer_bytes.len();
446 if len < FOOTER_LEN {
447 return Err(Error::io(
448 format!(
449 "does not have sufficient data, len: {}, bytes: {:?}",
450 len, footer_bytes
451 ),
452 location!(),
453 ));
454 }
455 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
456
457 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
458 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
459 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
460 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
461 let num_columns = cursor.read_u32::<LittleEndian>()?;
462 let major_version = cursor.read_u16::<LittleEndian>()?;
463 let minor_version = cursor.read_u16::<LittleEndian>()?;
464
465 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
466 return Err(Error::version_conflict(
467 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
468 major_version,
469 minor_version,
470 location!(),
471 ));
472 }
473
474 let magic_bytes = footer_bytes.slice(len - 4..);
475 if magic_bytes.as_ref() != MAGIC {
476 return Err(Error::io(
477 format!(
478 "file does not appear to be a Lance file (invalid magic: {:?})",
479 MAGIC
480 ),
481 location!(),
482 ));
483 }
484 Ok(Footer {
485 column_meta_start,
486 column_meta_offsets_start,
487 global_buff_offsets_start,
488 num_global_buffers,
489 num_columns,
490 major_version,
491 minor_version,
492 })
493 }
494
495 fn read_all_column_metadata(
497 column_metadata_bytes: Bytes,
498 footer: &Footer,
499 ) -> Result<Vec<pbfile::ColumnMetadata>> {
500 let column_metadata_start = footer.column_meta_start;
501 let cmo_table_size = 16 * footer.num_columns as usize;
503 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
504
505 (0..footer.num_columns)
506 .map(|col_idx| {
507 let offset = (col_idx * 16) as usize;
508 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
509 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
510 let normalized_position = (position - column_metadata_start) as usize;
511 let normalized_end = normalized_position + (length as usize);
512 Ok(pbfile::ColumnMetadata::decode(
513 &column_metadata_bytes[normalized_position..normalized_end],
514 )?)
515 })
516 .collect::<Result<Vec<_>>>()
517 }
518
519 async fn optimistic_tail_read(
520 data: &Bytes,
521 start_pos: u64,
522 scheduler: &FileScheduler,
523 file_len: u64,
524 ) -> Result<Bytes> {
525 let num_bytes_needed = (file_len - start_pos) as usize;
526 if data.len() >= num_bytes_needed {
527 Ok(data.slice((data.len() - num_bytes_needed)..))
528 } else {
529 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
530 let start = file_len - num_bytes_needed as u64;
531 let missing_bytes = scheduler
532 .submit_single(start..start + num_bytes_missing, 0)
533 .await?;
534 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
535 combined.extend(missing_bytes);
536 combined.extend(data);
537 Ok(combined.freeze())
538 }
539 }
540
541 fn do_decode_gbo_table(
542 gbo_bytes: &Bytes,
543 footer: &Footer,
544 version: LanceFileVersion,
545 ) -> Result<Vec<BufferDescriptor>> {
546 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
547
548 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
549 for _ in 0..footer.num_global_buffers {
550 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
551 assert!(
552 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
553 );
554 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
555 global_buffers.push(BufferDescriptor {
556 position: buf_pos,
557 size: buf_size,
558 });
559 }
560
561 Ok(global_buffers)
562 }
563
564 async fn decode_gbo_table(
565 tail_bytes: &Bytes,
566 file_len: u64,
567 scheduler: &FileScheduler,
568 footer: &Footer,
569 version: LanceFileVersion,
570 ) -> Result<Vec<BufferDescriptor>> {
571 let gbo_bytes = Self::optimistic_tail_read(
574 tail_bytes,
575 footer.global_buff_offsets_start,
576 scheduler,
577 file_len,
578 )
579 .await?;
580 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
581 }
582
583 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
584 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
585 let pb_schema = file_descriptor.schema.unwrap();
586 let num_rows = file_descriptor.length;
587 let fields_with_meta = FieldsWithMeta {
588 fields: Fields(pb_schema.fields),
589 metadata: pb_schema.metadata,
590 };
591 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
592 Ok((num_rows, schema))
593 }
594
595 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
609 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
611 let footer = Self::decode_footer(&tail_bytes)?;
612
613 let file_version = LanceFileVersion::try_from_major_minor(
614 footer.major_version as u32,
615 footer.minor_version as u32,
616 )?;
617
618 let gbo_table =
619 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
620 if gbo_table.is_empty() {
621 return Err(Error::Internal {
622 message: "File did not contain any global buffers, schema expected".to_string(),
623 location: location!(),
624 });
625 }
626 let schema_start = gbo_table[0].position;
627 let schema_size = gbo_table[0].size;
628
629 let num_footer_bytes = file_len - schema_start;
630
631 let all_metadata_bytes =
634 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
635
636 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
637 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
638
639 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
642 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
643 let column_metadata_bytes =
644 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
645 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
646
647 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
648 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
649 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
650
651 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
652
653 Ok(CachedFileMetadata {
654 file_schema: Arc::new(schema),
655 column_metadatas,
656 column_infos,
657 num_rows,
658 num_data_bytes,
659 num_column_metadata_bytes,
660 num_global_buffer_bytes,
661 num_footer_bytes,
662 file_buffers: gbo_table,
663 major_version: footer.major_version,
664 minor_version: footer.minor_version,
665 })
666 }
667
668 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
669 match &encoding.location {
670 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
671 Some(pbfile::encoding::Location::Direct(encoding)) => {
672 let encoding_buf = Bytes::from(encoding.encoding.clone());
673 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
674 encoding_any.to_msg::<M>().unwrap()
675 }
676 Some(pbfile::encoding::Location::None(_)) => panic!(),
677 None => panic!(),
678 }
679 }
680
681 fn meta_to_col_infos(
682 column_metadatas: &[pbfile::ColumnMetadata],
683 file_version: LanceFileVersion,
684 ) -> Vec<Arc<ColumnInfo>> {
685 column_metadatas
686 .iter()
687 .enumerate()
688 .map(|(col_idx, col_meta)| {
689 let page_infos = col_meta
690 .pages
691 .iter()
692 .map(|page| {
693 let num_rows = page.length;
694 let encoding = match file_version {
695 LanceFileVersion::V2_0 => {
696 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
697 page.encoding.as_ref().unwrap(),
698 ))
699 }
700 _ => PageEncoding::Structural(Self::fetch_encoding::<
701 pbenc21::PageLayout,
702 >(
703 page.encoding.as_ref().unwrap()
704 )),
705 };
706 let buffer_offsets_and_sizes = Arc::from(
707 page.buffer_offsets
708 .iter()
709 .zip(page.buffer_sizes.iter())
710 .map(|(offset, size)| {
711 assert!(
713 file_version < LanceFileVersion::V2_1
714 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
715 );
716 (*offset, *size)
717 })
718 .collect::<Vec<_>>(),
719 );
720 PageInfo {
721 buffer_offsets_and_sizes,
722 encoding,
723 num_rows,
724 priority: page.priority,
725 }
726 })
727 .collect::<Vec<_>>();
728 let buffer_offsets_and_sizes = Arc::from(
729 col_meta
730 .buffer_offsets
731 .iter()
732 .zip(col_meta.buffer_sizes.iter())
733 .map(|(offset, size)| (*offset, *size))
734 .collect::<Vec<_>>(),
735 );
736 Arc::new(ColumnInfo {
737 index: col_idx as u32,
738 page_infos: Arc::from(page_infos),
739 buffer_offsets_and_sizes,
740 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
741 })
742 })
743 .collect::<Vec<_>>()
744 }
745
746 fn validate_projection(
747 projection: &ReaderProjection,
748 metadata: &CachedFileMetadata,
749 ) -> Result<()> {
750 if projection.schema.fields.is_empty() {
751 return Err(Error::invalid_input(
752 "Attempt to read zero columns from the file, at least one column must be specified"
753 .to_string(),
754 location!(),
755 ));
756 }
757 let mut column_indices_seen = BTreeSet::new();
758 for column_index in &projection.column_indices {
759 if !column_indices_seen.insert(*column_index) {
760 return Err(Error::invalid_input(
761 format!(
762 "The projection specified the column index {} more than once",
763 column_index
764 ),
765 location!(),
766 ));
767 }
768 if *column_index >= metadata.column_infos.len() as u32 {
769 return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
770 }
771 }
772 Ok(())
773 }
774
775 pub async fn try_open(
782 scheduler: FileScheduler,
783 base_projection: Option<ReaderProjection>,
784 decoder_plugins: Arc<DecoderPlugins>,
785 cache: &LanceCache,
786 options: FileReaderOptions,
787 ) -> Result<Self> {
788 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
789 let path = scheduler.reader().path().clone();
790
791 let encodings_io =
793 LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
794
795 Self::try_open_with_file_metadata(
796 Arc::new(encodings_io),
797 path,
798 base_projection,
799 decoder_plugins,
800 file_metadata,
801 cache,
802 options,
803 )
804 .await
805 }
806
807 pub async fn try_open_with_file_metadata(
813 scheduler: Arc<dyn EncodingsIo>,
814 path: Path,
815 base_projection: Option<ReaderProjection>,
816 decoder_plugins: Arc<DecoderPlugins>,
817 file_metadata: Arc<CachedFileMetadata>,
818 cache: &LanceCache,
819 options: FileReaderOptions,
820 ) -> Result<Self> {
821 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
822
823 if let Some(base_projection) = base_projection.as_ref() {
824 Self::validate_projection(base_projection, &file_metadata)?;
825 }
826 let num_rows = file_metadata.num_rows;
827 Ok(Self {
828 scheduler,
829 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
830 file_metadata.file_schema.as_ref(),
831 file_metadata.version(),
832 )),
833 num_rows,
834 metadata: file_metadata,
835 decoder_plugins,
836 cache,
837 options,
838 })
839 }
840
841 fn collect_columns_from_projection(
855 &self,
856 _projection: &ReaderProjection,
857 ) -> Result<Vec<Arc<ColumnInfo>>> {
858 Ok(self.metadata.column_infos.to_vec())
859 }
860
861 #[allow(clippy::too_many_arguments)]
862 fn do_read_range(
863 column_infos: Vec<Arc<ColumnInfo>>,
864 io: Arc<dyn EncodingsIo>,
865 cache: Arc<LanceCache>,
866 num_rows: u64,
867 decoder_plugins: Arc<DecoderPlugins>,
868 range: Range<u64>,
869 batch_size: u32,
870 projection: ReaderProjection,
871 filter: FilterExpression,
872 decoder_config: DecoderConfig,
873 ) -> Result<BoxStream<'static, ReadBatchTask>> {
874 debug!(
875 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
876 range,
877 batch_size,
878 num_rows,
879 column_infos.len(),
880 projection.schema.fields.len(),
881 );
882
883 let config = SchedulerDecoderConfig {
884 batch_size,
885 cache,
886 decoder_plugins,
887 io,
888 decoder_config,
889 };
890
891 let requested_rows = RequestedRows::Ranges(vec![range]);
892
893 Ok(schedule_and_decode(
894 column_infos,
895 requested_rows,
896 filter,
897 projection.column_indices,
898 projection.schema,
899 config,
900 ))
901 }
902
903 fn read_range(
904 &self,
905 range: Range<u64>,
906 batch_size: u32,
907 projection: ReaderProjection,
908 filter: FilterExpression,
909 ) -> Result<BoxStream<'static, ReadBatchTask>> {
910 Self::do_read_range(
912 self.collect_columns_from_projection(&projection)?,
913 self.scheduler.clone(),
914 self.cache.clone(),
915 self.num_rows,
916 self.decoder_plugins.clone(),
917 range,
918 batch_size,
919 projection,
920 filter,
921 self.options.decoder_config.clone(),
922 )
923 }
924
925 #[allow(clippy::too_many_arguments)]
926 fn do_take_rows(
927 column_infos: Vec<Arc<ColumnInfo>>,
928 io: Arc<dyn EncodingsIo>,
929 cache: Arc<LanceCache>,
930 decoder_plugins: Arc<DecoderPlugins>,
931 indices: Vec<u64>,
932 batch_size: u32,
933 projection: ReaderProjection,
934 filter: FilterExpression,
935 decoder_config: DecoderConfig,
936 ) -> Result<BoxStream<'static, ReadBatchTask>> {
937 debug!(
938 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
939 indices.len(),
940 indices[0],
941 indices[indices.len() - 1],
942 batch_size,
943 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
944 );
945
946 let config = SchedulerDecoderConfig {
947 batch_size,
948 cache,
949 decoder_plugins,
950 io,
951 decoder_config,
952 };
953
954 let requested_rows = RequestedRows::Indices(indices);
955
956 Ok(schedule_and_decode(
957 column_infos,
958 requested_rows,
959 filter,
960 projection.column_indices,
961 projection.schema,
962 config,
963 ))
964 }
965
966 fn take_rows(
967 &self,
968 indices: Vec<u64>,
969 batch_size: u32,
970 projection: ReaderProjection,
971 ) -> Result<BoxStream<'static, ReadBatchTask>> {
972 Self::do_take_rows(
974 self.collect_columns_from_projection(&projection)?,
975 self.scheduler.clone(),
976 self.cache.clone(),
977 self.decoder_plugins.clone(),
978 indices,
979 batch_size,
980 projection,
981 FilterExpression::no_filter(),
982 self.options.decoder_config.clone(),
983 )
984 }
985
986 #[allow(clippy::too_many_arguments)]
987 fn do_read_ranges(
988 column_infos: Vec<Arc<ColumnInfo>>,
989 io: Arc<dyn EncodingsIo>,
990 cache: Arc<LanceCache>,
991 decoder_plugins: Arc<DecoderPlugins>,
992 ranges: Vec<Range<u64>>,
993 batch_size: u32,
994 projection: ReaderProjection,
995 filter: FilterExpression,
996 decoder_config: DecoderConfig,
997 ) -> Result<BoxStream<'static, ReadBatchTask>> {
998 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
999 debug!(
1000 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1001 ranges.len(),
1002 num_rows,
1003 ranges[0].start,
1004 ranges[ranges.len() - 1].end,
1005 batch_size,
1006 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1007 );
1008
1009 let config = SchedulerDecoderConfig {
1010 batch_size,
1011 cache,
1012 decoder_plugins,
1013 io,
1014 decoder_config,
1015 };
1016
1017 let requested_rows = RequestedRows::Ranges(ranges);
1018
1019 Ok(schedule_and_decode(
1020 column_infos,
1021 requested_rows,
1022 filter,
1023 projection.column_indices,
1024 projection.schema,
1025 config,
1026 ))
1027 }
1028
1029 fn read_ranges(
1030 &self,
1031 ranges: Vec<Range<u64>>,
1032 batch_size: u32,
1033 projection: ReaderProjection,
1034 filter: FilterExpression,
1035 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1036 Self::do_read_ranges(
1037 self.collect_columns_from_projection(&projection)?,
1038 self.scheduler.clone(),
1039 self.cache.clone(),
1040 self.decoder_plugins.clone(),
1041 ranges,
1042 batch_size,
1043 projection,
1044 filter,
1045 self.options.decoder_config.clone(),
1046 )
1047 }
1048
1049 pub fn read_tasks(
1060 &self,
1061 params: ReadBatchParams,
1062 batch_size: u32,
1063 projection: Option<ReaderProjection>,
1064 filter: FilterExpression,
1065 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1066 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1067 Self::validate_projection(&projection, &self.metadata)?;
1068 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1069 if bound > self.num_rows || bound == self.num_rows && inclusive {
1070 Err(Error::invalid_input(
1071 format!(
1072 "cannot read {:?} from file with {} rows",
1073 params, self.num_rows
1074 ),
1075 location!(),
1076 ))
1077 } else {
1078 Ok(())
1079 }
1080 };
1081 match ¶ms {
1082 ReadBatchParams::Indices(indices) => {
1083 for idx in indices {
1084 match idx {
1085 None => {
1086 return Err(Error::invalid_input(
1087 "Null value in indices array",
1088 location!(),
1089 ));
1090 }
1091 Some(idx) => {
1092 verify_bound(¶ms, idx as u64, true)?;
1093 }
1094 }
1095 }
1096 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1097 self.take_rows(indices, batch_size, projection)
1098 }
1099 ReadBatchParams::Range(range) => {
1100 verify_bound(¶ms, range.end as u64, false)?;
1101 self.read_range(
1102 range.start as u64..range.end as u64,
1103 batch_size,
1104 projection,
1105 filter,
1106 )
1107 }
1108 ReadBatchParams::Ranges(ranges) => {
1109 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1110 for range in ranges.as_ref() {
1111 verify_bound(¶ms, range.end, false)?;
1112 ranges_u64.push(range.start..range.end);
1113 }
1114 self.read_ranges(ranges_u64, batch_size, projection, filter)
1115 }
1116 ReadBatchParams::RangeFrom(range) => {
1117 verify_bound(¶ms, range.start as u64, true)?;
1118 self.read_range(
1119 range.start as u64..self.num_rows,
1120 batch_size,
1121 projection,
1122 filter,
1123 )
1124 }
1125 ReadBatchParams::RangeTo(range) => {
1126 verify_bound(¶ms, range.end as u64, false)?;
1127 self.read_range(0..range.end as u64, batch_size, projection, filter)
1128 }
1129 ReadBatchParams::RangeFull => {
1130 self.read_range(0..self.num_rows, batch_size, projection, filter)
1131 }
1132 }
1133 }
1134
1135 pub fn read_stream_projected(
1157 &self,
1158 params: ReadBatchParams,
1159 batch_size: u32,
1160 batch_readahead: u32,
1161 projection: ReaderProjection,
1162 filter: FilterExpression,
1163 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1164 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1165 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1166 let batch_stream = tasks_stream
1167 .map(|task| task.task)
1168 .buffered(batch_readahead as usize)
1169 .boxed();
1170 Ok(Box::pin(RecordBatchStreamAdapter::new(
1171 arrow_schema,
1172 batch_stream,
1173 )))
1174 }
1175
1176 fn take_rows_blocking(
1177 &self,
1178 indices: Vec<u64>,
1179 batch_size: u32,
1180 projection: ReaderProjection,
1181 filter: FilterExpression,
1182 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1183 let column_infos = self.collect_columns_from_projection(&projection)?;
1184 debug!(
1185 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1186 indices.len(),
1187 indices[0],
1188 indices[indices.len() - 1],
1189 batch_size,
1190 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1191 );
1192
1193 let config = SchedulerDecoderConfig {
1194 batch_size,
1195 cache: self.cache.clone(),
1196 decoder_plugins: self.decoder_plugins.clone(),
1197 io: self.scheduler.clone(),
1198 decoder_config: self.options.decoder_config.clone(),
1199 };
1200
1201 let requested_rows = RequestedRows::Indices(indices);
1202
1203 schedule_and_decode_blocking(
1204 column_infos,
1205 requested_rows,
1206 filter,
1207 projection.column_indices,
1208 projection.schema,
1209 config,
1210 )
1211 }
1212
1213 fn read_ranges_blocking(
1214 &self,
1215 ranges: Vec<Range<u64>>,
1216 batch_size: u32,
1217 projection: ReaderProjection,
1218 filter: FilterExpression,
1219 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1220 let column_infos = self.collect_columns_from_projection(&projection)?;
1221 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1222 debug!(
1223 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1224 ranges.len(),
1225 num_rows,
1226 ranges[0].start,
1227 ranges[ranges.len() - 1].end,
1228 batch_size,
1229 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1230 );
1231
1232 let config = SchedulerDecoderConfig {
1233 batch_size,
1234 cache: self.cache.clone(),
1235 decoder_plugins: self.decoder_plugins.clone(),
1236 io: self.scheduler.clone(),
1237 decoder_config: self.options.decoder_config.clone(),
1238 };
1239
1240 let requested_rows = RequestedRows::Ranges(ranges);
1241
1242 schedule_and_decode_blocking(
1243 column_infos,
1244 requested_rows,
1245 filter,
1246 projection.column_indices,
1247 projection.schema,
1248 config,
1249 )
1250 }
1251
1252 fn read_range_blocking(
1253 &self,
1254 range: Range<u64>,
1255 batch_size: u32,
1256 projection: ReaderProjection,
1257 filter: FilterExpression,
1258 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1259 let column_infos = self.collect_columns_from_projection(&projection)?;
1260 let num_rows = self.num_rows;
1261
1262 debug!(
1263 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1264 range,
1265 batch_size,
1266 num_rows,
1267 column_infos.len(),
1268 projection.schema.fields.len(),
1269 );
1270
1271 let config = SchedulerDecoderConfig {
1272 batch_size,
1273 cache: self.cache.clone(),
1274 decoder_plugins: self.decoder_plugins.clone(),
1275 io: self.scheduler.clone(),
1276 decoder_config: self.options.decoder_config.clone(),
1277 };
1278
1279 let requested_rows = RequestedRows::Ranges(vec![range]);
1280
1281 schedule_and_decode_blocking(
1282 column_infos,
1283 requested_rows,
1284 filter,
1285 projection.column_indices,
1286 projection.schema,
1287 config,
1288 )
1289 }
1290
1291 pub fn read_stream_projected_blocking(
1303 &self,
1304 params: ReadBatchParams,
1305 batch_size: u32,
1306 projection: Option<ReaderProjection>,
1307 filter: FilterExpression,
1308 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1309 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1310 Self::validate_projection(&projection, &self.metadata)?;
1311 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1312 if bound > self.num_rows || bound == self.num_rows && inclusive {
1313 Err(Error::invalid_input(
1314 format!(
1315 "cannot read {:?} from file with {} rows",
1316 params, self.num_rows
1317 ),
1318 location!(),
1319 ))
1320 } else {
1321 Ok(())
1322 }
1323 };
1324 match ¶ms {
1325 ReadBatchParams::Indices(indices) => {
1326 for idx in indices {
1327 match idx {
1328 None => {
1329 return Err(Error::invalid_input(
1330 "Null value in indices array",
1331 location!(),
1332 ));
1333 }
1334 Some(idx) => {
1335 verify_bound(¶ms, idx as u64, true)?;
1336 }
1337 }
1338 }
1339 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1340 self.take_rows_blocking(indices, batch_size, projection, filter)
1341 }
1342 ReadBatchParams::Range(range) => {
1343 verify_bound(¶ms, range.end as u64, false)?;
1344 self.read_range_blocking(
1345 range.start as u64..range.end as u64,
1346 batch_size,
1347 projection,
1348 filter,
1349 )
1350 }
1351 ReadBatchParams::Ranges(ranges) => {
1352 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1353 for range in ranges.as_ref() {
1354 verify_bound(¶ms, range.end, false)?;
1355 ranges_u64.push(range.start..range.end);
1356 }
1357 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1358 }
1359 ReadBatchParams::RangeFrom(range) => {
1360 verify_bound(¶ms, range.start as u64, true)?;
1361 self.read_range_blocking(
1362 range.start as u64..self.num_rows,
1363 batch_size,
1364 projection,
1365 filter,
1366 )
1367 }
1368 ReadBatchParams::RangeTo(range) => {
1369 verify_bound(¶ms, range.end as u64, false)?;
1370 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1371 }
1372 ReadBatchParams::RangeFull => {
1373 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1374 }
1375 }
1376 }
1377
1378 pub fn read_stream(
1384 &self,
1385 params: ReadBatchParams,
1386 batch_size: u32,
1387 batch_readahead: u32,
1388 filter: FilterExpression,
1389 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1390 self.read_stream_projected(
1391 params,
1392 batch_size,
1393 batch_readahead,
1394 self.base_projection.clone(),
1395 filter,
1396 )
1397 }
1398
1399 pub fn schema(&self) -> &Arc<Schema> {
1400 &self.metadata.file_schema
1401 }
1402}
1403
1404pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1406 if let Some(encoding) = &page.encoding {
1407 if let Some(style) = &encoding.location {
1408 match style {
1409 pbfile::encoding::Location::Indirect(indirect) => {
1410 format!(
1411 "IndirectEncoding(pos={},size={})",
1412 indirect.buffer_location, indirect.buffer_length
1413 )
1414 }
1415 pbfile::encoding::Location::Direct(direct) => {
1416 let encoding_any =
1417 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1418 .expect("failed to deserialize encoding as protobuf");
1419 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1420 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1421 match encoding {
1422 Ok(encoding) => {
1423 format!("{:#?}", encoding)
1424 }
1425 Err(err) => {
1426 format!("Unsupported(decode_err={})", err)
1427 }
1428 }
1429 } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1430 let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1431 match encoding {
1432 Ok(encoding) => {
1433 format!("{:#?}", encoding)
1434 }
1435 Err(err) => {
1436 format!("Unsupported(decode_err={})", err)
1437 }
1438 }
1439 } else {
1440 format!("Unrecognized(type_url={})", encoding_any.type_url)
1441 }
1442 }
1443 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1444 }
1445 } else {
1446 "MISSING STYLE".to_string()
1447 }
1448 } else {
1449 "MISSING".to_string()
1450 }
1451}
1452
1453pub trait EncodedBatchReaderExt {
1454 fn try_from_mini_lance(
1455 bytes: Bytes,
1456 schema: &Schema,
1457 version: LanceFileVersion,
1458 ) -> Result<Self>
1459 where
1460 Self: Sized;
1461 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1462 where
1463 Self: Sized;
1464}
1465
1466impl EncodedBatchReaderExt for EncodedBatch {
1467 fn try_from_mini_lance(
1468 bytes: Bytes,
1469 schema: &Schema,
1470 file_version: LanceFileVersion,
1471 ) -> Result<Self>
1472 where
1473 Self: Sized,
1474 {
1475 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1476 let footer = FileReader::decode_footer(&bytes)?;
1477
1478 let column_metadata_start = footer.column_meta_start as usize;
1481 let column_metadata_end = footer.global_buff_offsets_start as usize;
1482 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1483 let column_metadatas =
1484 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1485
1486 let file_version = LanceFileVersion::try_from_major_minor(
1487 footer.major_version as u32,
1488 footer.minor_version as u32,
1489 )?;
1490
1491 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1492
1493 Ok(Self {
1494 data: bytes,
1495 num_rows: page_table
1496 .first()
1497 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1498 .unwrap_or(0),
1499 page_table,
1500 top_level_columns: projection.column_indices,
1501 schema: Arc::new(schema.clone()),
1502 })
1503 }
1504
1505 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1506 where
1507 Self: Sized,
1508 {
1509 let footer = FileReader::decode_footer(&bytes)?;
1510 let file_version = LanceFileVersion::try_from_major_minor(
1511 footer.major_version as u32,
1512 footer.minor_version as u32,
1513 )?;
1514
1515 let gbo_table = FileReader::do_decode_gbo_table(
1516 &bytes.slice(footer.global_buff_offsets_start as usize..),
1517 &footer,
1518 file_version,
1519 )?;
1520 if gbo_table.is_empty() {
1521 return Err(Error::Internal {
1522 message: "File did not contain any global buffers, schema expected".to_string(),
1523 location: location!(),
1524 });
1525 }
1526 let schema_start = gbo_table[0].position as usize;
1527 let schema_size = gbo_table[0].size as usize;
1528
1529 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1530 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1531 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1532
1533 let column_metadata_start = footer.column_meta_start as usize;
1536 let column_metadata_end = footer.global_buff_offsets_start as usize;
1537 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1538 let column_metadatas =
1539 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1540
1541 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1542
1543 Ok(Self {
1544 data: bytes,
1545 num_rows: page_table
1546 .first()
1547 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1548 .unwrap_or(0),
1549 page_table,
1550 top_level_columns: projection.column_indices,
1551 schema: Arc::new(schema),
1552 })
1553 }
1554}
1555
1556#[cfg(test)]
1557pub mod tests {
1558 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1559
1560 use arrow_array::{
1561 types::{Float64Type, Int32Type},
1562 RecordBatch, UInt32Array,
1563 };
1564 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1565 use bytes::Bytes;
1566 use futures::{prelude::stream::TryStreamExt, StreamExt};
1567 use lance_arrow::RecordBatchExt;
1568 use lance_core::{datatypes::Schema, ArrowResult};
1569 use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1570 use lance_encoding::{
1571 decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1572 encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1573 version::LanceFileVersion,
1574 };
1575 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1576 use log::debug;
1577 use rstest::rstest;
1578 use tokio::sync::mpsc;
1579
1580 use crate::v2::{
1581 reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1582 testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1583 writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1584 };
1585 use lance_encoding::decoder::DecoderConfig;
1586
1587 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1588 let location_type = DataType::Struct(Fields::from(vec![
1589 Field::new("x", DataType::Float64, true),
1590 Field::new("y", DataType::Float64, true),
1591 ]));
1592 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1593
1594 let mut reader = gen_batch()
1595 .col("score", array::rand::<Float64Type>())
1596 .col("location", array::rand_type(&location_type))
1597 .col("categories", array::rand_type(&categories_type))
1598 .col("binary", array::rand_type(&DataType::Binary));
1599 if version <= LanceFileVersion::V2_0 {
1600 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1601 }
1602 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1603
1604 write_lance_file(
1605 reader,
1606 fs,
1607 FileWriterOptions {
1608 format_version: Some(version),
1609 ..Default::default()
1610 },
1611 )
1612 .await
1613 }
1614
1615 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1616
1617 async fn verify_expected(
1618 expected: &[RecordBatch],
1619 mut actual: Pin<Box<dyn RecordBatchStream>>,
1620 read_size: u32,
1621 transform: Option<Transformer>,
1622 ) {
1623 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1624 let mut expected_iter = expected.iter().map(|batch| {
1625 if let Some(transform) = &transform {
1626 transform(batch)
1627 } else {
1628 batch.clone()
1629 }
1630 });
1631 let mut next_expected = expected_iter.next().unwrap().clone();
1632 while let Some(actual) = actual.next().await {
1633 let mut actual = actual.unwrap();
1634 let mut rows_to_verify = actual.num_rows() as u32;
1635 let expected_length = remaining.min(read_size);
1636 assert_eq!(expected_length, rows_to_verify);
1637
1638 while rows_to_verify > 0 {
1639 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1640 assert_eq!(
1641 next_expected.slice(0, next_slice_len as usize),
1642 actual.slice(0, next_slice_len as usize)
1643 );
1644 remaining -= next_slice_len;
1645 rows_to_verify -= next_slice_len;
1646 if remaining > 0 {
1647 if next_slice_len == next_expected.num_rows() as u32 {
1648 next_expected = expected_iter.next().unwrap().clone();
1649 } else {
1650 next_expected = next_expected.slice(
1651 next_slice_len as usize,
1652 next_expected.num_rows() - next_slice_len as usize,
1653 );
1654 }
1655 }
1656 if rows_to_verify > 0 {
1657 actual = actual.slice(
1658 next_slice_len as usize,
1659 actual.num_rows() - next_slice_len as usize,
1660 );
1661 }
1662 }
1663 }
1664 assert_eq!(remaining, 0);
1665 }
1666
1667 #[tokio::test]
1668 async fn test_round_trip() {
1669 let fs = FsFixture::default();
1670
1671 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1672
1673 for read_size in [32, 1024, 1024 * 1024] {
1674 let file_scheduler = fs
1675 .scheduler
1676 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1677 .await
1678 .unwrap();
1679 let file_reader = FileReader::try_open(
1680 file_scheduler,
1681 None,
1682 Arc::<DecoderPlugins>::default(),
1683 &test_cache(),
1684 FileReaderOptions::default(),
1685 )
1686 .await
1687 .unwrap();
1688
1689 let schema = file_reader.schema();
1690 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1691
1692 let batch_stream = file_reader
1693 .read_stream(
1694 lance_io::ReadBatchParams::RangeFull,
1695 read_size,
1696 16,
1697 FilterExpression::no_filter(),
1698 )
1699 .unwrap();
1700
1701 verify_expected(&data, batch_stream, read_size, None).await;
1702 }
1703 }
1704
1705 #[rstest]
1706 #[test_log::test(tokio::test)]
1707 async fn test_encoded_batch_round_trip(
1708 #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1710 ) {
1711 let data = gen_batch()
1712 .col("x", array::rand::<Int32Type>())
1713 .col("y", array::rand_utf8(ByteCount::from(16), false))
1714 .into_batch_rows(RowCount::from(10000))
1715 .unwrap();
1716
1717 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1718
1719 let encoding_options = EncodingOptions {
1720 cache_bytes_per_column: 4096,
1721 max_page_bytes: 32 * 1024 * 1024,
1722 keep_original_array: true,
1723 buffer_alignment: 64,
1724 };
1725
1726 let encoding_strategy = default_encoding_strategy(version);
1727
1728 let encoded_batch = encode_batch(
1729 &data,
1730 lance_schema.clone(),
1731 encoding_strategy.as_ref(),
1732 &encoding_options,
1733 )
1734 .await
1735 .unwrap();
1736
1737 let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1739
1740 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1741
1742 let decoded = decode_batch(
1743 &decoded_batch,
1744 &FilterExpression::no_filter(),
1745 Arc::<DecoderPlugins>::default(),
1746 false,
1747 version,
1748 None,
1749 )
1750 .await
1751 .unwrap();
1752
1753 assert_eq!(data, decoded);
1754
1755 let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1757 let decoded_batch =
1758 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1759 .unwrap();
1760 let decoded = decode_batch(
1761 &decoded_batch,
1762 &FilterExpression::no_filter(),
1763 Arc::<DecoderPlugins>::default(),
1764 false,
1765 version,
1766 None,
1767 )
1768 .await
1769 .unwrap();
1770
1771 assert_eq!(data, decoded);
1772 }
1773
1774 #[rstest]
1775 #[test_log::test(tokio::test)]
1776 async fn test_projection(
1777 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1778 ) {
1779 let fs = FsFixture::default();
1780
1781 let written_file = create_some_file(&fs, version).await;
1782 let file_scheduler = fs
1783 .scheduler
1784 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1785 .await
1786 .unwrap();
1787
1788 let field_id_mapping = written_file
1789 .field_id_mapping
1790 .iter()
1791 .copied()
1792 .collect::<BTreeMap<_, _>>();
1793
1794 let empty_projection = ReaderProjection {
1795 column_indices: Vec::default(),
1796 schema: Arc::new(Schema::default()),
1797 };
1798
1799 for columns in [
1800 vec!["score"],
1801 vec!["location"],
1802 vec!["categories"],
1803 vec!["score.x"],
1804 vec!["score", "categories"],
1805 vec!["score", "location"],
1806 vec!["location", "categories"],
1807 vec!["score.y", "location", "categories"],
1808 ] {
1809 debug!("Testing round trip with projection {:?}", columns);
1810 for use_field_ids in [true, false] {
1811 let file_reader = FileReader::try_open(
1813 file_scheduler.clone(),
1814 None,
1815 Arc::<DecoderPlugins>::default(),
1816 &test_cache(),
1817 FileReaderOptions::default(),
1818 )
1819 .await
1820 .unwrap();
1821
1822 let projected_schema = written_file.schema.project(&columns).unwrap();
1823 let projection = if use_field_ids {
1824 ReaderProjection::from_field_ids(
1825 file_reader.metadata.version(),
1826 &projected_schema,
1827 &field_id_mapping,
1828 )
1829 .unwrap()
1830 } else {
1831 ReaderProjection::from_column_names(
1832 file_reader.metadata.version(),
1833 &written_file.schema,
1834 &columns,
1835 )
1836 .unwrap()
1837 };
1838
1839 let batch_stream = file_reader
1840 .read_stream_projected(
1841 lance_io::ReadBatchParams::RangeFull,
1842 1024,
1843 16,
1844 projection.clone(),
1845 FilterExpression::no_filter(),
1846 )
1847 .unwrap();
1848
1849 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1850 verify_expected(
1851 &written_file.data,
1852 batch_stream,
1853 1024,
1854 Some(Box::new(move |batch: &RecordBatch| {
1855 batch.project_by_schema(&projection_arrow).unwrap()
1856 })),
1857 )
1858 .await;
1859
1860 let file_reader = FileReader::try_open(
1862 file_scheduler.clone(),
1863 Some(projection.clone()),
1864 Arc::<DecoderPlugins>::default(),
1865 &test_cache(),
1866 FileReaderOptions::default(),
1867 )
1868 .await
1869 .unwrap();
1870
1871 let batch_stream = file_reader
1872 .read_stream(
1873 lance_io::ReadBatchParams::RangeFull,
1874 1024,
1875 16,
1876 FilterExpression::no_filter(),
1877 )
1878 .unwrap();
1879
1880 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1881 verify_expected(
1882 &written_file.data,
1883 batch_stream,
1884 1024,
1885 Some(Box::new(move |batch: &RecordBatch| {
1886 batch.project_by_schema(&projection_arrow).unwrap()
1887 })),
1888 )
1889 .await;
1890
1891 assert!(file_reader
1892 .read_stream_projected(
1893 lance_io::ReadBatchParams::RangeFull,
1894 1024,
1895 16,
1896 empty_projection.clone(),
1897 FilterExpression::no_filter(),
1898 )
1899 .is_err());
1900 }
1901 }
1902
1903 assert!(FileReader::try_open(
1904 file_scheduler.clone(),
1905 Some(empty_projection),
1906 Arc::<DecoderPlugins>::default(),
1907 &test_cache(),
1908 FileReaderOptions::default(),
1909 )
1910 .await
1911 .is_err());
1912
1913 let arrow_schema = ArrowSchema::new(vec![
1914 Field::new("x", DataType::Int32, true),
1915 Field::new("y", DataType::Int32, true),
1916 ]);
1917 let schema = Schema::try_from(&arrow_schema).unwrap();
1918
1919 let projection_with_dupes = ReaderProjection {
1920 column_indices: vec![0, 0],
1921 schema: Arc::new(schema),
1922 };
1923
1924 assert!(FileReader::try_open(
1925 file_scheduler.clone(),
1926 Some(projection_with_dupes),
1927 Arc::<DecoderPlugins>::default(),
1928 &test_cache(),
1929 FileReaderOptions::default(),
1930 )
1931 .await
1932 .is_err());
1933 }
1934
1935 #[test_log::test(tokio::test)]
1936 async fn test_compressing_buffer() {
1937 let fs = FsFixture::default();
1938
1939 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1940 let file_scheduler = fs
1941 .scheduler
1942 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1943 .await
1944 .unwrap();
1945
1946 let file_reader = FileReader::try_open(
1948 file_scheduler.clone(),
1949 None,
1950 Arc::<DecoderPlugins>::default(),
1951 &test_cache(),
1952 FileReaderOptions::default(),
1953 )
1954 .await
1955 .unwrap();
1956
1957 let mut projection = written_file.schema.project(&["score"]).unwrap();
1958 for field in projection.fields.iter_mut() {
1959 field
1960 .metadata
1961 .insert("lance:compression".to_string(), "zstd".to_string());
1962 }
1963 let projection = ReaderProjection {
1964 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1965 schema: Arc::new(projection),
1966 };
1967
1968 let batch_stream = file_reader
1969 .read_stream_projected(
1970 lance_io::ReadBatchParams::RangeFull,
1971 1024,
1972 16,
1973 projection.clone(),
1974 FilterExpression::no_filter(),
1975 )
1976 .unwrap();
1977
1978 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1979 verify_expected(
1980 &written_file.data,
1981 batch_stream,
1982 1024,
1983 Some(Box::new(move |batch: &RecordBatch| {
1984 batch.project_by_schema(&projection_arrow).unwrap()
1985 })),
1986 )
1987 .await;
1988 }
1989
1990 #[tokio::test]
1991 async fn test_read_all() {
1992 let fs = FsFixture::default();
1993 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1994 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1995
1996 let file_scheduler = fs
1997 .scheduler
1998 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1999 .await
2000 .unwrap();
2001 let file_reader = FileReader::try_open(
2002 file_scheduler.clone(),
2003 None,
2004 Arc::<DecoderPlugins>::default(),
2005 &test_cache(),
2006 FileReaderOptions::default(),
2007 )
2008 .await
2009 .unwrap();
2010
2011 let batches = file_reader
2012 .read_stream(
2013 lance_io::ReadBatchParams::RangeFull,
2014 total_rows as u32,
2015 16,
2016 FilterExpression::no_filter(),
2017 )
2018 .unwrap()
2019 .try_collect::<Vec<_>>()
2020 .await
2021 .unwrap();
2022 assert_eq!(batches.len(), 1);
2023 assert_eq!(batches[0].num_rows(), total_rows);
2024 }
2025
2026 #[rstest]
2027 #[tokio::test]
2028 async fn test_blocking_take(
2029 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
2030 ) {
2031 let fs = FsFixture::default();
2032 let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
2033 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2034
2035 let file_scheduler = fs
2036 .scheduler
2037 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2038 .await
2039 .unwrap();
2040 let file_reader = FileReader::try_open(
2041 file_scheduler.clone(),
2042 Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
2043 Arc::<DecoderPlugins>::default(),
2044 &test_cache(),
2045 FileReaderOptions::default(),
2046 )
2047 .await
2048 .unwrap();
2049
2050 let batches = tokio::task::spawn_blocking(move || {
2051 file_reader
2052 .read_stream_projected_blocking(
2053 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2054 total_rows as u32,
2055 None,
2056 FilterExpression::no_filter(),
2057 )
2058 .unwrap()
2059 .collect::<ArrowResult<Vec<_>>>()
2060 .unwrap()
2061 })
2062 .await
2063 .unwrap();
2064
2065 assert_eq!(batches.len(), 1);
2066 assert_eq!(batches[0].num_rows(), 5);
2067 assert_eq!(batches[0].num_columns(), 1);
2068 }
2069
2070 #[tokio::test(flavor = "multi_thread")]
2071 async fn test_drop_in_progress() {
2072 let fs = FsFixture::default();
2073 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2074 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2075
2076 let file_scheduler = fs
2077 .scheduler
2078 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2079 .await
2080 .unwrap();
2081 let file_reader = FileReader::try_open(
2082 file_scheduler.clone(),
2083 None,
2084 Arc::<DecoderPlugins>::default(),
2085 &test_cache(),
2086 FileReaderOptions::default(),
2087 )
2088 .await
2089 .unwrap();
2090
2091 let mut batches = file_reader
2092 .read_stream(
2093 lance_io::ReadBatchParams::RangeFull,
2094 (total_rows / 10) as u32,
2095 16,
2096 FilterExpression::no_filter(),
2097 )
2098 .unwrap();
2099
2100 drop(file_reader);
2101
2102 let batch = batches.next().await.unwrap().unwrap();
2103 assert!(batch.num_rows() > 0);
2104
2105 drop(batches);
2107 }
2108
2109 #[tokio::test]
2110 async fn drop_while_scheduling() {
2111 let fs = FsFixture::default();
2121 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2122 let total_rows = written_file
2123 .data
2124 .iter()
2125 .map(|batch| batch.num_rows())
2126 .sum::<usize>();
2127
2128 let file_scheduler = fs
2129 .scheduler
2130 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2131 .await
2132 .unwrap();
2133 let file_reader = FileReader::try_open(
2134 file_scheduler.clone(),
2135 None,
2136 Arc::<DecoderPlugins>::default(),
2137 &test_cache(),
2138 FileReaderOptions::default(),
2139 )
2140 .await
2141 .unwrap();
2142
2143 let projection =
2144 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2145 let column_infos = file_reader
2146 .collect_columns_from_projection(&projection)
2147 .unwrap();
2148 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2149 &projection.schema,
2150 &projection.column_indices,
2151 &column_infos,
2152 &vec![],
2153 total_rows as u64,
2154 Arc::<DecoderPlugins>::default(),
2155 file_reader.scheduler.clone(),
2156 test_cache(),
2157 &FilterExpression::no_filter(),
2158 &DecoderConfig::default(),
2159 )
2160 .await
2161 .unwrap();
2162
2163 let range = 0..total_rows as u64;
2164
2165 let (tx, rx) = mpsc::unbounded_channel();
2166
2167 drop(rx);
2169
2170 decode_scheduler.schedule_range(
2172 range,
2173 &FilterExpression::no_filter(),
2174 tx,
2175 file_reader.scheduler.clone(),
2176 )
2177 }
2178
2179 #[tokio::test]
2180 async fn test_read_empty_range() {
2181 let fs = FsFixture::default();
2182 create_some_file(&fs, LanceFileVersion::V2_0).await;
2183
2184 let file_scheduler = fs
2185 .scheduler
2186 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2187 .await
2188 .unwrap();
2189 let file_reader = FileReader::try_open(
2190 file_scheduler.clone(),
2191 None,
2192 Arc::<DecoderPlugins>::default(),
2193 &test_cache(),
2194 FileReaderOptions::default(),
2195 )
2196 .await
2197 .unwrap();
2198
2199 let batches = file_reader
2201 .read_stream(
2202 lance_io::ReadBatchParams::Range(0..0),
2203 1024,
2204 16,
2205 FilterExpression::no_filter(),
2206 )
2207 .unwrap()
2208 .try_collect::<Vec<_>>()
2209 .await
2210 .unwrap();
2211
2212 assert_eq!(batches.len(), 0);
2213
2214 let batches = file_reader
2216 .read_stream(
2217 lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2218 1024,
2219 16,
2220 FilterExpression::no_filter(),
2221 )
2222 .unwrap()
2223 .try_collect::<Vec<_>>()
2224 .await
2225 .unwrap();
2226 assert_eq!(batches.len(), 1);
2227 }
2228
2229 #[tokio::test]
2230 async fn test_global_buffers() {
2231 let fs = FsFixture::default();
2232
2233 let lance_schema =
2234 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2235 "foo",
2236 DataType::Int32,
2237 true,
2238 )]))
2239 .unwrap();
2240
2241 let mut file_writer = FileWriter::try_new(
2242 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2243 lance_schema.clone(),
2244 FileWriterOptions::default(),
2245 )
2246 .unwrap();
2247
2248 let test_bytes = Bytes::from_static(b"hello");
2249
2250 let buf_index = file_writer
2251 .add_global_buffer(test_bytes.clone())
2252 .await
2253 .unwrap();
2254
2255 assert_eq!(buf_index, 1);
2256
2257 file_writer.finish().await.unwrap();
2258
2259 let file_scheduler = fs
2260 .scheduler
2261 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2262 .await
2263 .unwrap();
2264 let file_reader = FileReader::try_open(
2265 file_scheduler.clone(),
2266 None,
2267 Arc::<DecoderPlugins>::default(),
2268 &test_cache(),
2269 FileReaderOptions::default(),
2270 )
2271 .await
2272 .unwrap();
2273
2274 let buf = file_reader.read_global_buffer(1).await.unwrap();
2275 assert_eq!(buf, test_bytes);
2276 }
2277}