1use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12 builder::PrimitiveBuilder,
13 cast::AsArray,
14 types::{Int32Type, Int64Type},
15 ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
16 RecordBatch, StructArray, UInt32Array,
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
24use lance_arrow::*;
25use lance_core::cache::FileMetadataCache;
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::dictionary::DictionaryDecoder;
29use lance_io::encodings::AsyncIndex;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33 read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{object_store::ObjectStore, ReadBatchParams};
36
37use object_store::path::Path;
38use snafu::location;
39use tracing::instrument;
40
41use crate::format::metadata::Metadata;
42use crate::page_table::{PageInfo, PageTable};
43
44#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49 pub object_reader: Arc<dyn Reader>,
50 metadata: Arc<Metadata>,
51 page_table: Arc<PageTable>,
52 schema: Schema,
53
54 fragment_id: u64,
57
58 stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("FileReader")
65 .field("fragment", &self.fragment_id)
66 .field("path", &self.object_reader.path())
67 .finish()
68 }
69}
70
71impl FileReader {
72 #[instrument(level = "debug", skip(object_store, schema, session))]
85 pub async fn try_new_with_fragment_id(
86 object_store: &ObjectStore,
87 path: &Path,
88 schema: Schema,
89 fragment_id: u32,
90 field_id_offset: i32,
91 max_field_id: i32,
92 session: Option<&FileMetadataCache>,
93 ) -> Result<Self> {
94 let object_reader = object_store.open(path).await?;
95
96 let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
97
98 Self::try_new_from_reader(
99 path,
100 object_reader.into(),
101 Some(metadata),
102 schema,
103 fragment_id,
104 field_id_offset,
105 max_field_id,
106 session,
107 )
108 .await
109 }
110
111 #[allow(clippy::too_many_arguments)]
112 pub async fn try_new_from_reader(
113 path: &Path,
114 object_reader: Arc<dyn Reader>,
115 metadata: Option<Arc<Metadata>>,
116 schema: Schema,
117 fragment_id: u32,
118 field_id_offset: i32,
119 max_field_id: i32,
120 session: Option<&FileMetadataCache>,
121 ) -> Result<Self> {
122 let metadata = match metadata {
123 Some(metadata) => metadata,
124 None => Self::read_metadata(object_reader.as_ref(), session).await?,
125 };
126
127 let page_table = async {
128 Self::load_from_cache(session, path, |_| async {
129 PageTable::load(
130 object_reader.as_ref(),
131 metadata.page_table_position,
132 field_id_offset,
133 max_field_id,
134 metadata.num_batches() as i32,
135 )
136 .await
137 })
138 .await
139 };
140
141 let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
142
143 let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
145
146 Ok(Self {
147 object_reader,
148 metadata,
149 schema,
150 page_table,
151 fragment_id: fragment_id as u64,
152 stats_page_table,
153 })
154 }
155
156 pub async fn read_metadata(
157 object_reader: &dyn Reader,
158 cache: Option<&FileMetadataCache>,
159 ) -> Result<Arc<Metadata>> {
160 Self::load_from_cache(cache, object_reader.path(), |_| async {
161 let file_size = object_reader.size().await?;
162 let begin = if file_size < object_reader.block_size() {
163 0
164 } else {
165 file_size - object_reader.block_size()
166 };
167 let tail_bytes = object_reader.get_range(begin..file_size).await?;
168 let metadata_pos = read_metadata_offset(&tail_bytes)?;
169
170 let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
171 read_struct(object_reader, metadata_pos).await?
173 } else {
174 let offset = tail_bytes.len() - (file_size - metadata_pos);
175 read_struct_from_buf(&tail_bytes.slice(offset..))?
176 };
177 Ok(metadata)
178 })
179 .await
180 }
181
182 async fn read_stats_page_table(
186 reader: &dyn Reader,
187 cache: Option<&FileMetadataCache>,
188 ) -> Result<Arc<Option<PageTable>>> {
189 Self::load_from_cache(cache, &reader.path().child("stats"), |_| async {
191 let metadata = Self::read_metadata(reader, cache).await?;
192
193 if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
194 Ok(Some(
195 PageTable::load(
196 reader,
197 stats_meta.page_table_position,
198 0,
199 *stats_meta.leaf_field_ids.iter().max().unwrap(),
200 1,
201 )
202 .await?,
203 ))
204 } else {
205 Ok(None)
206 }
207 })
208 .await
209 }
210
211 async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
213 cache: Option<&FileMetadataCache>,
214 path: &Path,
215 loader: F,
216 ) -> Result<Arc<T>>
217 where
218 F: Fn(&Path) -> Fut,
219 Fut: Future<Output = Result<T>>,
220 {
221 if let Some(cache) = cache {
222 cache.get_or_insert(path, loader).await
223 } else {
224 Ok(Arc::new(loader(path).await?))
225 }
226 }
227
228 pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
230 let max_field_id = schema.max_field_id().unwrap_or_default();
232 Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
233 }
234
235 fn io_parallelism(&self) -> usize {
236 self.object_reader.io_parallelism()
237 }
238
239 pub fn schema(&self) -> &Schema {
241 &self.schema
242 }
243
244 pub fn num_batches(&self) -> usize {
245 self.metadata.num_batches()
246 }
247
248 pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
250 self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
251 }
252
253 pub fn len(&self) -> usize {
255 self.metadata.len()
256 }
257
258 pub fn is_empty(&self) -> bool {
259 self.metadata.is_empty()
260 }
261
262 #[instrument(level = "debug", skip(self, params, projection))]
266 pub async fn read_batch(
267 &self,
268 batch_id: i32,
269 params: impl Into<ReadBatchParams>,
270 projection: &Schema,
271 ) -> Result<RecordBatch> {
272 read_batch(self, ¶ms.into(), projection, batch_id).await
273 }
274
275 #[instrument(level = "debug", skip(self, projection))]
280 pub async fn read_range(
281 &self,
282 range: Range<usize>,
283 projection: &Schema,
284 ) -> Result<RecordBatch> {
285 if range.is_empty() {
286 return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
287 }
288 let range_in_batches = self.metadata.range_to_batches(range)?;
289 let batches =
290 stream::iter(range_in_batches)
291 .map(|(batch_id, range)| async move {
292 self.read_batch(batch_id, range, projection).await
293 })
294 .buffered(self.io_parallelism())
295 .try_collect::<Vec<_>>()
296 .await?;
297 if batches.len() == 1 {
298 return Ok(batches[0].clone());
299 }
300 let schema = batches[0].schema();
301 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
302 }
303
304 #[instrument(level = "debug", skip_all)]
308 pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
309 let num_batches = self.num_batches();
310 let num_rows = self.len() as u32;
311 let indices_in_batches = self.metadata.group_indices_to_batches(indices);
312 let batches = stream::iter(indices_in_batches)
313 .map(|batch| async move {
314 if batch.batch_id >= num_batches as i32 {
315 Err(Error::InvalidInput {
316 source: format!("batch_id: {} out of bounds", batch.batch_id).into(),
317 location: location!(),
318 })
319 } else if *batch.offsets.last().expect("got empty batch") > num_rows {
320 Err(Error::InvalidInput {
321 source: format!("indices: {:?} out of bounds", batch.offsets).into(),
322 location: location!(),
323 })
324 } else {
325 self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
326 .await
327 }
328 })
329 .buffered(self.io_parallelism())
330 .try_collect::<Vec<_>>()
331 .await?;
332
333 let schema = Arc::new(ArrowSchema::from(projection));
334
335 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
336 }
337
338 pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
340 self.metadata.stats_metadata.as_ref().map(|meta| {
341 let mut stats_field_ids = vec![];
342 for stats_field in &meta.schema.fields {
343 if let Ok(stats_field_id) = stats_field.name.parse::<i32>() {
344 if field_ids.contains(&stats_field_id) {
345 stats_field_ids.push(stats_field.id);
346 for child in &stats_field.children {
347 stats_field_ids.push(child.id);
348 }
349 }
350 }
351 }
352 meta.schema.project_by_ids(&stats_field_ids, true)
353 })
354 }
355
356 pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
358 if let Some(stats_page_table) = self.stats_page_table.as_ref() {
359 let projection = self.page_stats_schema(field_ids).unwrap();
360 if projection.fields.is_empty() {
362 return Ok(None);
363 }
364 let arrays = futures::stream::iter(projection.fields.iter().cloned())
365 .map(|field| async move {
366 read_array(
367 self,
368 &field,
369 0,
370 stats_page_table,
371 &ReadBatchParams::RangeFull,
372 )
373 .await
374 })
375 .buffered(self.io_parallelism())
376 .try_collect::<Vec<_>>()
377 .await?;
378
379 let schema = ArrowSchema::from(&projection);
380 let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
381 Ok(Some(batch))
382 } else {
383 Ok(None)
384 }
385 }
386}
387
388pub fn batches_stream(
399 reader: FileReader,
400 projection: Schema,
401 predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
402) -> impl RecordBatchStream {
403 let projection = Arc::new(projection);
405 let arrow_schema = ArrowSchema::from(projection.as_ref());
406
407 let total_batches = reader.num_batches() as i32;
408 let batches = (0..total_batches).filter(predicate);
409 let this = Arc::new(reader);
411 let inner = stream::iter(batches)
412 .zip(stream::repeat_with(move || {
413 (this.clone(), projection.clone())
414 }))
415 .map(move |(batch_id, (reader, projection))| async move {
416 reader
417 .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
418 .await
419 })
420 .buffered(2)
421 .boxed();
422 RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
423}
424
425pub async fn read_batch(
430 reader: &FileReader,
431 params: &ReadBatchParams,
432 schema: &Schema,
433 batch_id: i32,
434) -> Result<RecordBatch> {
435 if !schema.fields.is_empty() {
436 let arrs = stream::iter(&schema.fields)
438 .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
439 .buffered(reader.io_parallelism())
440 .try_collect::<Vec<_>>()
441 .boxed();
442 let arrs = arrs.await?;
443 Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
444 } else {
445 Err(Error::invalid_input("no fields requested", location!()))
446 }
447}
448
449#[async_recursion]
450async fn read_array(
451 reader: &FileReader,
452 field: &Field,
453 batch_id: i32,
454 page_table: &PageTable,
455 params: &ReadBatchParams,
456) -> Result<ArrayRef> {
457 let data_type = field.data_type();
458
459 use DataType::*;
460
461 if data_type.is_fixed_stride() {
462 _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
463 } else {
464 match data_type {
465 Null => read_null_array(field, batch_id, page_table, params),
466 Utf8 | LargeUtf8 | Binary | LargeBinary => {
467 read_binary_array(reader, field, batch_id, page_table, params).await
468 }
469 Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
470 Dictionary(_, _) => {
471 read_dictionary_array(reader, field, batch_id, page_table, params).await
472 }
473 List(_) => {
474 read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
475 }
476 LargeList(_) => {
477 read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
478 }
479 _ => {
480 unimplemented!("{}", format!("No support for {data_type} yet"));
481 }
482 }
483 }
484}
485
486fn get_page_info<'a>(
487 page_table: &'a PageTable,
488 field: &'a Field,
489 batch_id: i32,
490) -> Result<&'a PageInfo> {
491 page_table.get(field.id, batch_id).ok_or_else(|| {
492 Error::io(
493 format!(
494 "No page info found for field: {}, field_id={} batch={}",
495 field.name, field.id, batch_id
496 ),
497 location!(),
498 )
499 })
500}
501
502async fn _read_fixed_stride_array(
504 reader: &FileReader,
505 field: &Field,
506 batch_id: i32,
507 page_table: &PageTable,
508 params: &ReadBatchParams,
509) -> Result<ArrayRef> {
510 let page_info = get_page_info(page_table, field, batch_id)?;
511 read_fixed_stride_array(
512 reader.object_reader.as_ref(),
513 &field.data_type(),
514 page_info.position,
515 page_info.length,
516 params.clone(),
517 )
518 .await
519}
520
521fn read_null_array(
522 field: &Field,
523 batch_id: i32,
524 page_table: &PageTable,
525 params: &ReadBatchParams,
526) -> Result<ArrayRef> {
527 let page_info = get_page_info(page_table, field, batch_id)?;
528
529 let length_output = match params {
530 ReadBatchParams::Indices(indices) => {
531 if indices.is_empty() {
532 0
533 } else {
534 let idx_max = *indices.values().iter().max().unwrap() as u64;
535 if idx_max >= page_info.length as u64 {
536 return Err(Error::io(
537 format!(
538 "NullArray Reader: request([{}]) out of range: [0..{}]",
539 idx_max, page_info.length
540 ),
541 location!(),
542 ));
543 }
544 indices.len()
545 }
546 }
547 _ => {
548 let (idx_start, idx_end) = match params {
549 ReadBatchParams::Range(r) => (r.start, r.end),
550 ReadBatchParams::RangeFull => (0, page_info.length),
551 ReadBatchParams::RangeTo(r) => (0, r.end),
552 ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
553 _ => unreachable!(),
554 };
555 if idx_end > page_info.length {
556 return Err(Error::io(
557 format!(
558 "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
559 idx_start,
561 idx_end,
562 page_info.length
563 ),
564 location!(),
565 ));
566 }
567 idx_end - idx_start
568 }
569 };
570
571 Ok(Arc::new(NullArray::new(length_output)))
572}
573
574async fn read_binary_array(
575 reader: &FileReader,
576 field: &Field,
577 batch_id: i32,
578 page_table: &PageTable,
579 params: &ReadBatchParams,
580) -> Result<ArrayRef> {
581 let page_info = get_page_info(page_table, field, batch_id)?;
582
583 lance_io::utils::read_binary_array(
584 reader.object_reader.as_ref(),
585 &field.data_type(),
586 field.nullable,
587 page_info.position,
588 page_info.length,
589 params,
590 )
591 .await
592}
593
594async fn read_dictionary_array(
595 reader: &FileReader,
596 field: &Field,
597 batch_id: i32,
598 page_table: &PageTable,
599 params: &ReadBatchParams,
600) -> Result<ArrayRef> {
601 let page_info = get_page_info(page_table, field, batch_id)?;
602 let data_type = field.data_type();
603 let decoder = DictionaryDecoder::new(
604 reader.object_reader.as_ref(),
605 page_info.position,
606 page_info.length,
607 &data_type,
608 field
609 .dictionary
610 .as_ref()
611 .unwrap()
612 .values
613 .as_ref()
614 .unwrap()
615 .clone(),
616 );
617 decoder.get(params.clone()).await
618}
619
620async fn read_struct_array(
621 reader: &FileReader,
622 field: &Field,
623 batch_id: i32,
624 page_table: &PageTable,
625 params: &ReadBatchParams,
626) -> Result<ArrayRef> {
627 let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
629
630 for child in field.children.as_slice() {
631 let arr = read_array(reader, child, batch_id, page_table, params).await?;
632 sub_arrays.push((Arc::new(child.into()), arr));
633 }
634
635 Ok(Arc::new(StructArray::from(sub_arrays)))
636}
637
638async fn take_list_array<T: ArrowNumericType>(
639 reader: &FileReader,
640 field: &Field,
641 batch_id: i32,
642 page_table: &PageTable,
643 positions: &PrimitiveArray<T>,
644 indices: &UInt32Array,
645) -> Result<ArrayRef>
646where
647 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
648{
649 let first_idx = indices.value(0);
650 let ranges = indices
652 .values()
653 .iter()
654 .map(|i| (*i - first_idx).as_usize())
655 .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
656 .collect::<Vec<_>>();
657 let field = field.clone();
658 let mut list_values: Vec<ArrayRef> = vec![];
659 for range in ranges.iter() {
661 list_values.push(
662 read_array(
663 reader,
664 &field.children[0],
665 batch_id,
666 page_table,
667 &(range.clone()).into(),
668 )
669 .await?,
670 );
671 }
672
673 let value_refs = list_values
674 .iter()
675 .map(|arr| arr.as_ref())
676 .collect::<Vec<_>>();
677 let mut offsets_builder = PrimitiveBuilder::<T>::new();
678 offsets_builder.append_value(T::Native::usize_as(0));
679 let mut off = 0_usize;
680 for range in ranges {
681 off += range.len();
682 offsets_builder.append_value(T::Native::usize_as(off));
683 }
684 let all_values = concat::concat(value_refs.as_slice())?;
685 let offset_arr = offsets_builder.finish();
686 let arr = try_new_generic_list_array(all_values, &offset_arr)?;
687 Ok(Arc::new(arr) as ArrayRef)
688}
689
690async fn read_list_array<T: ArrowNumericType>(
691 reader: &FileReader,
692 field: &Field,
693 batch_id: i32,
694 page_table: &PageTable,
695 params: &ReadBatchParams,
696) -> Result<ArrayRef>
697where
698 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
699{
700 let positions_params = match params {
702 ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
703 ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
704 ReadBatchParams::Indices(indices) => {
705 (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
706 }
707 p => p.clone(),
708 };
709
710 let page_info = get_page_info(&reader.page_table, field, batch_id)?;
711 let position_arr = read_fixed_stride_array(
712 reader.object_reader.as_ref(),
713 &T::DATA_TYPE,
714 page_info.position,
715 page_info.length,
716 positions_params,
717 )
718 .await?;
719
720 let positions: &PrimitiveArray<T> = position_arr.as_primitive();
721
722 let value_params = match params {
724 ReadBatchParams::Range(range) => ReadBatchParams::from(
725 positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
726 ),
727 ReadBatchParams::RangeTo(RangeTo { end }) => {
728 ReadBatchParams::from(..positions.value(*end).as_usize())
729 }
730 ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
731 ReadBatchParams::RangeFull => ReadBatchParams::from(
732 positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
733 ),
734 ReadBatchParams::Indices(indices) => {
735 return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
736 }
737 };
738
739 let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
740 let offset_arr = sub(positions, &start_position)?;
741 let offset_arr_ref = offset_arr.as_primitive::<T>();
742 let value_arrs = read_array(
743 reader,
744 &field.children[0],
745 batch_id,
746 page_table,
747 &value_params,
748 )
749 .await?;
750 let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
751 Ok(Arc::new(arr) as ArrayRef)
752}
753
754#[cfg(test)]
755mod tests {
756 use crate::writer::{FileWriter, NotSelfDescribing};
757
758 use super::*;
759
760 use arrow_array::{
761 builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
762 cast::{as_string_array, as_struct_array},
763 types::UInt8Type,
764 Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
765 UInt8Array,
766 };
767 use arrow_array::{BooleanArray, Int32Array};
768 use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
769 use lance_io::object_store::ObjectStoreParams;
770
771 #[tokio::test]
772 async fn test_take() {
773 let arrow_schema = ArrowSchema::new(vec![
774 ArrowField::new("i", DataType::Int64, true),
775 ArrowField::new("f", DataType::Float32, false),
776 ArrowField::new("s", DataType::Utf8, false),
777 ArrowField::new(
778 "d",
779 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
780 false,
781 ),
782 ]);
783 let mut schema = Schema::try_from(&arrow_schema).unwrap();
784
785 let store = ObjectStore::memory();
786 let path = Path::from("/take_test");
787
788 let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
790 let values_ref = Arc::new(values);
791 let mut batches = vec![];
792 for batch_id in 0..10 {
793 let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
794 let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
795 let columns: Vec<ArrayRef> = vec![
796 Arc::new(Int64Array::from_iter(
797 value_range.clone().collect::<Vec<_>>(),
798 )),
799 Arc::new(Float32Array::from_iter(
800 value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
801 )),
802 Arc::new(StringArray::from_iter_values(
803 value_range.clone().map(|n| format!("str-{}", n)),
804 )),
805 Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
806 ];
807 batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
808 }
809 schema.set_dictionary(&batches[0]).unwrap();
810
811 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
812 &store,
813 &path,
814 schema.clone(),
815 &Default::default(),
816 )
817 .await
818 .unwrap();
819 for batch in batches.iter() {
820 file_writer.write(&[batch.clone()]).await.unwrap();
821 }
822 file_writer.finish().await.unwrap();
823
824 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
825 let batch = reader
826 .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
827 .await
828 .unwrap();
829 let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
830 assert_eq!(
831 batch,
832 RecordBatch::try_new(
833 batch.schema(),
834 vec![
835 Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
836 Arc::new(Float32Array::from_iter_values([
837 1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
838 ])),
839 Arc::new(StringArray::from_iter_values([
840 "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
841 ])),
842 Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
843 ]
844 )
845 .unwrap()
846 );
847 }
848
849 async fn test_write_null_string_in_struct(field_nullable: bool) {
850 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
851 "parent",
852 DataType::Struct(ArrowFields::from(vec![ArrowField::new(
853 "str",
854 DataType::Utf8,
855 field_nullable,
856 )])),
857 true,
858 )]));
859
860 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
861
862 let store = ObjectStore::memory();
863 let path = Path::from("/null_strings");
864
865 let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
866 let struct_arr = Arc::new(StructArray::from(vec![(
867 Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
868 string_arr.clone() as ArrayRef,
869 )]));
870 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
871
872 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
873 &store,
874 &path,
875 schema.clone(),
876 &Default::default(),
877 )
878 .await
879 .unwrap();
880 file_writer.write(&[batch.clone()]).await.unwrap();
881 file_writer.finish().await.unwrap();
882
883 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
884 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
885
886 if field_nullable {
887 assert_eq!(
888 &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
889 as_string_array(
890 as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
891 .column_by_name("str")
892 .unwrap()
893 .as_ref()
894 )
895 );
896 } else {
897 assert_eq!(actual_batch, batch);
898 }
899 }
900
901 #[tokio::test]
902 async fn read_nullable_string_in_struct() {
903 test_write_null_string_in_struct(true).await;
904 test_write_null_string_in_struct(false).await;
905 }
906
907 #[tokio::test]
908 async fn test_read_struct_of_list_arrays() {
909 let store = ObjectStore::memory();
910 let path = Path::from("/null_strings");
911
912 let arrow_schema = make_schema_of_list_array();
913 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
914
915 let batches = (0..3)
916 .map(|_| {
917 let struct_array = make_struct_of_list_array(10, 10);
918 RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
919 })
920 .collect::<Vec<_>>();
921 let batches_ref = batches.iter().collect::<Vec<_>>();
922
923 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
924 &store,
925 &path,
926 schema.clone(),
927 &Default::default(),
928 )
929 .await
930 .unwrap();
931 file_writer.write(&batches).await.unwrap();
932 file_writer.finish().await.unwrap();
933
934 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
935 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
936 let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
937 assert_eq!(expected, actual_batch);
938 }
939
940 #[tokio::test]
941 async fn test_scan_struct_of_list_arrays() {
942 let store = ObjectStore::memory();
943 let path = Path::from("/null_strings");
944
945 let arrow_schema = make_schema_of_list_array();
946 let struct_array = make_struct_of_list_array(3, 10);
947 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
948 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
949
950 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
951 &store,
952 &path,
953 schema.clone(),
954 &Default::default(),
955 )
956 .await
957 .unwrap();
958 file_writer.write(&[batch]).await.unwrap();
959 file_writer.finish().await.unwrap();
960
961 let mut expected_columns: Vec<ArrayRef> = Vec::new();
962 for c in struct_array.columns().iter() {
963 expected_columns.push(c.slice(1, 1));
964 }
965
966 let expected_struct = match arrow_schema.fields[0].data_type() {
967 DataType::Struct(subfields) => subfields
968 .iter()
969 .zip(expected_columns)
970 .map(|(f, d)| (f.clone(), d))
971 .collect::<Vec<_>>(),
972 _ => panic!("unexpected field"),
973 };
974
975 let expected_struct_array = StructArray::from(expected_struct);
976 let expected_batch = RecordBatch::from(&StructArray::from(vec![(
977 Arc::new(arrow_schema.fields[0].as_ref().clone()),
978 Arc::new(expected_struct_array) as ArrayRef,
979 )]));
980
981 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
982 let params = ReadBatchParams::Range(1..2);
983 let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
984 assert_eq!(expected_batch, slice_of_batch);
985 }
986
987 fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
988 Arc::new(ArrowSchema::new(vec![ArrowField::new(
989 "s",
990 DataType::Struct(ArrowFields::from(vec![
991 ArrowField::new(
992 "li",
993 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
994 true,
995 ),
996 ArrowField::new(
997 "ls",
998 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
999 true,
1000 ),
1001 ArrowField::new(
1002 "ll",
1003 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1004 false,
1005 ),
1006 ])),
1007 true,
1008 )]))
1009 }
1010
1011 fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1012 let mut li_builder = ListBuilder::new(Int32Builder::new());
1013 let mut ls_builder = ListBuilder::new(StringBuilder::new());
1014 let ll_value_builder = Int32Builder::new();
1015 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1016 for i in 0..rows {
1017 for j in 0..num_items {
1018 li_builder.values().append_value(i * 10 + j);
1019 ls_builder
1020 .values()
1021 .append_value(format!("str-{}", i * 10 + j));
1022 large_list_builder.values().append_value(i * 10 + j);
1023 }
1024 li_builder.append(true);
1025 ls_builder.append(true);
1026 large_list_builder.append(true);
1027 }
1028 Arc::new(StructArray::from(vec![
1029 (
1030 Arc::new(ArrowField::new(
1031 "li",
1032 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1033 true,
1034 )),
1035 Arc::new(li_builder.finish()) as ArrayRef,
1036 ),
1037 (
1038 Arc::new(ArrowField::new(
1039 "ls",
1040 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1041 true,
1042 )),
1043 Arc::new(ls_builder.finish()) as ArrayRef,
1044 ),
1045 (
1046 Arc::new(ArrowField::new(
1047 "ll",
1048 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1049 false,
1050 )),
1051 Arc::new(large_list_builder.finish()) as ArrayRef,
1052 ),
1053 ]))
1054 }
1055
1056 #[tokio::test]
1057 async fn test_read_nullable_arrays() {
1058 use arrow_array::Array;
1059
1060 let arrow_schema = ArrowSchema::new(vec![
1062 ArrowField::new("i", DataType::Int64, false),
1063 ArrowField::new("n", DataType::Null, true),
1064 ]);
1065 let schema = Schema::try_from(&arrow_schema).unwrap();
1066 let columns: Vec<ArrayRef> = vec![
1067 Arc::new(Int64Array::from_iter_values(0..100)),
1068 Arc::new(NullArray::new(100)),
1069 ];
1070 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1071
1072 let store = ObjectStore::memory();
1074 let path = Path::from("/takes");
1075 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1076 &store,
1077 &path,
1078 schema.clone(),
1079 &Default::default(),
1080 )
1081 .await
1082 .unwrap();
1083 file_writer.write(&[batch]).await.unwrap();
1084 file_writer.finish().await.unwrap();
1085
1086 let reader = FileReader::try_new(&store, &path, schema.clone())
1088 .await
1089 .unwrap();
1090
1091 async fn read_array_w_params(
1092 reader: &FileReader,
1093 field: &Field,
1094 params: ReadBatchParams,
1095 ) -> ArrayRef {
1096 read_array(reader, field, 0, reader.page_table.as_ref(), ¶ms)
1097 .await
1098 .expect("Error reading back the null array from file") as _
1099 }
1100
1101 let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1102 assert_eq!(100, arr.len());
1103 assert_eq!(arr.data_type(), &DataType::Null);
1104
1105 let arr =
1106 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1107 assert_eq!(15, arr.len());
1108 assert_eq!(arr.data_type(), &DataType::Null);
1109
1110 let arr =
1111 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1112 assert_eq!(40, arr.len());
1113 assert_eq!(arr.data_type(), &DataType::Null);
1114
1115 let arr =
1116 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1117 assert_eq!(25, arr.len());
1118 assert_eq!(arr.data_type(), &DataType::Null);
1119
1120 let arr = read_array_w_params(
1121 &reader,
1122 &schema.fields[1],
1123 ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1124 )
1125 .await;
1126 assert_eq!(4, arr.len());
1127 assert_eq!(arr.data_type(), &DataType::Null);
1128
1129 let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1131 let arr = read_array(
1132 &reader,
1133 &schema.fields[1],
1134 0,
1135 reader.page_table.as_ref(),
1136 ¶ms,
1137 );
1138 assert!(arr.await.is_err());
1139
1140 let params = ReadBatchParams::RangeTo(..107);
1142 let arr = read_array(
1143 &reader,
1144 &schema.fields[1],
1145 0,
1146 reader.page_table.as_ref(),
1147 ¶ms,
1148 );
1149 assert!(arr.await.is_err());
1150 }
1151
1152 #[tokio::test]
1153 async fn test_take_lists() {
1154 let arrow_schema = ArrowSchema::new(vec![
1155 ArrowField::new(
1156 "l",
1157 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1158 false,
1159 ),
1160 ArrowField::new(
1161 "ll",
1162 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1163 false,
1164 ),
1165 ]);
1166
1167 let value_builder = Int32Builder::new();
1168 let mut list_builder = ListBuilder::new(value_builder);
1169 let ll_value_builder = Int32Builder::new();
1170 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1171 for i in 0..100 {
1172 list_builder.values().append_value(i);
1173 large_list_builder.values().append_value(i);
1174 if (i + 1) % 10 == 0 {
1175 list_builder.append(true);
1176 large_list_builder.append(true);
1177 }
1178 }
1179 let list_arr = Arc::new(list_builder.finish());
1180 let large_list_arr = Arc::new(large_list_builder.finish());
1181
1182 let batch = RecordBatch::try_new(
1183 Arc::new(arrow_schema.clone()),
1184 vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1185 )
1186 .unwrap();
1187
1188 let store = ObjectStore::memory();
1190 let path = Path::from("/take_list");
1191 let schema: Schema = (&arrow_schema).try_into().unwrap();
1192 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1193 &store,
1194 &path,
1195 schema.clone(),
1196 &Default::default(),
1197 )
1198 .await
1199 .unwrap();
1200 file_writer.write(&[batch]).await.unwrap();
1201 file_writer.finish().await.unwrap();
1202
1203 let reader = FileReader::try_new(&store, &path, schema.clone())
1205 .await
1206 .unwrap();
1207 let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1208
1209 let value_builder = Int32Builder::new();
1210 let mut list_builder = ListBuilder::new(value_builder);
1211 let ll_value_builder = Int32Builder::new();
1212 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1213 for i in [1, 3, 5, 9] {
1214 for j in 0..10 {
1215 list_builder.values().append_value(i * 10 + j);
1216 large_list_builder.values().append_value(i * 10 + j);
1217 }
1218 list_builder.append(true);
1219 large_list_builder.append(true);
1220 }
1221 let expected_list = list_builder.finish();
1222 let expected_large_list = large_list_builder.finish();
1223
1224 assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1225 assert_eq!(
1226 actual.column_by_name("ll").unwrap().as_ref(),
1227 &expected_large_list
1228 );
1229 }
1230
1231 #[tokio::test]
1232 async fn test_list_array_with_offsets() {
1233 let arrow_schema = ArrowSchema::new(vec![
1234 ArrowField::new(
1235 "l",
1236 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1237 false,
1238 ),
1239 ArrowField::new(
1240 "ll",
1241 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1242 false,
1243 ),
1244 ]);
1245
1246 let store = ObjectStore::memory();
1247 let path = Path::from("/lists");
1248
1249 let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1250 Some(vec![Some(1), Some(2)]),
1251 Some(vec![Some(3), Some(4)]),
1252 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1253 ])
1254 .slice(1, 1);
1255 let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1256 Some(vec![Some(10), Some(11)]),
1257 Some(vec![Some(12), Some(13)]),
1258 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1259 ])
1260 .slice(1, 1);
1261
1262 let batch = RecordBatch::try_new(
1263 Arc::new(arrow_schema.clone()),
1264 vec![Arc::new(list_array), Arc::new(large_list_array)],
1265 )
1266 .unwrap();
1267
1268 let schema: Schema = (&arrow_schema).try_into().unwrap();
1269 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1270 &store,
1271 &path,
1272 schema.clone(),
1273 &Default::default(),
1274 )
1275 .await
1276 .unwrap();
1277 file_writer.write(&[batch.clone()]).await.unwrap();
1278 file_writer.finish().await.unwrap();
1279
1280 let file_size_bytes = store.size(&path).await.unwrap();
1282 assert!(file_size_bytes < 1_000);
1283
1284 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1285 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1286 assert_eq!(batch, actual_batch);
1287 }
1288
1289 #[tokio::test]
1290 async fn test_read_ranges() {
1291 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1293 let schema = Schema::try_from(&arrow_schema).unwrap();
1294 let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1295 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1296
1297 let store = ObjectStore::memory();
1299 let path = Path::from("/read_range");
1300 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1301 &store,
1302 &path,
1303 schema.clone(),
1304 &Default::default(),
1305 )
1306 .await
1307 .unwrap();
1308 file_writer.write(&[batch]).await.unwrap();
1309 file_writer.finish().await.unwrap();
1310
1311 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1312 let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1313
1314 assert_eq!(
1315 actual_batch.column_by_name("i").unwrap().as_ref(),
1316 &Int64Array::from_iter_values(7..25)
1317 );
1318 }
1319
1320 #[tokio::test]
1321 async fn test_batches_stream() {
1322 let store = ObjectStore::memory();
1323 let path = Path::from("/batch_stream");
1324
1325 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1326 let schema = Schema::try_from(&arrow_schema).unwrap();
1327 let mut writer = FileWriter::<NotSelfDescribing>::try_new(
1328 &store,
1329 &path,
1330 schema.clone(),
1331 &Default::default(),
1332 )
1333 .await
1334 .unwrap();
1335 for i in 0..10 {
1336 let batch = RecordBatch::try_new(
1337 Arc::new(arrow_schema.clone()),
1338 vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1339 )
1340 .unwrap();
1341 writer.write(&[batch]).await.unwrap();
1342 }
1343 writer.finish().await.unwrap();
1344
1345 let reader = FileReader::try_new(&store, &path, schema.clone())
1346 .await
1347 .unwrap();
1348 let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1349 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1350
1351 assert_eq!(batches.len(), 5);
1352 for (i, batch) in batches.iter().enumerate() {
1353 assert_eq!(
1354 batch,
1355 &RecordBatch::try_new(
1356 Arc::new(arrow_schema.clone()),
1357 vec![Arc::new(Int32Array::from_iter_values(
1358 i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1359 ))],
1360 )
1361 .unwrap()
1362 )
1363 }
1364 }
1365
1366 #[tokio::test]
1367 async fn test_take_boolean_beyond_chunk() {
1368 let store = ObjectStore::from_uri_and_params(
1369 Arc::new(Default::default()),
1370 "memory://",
1371 &ObjectStoreParams {
1372 block_size: Some(256),
1373 ..Default::default()
1374 },
1375 )
1376 .await
1377 .unwrap()
1378 .0;
1379 let path = Path::from("/take_bools");
1380
1381 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1382 "b",
1383 DataType::Boolean,
1384 false,
1385 )]));
1386 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1387 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1388 &store,
1389 &path,
1390 schema.clone(),
1391 &Default::default(),
1392 )
1393 .await
1394 .unwrap();
1395
1396 let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1397 let batch =
1398 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1399 file_writer.write(&[batch]).await.unwrap();
1400 file_writer.finish().await.unwrap();
1401
1402 let reader = FileReader::try_new(&store, &path, schema.clone())
1403 .await
1404 .unwrap();
1405 let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1406
1407 assert_eq!(
1408 actual.column_by_name("b").unwrap().as_ref(),
1409 &BooleanArray::from(vec![false, false, true, false, true])
1410 );
1411 }
1412
1413 #[tokio::test]
1414 async fn test_read_projection() {
1415 let store = ObjectStore::memory();
1419 let path = Path::from("/partial_read");
1420
1421 let mut fields = vec![];
1423 for i in 0..100 {
1424 fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1425 }
1426 let arrow_schema = ArrowSchema::new(fields);
1427 let schema = Schema::try_from(&arrow_schema).unwrap();
1428
1429 let partial_schema = schema.project(&["f50"]).unwrap();
1430 let partial_arrow: ArrowSchema = (&partial_schema).into();
1431
1432 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1433 &store,
1434 &path,
1435 partial_schema.clone(),
1436 &Default::default(),
1437 )
1438 .await
1439 .unwrap();
1440
1441 let array = Int32Array::from(vec![0; 15]);
1442 let batch =
1443 RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1444 file_writer.write(&[batch.clone()]).await.unwrap();
1445 file_writer.finish().await.unwrap();
1446
1447 let field_id = partial_schema.fields.first().unwrap().id;
1448 let reader = FileReader::try_new_with_fragment_id(
1449 &store,
1450 &path,
1451 schema.clone(),
1452 0,
1453 field_id,
1454 field_id,
1455 None,
1456 )
1457 .await
1458 .unwrap();
1459 let actual = reader
1460 .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1461 .await
1462 .unwrap();
1463
1464 assert_eq!(actual, batch);
1465 }
1466}