1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19 decoder::{
20 schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderPlugins,
21 FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22 SchedulerDecoderConfig,
23 },
24 encoder::EncodedBatch,
25 version::LanceFileVersion,
26 EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34 cache::FileMetadataCache,
35 datatypes::{Field, Schema},
36 Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_io::{
40 scheduler::FileScheduler,
41 stream::{RecordBatchStream, RecordBatchStreamAdapter},
42 ReadBatchParams,
43};
44
45use crate::{
46 datatypes::{Fields, FieldsWithMeta},
47 format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
48 v2::writer::PAGE_BUFFER_ALIGNMENT,
49};
50
51use super::io::LanceEncodingsIo;
52
53#[derive(Debug, DeepSizeOf)]
58pub struct BufferDescriptor {
59 pub position: u64,
60 pub size: u64,
61}
62
63#[derive(Debug)]
65pub struct FileStatistics {
66 pub columns: Vec<ColumnStatistics>,
68}
69
70#[derive(Debug)]
72pub struct ColumnStatistics {
73 pub num_pages: usize,
75 pub size_bytes: u64,
79}
80
81#[derive(Debug)]
83pub struct CachedFileMetadata {
84 pub file_schema: Arc<Schema>,
86 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
88 pub column_infos: Vec<Arc<ColumnInfo>>,
89 pub num_rows: u64,
91 pub file_buffers: Vec<BufferDescriptor>,
92 pub num_data_bytes: u64,
94 pub num_column_metadata_bytes: u64,
97 pub num_global_buffer_bytes: u64,
99 pub num_footer_bytes: u64,
101 pub major_version: u16,
102 pub minor_version: u16,
103}
104
105impl DeepSizeOf for CachedFileMetadata {
106 fn deep_size_of_children(&self, context: &mut Context) -> usize {
108 self.file_schema.deep_size_of_children(context)
109 + self
110 .file_buffers
111 .iter()
112 .map(|file_buffer| file_buffer.deep_size_of_children(context))
113 .sum::<usize>()
114 }
115}
116
117impl CachedFileMetadata {
118 pub fn version(&self) -> LanceFileVersion {
119 match (self.major_version, self.minor_version) {
120 (0, 3) => LanceFileVersion::V2_0,
121 (2, 1) => LanceFileVersion::V2_1,
122 _ => panic!(
123 "Unsupported version: {}.{}",
124 self.major_version, self.minor_version
125 ),
126 }
127 }
128}
129
130#[derive(Debug, Clone)]
152pub struct ReaderProjection {
153 pub schema: Arc<Schema>,
156 pub column_indices: Vec<u32>,
187}
188
189impl ReaderProjection {
190 fn from_field_ids_helper<'a>(
191 reader: &FileReader,
192 fields: impl Iterator<Item = &'a Field>,
193 field_id_to_column_index: &BTreeMap<u32, u32>,
194 column_indices: &mut Vec<u32>,
195 ) -> Result<()> {
196 for field in fields {
197 let is_structural = reader.metadata.version() >= LanceFileVersion::V2_1;
198 if !is_structural || field.children.is_empty() {
201 if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
202 {
203 column_indices.push(column_idx);
204 }
205 }
206 Self::from_field_ids_helper(
207 reader,
208 field.children.iter(),
209 field_id_to_column_index,
210 column_indices,
211 )?;
212 }
213 Ok(())
214 }
215
216 pub fn from_field_ids(
221 reader: &FileReader,
222 schema: &Schema,
223 field_id_to_column_index: &BTreeMap<u32, u32>,
224 ) -> Result<Self> {
225 let mut column_indices = Vec::new();
226 Self::from_field_ids_helper(
227 reader,
228 schema.fields.iter(),
229 field_id_to_column_index,
230 &mut column_indices,
231 )?;
232 Ok(Self {
233 schema: Arc::new(schema.clone()),
234 column_indices,
235 })
236 }
237
238 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
246 let schema = Arc::new(schema.clone());
247 let is_structural = version >= LanceFileVersion::V2_1;
248 let mut column_indices = vec![];
249 let mut curr_column_idx = 0;
250 let mut packed_struct_fields_num = 0;
251 for field in schema.fields_pre_order() {
252 if packed_struct_fields_num > 0 {
253 packed_struct_fields_num -= 1;
254 continue;
255 }
256 if field.is_packed_struct() {
257 column_indices.push(curr_column_idx);
258 curr_column_idx += 1;
259 packed_struct_fields_num = field.children.len();
260 } else if field.children.is_empty() || !is_structural {
261 column_indices.push(curr_column_idx);
262 curr_column_idx += 1;
263 }
264 }
265 Self {
266 schema,
267 column_indices,
268 }
269 }
270
271 pub fn from_column_names(schema: &Schema, column_names: &[&str]) -> Result<Self> {
278 let field_id_to_column_index = schema
279 .fields_pre_order()
280 .enumerate()
281 .map(|(idx, field)| (field.id as u32, idx as u32))
282 .collect::<BTreeMap<_, _>>();
283 let projected = schema.project(column_names)?;
284 let column_indices = projected
285 .fields_pre_order()
286 .map(|f| field_id_to_column_index[&(f.id as u32)])
287 .collect::<Vec<_>>();
288 Ok(Self {
289 schema: Arc::new(projected),
290 column_indices,
291 })
292 }
293}
294
295#[derive(Clone, Debug, Default)]
296pub struct FileReaderOptions {
297 validate_on_decode: bool,
298}
299
300#[derive(Debug)]
301pub struct FileReader {
302 scheduler: Arc<dyn EncodingsIo>,
303 base_projection: ReaderProjection,
305 num_rows: u64,
306 metadata: Arc<CachedFileMetadata>,
307 decoder_plugins: Arc<DecoderPlugins>,
308 cache: Arc<FileMetadataCache>,
309 options: FileReaderOptions,
310}
311#[derive(Debug)]
312struct Footer {
313 #[allow(dead_code)]
314 column_meta_start: u64,
315 #[allow(dead_code)]
318 column_meta_offsets_start: u64,
319 global_buff_offsets_start: u64,
320 num_global_buffers: u32,
321 num_columns: u32,
322 major_version: u16,
323 minor_version: u16,
324}
325
326const FOOTER_LEN: usize = 40;
327
328impl FileReader {
329 pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
330 Self {
331 scheduler,
332 base_projection: self.base_projection.clone(),
333 cache: self.cache.clone(),
334 decoder_plugins: self.decoder_plugins.clone(),
335 metadata: self.metadata.clone(),
336 options: self.options.clone(),
337 num_rows: self.num_rows,
338 }
339 }
340
341 pub fn num_rows(&self) -> u64 {
342 self.num_rows
343 }
344
345 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
346 &self.metadata
347 }
348
349 pub fn file_statistics(&self) -> FileStatistics {
350 let column_metadatas = &self.metadata().column_metadatas;
351
352 let column_stats = column_metadatas
353 .iter()
354 .map(|col_metadata| {
355 let num_pages = col_metadata.pages.len();
356 let size_bytes = col_metadata
357 .pages
358 .iter()
359 .map(|page| page.buffer_sizes.iter().sum::<u64>())
360 .sum::<u64>();
361 ColumnStatistics {
362 num_pages,
363 size_bytes,
364 }
365 })
366 .collect();
367
368 FileStatistics {
369 columns: column_stats,
370 }
371 }
372
373 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
374 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
375 self.scheduler
376 .submit_single(
377 buffer_desc.position..buffer_desc.position + buffer_desc.size,
378 0,
379 )
380 .await
381 }
382
383 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
384 let file_size = scheduler.reader().size().await? as u64;
385 let begin = if file_size < scheduler.reader().block_size() as u64 {
386 0
387 } else {
388 file_size - scheduler.reader().block_size() as u64
389 };
390 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
391 Ok((tail_bytes, file_size))
392 }
393
394 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
397 let len = footer_bytes.len();
398 if len < FOOTER_LEN {
399 return Err(Error::io(
400 format!(
401 "does not have sufficient data, len: {}, bytes: {:?}",
402 len, footer_bytes
403 ),
404 location!(),
405 ));
406 }
407 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
408
409 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
410 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
411 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
412 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
413 let num_columns = cursor.read_u32::<LittleEndian>()?;
414 let major_version = cursor.read_u16::<LittleEndian>()?;
415 let minor_version = cursor.read_u16::<LittleEndian>()?;
416
417 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
418 return Err(Error::version_conflict(
419 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
420 major_version,
421 minor_version,
422 location!(),
423 ));
424 }
425
426 let magic_bytes = footer_bytes.slice(len - 4..);
427 if magic_bytes.as_ref() != MAGIC {
428 return Err(Error::io(
429 format!(
430 "file does not appear to be a Lance file (invalid magic: {:?})",
431 MAGIC
432 ),
433 location!(),
434 ));
435 }
436 Ok(Footer {
437 column_meta_start,
438 column_meta_offsets_start,
439 global_buff_offsets_start,
440 num_global_buffers,
441 num_columns,
442 major_version,
443 minor_version,
444 })
445 }
446
447 fn read_all_column_metadata(
449 column_metadata_bytes: Bytes,
450 footer: &Footer,
451 ) -> Result<Vec<pbfile::ColumnMetadata>> {
452 let column_metadata_start = footer.column_meta_start;
453 let cmo_table_size = 16 * footer.num_columns as usize;
455 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
456
457 (0..footer.num_columns)
458 .map(|col_idx| {
459 let offset = (col_idx * 16) as usize;
460 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
461 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
462 let normalized_position = (position - column_metadata_start) as usize;
463 let normalized_end = normalized_position + (length as usize);
464 Ok(pbfile::ColumnMetadata::decode(
465 &column_metadata_bytes[normalized_position..normalized_end],
466 )?)
467 })
468 .collect::<Result<Vec<_>>>()
469 }
470
471 async fn optimistic_tail_read(
472 data: &Bytes,
473 start_pos: u64,
474 scheduler: &FileScheduler,
475 file_len: u64,
476 ) -> Result<Bytes> {
477 let num_bytes_needed = (file_len - start_pos) as usize;
478 if data.len() >= num_bytes_needed {
479 Ok(data.slice((data.len() - num_bytes_needed)..))
480 } else {
481 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
482 let start = file_len - num_bytes_needed as u64;
483 let missing_bytes = scheduler
484 .submit_single(start..start + num_bytes_missing, 0)
485 .await?;
486 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
487 combined.extend(missing_bytes);
488 combined.extend(data);
489 Ok(combined.freeze())
490 }
491 }
492
493 fn do_decode_gbo_table(
494 gbo_bytes: &Bytes,
495 footer: &Footer,
496 version: LanceFileVersion,
497 ) -> Result<Vec<BufferDescriptor>> {
498 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
499
500 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
501 for _ in 0..footer.num_global_buffers {
502 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
503 assert!(
504 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
505 );
506 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
507 global_buffers.push(BufferDescriptor {
508 position: buf_pos,
509 size: buf_size,
510 });
511 }
512
513 Ok(global_buffers)
514 }
515
516 async fn decode_gbo_table(
517 tail_bytes: &Bytes,
518 file_len: u64,
519 scheduler: &FileScheduler,
520 footer: &Footer,
521 version: LanceFileVersion,
522 ) -> Result<Vec<BufferDescriptor>> {
523 let gbo_bytes = Self::optimistic_tail_read(
526 tail_bytes,
527 footer.global_buff_offsets_start,
528 scheduler,
529 file_len,
530 )
531 .await?;
532 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
533 }
534
535 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
536 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
537 let pb_schema = file_descriptor.schema.unwrap();
538 let num_rows = file_descriptor.length;
539 let fields_with_meta = FieldsWithMeta {
540 fields: Fields(pb_schema.fields),
541 metadata: pb_schema.metadata,
542 };
543 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
544 Ok((num_rows, schema))
545 }
546
547 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
561 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
563 let footer = Self::decode_footer(&tail_bytes)?;
564
565 let file_version = LanceFileVersion::try_from_major_minor(
566 footer.major_version as u32,
567 footer.minor_version as u32,
568 )?;
569
570 let gbo_table =
571 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
572 if gbo_table.is_empty() {
573 return Err(Error::Internal {
574 message: "File did not contain any global buffers, schema expected".to_string(),
575 location: location!(),
576 });
577 }
578 let schema_start = gbo_table[0].position;
579 let schema_size = gbo_table[0].size;
580
581 let num_footer_bytes = file_len - schema_start;
582
583 let all_metadata_bytes =
586 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
587
588 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
589 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
590
591 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
594 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
595 let column_metadata_bytes =
596 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
597 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
598
599 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
600 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
601 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
602
603 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
604
605 Ok(CachedFileMetadata {
606 file_schema: Arc::new(schema),
607 column_metadatas,
608 column_infos,
609 num_rows,
610 num_data_bytes,
611 num_column_metadata_bytes,
612 num_global_buffer_bytes,
613 num_footer_bytes,
614 file_buffers: gbo_table,
615 major_version: footer.major_version,
616 minor_version: footer.minor_version,
617 })
618 }
619
620 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
621 match &encoding.location {
622 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
623 Some(pbfile::encoding::Location::Direct(encoding)) => {
624 let encoding_buf = Bytes::from(encoding.encoding.clone());
625 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
626 encoding_any.to_msg::<M>().unwrap()
627 }
628 Some(pbfile::encoding::Location::None(_)) => panic!(),
629 None => panic!(),
630 }
631 }
632
633 fn meta_to_col_infos(
634 column_metadatas: &[pbfile::ColumnMetadata],
635 file_version: LanceFileVersion,
636 ) -> Vec<Arc<ColumnInfo>> {
637 column_metadatas
638 .iter()
639 .enumerate()
640 .map(|(col_idx, col_meta)| {
641 let page_infos = col_meta
642 .pages
643 .iter()
644 .map(|page| {
645 let num_rows = page.length;
646 let encoding = match file_version {
647 LanceFileVersion::V2_0 => {
648 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
649 page.encoding.as_ref().unwrap(),
650 ))
651 }
652 _ => {
653 PageEncoding::Structural(Self::fetch_encoding::<pbenc::PageLayout>(
654 page.encoding.as_ref().unwrap(),
655 ))
656 }
657 };
658 let buffer_offsets_and_sizes = Arc::from(
659 page.buffer_offsets
660 .iter()
661 .zip(page.buffer_sizes.iter())
662 .map(|(offset, size)| {
663 assert!(
665 file_version < LanceFileVersion::V2_1
666 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
667 );
668 (*offset, *size)
669 })
670 .collect::<Vec<_>>(),
671 );
672 PageInfo {
673 buffer_offsets_and_sizes,
674 encoding,
675 num_rows,
676 priority: page.priority,
677 }
678 })
679 .collect::<Vec<_>>();
680 let buffer_offsets_and_sizes = Arc::from(
681 col_meta
682 .buffer_offsets
683 .iter()
684 .zip(col_meta.buffer_sizes.iter())
685 .map(|(offset, size)| (*offset, *size))
686 .collect::<Vec<_>>(),
687 );
688 Arc::new(ColumnInfo {
689 index: col_idx as u32,
690 page_infos: Arc::from(page_infos),
691 buffer_offsets_and_sizes,
692 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
693 })
694 })
695 .collect::<Vec<_>>()
696 }
697
698 fn validate_projection(
699 projection: &ReaderProjection,
700 metadata: &CachedFileMetadata,
701 ) -> Result<()> {
702 if projection.schema.fields.is_empty() {
703 return Err(Error::invalid_input(
704 "Attempt to read zero columns from the file, at least one column must be specified"
705 .to_string(),
706 location!(),
707 ));
708 }
709 let mut column_indices_seen = BTreeSet::new();
710 for column_index in &projection.column_indices {
711 if !column_indices_seen.insert(*column_index) {
712 return Err(Error::invalid_input(
713 format!(
714 "The projection specified the column index {} more than once",
715 column_index
716 ),
717 location!(),
718 ));
719 }
720 if *column_index >= metadata.column_infos.len() as u32 {
721 return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
722 }
723 }
724 Ok(())
725 }
726
727 pub async fn try_open(
734 scheduler: FileScheduler,
735 base_projection: Option<ReaderProjection>,
736 decoder_plugins: Arc<DecoderPlugins>,
737 cache: &FileMetadataCache,
738 options: FileReaderOptions,
739 ) -> Result<Self> {
740 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
741 let path = scheduler.reader().path().clone();
742 Self::try_open_with_file_metadata(
743 Arc::new(LanceEncodingsIo(scheduler)),
744 path,
745 base_projection,
746 decoder_plugins,
747 file_metadata,
748 cache,
749 options,
750 )
751 .await
752 }
753
754 pub async fn try_open_with_file_metadata(
760 scheduler: Arc<dyn EncodingsIo>,
761 path: Path,
762 base_projection: Option<ReaderProjection>,
763 decoder_plugins: Arc<DecoderPlugins>,
764 file_metadata: Arc<CachedFileMetadata>,
765 cache: &FileMetadataCache,
766 options: FileReaderOptions,
767 ) -> Result<Self> {
768 let cache = Arc::new(cache.with_base_path(path));
769
770 if let Some(base_projection) = base_projection.as_ref() {
771 Self::validate_projection(base_projection, &file_metadata)?;
772 }
773 let num_rows = file_metadata.num_rows;
774 Ok(Self {
775 scheduler,
776 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
777 file_metadata.file_schema.as_ref(),
778 file_metadata.version(),
779 )),
780 num_rows,
781 metadata: file_metadata,
782 decoder_plugins,
783 cache,
784 options,
785 })
786 }
787
788 fn collect_columns_from_projection(
802 &self,
803 _projection: &ReaderProjection,
804 ) -> Result<Vec<Arc<ColumnInfo>>> {
805 Ok(self.metadata.column_infos.to_vec())
806 }
807
808 #[allow(clippy::too_many_arguments)]
809 fn do_read_range(
810 column_infos: Vec<Arc<ColumnInfo>>,
811 io: Arc<dyn EncodingsIo>,
812 cache: Arc<FileMetadataCache>,
813 num_rows: u64,
814 decoder_plugins: Arc<DecoderPlugins>,
815 range: Range<u64>,
816 batch_size: u32,
817 projection: ReaderProjection,
818 filter: FilterExpression,
819 should_validate: bool,
820 ) -> Result<BoxStream<'static, ReadBatchTask>> {
821 debug!(
822 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
823 range,
824 batch_size,
825 num_rows,
826 column_infos.len(),
827 projection.schema.fields.len(),
828 );
829
830 let config = SchedulerDecoderConfig {
831 batch_size,
832 cache,
833 decoder_plugins,
834 io,
835 should_validate,
836 };
837
838 let requested_rows = RequestedRows::Ranges(vec![range]);
839
840 Ok(schedule_and_decode(
841 column_infos,
842 requested_rows,
843 filter,
844 projection.column_indices,
845 projection.schema,
846 config,
847 ))
848 }
849
850 fn read_range(
851 &self,
852 range: Range<u64>,
853 batch_size: u32,
854 projection: ReaderProjection,
855 filter: FilterExpression,
856 ) -> Result<BoxStream<'static, ReadBatchTask>> {
857 Self::do_read_range(
859 self.collect_columns_from_projection(&projection)?,
860 self.scheduler.clone(),
861 self.cache.clone(),
862 self.num_rows,
863 self.decoder_plugins.clone(),
864 range,
865 batch_size,
866 projection,
867 filter,
868 self.options.validate_on_decode,
869 )
870 }
871
872 #[allow(clippy::too_many_arguments)]
873 fn do_take_rows(
874 column_infos: Vec<Arc<ColumnInfo>>,
875 io: Arc<dyn EncodingsIo>,
876 cache: Arc<FileMetadataCache>,
877 decoder_plugins: Arc<DecoderPlugins>,
878 indices: Vec<u64>,
879 batch_size: u32,
880 projection: ReaderProjection,
881 filter: FilterExpression,
882 should_validate: bool,
883 ) -> Result<BoxStream<'static, ReadBatchTask>> {
884 debug!(
885 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
886 indices.len(),
887 indices[0],
888 indices[indices.len() - 1],
889 batch_size,
890 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
891 );
892
893 let config = SchedulerDecoderConfig {
894 batch_size,
895 cache,
896 decoder_plugins,
897 io,
898 should_validate,
899 };
900
901 let requested_rows = RequestedRows::Indices(indices);
902
903 Ok(schedule_and_decode(
904 column_infos,
905 requested_rows,
906 filter,
907 projection.column_indices,
908 projection.schema,
909 config,
910 ))
911 }
912
913 fn take_rows(
914 &self,
915 indices: Vec<u64>,
916 batch_size: u32,
917 projection: ReaderProjection,
918 ) -> Result<BoxStream<'static, ReadBatchTask>> {
919 Self::do_take_rows(
921 self.collect_columns_from_projection(&projection)?,
922 self.scheduler.clone(),
923 self.cache.clone(),
924 self.decoder_plugins.clone(),
925 indices,
926 batch_size,
927 projection,
928 FilterExpression::no_filter(),
929 self.options.validate_on_decode,
930 )
931 }
932
933 #[allow(clippy::too_many_arguments)]
934 fn do_read_ranges(
935 column_infos: Vec<Arc<ColumnInfo>>,
936 io: Arc<dyn EncodingsIo>,
937 cache: Arc<FileMetadataCache>,
938 decoder_plugins: Arc<DecoderPlugins>,
939 ranges: Vec<Range<u64>>,
940 batch_size: u32,
941 projection: ReaderProjection,
942 filter: FilterExpression,
943 should_validate: bool,
944 ) -> Result<BoxStream<'static, ReadBatchTask>> {
945 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
946 debug!(
947 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
948 ranges.len(),
949 num_rows,
950 ranges[0].start,
951 ranges[ranges.len() - 1].end,
952 batch_size,
953 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
954 );
955
956 let config = SchedulerDecoderConfig {
957 batch_size,
958 cache,
959 decoder_plugins,
960 io,
961 should_validate,
962 };
963
964 let requested_rows = RequestedRows::Ranges(ranges);
965
966 Ok(schedule_and_decode(
967 column_infos,
968 requested_rows,
969 filter,
970 projection.column_indices,
971 projection.schema,
972 config,
973 ))
974 }
975
976 fn read_ranges(
977 &self,
978 ranges: Vec<Range<u64>>,
979 batch_size: u32,
980 projection: ReaderProjection,
981 filter: FilterExpression,
982 ) -> Result<BoxStream<'static, ReadBatchTask>> {
983 Self::do_read_ranges(
984 self.collect_columns_from_projection(&projection)?,
985 self.scheduler.clone(),
986 self.cache.clone(),
987 self.decoder_plugins.clone(),
988 ranges,
989 batch_size,
990 projection,
991 filter,
992 self.options.validate_on_decode,
993 )
994 }
995
996 pub fn read_tasks(
1007 &self,
1008 params: ReadBatchParams,
1009 batch_size: u32,
1010 projection: Option<ReaderProjection>,
1011 filter: FilterExpression,
1012 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
1013 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1014 Self::validate_projection(&projection, &self.metadata)?;
1015 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1016 if bound > self.num_rows || bound == self.num_rows && inclusive {
1017 Err(Error::invalid_input(
1018 format!(
1019 "cannot read {:?} from file with {} rows",
1020 params, self.num_rows
1021 ),
1022 location!(),
1023 ))
1024 } else {
1025 Ok(())
1026 }
1027 };
1028 match ¶ms {
1029 ReadBatchParams::Indices(indices) => {
1030 for idx in indices {
1031 match idx {
1032 None => {
1033 return Err(Error::invalid_input(
1034 "Null value in indices array",
1035 location!(),
1036 ));
1037 }
1038 Some(idx) => {
1039 verify_bound(¶ms, idx as u64, true)?;
1040 }
1041 }
1042 }
1043 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1044 self.take_rows(indices, batch_size, projection)
1045 }
1046 ReadBatchParams::Range(range) => {
1047 verify_bound(¶ms, range.end as u64, false)?;
1048 self.read_range(
1049 range.start as u64..range.end as u64,
1050 batch_size,
1051 projection,
1052 filter,
1053 )
1054 }
1055 ReadBatchParams::Ranges(ranges) => {
1056 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1057 for range in ranges.as_ref() {
1058 verify_bound(¶ms, range.end, false)?;
1059 ranges_u64.push(range.start..range.end);
1060 }
1061 self.read_ranges(ranges_u64, batch_size, projection, filter)
1062 }
1063 ReadBatchParams::RangeFrom(range) => {
1064 verify_bound(¶ms, range.start as u64, true)?;
1065 self.read_range(
1066 range.start as u64..self.num_rows,
1067 batch_size,
1068 projection,
1069 filter,
1070 )
1071 }
1072 ReadBatchParams::RangeTo(range) => {
1073 verify_bound(¶ms, range.end as u64, false)?;
1074 self.read_range(0..range.end as u64, batch_size, projection, filter)
1075 }
1076 ReadBatchParams::RangeFull => {
1077 self.read_range(0..self.num_rows, batch_size, projection, filter)
1078 }
1079 }
1080 }
1081
1082 pub fn read_stream_projected(
1104 &self,
1105 params: ReadBatchParams,
1106 batch_size: u32,
1107 batch_readahead: u32,
1108 projection: ReaderProjection,
1109 filter: FilterExpression,
1110 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1111 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1112 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1113 let batch_stream = tasks_stream
1114 .map(|task| task.task)
1115 .buffered(batch_readahead as usize)
1116 .boxed();
1117 Ok(Box::pin(RecordBatchStreamAdapter::new(
1118 arrow_schema,
1119 batch_stream,
1120 )))
1121 }
1122
1123 fn take_rows_blocking(
1124 &self,
1125 indices: Vec<u64>,
1126 batch_size: u32,
1127 projection: ReaderProjection,
1128 filter: FilterExpression,
1129 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1130 let column_infos = self.collect_columns_from_projection(&projection)?;
1131 debug!(
1132 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1133 indices.len(),
1134 indices[0],
1135 indices[indices.len() - 1],
1136 batch_size,
1137 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1138 );
1139
1140 let config = SchedulerDecoderConfig {
1141 batch_size,
1142 cache: self.cache.clone(),
1143 decoder_plugins: self.decoder_plugins.clone(),
1144 io: self.scheduler.clone(),
1145 should_validate: self.options.validate_on_decode,
1146 };
1147
1148 let requested_rows = RequestedRows::Indices(indices);
1149
1150 schedule_and_decode_blocking(
1151 column_infos,
1152 requested_rows,
1153 filter,
1154 projection.column_indices,
1155 projection.schema,
1156 config,
1157 )
1158 }
1159
1160 fn read_ranges_blocking(
1161 &self,
1162 ranges: Vec<Range<u64>>,
1163 batch_size: u32,
1164 projection: ReaderProjection,
1165 filter: FilterExpression,
1166 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1167 let column_infos = self.collect_columns_from_projection(&projection)?;
1168 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1169 debug!(
1170 "Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
1171 ranges.len(),
1172 num_rows,
1173 ranges[0].start,
1174 ranges[ranges.len() - 1].end,
1175 batch_size,
1176 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1177 );
1178
1179 let config = SchedulerDecoderConfig {
1180 batch_size,
1181 cache: self.cache.clone(),
1182 decoder_plugins: self.decoder_plugins.clone(),
1183 io: self.scheduler.clone(),
1184 should_validate: self.options.validate_on_decode,
1185 };
1186
1187 let requested_rows = RequestedRows::Ranges(ranges);
1188
1189 schedule_and_decode_blocking(
1190 column_infos,
1191 requested_rows,
1192 filter,
1193 projection.column_indices,
1194 projection.schema,
1195 config,
1196 )
1197 }
1198
1199 fn read_range_blocking(
1200 &self,
1201 range: Range<u64>,
1202 batch_size: u32,
1203 projection: ReaderProjection,
1204 filter: FilterExpression,
1205 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1206 let column_infos = self.collect_columns_from_projection(&projection)?;
1207 let num_rows = self.num_rows;
1208
1209 debug!(
1210 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
1211 range,
1212 batch_size,
1213 num_rows,
1214 column_infos.len(),
1215 projection.schema.fields.len(),
1216 );
1217
1218 let config = SchedulerDecoderConfig {
1219 batch_size,
1220 cache: self.cache.clone(),
1221 decoder_plugins: self.decoder_plugins.clone(),
1222 io: self.scheduler.clone(),
1223 should_validate: self.options.validate_on_decode,
1224 };
1225
1226 let requested_rows = RequestedRows::Ranges(vec![range]);
1227
1228 schedule_and_decode_blocking(
1229 column_infos,
1230 requested_rows,
1231 filter,
1232 projection.column_indices,
1233 projection.schema,
1234 config,
1235 )
1236 }
1237
1238 pub fn read_stream_projected_blocking(
1250 &self,
1251 params: ReadBatchParams,
1252 batch_size: u32,
1253 projection: Option<ReaderProjection>,
1254 filter: FilterExpression,
1255 ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1256 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1257 Self::validate_projection(&projection, &self.metadata)?;
1258 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1259 if bound > self.num_rows || bound == self.num_rows && inclusive {
1260 Err(Error::invalid_input(
1261 format!(
1262 "cannot read {:?} from file with {} rows",
1263 params, self.num_rows
1264 ),
1265 location!(),
1266 ))
1267 } else {
1268 Ok(())
1269 }
1270 };
1271 match ¶ms {
1272 ReadBatchParams::Indices(indices) => {
1273 for idx in indices {
1274 match idx {
1275 None => {
1276 return Err(Error::invalid_input(
1277 "Null value in indices array",
1278 location!(),
1279 ));
1280 }
1281 Some(idx) => {
1282 verify_bound(¶ms, idx as u64, true)?;
1283 }
1284 }
1285 }
1286 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1287 self.take_rows_blocking(indices, batch_size, projection, filter)
1288 }
1289 ReadBatchParams::Range(range) => {
1290 verify_bound(¶ms, range.end as u64, false)?;
1291 self.read_range_blocking(
1292 range.start as u64..range.end as u64,
1293 batch_size,
1294 projection,
1295 filter,
1296 )
1297 }
1298 ReadBatchParams::Ranges(ranges) => {
1299 let mut ranges_u64 = Vec::with_capacity(ranges.len());
1300 for range in ranges.as_ref() {
1301 verify_bound(¶ms, range.end, false)?;
1302 ranges_u64.push(range.start..range.end);
1303 }
1304 self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
1305 }
1306 ReadBatchParams::RangeFrom(range) => {
1307 verify_bound(¶ms, range.start as u64, true)?;
1308 self.read_range_blocking(
1309 range.start as u64..self.num_rows,
1310 batch_size,
1311 projection,
1312 filter,
1313 )
1314 }
1315 ReadBatchParams::RangeTo(range) => {
1316 verify_bound(¶ms, range.end as u64, false)?;
1317 self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
1318 }
1319 ReadBatchParams::RangeFull => {
1320 self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
1321 }
1322 }
1323 }
1324
1325 pub fn read_stream(
1331 &self,
1332 params: ReadBatchParams,
1333 batch_size: u32,
1334 batch_readahead: u32,
1335 filter: FilterExpression,
1336 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1337 self.read_stream_projected(
1338 params,
1339 batch_size,
1340 batch_readahead,
1341 self.base_projection.clone(),
1342 filter,
1343 )
1344 }
1345
1346 pub fn schema(&self) -> &Arc<Schema> {
1347 &self.metadata.file_schema
1348 }
1349}
1350
1351pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1353 if let Some(encoding) = &page.encoding {
1354 if let Some(style) = &encoding.location {
1355 match style {
1356 pbfile::encoding::Location::Indirect(indirect) => {
1357 format!(
1358 "IndirectEncoding(pos={},size={})",
1359 indirect.buffer_location, indirect.buffer_length
1360 )
1361 }
1362 pbfile::encoding::Location::Direct(direct) => {
1363 let encoding_any =
1364 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1365 .expect("failed to deserialize encoding as protobuf");
1366 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1367 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1368 match encoding {
1369 Ok(encoding) => {
1370 format!("{:#?}", encoding)
1371 }
1372 Err(err) => {
1373 format!("Unsupported(decode_err={})", err)
1374 }
1375 }
1376 } else if encoding_any.type_url == "/lance.encodings.PageLayout" {
1377 let encoding = encoding_any.to_msg::<pbenc::PageLayout>();
1378 match encoding {
1379 Ok(encoding) => {
1380 format!("{:#?}", encoding)
1381 }
1382 Err(err) => {
1383 format!("Unsupported(decode_err={})", err)
1384 }
1385 }
1386 } else {
1387 format!("Unrecognized(type_url={})", encoding_any.type_url)
1388 }
1389 }
1390 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1391 }
1392 } else {
1393 "MISSING STYLE".to_string()
1394 }
1395 } else {
1396 "MISSING".to_string()
1397 }
1398}
1399
1400pub trait EncodedBatchReaderExt {
1401 fn try_from_mini_lance(
1402 bytes: Bytes,
1403 schema: &Schema,
1404 version: LanceFileVersion,
1405 ) -> Result<Self>
1406 where
1407 Self: Sized;
1408 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1409 where
1410 Self: Sized;
1411}
1412
1413impl EncodedBatchReaderExt for EncodedBatch {
1414 fn try_from_mini_lance(
1415 bytes: Bytes,
1416 schema: &Schema,
1417 file_version: LanceFileVersion,
1418 ) -> Result<Self>
1419 where
1420 Self: Sized,
1421 {
1422 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1423 let footer = FileReader::decode_footer(&bytes)?;
1424
1425 let column_metadata_start = footer.column_meta_start as usize;
1428 let column_metadata_end = footer.global_buff_offsets_start as usize;
1429 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1430 let column_metadatas =
1431 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1432
1433 let file_version = LanceFileVersion::try_from_major_minor(
1434 footer.major_version as u32,
1435 footer.minor_version as u32,
1436 )?;
1437
1438 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1439
1440 Ok(Self {
1441 data: bytes,
1442 num_rows: page_table
1443 .first()
1444 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1445 .unwrap_or(0),
1446 page_table,
1447 top_level_columns: projection.column_indices,
1448 schema: Arc::new(schema.clone()),
1449 })
1450 }
1451
1452 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1453 where
1454 Self: Sized,
1455 {
1456 let footer = FileReader::decode_footer(&bytes)?;
1457 let file_version = LanceFileVersion::try_from_major_minor(
1458 footer.major_version as u32,
1459 footer.minor_version as u32,
1460 )?;
1461
1462 let gbo_table = FileReader::do_decode_gbo_table(
1463 &bytes.slice(footer.global_buff_offsets_start as usize..),
1464 &footer,
1465 file_version,
1466 )?;
1467 if gbo_table.is_empty() {
1468 return Err(Error::Internal {
1469 message: "File did not contain any global buffers, schema expected".to_string(),
1470 location: location!(),
1471 });
1472 }
1473 let schema_start = gbo_table[0].position as usize;
1474 let schema_size = gbo_table[0].size as usize;
1475
1476 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1477 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1478 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1479
1480 let column_metadata_start = footer.column_meta_start as usize;
1483 let column_metadata_end = footer.global_buff_offsets_start as usize;
1484 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1485 let column_metadatas =
1486 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1487
1488 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1489
1490 Ok(Self {
1491 data: bytes,
1492 num_rows: page_table
1493 .first()
1494 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1495 .unwrap_or(0),
1496 page_table,
1497 top_level_columns: projection.column_indices,
1498 schema: Arc::new(schema),
1499 })
1500 }
1501}
1502
1503#[cfg(test)]
1504pub mod tests {
1505 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1506
1507 use arrow_array::{
1508 types::{Float64Type, Int32Type},
1509 RecordBatch, UInt32Array,
1510 };
1511 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1512 use bytes::Bytes;
1513 use futures::{prelude::stream::TryStreamExt, StreamExt};
1514 use lance_arrow::RecordBatchExt;
1515 use lance_core::{datatypes::Schema, ArrowResult};
1516 use lance_datagen::{array, gen, BatchCount, ByteCount, RowCount};
1517 use lance_encoding::{
1518 decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1519 encoder::{encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodingOptions},
1520 version::LanceFileVersion,
1521 };
1522 use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
1523 use log::debug;
1524 use tokio::sync::mpsc;
1525
1526 use crate::v2::{
1527 reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1528 testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1529 writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1530 };
1531
1532 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1533 let location_type = DataType::Struct(Fields::from(vec![
1534 Field::new("x", DataType::Float64, true),
1535 Field::new("y", DataType::Float64, true),
1536 ]));
1537 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1538
1539 let mut reader = gen()
1540 .col("score", array::rand::<Float64Type>())
1541 .col("location", array::rand_type(&location_type))
1542 .col("categories", array::rand_type(&categories_type))
1543 .col("binary", array::rand_type(&DataType::Binary));
1544 if version <= LanceFileVersion::V2_0 {
1545 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1546 }
1547 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1548
1549 write_lance_file(
1550 reader,
1551 fs,
1552 FileWriterOptions {
1553 format_version: Some(version),
1554 ..Default::default()
1555 },
1556 )
1557 .await
1558 }
1559
1560 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1561
1562 async fn verify_expected(
1563 expected: &[RecordBatch],
1564 mut actual: Pin<Box<dyn RecordBatchStream>>,
1565 read_size: u32,
1566 transform: Option<Transformer>,
1567 ) {
1568 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1569 let mut expected_iter = expected.iter().map(|batch| {
1570 if let Some(transform) = &transform {
1571 transform(batch)
1572 } else {
1573 batch.clone()
1574 }
1575 });
1576 let mut next_expected = expected_iter.next().unwrap().clone();
1577 while let Some(actual) = actual.next().await {
1578 let mut actual = actual.unwrap();
1579 let mut rows_to_verify = actual.num_rows() as u32;
1580 let expected_length = remaining.min(read_size);
1581 assert_eq!(expected_length, rows_to_verify);
1582
1583 while rows_to_verify > 0 {
1584 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1585 assert_eq!(
1586 next_expected.slice(0, next_slice_len as usize),
1587 actual.slice(0, next_slice_len as usize)
1588 );
1589 remaining -= next_slice_len;
1590 rows_to_verify -= next_slice_len;
1591 if remaining > 0 {
1592 if next_slice_len == next_expected.num_rows() as u32 {
1593 next_expected = expected_iter.next().unwrap().clone();
1594 } else {
1595 next_expected = next_expected.slice(
1596 next_slice_len as usize,
1597 next_expected.num_rows() - next_slice_len as usize,
1598 );
1599 }
1600 }
1601 if rows_to_verify > 0 {
1602 actual = actual.slice(
1603 next_slice_len as usize,
1604 actual.num_rows() - next_slice_len as usize,
1605 );
1606 }
1607 }
1608 }
1609 assert_eq!(remaining, 0);
1610 }
1611
1612 #[tokio::test]
1613 async fn test_round_trip() {
1614 let fs = FsFixture::default();
1615
1616 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1617
1618 for read_size in [32, 1024, 1024 * 1024] {
1619 let file_scheduler = fs
1620 .scheduler
1621 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1622 .await
1623 .unwrap();
1624 let file_reader = FileReader::try_open(
1625 file_scheduler,
1626 None,
1627 Arc::<DecoderPlugins>::default(),
1628 &test_cache(),
1629 FileReaderOptions::default(),
1630 )
1631 .await
1632 .unwrap();
1633
1634 let schema = file_reader.schema();
1635 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1636
1637 let batch_stream = file_reader
1638 .read_stream(
1639 lance_io::ReadBatchParams::RangeFull,
1640 read_size,
1641 16,
1642 FilterExpression::no_filter(),
1643 )
1644 .unwrap();
1645
1646 verify_expected(&data, batch_stream, read_size, None).await;
1647 }
1648 }
1649
1650 #[test_log::test(tokio::test)]
1651 async fn test_encoded_batch_round_trip() {
1652 let data = gen()
1653 .col("x", array::rand::<Int32Type>())
1654 .col("y", array::rand_utf8(ByteCount::from(16), false))
1655 .into_batch_rows(RowCount::from(10000))
1656 .unwrap();
1657
1658 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1659
1660 let encoding_options = EncodingOptions {
1661 cache_bytes_per_column: 4096,
1662 max_page_bytes: 32 * 1024 * 1024,
1663 keep_original_array: true,
1664 buffer_alignment: 64,
1665 };
1666 let encoded_batch = encode_batch(
1667 &data,
1668 lance_schema.clone(),
1669 &CoreFieldEncodingStrategy::default(),
1670 &encoding_options,
1671 )
1672 .await
1673 .unwrap();
1674
1675 let bytes = encoded_batch.try_to_self_described_lance().unwrap();
1677
1678 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1679
1680 let decoded = decode_batch(
1681 &decoded_batch,
1682 &FilterExpression::no_filter(),
1683 Arc::<DecoderPlugins>::default(),
1684 false,
1685 LanceFileVersion::default(),
1686 None,
1687 )
1688 .await
1689 .unwrap();
1690
1691 assert_eq!(data, decoded);
1692
1693 let bytes = encoded_batch.try_to_mini_lance().unwrap();
1695 let decoded_batch =
1696 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1697 .unwrap();
1698 let decoded = decode_batch(
1699 &decoded_batch,
1700 &FilterExpression::no_filter(),
1701 Arc::<DecoderPlugins>::default(),
1702 false,
1703 LanceFileVersion::default(),
1704 None,
1705 )
1706 .await
1707 .unwrap();
1708
1709 assert_eq!(data, decoded);
1710 }
1711
1712 #[test_log::test(tokio::test)]
1713 async fn test_projection() {
1714 let fs = FsFixture::default();
1715
1716 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1717 let file_scheduler = fs
1718 .scheduler
1719 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1720 .await
1721 .unwrap();
1722
1723 let field_id_mapping = written_file
1724 .field_id_mapping
1725 .iter()
1726 .copied()
1727 .collect::<BTreeMap<_, _>>();
1728
1729 let empty_projection = ReaderProjection {
1730 column_indices: Vec::default(),
1731 schema: Arc::new(Schema::default()),
1732 };
1733
1734 for columns in [
1735 vec!["score"],
1736 vec!["location"],
1737 vec!["categories"],
1738 vec!["score.x"],
1739 vec!["score", "categories"],
1740 vec!["score", "location"],
1741 vec!["location", "categories"],
1742 vec!["score.y", "location", "categories"],
1743 ] {
1744 debug!("Testing round trip with projection {:?}", columns);
1745 let file_reader = FileReader::try_open(
1747 file_scheduler.clone(),
1748 None,
1749 Arc::<DecoderPlugins>::default(),
1750 &test_cache(),
1751 FileReaderOptions::default(),
1752 )
1753 .await
1754 .unwrap();
1755
1756 let projected_schema = written_file.schema.project(&columns).unwrap();
1757 let projection = ReaderProjection::from_field_ids(
1758 &file_reader,
1759 &projected_schema,
1760 &field_id_mapping,
1761 )
1762 .unwrap();
1763
1764 let batch_stream = file_reader
1765 .read_stream_projected(
1766 lance_io::ReadBatchParams::RangeFull,
1767 1024,
1768 16,
1769 projection.clone(),
1770 FilterExpression::no_filter(),
1771 )
1772 .unwrap();
1773
1774 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1775 verify_expected(
1776 &written_file.data,
1777 batch_stream,
1778 1024,
1779 Some(Box::new(move |batch: &RecordBatch| {
1780 batch.project_by_schema(&projection_arrow).unwrap()
1781 })),
1782 )
1783 .await;
1784
1785 let file_reader = FileReader::try_open(
1787 file_scheduler.clone(),
1788 Some(projection.clone()),
1789 Arc::<DecoderPlugins>::default(),
1790 &test_cache(),
1791 FileReaderOptions::default(),
1792 )
1793 .await
1794 .unwrap();
1795
1796 let batch_stream = file_reader
1797 .read_stream(
1798 lance_io::ReadBatchParams::RangeFull,
1799 1024,
1800 16,
1801 FilterExpression::no_filter(),
1802 )
1803 .unwrap();
1804
1805 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1806 verify_expected(
1807 &written_file.data,
1808 batch_stream,
1809 1024,
1810 Some(Box::new(move |batch: &RecordBatch| {
1811 batch.project_by_schema(&projection_arrow).unwrap()
1812 })),
1813 )
1814 .await;
1815
1816 assert!(file_reader
1817 .read_stream_projected(
1818 lance_io::ReadBatchParams::RangeFull,
1819 1024,
1820 16,
1821 empty_projection.clone(),
1822 FilterExpression::no_filter(),
1823 )
1824 .is_err());
1825 }
1826
1827 assert!(FileReader::try_open(
1828 file_scheduler.clone(),
1829 Some(empty_projection),
1830 Arc::<DecoderPlugins>::default(),
1831 &test_cache(),
1832 FileReaderOptions::default(),
1833 )
1834 .await
1835 .is_err());
1836
1837 let arrow_schema = ArrowSchema::new(vec![
1838 Field::new("x", DataType::Int32, true),
1839 Field::new("y", DataType::Int32, true),
1840 ]);
1841 let schema = Schema::try_from(&arrow_schema).unwrap();
1842
1843 let projection_with_dupes = ReaderProjection {
1844 column_indices: vec![0, 0],
1845 schema: Arc::new(schema),
1846 };
1847
1848 assert!(FileReader::try_open(
1849 file_scheduler.clone(),
1850 Some(projection_with_dupes),
1851 Arc::<DecoderPlugins>::default(),
1852 &test_cache(),
1853 FileReaderOptions::default(),
1854 )
1855 .await
1856 .is_err());
1857 }
1858
1859 #[test_log::test(tokio::test)]
1860 async fn test_compressing_buffer() {
1861 let fs = FsFixture::default();
1862
1863 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1864 let file_scheduler = fs
1865 .scheduler
1866 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1867 .await
1868 .unwrap();
1869
1870 let file_reader = FileReader::try_open(
1872 file_scheduler.clone(),
1873 None,
1874 Arc::<DecoderPlugins>::default(),
1875 &test_cache(),
1876 FileReaderOptions::default(),
1877 )
1878 .await
1879 .unwrap();
1880
1881 let mut projection = written_file.schema.project(&["score"]).unwrap();
1882 for field in projection.fields.iter_mut() {
1883 field
1884 .metadata
1885 .insert("lance:compression".to_string(), "zstd".to_string());
1886 }
1887 let projection = ReaderProjection {
1888 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1889 schema: Arc::new(projection),
1890 };
1891
1892 let batch_stream = file_reader
1893 .read_stream_projected(
1894 lance_io::ReadBatchParams::RangeFull,
1895 1024,
1896 16,
1897 projection.clone(),
1898 FilterExpression::no_filter(),
1899 )
1900 .unwrap();
1901
1902 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1903 verify_expected(
1904 &written_file.data,
1905 batch_stream,
1906 1024,
1907 Some(Box::new(move |batch: &RecordBatch| {
1908 batch.project_by_schema(&projection_arrow).unwrap()
1909 })),
1910 )
1911 .await;
1912 }
1913
1914 #[tokio::test]
1915 async fn test_read_all() {
1916 let fs = FsFixture::default();
1917 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1918 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1919
1920 let file_scheduler = fs
1921 .scheduler
1922 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1923 .await
1924 .unwrap();
1925 let file_reader = FileReader::try_open(
1926 file_scheduler.clone(),
1927 None,
1928 Arc::<DecoderPlugins>::default(),
1929 &test_cache(),
1930 FileReaderOptions::default(),
1931 )
1932 .await
1933 .unwrap();
1934
1935 let batches = file_reader
1936 .read_stream(
1937 lance_io::ReadBatchParams::RangeFull,
1938 total_rows as u32,
1939 16,
1940 FilterExpression::no_filter(),
1941 )
1942 .unwrap()
1943 .try_collect::<Vec<_>>()
1944 .await
1945 .unwrap();
1946 assert_eq!(batches.len(), 1);
1947 assert_eq!(batches[0].num_rows(), total_rows);
1948 }
1949
1950 #[tokio::test]
1951 async fn test_blocking_take() {
1952 let fs = FsFixture::default();
1953 let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
1954 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1955
1956 let file_scheduler = fs
1957 .scheduler
1958 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
1959 .await
1960 .unwrap();
1961 let file_reader = FileReader::try_open(
1962 file_scheduler.clone(),
1963 Some(ReaderProjection::from_column_names(&schema, &["score"]).unwrap()),
1964 Arc::<DecoderPlugins>::default(),
1965 &test_cache(),
1966 FileReaderOptions::default(),
1967 )
1968 .await
1969 .unwrap();
1970
1971 let batches = tokio::task::spawn_blocking(move || {
1972 file_reader
1973 .read_stream_projected_blocking(
1974 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
1975 total_rows as u32,
1976 None,
1977 FilterExpression::no_filter(),
1978 )
1979 .unwrap()
1980 .collect::<ArrowResult<Vec<_>>>()
1981 .unwrap()
1982 })
1983 .await
1984 .unwrap();
1985
1986 assert_eq!(batches.len(), 1);
1987 assert_eq!(batches[0].num_rows(), 5);
1988 assert_eq!(batches[0].num_columns(), 1);
1989 }
1990
1991 #[tokio::test(flavor = "multi_thread")]
1992 async fn test_drop_in_progress() {
1993 let fs = FsFixture::default();
1994 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1995 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1996
1997 let file_scheduler = fs
1998 .scheduler
1999 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2000 .await
2001 .unwrap();
2002 let file_reader = FileReader::try_open(
2003 file_scheduler.clone(),
2004 None,
2005 Arc::<DecoderPlugins>::default(),
2006 &test_cache(),
2007 FileReaderOptions::default(),
2008 )
2009 .await
2010 .unwrap();
2011
2012 let mut batches = file_reader
2013 .read_stream(
2014 lance_io::ReadBatchParams::RangeFull,
2015 (total_rows / 10) as u32,
2016 16,
2017 FilterExpression::no_filter(),
2018 )
2019 .unwrap();
2020
2021 drop(file_reader);
2022
2023 let batch = batches.next().await.unwrap().unwrap();
2024 assert!(batch.num_rows() > 0);
2025
2026 drop(batches);
2028 }
2029
2030 #[tokio::test]
2031 async fn drop_while_scheduling() {
2032 let fs = FsFixture::default();
2042 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
2043 let total_rows = written_file
2044 .data
2045 .iter()
2046 .map(|batch| batch.num_rows())
2047 .sum::<usize>();
2048
2049 let file_scheduler = fs
2050 .scheduler
2051 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2052 .await
2053 .unwrap();
2054 let file_reader = FileReader::try_open(
2055 file_scheduler.clone(),
2056 None,
2057 Arc::<DecoderPlugins>::default(),
2058 &test_cache(),
2059 FileReaderOptions::default(),
2060 )
2061 .await
2062 .unwrap();
2063
2064 let projection =
2065 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
2066 let column_infos = file_reader
2067 .collect_columns_from_projection(&projection)
2068 .unwrap();
2069 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2070 &projection.schema,
2071 &projection.column_indices,
2072 &column_infos,
2073 &vec![],
2074 total_rows as u64,
2075 Arc::<DecoderPlugins>::default(),
2076 file_reader.scheduler.clone(),
2077 test_cache(),
2078 &FilterExpression::no_filter(),
2079 )
2080 .await
2081 .unwrap();
2082
2083 let range = 0..total_rows as u64;
2084
2085 let (tx, rx) = mpsc::unbounded_channel();
2086
2087 drop(rx);
2089
2090 decode_scheduler.schedule_range(
2092 range,
2093 &FilterExpression::no_filter(),
2094 tx,
2095 file_reader.scheduler.clone(),
2096 )
2097 }
2098
2099 #[tokio::test]
2100 async fn test_global_buffers() {
2101 let fs = FsFixture::default();
2102
2103 let lance_schema =
2104 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
2105 "foo",
2106 DataType::Int32,
2107 true,
2108 )]))
2109 .unwrap();
2110
2111 let mut file_writer = FileWriter::try_new(
2112 fs.object_store.create(&fs.tmp_path).await.unwrap(),
2113 lance_schema.clone(),
2114 FileWriterOptions::default(),
2115 )
2116 .unwrap();
2117
2118 let test_bytes = Bytes::from_static(b"hello");
2119
2120 let buf_index = file_writer
2121 .add_global_buffer(test_bytes.clone())
2122 .await
2123 .unwrap();
2124
2125 assert_eq!(buf_index, 1);
2126
2127 file_writer.finish().await.unwrap();
2128
2129 let file_scheduler = fs
2130 .scheduler
2131 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
2132 .await
2133 .unwrap();
2134 let file_reader = FileReader::try_open(
2135 file_scheduler.clone(),
2136 None,
2137 Arc::<DecoderPlugins>::default(),
2138 &test_cache(),
2139 FileReaderOptions::default(),
2140 )
2141 .await
2142 .unwrap();
2143
2144 let buf = file_reader.read_global_buffer(1).await.unwrap();
2145 assert_eq!(buf, test_bytes);
2146 }
2147}