1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19 decoder::{
20 schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig,
21 DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22 SchedulerDecoderConfig,
23 },
24 encoder::EncodedBatch,
25 version::LanceFileVersion,
26 EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34 cache::LanceCache,
35 datatypes::{Field, Schema},
36 Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_encoding::format::pb21 as pbenc21;
40use lance_io::{
41 scheduler::FileScheduler,
42 stream::{RecordBatchStream, RecordBatchStreamAdapter},
43 ReadBatchParams,
44};
45
46use crate::{
47 datatypes::{Fields, FieldsWithMeta},
48 format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
49 v2::writer::PAGE_BUFFER_ALIGNMENT,
50};
51
52use super::io::LanceEncodingsIo;
53
54#[derive(Debug, DeepSizeOf)]
59pub struct BufferDescriptor {
60 pub position: u64,
61 pub size: u64,
62}
63
64#[derive(Debug)]
66pub struct FileStatistics {
67 pub columns: Vec<ColumnStatistics>,
69}
70
71#[derive(Debug)]
73pub struct ColumnStatistics {
74 pub num_pages: usize,
76 pub size_bytes: u64,
80}
81
82#[derive(Debug)]
84pub struct CachedFileMetadata {
85 pub file_schema: Arc<Schema>,
87 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
89 pub column_infos: Vec<Arc<ColumnInfo>>,
90 pub num_rows: u64,
92 pub file_buffers: Vec<BufferDescriptor>,
93 pub num_data_bytes: u64,
95 pub num_column_metadata_bytes: u64,
98 pub num_global_buffer_bytes: u64,
100 pub num_footer_bytes: u64,
102 pub major_version: u16,
103 pub minor_version: u16,
104}
105
106impl DeepSizeOf for CachedFileMetadata {
107 fn deep_size_of_children(&self, context: &mut Context) -> usize {
109 self.file_schema.deep_size_of_children(context)
110 + self
111 .file_buffers
112 .iter()
113 .map(|file_buffer| file_buffer.deep_size_of_children(context))
114 .sum::<usize>()
115 }
116}
117
118impl CachedFileMetadata {
119 pub fn version(&self) -> LanceFileVersion {
120 match (self.major_version, self.minor_version) {
121 (0, 3) => LanceFileVersion::V2_0,
122 (2, 1) => LanceFileVersion::V2_1,
123 _ => panic!(
124 "Unsupported version: {}.{}",
125 self.major_version, self.minor_version
126 ),
127 }
128 }
129}
130
131#[derive(Debug, Clone)]
153pub struct ReaderProjection {
154 pub schema: Arc<Schema>,
157 pub column_indices: Vec<u32>,
197}
198
199impl ReaderProjection {
200 fn from_field_ids_helper<'a>(
201 file_version: LanceFileVersion,
202 fields: impl Iterator<Item = &'a Field>,
203 field_id_to_column_index: &BTreeMap<u32, u32>,
204 column_indices: &mut Vec<u32>,
205 ) -> Result<()> {
206 for field in fields {
207 let is_structural = file_version >= LanceFileVersion::V2_1;
208 if !is_structural || field.children.is_empty() {
211 if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
212 {
213 column_indices.push(column_idx);
214 }
215 }
216 Self::from_field_ids_helper(
217 file_version,
218 field.children.iter(),
219 field_id_to_column_index,
220 column_indices,
221 )?;
222 }
223 Ok(())
224 }
225
226 pub fn from_field_ids(
231 file_version: LanceFileVersion,
232 schema: &Schema,
233 field_id_to_column_index: &BTreeMap<u32, u32>,
234 ) -> Result<Self> {
235 let mut column_indices = Vec::new();
236 Self::from_field_ids_helper(
237 file_version,
238 schema.fields.iter(),
239 field_id_to_column_index,
240 &mut column_indices,
241 )?;
242 Ok(Self {
243 schema: Arc::new(schema.clone()),
244 column_indices,
245 })
246 }
247
248 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
256 let schema = Arc::new(schema.clone());
257 let is_structural = version >= LanceFileVersion::V2_1;
258 let mut column_indices = vec![];
259 let mut curr_column_idx = 0;
260 let mut packed_struct_fields_num = 0;
261 for field in schema.fields_pre_order() {
262 if packed_struct_fields_num > 0 {
263 packed_struct_fields_num -= 1;
264 continue;
265 }
266 if field.is_packed_struct() {
267 column_indices.push(curr_column_idx);
268 curr_column_idx += 1;
269 packed_struct_fields_num = field.children.len();
270 } else if field.children.is_empty() || !is_structural {
271 column_indices.push(curr_column_idx);
272 curr_column_idx += 1;
273 }
274 }
275 Self {
276 schema,
277 column_indices,
278 }
279 }
280
281 pub fn from_column_names(
288 file_version: LanceFileVersion,
289 schema: &Schema,
290 column_names: &[&str],
291 ) -> Result<Self> {
292 let field_id_to_column_index = schema
293 .fields_pre_order()
294 .filter(|field| file_version < LanceFileVersion::V2_1 || field.is_leaf())
297 .enumerate()
298 .map(|(idx, field)| (field.id as u32, idx as u32))
299 .collect::<BTreeMap<_, _>>();
300 let projected = schema.project(column_names)?;
301 let mut column_indices = Vec::new();
302 Self::from_field_ids_helper(
303 file_version,
304 projected.fields.iter(),
305 &field_id_to_column_index,
306 &mut column_indices,
307 )?;
308 Ok(Self {
309 schema: Arc::new(projected),
310 column_indices,
311 })
312 }
313}
314
315#[derive(Clone, Debug, Default)]
317pub struct FileReaderOptions {
318 pub decoder_config: DecoderConfig,
319}
320
321#[derive(Debug)]
322pub struct FileReader {
323 scheduler: Arc<dyn EncodingsIo>,
324 base_projection: ReaderProjection,
326 num_rows: u64,
327 metadata: Arc<CachedFileMetadata>,
328 decoder_plugins: Arc<DecoderPlugins>,
329 cache: Arc<LanceCache>,
330 options: FileReaderOptions,
331}
332#[derive(Debug)]
333struct Footer {
334 #[allow(dead_code)]
335 column_meta_start: u64,
336 #[allow(dead_code)]
339 column_meta_offsets_start: u64,
340 global_buff_offsets_start: u64,
341 num_global_buffers: u32,
342 num_columns: u32,
343 major_version: u16,
344 minor_version: u16,
345}
346
347const FOOTER_LEN: usize = 40;
348
349impl FileReader {
350 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
351 Self {
352 scheduler,
353 base_projection: self.base_projection.clone(),
354 cache: self.cache.clone(),
355 decoder_plugins: self.decoder_plugins.clone(),
356 metadata: self.metadata.clone(),
357 options: self.options.clone(),
358 num_rows: self.num_rows,
359 }
360 }
361
362 pub fn num_rows(&self) -> u64 {
363 self.num_rows
364 }
365
366 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
367 &self.metadata
368 }
369
370 pub fn file_statistics(&self) -> FileStatistics {
371 let column_metadatas = &self.metadata().column_metadatas;
372
373 let column_stats = column_metadatas
374 .iter()
375 .map(|col_metadata| {
376 let num_pages = col_metadata.pages.len();
377 let size_bytes = col_metadata
378 .pages
379 .iter()
380 .map(|page| page.buffer_sizes.iter().sum::<u64>())
381 .sum::<u64>();
382 ColumnStatistics {
383 num_pages,
384 size_bytes,
385 }
386 })
387 .collect();
388
389 FileStatistics {
390 columns: column_stats,
391 }
392 }
393
394 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
395 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
396 self.scheduler
397 .submit_single(
398 buffer_desc.position..buffer_desc.position + buffer_desc.size,
399 0,
400 )
401 .await
402 }
403
404 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
405 let file_size = scheduler.reader().size().await? as u64;
406 let begin = if file_size < scheduler.reader().block_size() as u64 {
407 0
408 } else {
409 file_size - scheduler.reader().block_size() as u64
410 };
411 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
412 Ok((tail_bytes, file_size))
413 }
414
415 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
418 let len = footer_bytes.len();
419 if len < FOOTER_LEN {
420 return Err(Error::io(
421 format!(
422 "does not have sufficient data, len: {}, bytes: {:?}",
423 len, footer_bytes
424 ),
425 location!(),
426 ));
427 }
428 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
429
430 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
431 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
432 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
433 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
434 let num_columns = cursor.read_u32::<LittleEndian>()?;
435 let major_version = cursor.read_u16::<LittleEndian>()?;
436 let minor_version = cursor.read_u16::<LittleEndian>()?;
437
438 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
439 return Err(Error::version_conflict(
440 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
441 major_version,
442 minor_version,
443 location!(),
444 ));
445 }
446
447 let magic_bytes = footer_bytes.slice(len - 4..);
448 if magic_bytes.as_ref() != MAGIC {
449 return Err(Error::io(
450 format!(
451 "file does not appear to be a Lance file (invalid magic: {:?})",
452 MAGIC
453 ),
454 location!(),
455 ));
456 }
457 Ok(Footer {
458 column_meta_start,
459 column_meta_offsets_start,
460 global_buff_offsets_start,
461 num_global_buffers,
462 num_columns,
463 major_version,
464 minor_version,
465 })
466 }
467
468 fn read_all_column_metadata(
470 column_metadata_bytes: Bytes,
471 footer: &Footer,
472 ) -> Result<Vec<pbfile::ColumnMetadata>> {
473 let column_metadata_start = footer.column_meta_start;
474 let cmo_table_size = 16 * footer.num_columns as usize;
476 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
477
478 (0..footer.num_columns)
479 .map(|col_idx| {
480 let offset = (col_idx * 16) as usize;
481 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
482 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
483 let normalized_position = (position - column_metadata_start) as usize;
484 let normalized_end = normalized_position + (length as usize);
485 Ok(pbfile::ColumnMetadata::decode(
486 &column_metadata_bytes[normalized_position..normalized_end],
487 )?)
488 })
489 .collect::<Result<Vec<_>>>()
490 }
491
492 async fn optimistic_tail_read(
493 data: &Bytes,
494 start_pos: u64,
495 scheduler: &FileScheduler,
496 file_len: u64,
497 ) -> Result<Bytes> {
498 let num_bytes_needed = (file_len - start_pos) as usize;
499 if data.len() >= num_bytes_needed {
500 Ok(data.slice((data.len() - num_bytes_needed)..))
501 } else {
502 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
503 let start = file_len - num_bytes_needed as u64;
504 let missing_bytes = scheduler
505 .submit_single(start..start + num_bytes_missing, 0)
506 .await?;
507 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
508 combined.extend(missing_bytes);
509 combined.extend(data);
510 Ok(combined.freeze())
511 }
512 }
513
514 fn do_decode_gbo_table(
515 gbo_bytes: &Bytes,
516 footer: &Footer,
517 version: LanceFileVersion,
518 ) -> Result<Vec<BufferDescriptor>> {
519 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
520
521 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
522 for _ in 0..footer.num_global_buffers {
523 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
524 assert!(
525 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
526 );
527 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
528 global_buffers.push(BufferDescriptor {
529 position: buf_pos,
530 size: buf_size,
531 });
532 }
533
534 Ok(global_buffers)
535 }
536
537 async fn decode_gbo_table(
538 tail_bytes: &Bytes,
539 file_len: u64,
540 scheduler: &FileScheduler,
541 footer: &Footer,
542 version: LanceFileVersion,
543 ) -> Result<Vec<BufferDescriptor>> {
544 let gbo_bytes = Self::optimistic_tail_read(
547 tail_bytes,
548 footer.global_buff_offsets_start,
549 scheduler,
550 file_len,
551 )
552 .await?;
553 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
554 }
555
556 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
557 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
558 let pb_schema = file_descriptor.schema.unwrap();
559 let num_rows = file_descriptor.length;
560 let fields_with_meta = FieldsWithMeta {
561 fields: Fields(pb_schema.fields),
562 metadata: pb_schema.metadata,
563 };
564 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
565 Ok((num_rows, schema))
566 }
567
568 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
582 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
584 let footer = Self::decode_footer(&tail_bytes)?;
585
586 let file_version = LanceFileVersion::try_from_major_minor(
587 footer.major_version as u32,
588 footer.minor_version as u32,
589 )?;
590
591 let gbo_table =
592 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
593 if gbo_table.is_empty() {
594 return Err(Error::Internal {
595 message: "File did not contain any global buffers, schema expected".to_string(),
596 location: location!(),
597 });
598 }
599 let schema_start = gbo_table[0].position;
600 let schema_size = gbo_table[0].size;
601
602 let num_footer_bytes = file_len - schema_start;
603
604 let all_metadata_bytes =
607 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
608
609 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
610 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
611
612 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
615 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
616 let column_metadata_bytes =
617 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
618 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
619
620 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
621 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
622 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
623
624 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
625
626 Ok(CachedFileMetadata {
627 file_schema: Arc::new(schema),
628 column_metadatas,
629 column_infos,
630 num_rows,
631 num_data_bytes,
632 num_column_metadata_bytes,
633 num_global_buffer_bytes,
634 num_footer_bytes,
635 file_buffers: gbo_table,
636 major_version: footer.major_version,
637 minor_version: footer.minor_version,
638 })
639 }
640
641 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
642 match &encoding.location {
643 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
644 Some(pbfile::encoding::Location::Direct(encoding)) => {
645 let encoding_buf = Bytes::from(encoding.encoding.clone());
646 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
647 encoding_any.to_msg::<M>().unwrap()
648 }
649 Some(pbfile::encoding::Location::None(_)) => panic!(),
650 None => panic!(),
651 }
652 }
653
654 fn meta_to_col_infos(
655 column_metadatas: &[pbfile::ColumnMetadata],
656 file_version: LanceFileVersion,
657 ) -> Vec<Arc<ColumnInfo>> {
658 column_metadatas
659 .iter()
660 .enumerate()
661 .map(|(col_idx, col_meta)| {
662 let page_infos = col_meta
663 .pages
664 .iter()
665 .map(|page| {
666 let num_rows = page.length;
667 let encoding = match file_version {
668 LanceFileVersion::V2_0 => {
669 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
670 page.encoding.as_ref().unwrap(),
671 ))
672 }
673 _ => PageEncoding::Structural(Self::fetch_encoding::<
674 pbenc21::PageLayout,
675 >(
676 page.encoding.as_ref().unwrap()
677 )),
678 };
679 let buffer_offsets_and_sizes = Arc::from(
680 page.buffer_offsets
681 .iter()
682 .zip(page.buffer_sizes.iter())
683 .map(|(offset, size)| {
684 assert!(
686 file_version < LanceFileVersion::V2_1
687 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
688 );
689 (*offset, *size)
690 })
691 .collect::<Vec<_>>(),
692 );
693 PageInfo {
694 buffer_offsets_and_sizes,
695 encoding,
696 num_rows,
697 priority: page.priority,
698 }
699 })
700 .collect::<Vec<_>>();
701 let buffer_offsets_and_sizes = Arc::from(
702 col_meta
703 .buffer_offsets
704 .iter()
705 .zip(col_meta.buffer_sizes.iter())
706 .map(|(offset, size)| (*offset, *size))
707 .collect::<Vec<_>>(),
708 );
709 Arc::new(ColumnInfo {
710 index: col_idx as u32,
711 page_infos: Arc::from(page_infos),
712 buffer_offsets_and_sizes,
713 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
714 })
715 })
716 .collect::<Vec<_>>()
717 }
718
719 fn validate_projection(
720 projection: &ReaderProjection,
721 metadata: &CachedFileMetadata,
722 ) -> Result<()> {
723 if projection.schema.fields.is_empty() {
724 return Err(Error::invalid_input(
725 "Attempt to read zero columns from the file, at least one column must be specified"
726 .to_string(),
727 location!(),
728 ));
729 }
730 let mut column_indices_seen = BTreeSet::new();
731 for column_index in &projection.column_indices {
732 if !column_indices_seen.insert(*column_index) {
733 return Err(Error::invalid_input(
734 format!(
735 "The projection specified the column index {} more than once",
736 column_index
737 ),
738 location!(),
739 ));
740 }
741 if *column_index >= metadata.column_infos.len() as u32 {
742 return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
743 }
744 }
745 Ok(())
746 }
747
748 pub async fn try_open(
755 scheduler: FileScheduler,
756 base_projection: Option<ReaderProjection>,
757 decoder_plugins: Arc<DecoderPlugins>,
758 cache: &LanceCache,
759 options: FileReaderOptions,
760 ) -> Result<Self> {
761 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
762 let path = scheduler.reader().path().clone();
763 Self::try_open_with_file_metadata(
764 Arc::new(LanceEncodingsIo(scheduler)),
765 path,
766 base_projection,
767 decoder_plugins,
768 file_metadata,
769 cache,
770 options,
771 )
772 .await
773 }
774
775 pub async fn try_open_with_file_metadata(
781 scheduler: Arc<dyn EncodingsIo>,
782 path: Path,
783 base_projection: Option<ReaderProjection>,
784 decoder_plugins: Arc<DecoderPlugins>,
785 file_metadata: Arc<CachedFileMetadata>,
786 cache: &LanceCache,
787 options: FileReaderOptions,
788 ) -> Result<Self> {
789 let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
790
791 if let Some(base_projection) = base_projection.as_ref() {
792 Self::validate_projection(base_projection, &file_metadata)?;
793 }
794 let num_rows = file_metadata.num_rows;
795 Ok(Self {
796 scheduler,
797 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
798 file_metadata.file_schema.as_ref(),
799 file_metadata.version(),
800 )),
801 num_rows,
802 metadata: file_metadata,
803 decoder_plugins,
804 cache,
805 options,
806 })
807 }
808
809 fn collect_columns_from_projection(
823 &self,
824 _projection: &ReaderProjection,
825 ) -> Result<Vec<Arc<ColumnInfo>>> {
826 Ok(self.metadata.column_infos.to_vec())
827 }
828
829 #[allow(clippy::too_many_arguments)]
830 fn do_read_range(
831 column_infos: Vec<Arc<ColumnInfo>>,
832 io: Arc<dyn EncodingsIo>,
833 cache: Arc<LanceCache>,
834 num_rows: u64,
835 decoder_plugins: Arc<DecoderPlugins>,
836 range: Range<u64>,
837 batch_size: u32,
838 projection: ReaderProjection,
839 filter: FilterExpression,
840 decoder_config: DecoderConfig,
841 ) -> Result<BoxStream<'static, ReadBatchTask>> {
842 debug!(
843 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
844 range,
845 batch_size,
846 num_rows,
847 column_infos.len(),
848 projection.schema.fields.len(),
849 );
850
851 let config = SchedulerDecoderConfig {
852 batch_size,
853 cache,
854 decoder_plugins,
855 io,
856 decoder_config,
857 };
858
859 let requested_rows = RequestedRows::Ranges(vec![range]);
860
861 Ok(schedule_and_decode(
862 column_infos,
863 requested_rows,
864 filter,
865 projection.column_indices,
866 projection.schema,
867 config,
868 ))
869 }
870
871 fn read_range(
872 &self,
873 range: Range<u64>,
874 batch_size: u32,
875 projection: ReaderProjection,
876 filter: FilterExpression,
877 ) -> Result<BoxStream<'static, ReadBatchTask>> {
878 Self::do_read_range(
880 self.collect_columns_from_projection(&projection)?,
881 self.scheduler.clone(),
882 self.cache.clone(),
883 self.num_rows,
884 self.decoder_plugins.clone(),
885 range,
886 batch_size,
887 projection,
888 filter,
889 self.options.decoder_config.clone(),
890 )
891 }
892
893 #[allow(clippy::too_many_arguments)]
894 fn do_take_rows(
895 column_infos: Vec<Arc<ColumnInfo>>,
896 io: Arc<dyn EncodingsIo>,
897 cache: Arc<LanceCache>,
898 decoder_plugins: Arc<DecoderPlugins>,
899 indices: Vec<u64>,
900 batch_size: u32,
901 projection: ReaderProjection,
902 filter: FilterExpression,
903 decoder_config: DecoderConfig,
904 ) -> Result<BoxStream<'static, ReadBatchTask>> {
905 debug!(
906 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
907 indices.len(),
908 indices[0],
909 indices[indices.len() - 1],
910 batch_size,
911 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
912 );
913
914 let config = SchedulerDecoderConfig {
915 batch_size,
916 cache,
917 decoder_plugins,
918 io,
919 decoder_config,
920 };
921
922 let requested_rows = RequestedRows::Indices(indices);
923
924 Ok(schedule_and_decode(
925 column_infos,
926 requested_rows,
927 filter,
928 projection.column_indices,
929 projection.schema,
930 config,
931 ))
932 }
933
934 fn take_rows(
935 &self,
936 indices: Vec<u64>,
937 batch_size: u32,
938 projection: ReaderProjection,
939 ) -> Result<BoxStream<'static, ReadBatchTask>> {
940 Self::do_take_rows(
942 self.collect_columns_from_projection(&projection)?,
943 self.scheduler.clone(),
944 self.cache.clone(),
945 self.decoder_plugins.clone(),
946 indices,
947 batch_size,
948 projection,
949 FilterExpression::no_filter(),
950 self.options.decoder_config.clone(),
951 )
952 }
953
954 #[allow(clippy::too_many_arguments)]
955 fn do_read_ranges(
956 column_infos: Vec<Arc<ColumnInfo>>,
957 io: Arc<dyn EncodingsIo>,
958 cache: Arc<LanceCache>,
959 decoder_plugins: Arc<DecoderPlugins>,
960 ranges: Vec<Range<u64>>,
961 batch_size: u32,
962 projection: ReaderProjection,
963 filter: FilterExpression,
964 decoder_config: DecoderConfig,
965 ) -> Result<BoxStream<'static, ReadBatchTask>> {
966 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
967 debug!(
968 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
969 ranges.len(),
970 num_rows,
971 ranges[0].start,
972 ranges[ranges.len() - 1].end,
973 batch_size,
974 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
975 );
976
977 let config = SchedulerDecoderConfig {
978 batch_size,
979 cache,
980 decoder_plugins,
981 io,
982 decoder_config,
983 };
984
985 let requested_rows = RequestedRows::Ranges(ranges);
986
987 Ok(schedule_and_decode(
988 column_infos,
989 requested_rows,
990 filter,
991 projection.column_indices,
992 projection.schema,
993 config,
994 ))
995 }
996
997 fn read_ranges(
998 &self,
999 ranges: Vec<Range<u64>>,
1000 batch_size: u32,
1001 projection: ReaderProjection,
1002 filter: FilterExpression,
1003 ) -> Result<BoxStream<'static, ReadBatchTask>> {
1004 Self::do_read_ranges(
1005 self.collect_columns_from_projection(&projection)?,
1006 self.scheduler.clone(),
1007 self.cache.clone(),
1008 self.decoder_plugins.clone(),
1009 ranges,
1010 batch_size,
1011 projection,
1012 filter,
1013 self.options.decoder_config.clone(),
1014 )
1015 }
1016
1017 pub fn read_tasks(
1028 &self,
1029 params: ReadBatchParams,
1030 batch_size: u32,
1031 projection: Option<ReaderProjection>,
1032 filter: FilterExpression,
1033 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1034 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1035 Self::validate_projection(&projection, &self.metadata)?;
1036 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1037 if bound > self.num_rows || bound == self.num_rows && inclusive {
1038 Err(Error::invalid_input(
1039 format!(
1040 "cannot read {:?} from file with {} rows",
1041 params, self.num_rows
1042 ),
1043 location!(),
1044 ))
1045 } else {
1046 Ok(())
1047 }
1048 };
1049 match ¶ms {
1050 ReadBatchParams::Indices(indices) => {
1051 for idx in indices {
1052 match idx {
1053 None => {
1054 return Err(Error::invalid_input(
1055 "Null value in indices array",
1056 location!(),
1057 ));
1058 }
1059 Some(idx) => {
1060 verify_bound(¶ms, idx as u64, true)?;
1061 }
1062 }
1063 }
1064 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1065 self.take_rows(indices, batch_size, projection)
1066 }
1067 ReadBatchParams::Range(range) => {
1068 verify_bound(¶ms, range.end as u64, false)?;
1069 self.read_range(
1070 range.start as u64..range.end as u64,
1071 batch_size,
1072 projection,
1073 filter,
1074 )
1075 }
1076 ReadBatchParams::Ranges(ranges) => {
1077 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1078 for range in ranges.as_ref() {
1079 verify_bound(¶ms, range.end, false)?;
1080 ranges_u64.push(range.start..range.end);
1081 }
1082 self.read_ranges(ranges_u64, batch_size, projection, filter)
1083 }
1084 ReadBatchParams::RangeFrom(range) => {
1085 verify_bound(¶ms, range.start as u64, true)?;
1086 self.read_range(
1087 range.start as u64..self.num_rows,
1088 batch_size,
1089 projection,
1090 filter,
1091 )
1092 }
1093 ReadBatchParams::RangeTo(range) => {
1094 verify_bound(¶ms, range.end as u64, false)?;
1095 self.read_range(0..range.end as u64, batch_size, projection, filter)
1096 }
1097 ReadBatchParams::RangeFull => {
1098 self.read_range(0..self.num_rows, batch_size, projection, filter)
1099 }
1100 }
1101 }
1102
1103 pub fn read_stream_projected(
1125 &self,
1126 params: ReadBatchParams,
1127 batch_size: u32,
1128 batch_readahead: u32,
1129 projection: ReaderProjection,
1130 filter: FilterExpression,
1131 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1132 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1133 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1134 let batch_stream = tasks_stream
1135 .map(|task| task.task)
1136 .buffered(batch_readahead as usize)
1137 .boxed();
1138 Ok(Box::pin(RecordBatchStreamAdapter::new(
1139 arrow_schema,
1140 batch_stream,
1141 )))
1142 }
1143
1144 fn take_rows_blocking(
1145 &self,
1146 indices: Vec<u64>,
1147 batch_size: u32,
1148 projection: ReaderProjection,
1149 filter: FilterExpression,
1150 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1151 let column_infos = self.collect_columns_from_projection(&projection)?;
1152 debug!(
1153 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1154 indices.len(),
1155 indices[0],
1156 indices[indices.len() - 1],
1157 batch_size,
1158 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1159 );
1160
1161 let config = SchedulerDecoderConfig {
1162 batch_size,
1163 cache: self.cache.clone(),
1164 decoder_plugins: self.decoder_plugins.clone(),
1165 io: self.scheduler.clone(),
1166 decoder_config: self.options.decoder_config.clone(),
1167 };
1168
1169 let requested_rows = RequestedRows::Indices(indices);
1170
1171 schedule_and_decode_blocking(
1172 column_infos,
1173 requested_rows,
1174 filter,
1175 projection.column_indices,
1176 projection.schema,
1177 config,
1178 )
1179 }
1180
1181 fn read_ranges_blocking(
1182 &self,
1183 ranges: Vec<Range<u64>>,
1184 batch_size: u32,
1185 projection: ReaderProjection,
1186 filter: FilterExpression,
1187 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1188 let column_infos = self.collect_columns_from_projection(&projection)?;
1189 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1190 debug!(
1191 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1192 ranges.len(),
1193 num_rows,
1194 ranges[0].start,
1195 ranges[ranges.len() - 1].end,
1196 batch_size,
1197 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1198 );
1199
1200 let config = SchedulerDecoderConfig {
1201 batch_size,
1202 cache: self.cache.clone(),
1203 decoder_plugins: self.decoder_plugins.clone(),
1204 io: self.scheduler.clone(),
1205 decoder_config: self.options.decoder_config.clone(),
1206 };
1207
1208 let requested_rows = RequestedRows::Ranges(ranges);
1209
1210 schedule_and_decode_blocking(
1211 column_infos,
1212 requested_rows,
1213 filter,
1214 projection.column_indices,
1215 projection.schema,
1216 config,
1217 )
1218 }
1219
1220 fn read_range_blocking(
1221 &self,
1222 range: Range<u64>,
1223 batch_size: u32,
1224 projection: ReaderProjection,
1225 filter: FilterExpression,
1226 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1227 let column_infos = self.collect_columns_from_projection(&projection)?;
1228 let num_rows = self.num_rows;
1229
1230 debug!(
1231 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1232 range,
1233 batch_size,
1234 num_rows,
1235 column_infos.len(),
1236 projection.schema.fields.len(),
1237 );
1238
1239 let config = SchedulerDecoderConfig {
1240 batch_size,
1241 cache: self.cache.clone(),
1242 decoder_plugins: self.decoder_plugins.clone(),
1243 io: self.scheduler.clone(),
1244 decoder_config: self.options.decoder_config.clone(),
1245 };
1246
1247 let requested_rows = RequestedRows::Ranges(vec![range]);
1248
1249 schedule_and_decode_blocking(
1250 column_infos,
1251 requested_rows,
1252 filter,
1253 projection.column_indices,
1254 projection.schema,
1255 config,
1256 )
1257 }
1258
1259 pub fn read_stream_projected_blocking(
1271 &self,
1272 params: ReadBatchParams,
1273 batch_size: u32,
1274 projection: Option<ReaderProjection>,
1275 filter: FilterExpression,
1276 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1277 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1278 Self::validate_projection(&projection, &self.metadata)?;
1279 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1280 if bound > self.num_rows || bound == self.num_rows && inclusive {
1281 Err(Error::invalid_input(
1282 format!(
1283 "cannot read {:?} from file with {} rows",
1284 params, self.num_rows
1285 ),
1286 location!(),
1287 ))
1288 } else {
1289 Ok(())
1290 }
1291 };
1292 match ¶ms {
1293 ReadBatchParams::Indices(indices) => {
1294 for idx in indices {
1295 match idx {
1296 None => {
1297 return Err(Error::invalid_input(
1298 "Null value in indices array",
1299 location!(),
1300 ));
1301 }
1302 Some(idx) => {
1303 verify_bound(¶ms, idx as u64, true)?;
1304 }
1305 }
1306 }
1307 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1308 self.take_rows_blocking(indices, batch_size, projection, filter)
1309 }
1310 ReadBatchParams::Range(range) => {
1311 verify_bound(¶ms, range.end as u64, false)?;
1312 self.read_range_blocking(
1313 range.start as u64..range.end as u64,
1314 batch_size,
1315 projection,
1316 filter,
1317 )
1318 }
1319 ReadBatchParams::Ranges(ranges) => {
1320 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1321 for range in ranges.as_ref() {
1322 verify_bound(¶ms, range.end, false)?;
1323 ranges_u64.push(range.start..range.end);
1324 }
1325 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1326 }
1327 ReadBatchParams::RangeFrom(range) => {
1328 verify_bound(¶ms, range.start as u64, true)?;
1329 self.read_range_blocking(
1330 range.start as u64..self.num_rows,
1331 batch_size,
1332 projection,
1333 filter,
1334 )
1335 }
1336 ReadBatchParams::RangeTo(range) => {
1337 verify_bound(¶ms, range.end as u64, false)?;
1338 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1339 }
1340 ReadBatchParams::RangeFull => {
1341 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1342 }
1343 }
1344 }
1345
1346 pub fn read_stream(
1352 &self,
1353 params: ReadBatchParams,
1354 batch_size: u32,
1355 batch_readahead: u32,
1356 filter: FilterExpression,
1357 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1358 self.read_stream_projected(
1359 params,
1360 batch_size,
1361 batch_readahead,
1362 self.base_projection.clone(),
1363 filter,
1364 )
1365 }
1366
1367 pub fn schema(&self) -> &Arc<Schema> {
1368 &self.metadata.file_schema
1369 }
1370}
1371
1372pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1374 if let Some(encoding) = &page.encoding {
1375 if let Some(style) = &encoding.location {
1376 match style {
1377 pbfile::encoding::Location::Indirect(indirect) => {
1378 format!(
1379 "IndirectEncoding(pos={},size={})",
1380 indirect.buffer_location, indirect.buffer_length
1381 )
1382 }
1383 pbfile::encoding::Location::Direct(direct) => {
1384 let encoding_any =
1385 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1386 .expect("failed to deserialize encoding as protobuf");
1387 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1388 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1389 match encoding {
1390 Ok(encoding) => {
1391 format!("{:#?}", encoding)
1392 }
1393 Err(err) => {
1394 format!("Unsupported(decode_err={})", err)
1395 }
1396 }
1397 } else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
1398 let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
1399 match encoding {
1400 Ok(encoding) => {
1401 format!("{:#?}", encoding)
1402 }
1403 Err(err) => {
1404 format!("Unsupported(decode_err={})", err)
1405 }
1406 }
1407 } else {
1408 format!("Unrecognized(type_url={})", encoding_any.type_url)
1409 }
1410 }
1411 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1412 }
1413 } else {
1414 "MISSING STYLE".to_string()
1415 }
1416 } else {
1417 "MISSING".to_string()
1418 }
1419}
1420
1421pub trait EncodedBatchReaderExt {
1422 fn try_from_mini_lance(
1423 bytes: Bytes,
1424 schema: &Schema,
1425 version: LanceFileVersion,
1426 ) -> Result<Self>
1427 where
1428 Self: Sized;
1429 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1430 where
1431 Self: Sized;
1432}
1433
1434impl EncodedBatchReaderExt for EncodedBatch {
1435 fn try_from_mini_lance(
1436 bytes: Bytes,
1437 schema: &Schema,
1438 file_version: LanceFileVersion,
1439 ) -> Result<Self>
1440 where
1441 Self: Sized,
1442 {
1443 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1444 let footer = FileReader::decode_footer(&bytes)?;
1445
1446 let column_metadata_start = footer.column_meta_start as usize;
1449 let column_metadata_end = footer.global_buff_offsets_start as usize;
1450 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1451 let column_metadatas =
1452 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1453
1454 let file_version = LanceFileVersion::try_from_major_minor(
1455 footer.major_version as u32,
1456 footer.minor_version as u32,
1457 )?;
1458
1459 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1460
1461 Ok(Self {
1462 data: bytes,
1463 num_rows: page_table
1464 .first()
1465 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1466 .unwrap_or(0),
1467 page_table,
1468 top_level_columns: projection.column_indices,
1469 schema: Arc::new(schema.clone()),
1470 })
1471 }
1472
1473 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1474 where
1475 Self: Sized,
1476 {
1477 let footer = FileReader::decode_footer(&bytes)?;
1478 let file_version = LanceFileVersion::try_from_major_minor(
1479 footer.major_version as u32,
1480 footer.minor_version as u32,
1481 )?;
1482
1483 let gbo_table = FileReader::do_decode_gbo_table(
1484 &bytes.slice(footer.global_buff_offsets_start as usize..),
1485 &footer,
1486 file_version,
1487 )?;
1488 if gbo_table.is_empty() {
1489 return Err(Error::Internal {
1490 message: "File did not contain any global buffers, schema expected".to_string(),
1491 location: location!(),
1492 });
1493 }
1494 let schema_start = gbo_table[0].position as usize;
1495 let schema_size = gbo_table[0].size as usize;
1496
1497 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1498 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1499 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1500
1501 let column_metadata_start = footer.column_meta_start as usize;
1504 let column_metadata_end = footer.global_buff_offsets_start as usize;
1505 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1506 let column_metadatas =
1507 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1508
1509 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1510
1511 Ok(Self {
1512 data: bytes,
1513 num_rows: page_table
1514 .first()
1515 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1516 .unwrap_or(0),
1517 page_table,
1518 top_level_columns: projection.column_indices,
1519 schema: Arc::new(schema),
1520 })
1521 }
1522}
1523
1524#[cfg(test)]
1525pub mod tests {
1526 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1527
1528 use arrow_array::{
1529 types::{Float64Type, Int32Type},
1530 RecordBatch, UInt32Array,
1531 };
1532 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1533 use bytes::Bytes;
1534 use futures::{prelude::stream::TryStreamExt, StreamExt};
1535 use lance_arrow::RecordBatchExt;
1536 use lance_core::{datatypes::Schema, ArrowResult};
1537 use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RowCount};
1538 use lance_encoding::{
1539 decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1540 encoder::{default_encoding_strategy, encode_batch, EncodedBatch, EncodingOptions},
1541 version::LanceFileVersion,
1542 };
1543 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1544 use log::debug;
1545 use rstest::rstest;
1546 use tokio::sync::mpsc;
1547
1548 use crate::v2::{
1549 reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1550 testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1551 writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1552 };
1553 use lance_encoding::decoder::DecoderConfig;
1554
1555 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1556 let location_type = DataType::Struct(Fields::from(vec![
1557 Field::new("x", DataType::Float64, true),
1558 Field::new("y", DataType::Float64, true),
1559 ]));
1560 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1561
1562 let mut reader = gen_batch()
1563 .col("score", array::rand::<Float64Type>())
1564 .col("location", array::rand_type(&location_type))
1565 .col("categories", array::rand_type(&categories_type))
1566 .col("binary", array::rand_type(&DataType::Binary));
1567 if version <= LanceFileVersion::V2_0 {
1568 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1569 }
1570 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1571
1572 write_lance_file(
1573 reader,
1574 fs,
1575 FileWriterOptions {
1576 format_version: Some(version),
1577 ..Default::default()
1578 },
1579 )
1580 .await
1581 }
1582
1583 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1584
1585 async fn verify_expected(
1586 expected: &[RecordBatch],
1587 mut actual: Pin<Box<dyn RecordBatchStream>>,
1588 read_size: u32,
1589 transform: Option<Transformer>,
1590 ) {
1591 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1592 let mut expected_iter = expected.iter().map(|batch| {
1593 if let Some(transform) = &transform {
1594 transform(batch)
1595 } else {
1596 batch.clone()
1597 }
1598 });
1599 let mut next_expected = expected_iter.next().unwrap().clone();
1600 while let Some(actual) = actual.next().await {
1601 let mut actual = actual.unwrap();
1602 let mut rows_to_verify = actual.num_rows() as u32;
1603 let expected_length = remaining.min(read_size);
1604 assert_eq!(expected_length, rows_to_verify);
1605
1606 while rows_to_verify > 0 {
1607 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1608 assert_eq!(
1609 next_expected.slice(0, next_slice_len as usize),
1610 actual.slice(0, next_slice_len as usize)
1611 );
1612 remaining -= next_slice_len;
1613 rows_to_verify -= next_slice_len;
1614 if remaining > 0 {
1615 if next_slice_len == next_expected.num_rows() as u32 {
1616 next_expected = expected_iter.next().unwrap().clone();
1617 } else {
1618 next_expected = next_expected.slice(
1619 next_slice_len as usize,
1620 next_expected.num_rows() - next_slice_len as usize,
1621 );
1622 }
1623 }
1624 if rows_to_verify > 0 {
1625 actual = actual.slice(
1626 next_slice_len as usize,
1627 actual.num_rows() - next_slice_len as usize,
1628 );
1629 }
1630 }
1631 }
1632 assert_eq!(remaining, 0);
1633 }
1634
1635 #[tokio::test]
1636 async fn test_round_trip() {
1637 let fs = FsFixture::default();
1638
1639 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1640
1641 for read_size in [32, 1024, 1024 * 1024] {
1642 let file_scheduler = fs
1643 .scheduler
1644 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1645 .await
1646 .unwrap();
1647 let file_reader = FileReader::try_open(
1648 file_scheduler,
1649 None,
1650 Arc::<DecoderPlugins>::default(),
1651 &test_cache(),
1652 FileReaderOptions::default(),
1653 )
1654 .await
1655 .unwrap();
1656
1657 let schema = file_reader.schema();
1658 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1659
1660 let batch_stream = file_reader
1661 .read_stream(
1662 lance_io::ReadBatchParams::RangeFull,
1663 read_size,
1664 16,
1665 FilterExpression::no_filter(),
1666 )
1667 .unwrap();
1668
1669 verify_expected(&data, batch_stream, read_size, None).await;
1670 }
1671 }
1672
1673 #[rstest]
1674 #[test_log::test(tokio::test)]
1675 async fn test_encoded_batch_round_trip(
1676 #[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
1678 ) {
1679 let data = gen_batch()
1680 .col("x", array::rand::<Int32Type>())
1681 .col("y", array::rand_utf8(ByteCount::from(16), false))
1682 .into_batch_rows(RowCount::from(10000))
1683 .unwrap();
1684
1685 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1686
1687 let encoding_options = EncodingOptions {
1688 cache_bytes_per_column: 4096,
1689 max_page_bytes: 32 * 1024 * 1024,
1690 keep_original_array: true,
1691 buffer_alignment: 64,
1692 };
1693
1694 let encoding_strategy = default_encoding_strategy(version);
1695
1696 let encoded_batch = encode_batch(
1697 &data,
1698 lance_schema.clone(),
1699 encoding_strategy.as_ref(),
1700 &encoding_options,
1701 )
1702 .await
1703 .unwrap();
1704
1705 let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
1707
1708 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1709
1710 let decoded = decode_batch(
1711 &decoded_batch,
1712 &FilterExpression::no_filter(),
1713 Arc::<DecoderPlugins>::default(),
1714 false,
1715 LanceFileVersion::default(),
1716 None,
1717 )
1718 .await
1719 .unwrap();
1720
1721 assert_eq!(data, decoded);
1722
1723 let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
1725 let decoded_batch =
1726 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1727 .unwrap();
1728 let decoded = decode_batch(
1729 &decoded_batch,
1730 &FilterExpression::no_filter(),
1731 Arc::<DecoderPlugins>::default(),
1732 false,
1733 LanceFileVersion::default(),
1734 None,
1735 )
1736 .await
1737 .unwrap();
1738
1739 assert_eq!(data, decoded);
1740 }
1741
1742 #[rstest]
1743 #[test_log::test(tokio::test)]
1744 async fn test_projection(
1745 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1746 ) {
1747 let fs = FsFixture::default();
1748
1749 let written_file = create_some_file(&fs, version).await;
1750 let file_scheduler = fs
1751 .scheduler
1752 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1753 .await
1754 .unwrap();
1755
1756 let field_id_mapping = written_file
1757 .field_id_mapping
1758 .iter()
1759 .copied()
1760 .collect::<BTreeMap<_, _>>();
1761
1762 let empty_projection = ReaderProjection {
1763 column_indices: Vec::default(),
1764 schema: Arc::new(Schema::default()),
1765 };
1766
1767 for columns in [
1768 vec!["score"],
1769 vec!["location"],
1770 vec!["categories"],
1771 vec!["score.x"],
1772 vec!["score", "categories"],
1773 vec!["score", "location"],
1774 vec!["location", "categories"],
1775 vec!["score.y", "location", "categories"],
1776 ] {
1777 debug!("Testing round trip with projection {:?}", columns);
1778 for use_field_ids in [true, false] {
1779 let file_reader = FileReader::try_open(
1781 file_scheduler.clone(),
1782 None,
1783 Arc::<DecoderPlugins>::default(),
1784 &test_cache(),
1785 FileReaderOptions::default(),
1786 )
1787 .await
1788 .unwrap();
1789
1790 let projected_schema = written_file.schema.project(&columns).unwrap();
1791 let projection = if use_field_ids {
1792 ReaderProjection::from_field_ids(
1793 file_reader.metadata.version(),
1794 &projected_schema,
1795 &field_id_mapping,
1796 )
1797 .unwrap()
1798 } else {
1799 ReaderProjection::from_column_names(
1800 file_reader.metadata.version(),
1801 &written_file.schema,
1802 &columns,
1803 )
1804 .unwrap()
1805 };
1806
1807 let batch_stream = file_reader
1808 .read_stream_projected(
1809 lance_io::ReadBatchParams::RangeFull,
1810 1024,
1811 16,
1812 projection.clone(),
1813 FilterExpression::no_filter(),
1814 )
1815 .unwrap();
1816
1817 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1818 verify_expected(
1819 &written_file.data,
1820 batch_stream,
1821 1024,
1822 Some(Box::new(move |batch: &RecordBatch| {
1823 batch.project_by_schema(&projection_arrow).unwrap()
1824 })),
1825 )
1826 .await;
1827
1828 let file_reader = FileReader::try_open(
1830 file_scheduler.clone(),
1831 Some(projection.clone()),
1832 Arc::<DecoderPlugins>::default(),
1833 &test_cache(),
1834 FileReaderOptions::default(),
1835 )
1836 .await
1837 .unwrap();
1838
1839 let batch_stream = file_reader
1840 .read_stream(
1841 lance_io::ReadBatchParams::RangeFull,
1842 1024,
1843 16,
1844 FilterExpression::no_filter(),
1845 )
1846 .unwrap();
1847
1848 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1849 verify_expected(
1850 &written_file.data,
1851 batch_stream,
1852 1024,
1853 Some(Box::new(move |batch: &RecordBatch| {
1854 batch.project_by_schema(&projection_arrow).unwrap()
1855 })),
1856 )
1857 .await;
1858
1859 assert!(file_reader
1860 .read_stream_projected(
1861 lance_io::ReadBatchParams::RangeFull,
1862 1024,
1863 16,
1864 empty_projection.clone(),
1865 FilterExpression::no_filter(),
1866 )
1867 .is_err());
1868 }
1869 }
1870
1871 assert!(FileReader::try_open(
1872 file_scheduler.clone(),
1873 Some(empty_projection),
1874 Arc::<DecoderPlugins>::default(),
1875 &test_cache(),
1876 FileReaderOptions::default(),
1877 )
1878 .await
1879 .is_err());
1880
1881 let arrow_schema = ArrowSchema::new(vec![
1882 Field::new("x", DataType::Int32, true),
1883 Field::new("y", DataType::Int32, true),
1884 ]);
1885 let schema = Schema::try_from(&arrow_schema).unwrap();
1886
1887 let projection_with_dupes = ReaderProjection {
1888 column_indices: vec![0, 0],
1889 schema: Arc::new(schema),
1890 };
1891
1892 assert!(FileReader::try_open(
1893 file_scheduler.clone(),
1894 Some(projection_with_dupes),
1895 Arc::<DecoderPlugins>::default(),
1896 &test_cache(),
1897 FileReaderOptions::default(),
1898 )
1899 .await
1900 .is_err());
1901 }
1902
1903 #[test_log::test(tokio::test)]
1904 async fn test_compressing_buffer() {
1905 let fs = FsFixture::default();
1906
1907 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1908 let file_scheduler = fs
1909 .scheduler
1910 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1911 .await
1912 .unwrap();
1913
1914 let file_reader = FileReader::try_open(
1916 file_scheduler.clone(),
1917 None,
1918 Arc::<DecoderPlugins>::default(),
1919 &test_cache(),
1920 FileReaderOptions::default(),
1921 )
1922 .await
1923 .unwrap();
1924
1925 let mut projection = written_file.schema.project(&["score"]).unwrap();
1926 for field in projection.fields.iter_mut() {
1927 field
1928 .metadata
1929 .insert("lance:compression".to_string(), "zstd".to_string());
1930 }
1931 let projection = ReaderProjection {
1932 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1933 schema: Arc::new(projection),
1934 };
1935
1936 let batch_stream = file_reader
1937 .read_stream_projected(
1938 lance_io::ReadBatchParams::RangeFull,
1939 1024,
1940 16,
1941 projection.clone(),
1942 FilterExpression::no_filter(),
1943 )
1944 .unwrap();
1945
1946 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1947 verify_expected(
1948 &written_file.data,
1949 batch_stream,
1950 1024,
1951 Some(Box::new(move |batch: &RecordBatch| {
1952 batch.project_by_schema(&projection_arrow).unwrap()
1953 })),
1954 )
1955 .await;
1956 }
1957
1958 #[tokio::test]
1959 async fn test_read_all() {
1960 let fs = FsFixture::default();
1961 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1962 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1963
1964 let file_scheduler = fs
1965 .scheduler
1966 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1967 .await
1968 .unwrap();
1969 let file_reader = FileReader::try_open(
1970 file_scheduler.clone(),
1971 None,
1972 Arc::<DecoderPlugins>::default(),
1973 &test_cache(),
1974 FileReaderOptions::default(),
1975 )
1976 .await
1977 .unwrap();
1978
1979 let batches = file_reader
1980 .read_stream(
1981 lance_io::ReadBatchParams::RangeFull,
1982 total_rows as u32,
1983 16,
1984 FilterExpression::no_filter(),
1985 )
1986 .unwrap()
1987 .try_collect::<Vec<_>>()
1988 .await
1989 .unwrap();
1990 assert_eq!(batches.len(), 1);
1991 assert_eq!(batches[0].num_rows(), total_rows);
1992 }
1993
1994 #[tokio::test]
1995 async fn test_blocking_take() {
1996 let fs = FsFixture::default();
1997 let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
1998 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1999
2000 let file_scheduler = fs
2001 .scheduler
2002 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2003 .await
2004 .unwrap();
2005 let file_reader = FileReader::try_open(
2006 file_scheduler.clone(),
2007 Some(
2008 ReaderProjection::from_column_names(LanceFileVersion::V2_1, &schema, &["score"])
2009 .unwrap(),
2010 ),
2011 Arc::<DecoderPlugins>::default(),
2012 &test_cache(),
2013 FileReaderOptions::default(),
2014 )
2015 .await
2016 .unwrap();
2017
2018 let batches = tokio::task::spawn_blocking(move || {
2019 file_reader
2020 .read_stream_projected_blocking(
2021 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
2022 total_rows as u32,
2023 None,
2024 FilterExpression::no_filter(),
2025 )
2026 .unwrap()
2027 .collect::<ArrowResult<Vec<_>>>()
2028 .unwrap()
2029 })
2030 .await
2031 .unwrap();
2032
2033 assert_eq!(batches.len(), 1);
2034 assert_eq!(batches[0].num_rows(), 5);
2035 assert_eq!(batches[0].num_columns(), 1);
2036 }
2037
2038 #[tokio::test(flavor = "multi_thread")]
2039 async fn test_drop_in_progress() {
2040 let fs = FsFixture::default();
2041 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
2042 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
2043
2044 let file_scheduler = fs
2045 .scheduler
2046 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2047 .await
2048 .unwrap();
2049 let file_reader = FileReader::try_open(
2050 file_scheduler.clone(),
2051 None,
2052 Arc::<DecoderPlugins>::default(),
2053 &test_cache(),
2054 FileReaderOptions::default(),
2055 )
2056 .await
2057 .unwrap();
2058
2059 let mut batches = file_reader
2060 .read_stream(
2061 lance_io::ReadBatchParams::RangeFull,
2062 (total_rows / 10) as u32,
2063 16,
2064 FilterExpression::no_filter(),
2065 )
2066 .unwrap();
2067
2068 drop(file_reader);
2069
2070 let batch = batches.next().await.unwrap().unwrap();
2071 assert!(batch.num_rows() > 0);
2072
2073 drop(batches);
2075 }
2076
2077 #[tokio::test]
2078 async fn drop_while_scheduling() {
2079 let fs = FsFixture::default();
2089 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2090 let total_rows = written_file
2091 .data
2092 .iter()
2093 .map(|batch| batch.num_rows())
2094 .sum::<usize>();
2095
2096 let file_scheduler = fs
2097 .scheduler
2098 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2099 .await
2100 .unwrap();
2101 let file_reader = FileReader::try_open(
2102 file_scheduler.clone(),
2103 None,
2104 Arc::<DecoderPlugins>::default(),
2105 &test_cache(),
2106 FileReaderOptions::default(),
2107 )
2108 .await
2109 .unwrap();
2110
2111 let projection =
2112 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2113 let column_infos = file_reader
2114 .collect_columns_from_projection(&projection)
2115 .unwrap();
2116 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2117 &projection.schema,
2118 &projection.column_indices,
2119 &column_infos,
2120 &vec![],
2121 total_rows as u64,
2122 Arc::<DecoderPlugins>::default(),
2123 file_reader.scheduler.clone(),
2124 test_cache(),
2125 &FilterExpression::no_filter(),
2126 &DecoderConfig::default(),
2127 )
2128 .await
2129 .unwrap();
2130
2131 let range = 0..total_rows as u64;
2132
2133 let (tx, rx) = mpsc::unbounded_channel();
2134
2135 drop(rx);
2137
2138 decode_scheduler.schedule_range(
2140 range,
2141 &FilterExpression::no_filter(),
2142 tx,
2143 file_reader.scheduler.clone(),
2144 )
2145 }
2146
2147 #[tokio::test]
2148 async fn test_read_empty_range() {
2149 let fs = FsFixture::default();
2150 create_some_file(&fs, LanceFileVersion::V2_0).await;
2151
2152 let file_scheduler = fs
2153 .scheduler
2154 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2155 .await
2156 .unwrap();
2157 let file_reader = FileReader::try_open(
2158 file_scheduler.clone(),
2159 None,
2160 Arc::<DecoderPlugins>::default(),
2161 &test_cache(),
2162 FileReaderOptions::default(),
2163 )
2164 .await
2165 .unwrap();
2166
2167 let batches = file_reader
2169 .read_stream(
2170 lance_io::ReadBatchParams::Range(0..0),
2171 1024,
2172 16,
2173 FilterExpression::no_filter(),
2174 )
2175 .unwrap()
2176 .try_collect::<Vec<_>>()
2177 .await
2178 .unwrap();
2179
2180 assert_eq!(batches.len(), 0);
2181
2182 let batches = file_reader
2184 .read_stream(
2185 lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
2186 1024,
2187 16,
2188 FilterExpression::no_filter(),
2189 )
2190 .unwrap()
2191 .try_collect::<Vec<_>>()
2192 .await
2193 .unwrap();
2194 assert_eq!(batches.len(), 1);
2195 }
2196
2197 #[tokio::test]
2198 async fn test_global_buffers() {
2199 let fs = FsFixture::default();
2200
2201 let lance_schema =
2202 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2203 "foo",
2204 DataType::Int32,
2205 true,
2206 )]))
2207 .unwrap();
2208
2209 let mut file_writer = FileWriter::try_new(
2210 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2211 lance_schema.clone(),
2212 FileWriterOptions::default(),
2213 )
2214 .unwrap();
2215
2216 let test_bytes = Bytes::from_static(b"hello");
2217
2218 let buf_index = file_writer
2219 .add_global_buffer(test_bytes.clone())
2220 .await
2221 .unwrap();
2222
2223 assert_eq!(buf_index, 1);
2224
2225 file_writer.finish().await.unwrap();
2226
2227 let file_scheduler = fs
2228 .scheduler
2229 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2230 .await
2231 .unwrap();
2232 let file_reader = FileReader::try_open(
2233 file_scheduler.clone(),
2234 None,
2235 Arc::<DecoderPlugins>::default(),
2236 &test_cache(),
2237 FileReaderOptions::default(),
2238 )
2239 .await
2240 .unwrap();
2241
2242 let buf = file_reader.read_global_buffer(1).await.unwrap();
2243 assert_eq!(buf, test_bytes);
2244 }
2245}