1use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12 builder::PrimitiveBuilder,
13 cast::AsArray,
14 types::{Int32Type, Int64Type},
15 ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
16 RecordBatch, StructArray, UInt32Array,
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
24use lance_arrow::*;
25use lance_core::cache::LanceCache;
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::dictionary::DictionaryDecoder;
29use lance_io::encodings::AsyncIndex;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33 read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{object_store::ObjectStore, ReadBatchParams};
36
37use object_store::path::Path;
38use snafu::location;
39use tracing::instrument;
40
41use crate::format::metadata::Metadata;
42use crate::page_table::{PageInfo, PageTable};
43
44#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49 pub object_reader: Arc<dyn Reader>,
50 metadata: Arc<Metadata>,
51 page_table: Arc<PageTable>,
52 schema: Schema,
53
54 fragment_id: u64,
57
58 stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("FileReader")
65 .field("fragment", &self.fragment_id)
66 .field("path", &self.object_reader.path())
67 .finish()
68 }
69}
70
71impl FileReader {
72 #[instrument(level = "debug", skip(object_store, schema, session))]
85 pub async fn try_new_with_fragment_id(
86 object_store: &ObjectStore,
87 path: &Path,
88 schema: Schema,
89 fragment_id: u32,
90 field_id_offset: i32,
91 max_field_id: i32,
92 session: Option<&LanceCache>,
93 ) -> Result<Self> {
94 let object_reader = object_store.open(path).await?;
95
96 let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
97
98 Self::try_new_from_reader(
99 path,
100 object_reader.into(),
101 Some(metadata),
102 schema,
103 fragment_id,
104 field_id_offset,
105 max_field_id,
106 session,
107 )
108 .await
109 }
110
111 #[allow(clippy::too_many_arguments)]
112 pub async fn try_new_from_reader(
113 path: &Path,
114 object_reader: Arc<dyn Reader>,
115 metadata: Option<Arc<Metadata>>,
116 schema: Schema,
117 fragment_id: u32,
118 field_id_offset: i32,
119 max_field_id: i32,
120 session: Option<&LanceCache>,
121 ) -> Result<Self> {
122 let metadata = match metadata {
123 Some(metadata) => metadata,
124 None => Self::read_metadata(object_reader.as_ref(), session).await?,
125 };
126
127 let page_table = async {
128 Self::load_from_cache(session, path.to_string(), |_| async {
129 PageTable::load(
130 object_reader.as_ref(),
131 metadata.page_table_position,
132 field_id_offset,
133 max_field_id,
134 metadata.num_batches() as i32,
135 )
136 .await
137 })
138 .await
139 };
140
141 let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
142
143 let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
145
146 Ok(Self {
147 object_reader,
148 metadata,
149 schema,
150 page_table,
151 fragment_id: fragment_id as u64,
152 stats_page_table,
153 })
154 }
155
156 pub async fn read_metadata(
157 object_reader: &dyn Reader,
158 cache: Option<&LanceCache>,
159 ) -> Result<Arc<Metadata>> {
160 Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
161 let file_size = object_reader.size().await?;
162 let begin = if file_size < object_reader.block_size() {
163 0
164 } else {
165 file_size - object_reader.block_size()
166 };
167 let tail_bytes = object_reader.get_range(begin..file_size).await?;
168 let metadata_pos = read_metadata_offset(&tail_bytes)?;
169
170 let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
171 read_struct(object_reader, metadata_pos).await?
173 } else {
174 let offset = tail_bytes.len() - (file_size - metadata_pos);
175 read_struct_from_buf(&tail_bytes.slice(offset..))?
176 };
177 Ok(metadata)
178 })
179 .await
180 }
181
182 async fn read_stats_page_table(
186 reader: &dyn Reader,
187 cache: Option<&LanceCache>,
188 ) -> Result<Arc<Option<PageTable>>> {
189 Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async {
191 let metadata = Self::read_metadata(reader, cache).await?;
192
193 if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
194 Ok(Some(
195 PageTable::load(
196 reader,
197 stats_meta.page_table_position,
198 0,
199 *stats_meta.leaf_field_ids.iter().max().unwrap(),
200 1,
201 )
202 .await?,
203 ))
204 } else {
205 Ok(None)
206 }
207 })
208 .await
209 }
210
211 async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
213 cache: Option<&LanceCache>,
214 key: String,
215 loader: F,
216 ) -> Result<Arc<T>>
217 where
218 F: Fn(&str) -> Fut,
219 Fut: Future<Output = Result<T>>,
220 {
221 if let Some(cache) = cache {
222 cache.get_or_insert(key, loader).await
223 } else {
224 Ok(Arc::new(loader(key.as_str()).await?))
225 }
226 }
227
228 pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
230 let max_field_id = schema.max_field_id().unwrap_or_default();
232 Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
233 }
234
235 fn io_parallelism(&self) -> usize {
236 self.object_reader.io_parallelism()
237 }
238
239 pub fn schema(&self) -> &Schema {
241 &self.schema
242 }
243
244 pub fn num_batches(&self) -> usize {
245 self.metadata.num_batches()
246 }
247
248 pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
250 self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
251 }
252
253 pub fn len(&self) -> usize {
255 self.metadata.len()
256 }
257
258 pub fn is_empty(&self) -> bool {
259 self.metadata.is_empty()
260 }
261
262 #[instrument(level = "debug", skip(self, params, projection))]
266 pub async fn read_batch(
267 &self,
268 batch_id: i32,
269 params: impl Into<ReadBatchParams>,
270 projection: &Schema,
271 ) -> Result<RecordBatch> {
272 read_batch(self, ¶ms.into(), projection, batch_id).await
273 }
274
275 #[instrument(level = "debug", skip(self, projection))]
280 pub async fn read_range(
281 &self,
282 range: Range<usize>,
283 projection: &Schema,
284 ) -> Result<RecordBatch> {
285 if range.is_empty() {
286 return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
287 }
288 let range_in_batches = self.metadata.range_to_batches(range)?;
289 let batches =
290 stream::iter(range_in_batches)
291 .map(|(batch_id, range)| async move {
292 self.read_batch(batch_id, range, projection).await
293 })
294 .buffered(self.io_parallelism())
295 .try_collect::<Vec<_>>()
296 .await?;
297 if batches.len() == 1 {
298 return Ok(batches[0].clone());
299 }
300 let schema = batches[0].schema();
301 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
302 }
303
304 #[instrument(level = "debug", skip_all)]
308 pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
309 let num_batches = self.num_batches();
310 let num_rows = self.len() as u32;
311 let indices_in_batches = self.metadata.group_indices_to_batches(indices);
312 let batches = stream::iter(indices_in_batches)
313 .map(|batch| async move {
314 if batch.batch_id >= num_batches as i32 {
315 Err(Error::InvalidInput {
316 source: format!("batch_id: {} out of bounds", batch.batch_id).into(),
317 location: location!(),
318 })
319 } else if *batch.offsets.last().expect("got empty batch") > num_rows {
320 Err(Error::InvalidInput {
321 source: format!("indices: {:?} out of bounds", batch.offsets).into(),
322 location: location!(),
323 })
324 } else {
325 self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
326 .await
327 }
328 })
329 .buffered(self.io_parallelism())
330 .try_collect::<Vec<_>>()
331 .await?;
332
333 let schema = Arc::new(ArrowSchema::from(projection));
334
335 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
336 }
337
338 pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
340 self.metadata.stats_metadata.as_ref().map(|meta| {
341 let mut stats_field_ids = vec![];
342 for stats_field in &meta.schema.fields {
343 if let Ok(stats_field_id) = stats_field.name.parse::<i32>() {
344 if field_ids.contains(&stats_field_id) {
345 stats_field_ids.push(stats_field.id);
346 for child in &stats_field.children {
347 stats_field_ids.push(child.id);
348 }
349 }
350 }
351 }
352 meta.schema.project_by_ids(&stats_field_ids, true)
353 })
354 }
355
356 pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
358 if let Some(stats_page_table) = self.stats_page_table.as_ref() {
359 let projection = self.page_stats_schema(field_ids).unwrap();
360 if projection.fields.is_empty() {
362 return Ok(None);
363 }
364 let arrays = futures::stream::iter(projection.fields.iter().cloned())
365 .map(|field| async move {
366 read_array(
367 self,
368 &field,
369 0,
370 stats_page_table,
371 &ReadBatchParams::RangeFull,
372 )
373 .await
374 })
375 .buffered(self.io_parallelism())
376 .try_collect::<Vec<_>>()
377 .await?;
378
379 let schema = ArrowSchema::from(&projection);
380 let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
381 Ok(Some(batch))
382 } else {
383 Ok(None)
384 }
385 }
386}
387
388pub fn batches_stream(
399 reader: FileReader,
400 projection: Schema,
401 predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
402) -> impl RecordBatchStream {
403 let projection = Arc::new(projection);
405 let arrow_schema = ArrowSchema::from(projection.as_ref());
406
407 let total_batches = reader.num_batches() as i32;
408 let batches = (0..total_batches).filter(predicate);
409 let this = Arc::new(reader);
411 let inner = stream::iter(batches)
412 .zip(stream::repeat_with(move || {
413 (this.clone(), projection.clone())
414 }))
415 .map(move |(batch_id, (reader, projection))| async move {
416 reader
417 .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
418 .await
419 })
420 .buffered(2)
421 .boxed();
422 RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
423}
424
425pub async fn read_batch(
430 reader: &FileReader,
431 params: &ReadBatchParams,
432 schema: &Schema,
433 batch_id: i32,
434) -> Result<RecordBatch> {
435 if !schema.fields.is_empty() {
436 let arrs = stream::iter(&schema.fields)
438 .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
439 .buffered(reader.io_parallelism())
440 .try_collect::<Vec<_>>()
441 .boxed();
442 let arrs = arrs.await?;
443 Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
444 } else {
445 Err(Error::invalid_input("no fields requested", location!()))
446 }
447}
448
449#[async_recursion]
450async fn read_array(
451 reader: &FileReader,
452 field: &Field,
453 batch_id: i32,
454 page_table: &PageTable,
455 params: &ReadBatchParams,
456) -> Result<ArrayRef> {
457 let data_type = field.data_type();
458
459 use DataType::*;
460
461 if data_type.is_fixed_stride() {
462 _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
463 } else {
464 match data_type {
465 Null => read_null_array(field, batch_id, page_table, params),
466 Utf8 | LargeUtf8 | Binary | LargeBinary => {
467 read_binary_array(reader, field, batch_id, page_table, params).await
468 }
469 Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
470 Dictionary(_, _) => {
471 read_dictionary_array(reader, field, batch_id, page_table, params).await
472 }
473 List(_) => {
474 read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
475 }
476 LargeList(_) => {
477 read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
478 }
479 _ => {
480 unimplemented!("{}", format!("No support for {data_type} yet"));
481 }
482 }
483 }
484}
485
486fn get_page_info<'a>(
487 page_table: &'a PageTable,
488 field: &'a Field,
489 batch_id: i32,
490) -> Result<&'a PageInfo> {
491 page_table.get(field.id, batch_id).ok_or_else(|| {
492 Error::io(
493 format!(
494 "No page info found for field: {}, field_id={} batch={}",
495 field.name, field.id, batch_id
496 ),
497 location!(),
498 )
499 })
500}
501
502async fn _read_fixed_stride_array(
504 reader: &FileReader,
505 field: &Field,
506 batch_id: i32,
507 page_table: &PageTable,
508 params: &ReadBatchParams,
509) -> Result<ArrayRef> {
510 let page_info = get_page_info(page_table, field, batch_id)?;
511 read_fixed_stride_array(
512 reader.object_reader.as_ref(),
513 &field.data_type(),
514 page_info.position,
515 page_info.length,
516 params.clone(),
517 )
518 .await
519}
520
521fn read_null_array(
522 field: &Field,
523 batch_id: i32,
524 page_table: &PageTable,
525 params: &ReadBatchParams,
526) -> Result<ArrayRef> {
527 let page_info = get_page_info(page_table, field, batch_id)?;
528
529 let length_output = match params {
530 ReadBatchParams::Indices(indices) => {
531 if indices.is_empty() {
532 0
533 } else {
534 let idx_max = *indices.values().iter().max().unwrap() as u64;
535 if idx_max >= page_info.length as u64 {
536 return Err(Error::io(
537 format!(
538 "NullArray Reader: request([{}]) out of range: [0..{}]",
539 idx_max, page_info.length
540 ),
541 location!(),
542 ));
543 }
544 indices.len()
545 }
546 }
547 _ => {
548 let (idx_start, idx_end) = match params {
549 ReadBatchParams::Range(r) => (r.start, r.end),
550 ReadBatchParams::RangeFull => (0, page_info.length),
551 ReadBatchParams::RangeTo(r) => (0, r.end),
552 ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
553 _ => unreachable!(),
554 };
555 if idx_end > page_info.length {
556 return Err(Error::io(
557 format!(
558 "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
559 idx_start,
561 idx_end,
562 page_info.length
563 ),
564 location!(),
565 ));
566 }
567 idx_end - idx_start
568 }
569 };
570
571 Ok(Arc::new(NullArray::new(length_output)))
572}
573
574async fn read_binary_array(
575 reader: &FileReader,
576 field: &Field,
577 batch_id: i32,
578 page_table: &PageTable,
579 params: &ReadBatchParams,
580) -> Result<ArrayRef> {
581 let page_info = get_page_info(page_table, field, batch_id)?;
582
583 lance_io::utils::read_binary_array(
584 reader.object_reader.as_ref(),
585 &field.data_type(),
586 field.nullable,
587 page_info.position,
588 page_info.length,
589 params,
590 )
591 .await
592}
593
594async fn read_dictionary_array(
595 reader: &FileReader,
596 field: &Field,
597 batch_id: i32,
598 page_table: &PageTable,
599 params: &ReadBatchParams,
600) -> Result<ArrayRef> {
601 let page_info = get_page_info(page_table, field, batch_id)?;
602 let data_type = field.data_type();
603 let decoder = DictionaryDecoder::new(
604 reader.object_reader.as_ref(),
605 page_info.position,
606 page_info.length,
607 &data_type,
608 field
609 .dictionary
610 .as_ref()
611 .unwrap()
612 .values
613 .as_ref()
614 .unwrap()
615 .clone(),
616 );
617 decoder.get(params.clone()).await
618}
619
620async fn read_struct_array(
621 reader: &FileReader,
622 field: &Field,
623 batch_id: i32,
624 page_table: &PageTable,
625 params: &ReadBatchParams,
626) -> Result<ArrayRef> {
627 let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
629
630 for child in field.children.as_slice() {
631 let arr = read_array(reader, child, batch_id, page_table, params).await?;
632 sub_arrays.push((Arc::new(child.into()), arr));
633 }
634
635 Ok(Arc::new(StructArray::from(sub_arrays)))
636}
637
638async fn take_list_array<T: ArrowNumericType>(
639 reader: &FileReader,
640 field: &Field,
641 batch_id: i32,
642 page_table: &PageTable,
643 positions: &PrimitiveArray<T>,
644 indices: &UInt32Array,
645) -> Result<ArrayRef>
646where
647 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
648{
649 let first_idx = indices.value(0);
650 let ranges = indices
652 .values()
653 .iter()
654 .map(|i| (*i - first_idx).as_usize())
655 .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
656 .collect::<Vec<_>>();
657 let field = field.clone();
658 let mut list_values: Vec<ArrayRef> = vec![];
659 for range in ranges.iter() {
661 list_values.push(
662 read_array(
663 reader,
664 &field.children[0],
665 batch_id,
666 page_table,
667 &(range.clone()).into(),
668 )
669 .await?,
670 );
671 }
672
673 let value_refs = list_values
674 .iter()
675 .map(|arr| arr.as_ref())
676 .collect::<Vec<_>>();
677 let mut offsets_builder = PrimitiveBuilder::<T>::new();
678 offsets_builder.append_value(T::Native::usize_as(0));
679 let mut off = 0_usize;
680 for range in ranges {
681 off += range.len();
682 offsets_builder.append_value(T::Native::usize_as(off));
683 }
684 let all_values = concat::concat(value_refs.as_slice())?;
685 let offset_arr = offsets_builder.finish();
686 let arr = try_new_generic_list_array(all_values, &offset_arr)?;
687 Ok(Arc::new(arr) as ArrayRef)
688}
689
690async fn read_list_array<T: ArrowNumericType>(
691 reader: &FileReader,
692 field: &Field,
693 batch_id: i32,
694 page_table: &PageTable,
695 params: &ReadBatchParams,
696) -> Result<ArrayRef>
697where
698 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
699{
700 let positions_params = match params {
702 ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
703 ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
704 ReadBatchParams::Indices(indices) => {
705 (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
706 }
707 p => p.clone(),
708 };
709
710 let page_info = get_page_info(&reader.page_table, field, batch_id)?;
711 let position_arr = read_fixed_stride_array(
712 reader.object_reader.as_ref(),
713 &T::DATA_TYPE,
714 page_info.position,
715 page_info.length,
716 positions_params,
717 )
718 .await?;
719
720 let positions: &PrimitiveArray<T> = position_arr.as_primitive();
721
722 let value_params = match params {
724 ReadBatchParams::Range(range) => ReadBatchParams::from(
725 positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
726 ),
727 ReadBatchParams::Ranges(_) => {
728 return Err(Error::Internal {
729 message: "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
730 location: location!(),
731 })
732 }
733 ReadBatchParams::RangeTo(RangeTo { end }) => {
734 ReadBatchParams::from(..positions.value(*end).as_usize())
735 }
736 ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
737 ReadBatchParams::RangeFull => ReadBatchParams::from(
738 positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
739 ),
740 ReadBatchParams::Indices(indices) => {
741 return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
742 }
743 };
744
745 let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
746 let offset_arr = sub(positions, &start_position)?;
747 let offset_arr_ref = offset_arr.as_primitive::<T>();
748 let value_arrs = read_array(
749 reader,
750 &field.children[0],
751 batch_id,
752 page_table,
753 &value_params,
754 )
755 .await?;
756 let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
757 Ok(Arc::new(arr) as ArrayRef)
758}
759
760#[cfg(test)]
761mod tests {
762 use crate::writer::{FileWriter, NotSelfDescribing};
763
764 use super::*;
765
766 use arrow_array::{
767 builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
768 cast::{as_string_array, as_struct_array},
769 types::UInt8Type,
770 Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
771 UInt8Array,
772 };
773 use arrow_array::{BooleanArray, Int32Array};
774 use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
775 use lance_io::object_store::ObjectStoreParams;
776
777 #[tokio::test]
778 async fn test_take() {
779 let arrow_schema = ArrowSchema::new(vec![
780 ArrowField::new("i", DataType::Int64, true),
781 ArrowField::new("f", DataType::Float32, false),
782 ArrowField::new("s", DataType::Utf8, false),
783 ArrowField::new(
784 "d",
785 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
786 false,
787 ),
788 ]);
789 let mut schema = Schema::try_from(&arrow_schema).unwrap();
790
791 let store = ObjectStore::memory();
792 let path = Path::from("/take_test");
793
794 let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
796 let values_ref = Arc::new(values);
797 let mut batches = vec![];
798 for batch_id in 0..10 {
799 let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
800 let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
801 let columns: Vec<ArrayRef> = vec![
802 Arc::new(Int64Array::from_iter(
803 value_range.clone().collect::<Vec<_>>(),
804 )),
805 Arc::new(Float32Array::from_iter(
806 value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
807 )),
808 Arc::new(StringArray::from_iter_values(
809 value_range.clone().map(|n| format!("str-{}", n)),
810 )),
811 Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
812 ];
813 batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
814 }
815 schema.set_dictionary(&batches[0]).unwrap();
816
817 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
818 &store,
819 &path,
820 schema.clone(),
821 &Default::default(),
822 )
823 .await
824 .unwrap();
825 for batch in batches.iter() {
826 file_writer.write(&[batch.clone()]).await.unwrap();
827 }
828 file_writer.finish().await.unwrap();
829
830 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
831 let batch = reader
832 .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
833 .await
834 .unwrap();
835 let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
836 assert_eq!(
837 batch,
838 RecordBatch::try_new(
839 batch.schema(),
840 vec![
841 Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
842 Arc::new(Float32Array::from_iter_values([
843 1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
844 ])),
845 Arc::new(StringArray::from_iter_values([
846 "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
847 ])),
848 Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
849 ]
850 )
851 .unwrap()
852 );
853 }
854
855 async fn test_write_null_string_in_struct(field_nullable: bool) {
856 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
857 "parent",
858 DataType::Struct(ArrowFields::from(vec![ArrowField::new(
859 "str",
860 DataType::Utf8,
861 field_nullable,
862 )])),
863 true,
864 )]));
865
866 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
867
868 let store = ObjectStore::memory();
869 let path = Path::from("/null_strings");
870
871 let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
872 let struct_arr = Arc::new(StructArray::from(vec![(
873 Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
874 string_arr.clone() as ArrayRef,
875 )]));
876 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
877
878 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
879 &store,
880 &path,
881 schema.clone(),
882 &Default::default(),
883 )
884 .await
885 .unwrap();
886 file_writer.write(&[batch.clone()]).await.unwrap();
887 file_writer.finish().await.unwrap();
888
889 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
890 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
891
892 if field_nullable {
893 assert_eq!(
894 &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
895 as_string_array(
896 as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
897 .column_by_name("str")
898 .unwrap()
899 .as_ref()
900 )
901 );
902 } else {
903 assert_eq!(actual_batch, batch);
904 }
905 }
906
907 #[tokio::test]
908 async fn read_nullable_string_in_struct() {
909 test_write_null_string_in_struct(true).await;
910 test_write_null_string_in_struct(false).await;
911 }
912
913 #[tokio::test]
914 async fn test_read_struct_of_list_arrays() {
915 let store = ObjectStore::memory();
916 let path = Path::from("/null_strings");
917
918 let arrow_schema = make_schema_of_list_array();
919 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
920
921 let batches = (0..3)
922 .map(|_| {
923 let struct_array = make_struct_of_list_array(10, 10);
924 RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
925 })
926 .collect::<Vec<_>>();
927 let batches_ref = batches.iter().collect::<Vec<_>>();
928
929 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
930 &store,
931 &path,
932 schema.clone(),
933 &Default::default(),
934 )
935 .await
936 .unwrap();
937 file_writer.write(&batches).await.unwrap();
938 file_writer.finish().await.unwrap();
939
940 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
941 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
942 let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
943 assert_eq!(expected, actual_batch);
944 }
945
946 #[tokio::test]
947 async fn test_scan_struct_of_list_arrays() {
948 let store = ObjectStore::memory();
949 let path = Path::from("/null_strings");
950
951 let arrow_schema = make_schema_of_list_array();
952 let struct_array = make_struct_of_list_array(3, 10);
953 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
954 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
955
956 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
957 &store,
958 &path,
959 schema.clone(),
960 &Default::default(),
961 )
962 .await
963 .unwrap();
964 file_writer.write(&[batch]).await.unwrap();
965 file_writer.finish().await.unwrap();
966
967 let mut expected_columns: Vec<ArrayRef> = Vec::new();
968 for c in struct_array.columns().iter() {
969 expected_columns.push(c.slice(1, 1));
970 }
971
972 let expected_struct = match arrow_schema.fields[0].data_type() {
973 DataType::Struct(subfields) => subfields
974 .iter()
975 .zip(expected_columns)
976 .map(|(f, d)| (f.clone(), d))
977 .collect::<Vec<_>>(),
978 _ => panic!("unexpected field"),
979 };
980
981 let expected_struct_array = StructArray::from(expected_struct);
982 let expected_batch = RecordBatch::from(&StructArray::from(vec![(
983 Arc::new(arrow_schema.fields[0].as_ref().clone()),
984 Arc::new(expected_struct_array) as ArrayRef,
985 )]));
986
987 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
988 let params = ReadBatchParams::Range(1..2);
989 let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
990 assert_eq!(expected_batch, slice_of_batch);
991 }
992
993 fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
994 Arc::new(ArrowSchema::new(vec![ArrowField::new(
995 "s",
996 DataType::Struct(ArrowFields::from(vec![
997 ArrowField::new(
998 "li",
999 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1000 true,
1001 ),
1002 ArrowField::new(
1003 "ls",
1004 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1005 true,
1006 ),
1007 ArrowField::new(
1008 "ll",
1009 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1010 false,
1011 ),
1012 ])),
1013 true,
1014 )]))
1015 }
1016
1017 fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1018 let mut li_builder = ListBuilder::new(Int32Builder::new());
1019 let mut ls_builder = ListBuilder::new(StringBuilder::new());
1020 let ll_value_builder = Int32Builder::new();
1021 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1022 for i in 0..rows {
1023 for j in 0..num_items {
1024 li_builder.values().append_value(i * 10 + j);
1025 ls_builder
1026 .values()
1027 .append_value(format!("str-{}", i * 10 + j));
1028 large_list_builder.values().append_value(i * 10 + j);
1029 }
1030 li_builder.append(true);
1031 ls_builder.append(true);
1032 large_list_builder.append(true);
1033 }
1034 Arc::new(StructArray::from(vec![
1035 (
1036 Arc::new(ArrowField::new(
1037 "li",
1038 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1039 true,
1040 )),
1041 Arc::new(li_builder.finish()) as ArrayRef,
1042 ),
1043 (
1044 Arc::new(ArrowField::new(
1045 "ls",
1046 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1047 true,
1048 )),
1049 Arc::new(ls_builder.finish()) as ArrayRef,
1050 ),
1051 (
1052 Arc::new(ArrowField::new(
1053 "ll",
1054 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1055 false,
1056 )),
1057 Arc::new(large_list_builder.finish()) as ArrayRef,
1058 ),
1059 ]))
1060 }
1061
1062 #[tokio::test]
1063 async fn test_read_nullable_arrays() {
1064 use arrow_array::Array;
1065
1066 let arrow_schema = ArrowSchema::new(vec![
1068 ArrowField::new("i", DataType::Int64, false),
1069 ArrowField::new("n", DataType::Null, true),
1070 ]);
1071 let schema = Schema::try_from(&arrow_schema).unwrap();
1072 let columns: Vec<ArrayRef> = vec![
1073 Arc::new(Int64Array::from_iter_values(0..100)),
1074 Arc::new(NullArray::new(100)),
1075 ];
1076 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1077
1078 let store = ObjectStore::memory();
1080 let path = Path::from("/takes");
1081 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1082 &store,
1083 &path,
1084 schema.clone(),
1085 &Default::default(),
1086 )
1087 .await
1088 .unwrap();
1089 file_writer.write(&[batch]).await.unwrap();
1090 file_writer.finish().await.unwrap();
1091
1092 let reader = FileReader::try_new(&store, &path, schema.clone())
1094 .await
1095 .unwrap();
1096
1097 async fn read_array_w_params(
1098 reader: &FileReader,
1099 field: &Field,
1100 params: ReadBatchParams,
1101 ) -> ArrayRef {
1102 read_array(reader, field, 0, reader.page_table.as_ref(), ¶ms)
1103 .await
1104 .expect("Error reading back the null array from file") as _
1105 }
1106
1107 let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1108 assert_eq!(100, arr.len());
1109 assert_eq!(arr.data_type(), &DataType::Null);
1110
1111 let arr =
1112 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1113 assert_eq!(15, arr.len());
1114 assert_eq!(arr.data_type(), &DataType::Null);
1115
1116 let arr =
1117 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1118 assert_eq!(40, arr.len());
1119 assert_eq!(arr.data_type(), &DataType::Null);
1120
1121 let arr =
1122 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1123 assert_eq!(25, arr.len());
1124 assert_eq!(arr.data_type(), &DataType::Null);
1125
1126 let arr = read_array_w_params(
1127 &reader,
1128 &schema.fields[1],
1129 ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1130 )
1131 .await;
1132 assert_eq!(4, arr.len());
1133 assert_eq!(arr.data_type(), &DataType::Null);
1134
1135 let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1137 let arr = read_array(
1138 &reader,
1139 &schema.fields[1],
1140 0,
1141 reader.page_table.as_ref(),
1142 ¶ms,
1143 );
1144 assert!(arr.await.is_err());
1145
1146 let params = ReadBatchParams::RangeTo(..107);
1148 let arr = read_array(
1149 &reader,
1150 &schema.fields[1],
1151 0,
1152 reader.page_table.as_ref(),
1153 ¶ms,
1154 );
1155 assert!(arr.await.is_err());
1156 }
1157
1158 #[tokio::test]
1159 async fn test_take_lists() {
1160 let arrow_schema = ArrowSchema::new(vec![
1161 ArrowField::new(
1162 "l",
1163 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1164 false,
1165 ),
1166 ArrowField::new(
1167 "ll",
1168 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1169 false,
1170 ),
1171 ]);
1172
1173 let value_builder = Int32Builder::new();
1174 let mut list_builder = ListBuilder::new(value_builder);
1175 let ll_value_builder = Int32Builder::new();
1176 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1177 for i in 0..100 {
1178 list_builder.values().append_value(i);
1179 large_list_builder.values().append_value(i);
1180 if (i + 1) % 10 == 0 {
1181 list_builder.append(true);
1182 large_list_builder.append(true);
1183 }
1184 }
1185 let list_arr = Arc::new(list_builder.finish());
1186 let large_list_arr = Arc::new(large_list_builder.finish());
1187
1188 let batch = RecordBatch::try_new(
1189 Arc::new(arrow_schema.clone()),
1190 vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1191 )
1192 .unwrap();
1193
1194 let store = ObjectStore::memory();
1196 let path = Path::from("/take_list");
1197 let schema: Schema = (&arrow_schema).try_into().unwrap();
1198 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1199 &store,
1200 &path,
1201 schema.clone(),
1202 &Default::default(),
1203 )
1204 .await
1205 .unwrap();
1206 file_writer.write(&[batch]).await.unwrap();
1207 file_writer.finish().await.unwrap();
1208
1209 let reader = FileReader::try_new(&store, &path, schema.clone())
1211 .await
1212 .unwrap();
1213 let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1214
1215 let value_builder = Int32Builder::new();
1216 let mut list_builder = ListBuilder::new(value_builder);
1217 let ll_value_builder = Int32Builder::new();
1218 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1219 for i in [1, 3, 5, 9] {
1220 for j in 0..10 {
1221 list_builder.values().append_value(i * 10 + j);
1222 large_list_builder.values().append_value(i * 10 + j);
1223 }
1224 list_builder.append(true);
1225 large_list_builder.append(true);
1226 }
1227 let expected_list = list_builder.finish();
1228 let expected_large_list = large_list_builder.finish();
1229
1230 assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1231 assert_eq!(
1232 actual.column_by_name("ll").unwrap().as_ref(),
1233 &expected_large_list
1234 );
1235 }
1236
1237 #[tokio::test]
1238 async fn test_list_array_with_offsets() {
1239 let arrow_schema = ArrowSchema::new(vec![
1240 ArrowField::new(
1241 "l",
1242 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1243 false,
1244 ),
1245 ArrowField::new(
1246 "ll",
1247 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1248 false,
1249 ),
1250 ]);
1251
1252 let store = ObjectStore::memory();
1253 let path = Path::from("/lists");
1254
1255 let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1256 Some(vec![Some(1), Some(2)]),
1257 Some(vec![Some(3), Some(4)]),
1258 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1259 ])
1260 .slice(1, 1);
1261 let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1262 Some(vec![Some(10), Some(11)]),
1263 Some(vec![Some(12), Some(13)]),
1264 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1265 ])
1266 .slice(1, 1);
1267
1268 let batch = RecordBatch::try_new(
1269 Arc::new(arrow_schema.clone()),
1270 vec![Arc::new(list_array), Arc::new(large_list_array)],
1271 )
1272 .unwrap();
1273
1274 let schema: Schema = (&arrow_schema).try_into().unwrap();
1275 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1276 &store,
1277 &path,
1278 schema.clone(),
1279 &Default::default(),
1280 )
1281 .await
1282 .unwrap();
1283 file_writer.write(&[batch.clone()]).await.unwrap();
1284 file_writer.finish().await.unwrap();
1285
1286 let file_size_bytes = store.size(&path).await.unwrap();
1288 assert!(file_size_bytes < 1_000);
1289
1290 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1291 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1292 assert_eq!(batch, actual_batch);
1293 }
1294
1295 #[tokio::test]
1296 async fn test_read_ranges() {
1297 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1299 let schema = Schema::try_from(&arrow_schema).unwrap();
1300 let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1301 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1302
1303 let store = ObjectStore::memory();
1305 let path = Path::from("/read_range");
1306 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1307 &store,
1308 &path,
1309 schema.clone(),
1310 &Default::default(),
1311 )
1312 .await
1313 .unwrap();
1314 file_writer.write(&[batch]).await.unwrap();
1315 file_writer.finish().await.unwrap();
1316
1317 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1318 let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1319
1320 assert_eq!(
1321 actual_batch.column_by_name("i").unwrap().as_ref(),
1322 &Int64Array::from_iter_values(7..25)
1323 );
1324 }
1325
1326 #[tokio::test]
1327 async fn test_batches_stream() {
1328 let store = ObjectStore::memory();
1329 let path = Path::from("/batch_stream");
1330
1331 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1332 let schema = Schema::try_from(&arrow_schema).unwrap();
1333 let mut writer = FileWriter::<NotSelfDescribing>::try_new(
1334 &store,
1335 &path,
1336 schema.clone(),
1337 &Default::default(),
1338 )
1339 .await
1340 .unwrap();
1341 for i in 0..10 {
1342 let batch = RecordBatch::try_new(
1343 Arc::new(arrow_schema.clone()),
1344 vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1345 )
1346 .unwrap();
1347 writer.write(&[batch]).await.unwrap();
1348 }
1349 writer.finish().await.unwrap();
1350
1351 let reader = FileReader::try_new(&store, &path, schema.clone())
1352 .await
1353 .unwrap();
1354 let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1355 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1356
1357 assert_eq!(batches.len(), 5);
1358 for (i, batch) in batches.iter().enumerate() {
1359 assert_eq!(
1360 batch,
1361 &RecordBatch::try_new(
1362 Arc::new(arrow_schema.clone()),
1363 vec![Arc::new(Int32Array::from_iter_values(
1364 i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1365 ))],
1366 )
1367 .unwrap()
1368 )
1369 }
1370 }
1371
1372 #[tokio::test]
1373 async fn test_take_boolean_beyond_chunk() {
1374 let store = ObjectStore::from_uri_and_params(
1375 Arc::new(Default::default()),
1376 "memory://",
1377 &ObjectStoreParams {
1378 block_size: Some(256),
1379 ..Default::default()
1380 },
1381 )
1382 .await
1383 .unwrap()
1384 .0;
1385 let path = Path::from("/take_bools");
1386
1387 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1388 "b",
1389 DataType::Boolean,
1390 false,
1391 )]));
1392 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1393 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1394 &store,
1395 &path,
1396 schema.clone(),
1397 &Default::default(),
1398 )
1399 .await
1400 .unwrap();
1401
1402 let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1403 let batch =
1404 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1405 file_writer.write(&[batch]).await.unwrap();
1406 file_writer.finish().await.unwrap();
1407
1408 let reader = FileReader::try_new(&store, &path, schema.clone())
1409 .await
1410 .unwrap();
1411 let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1412
1413 assert_eq!(
1414 actual.column_by_name("b").unwrap().as_ref(),
1415 &BooleanArray::from(vec![false, false, true, false, true])
1416 );
1417 }
1418
1419 #[tokio::test]
1420 async fn test_read_projection() {
1421 let store = ObjectStore::memory();
1425 let path = Path::from("/partial_read");
1426
1427 let mut fields = vec![];
1429 for i in 0..100 {
1430 fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1431 }
1432 let arrow_schema = ArrowSchema::new(fields);
1433 let schema = Schema::try_from(&arrow_schema).unwrap();
1434
1435 let partial_schema = schema.project(&["f50"]).unwrap();
1436 let partial_arrow: ArrowSchema = (&partial_schema).into();
1437
1438 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1439 &store,
1440 &path,
1441 partial_schema.clone(),
1442 &Default::default(),
1443 )
1444 .await
1445 .unwrap();
1446
1447 let array = Int32Array::from(vec![0; 15]);
1448 let batch =
1449 RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1450 file_writer.write(&[batch.clone()]).await.unwrap();
1451 file_writer.finish().await.unwrap();
1452
1453 let field_id = partial_schema.fields.first().unwrap().id;
1454 let reader = FileReader::try_new_with_fragment_id(
1455 &store,
1456 &path,
1457 schema.clone(),
1458 0,
1459 field_id,
1460 field_id,
1461 None,
1462 )
1463 .await
1464 .unwrap();
1465 let actual = reader
1466 .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1467 .await
1468 .unwrap();
1469
1470 assert_eq!(actual, batch);
1471 }
1472}