1use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12 builder::PrimitiveBuilder,
13 cast::AsArray,
14 types::{Int32Type, Int64Type},
15 ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
16 RecordBatch, StructArray, UInt32Array,
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
24use lance_arrow::*;
25use lance_core::cache::{CacheKey, LanceCache};
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::dictionary::DictionaryDecoder;
29use lance_io::encodings::AsyncIndex;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33 read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{object_store::ObjectStore, ReadBatchParams};
36use std::borrow::Cow;
37
38use object_store::path::Path;
39use snafu::location;
40use tracing::instrument;
41
42use crate::format::metadata::Metadata;
43use crate::page_table::{PageInfo, PageTable};
44
45#[derive(Clone, DeepSizeOf)]
49pub struct FileReader {
50 pub object_reader: Arc<dyn Reader>,
51 metadata: Arc<Metadata>,
52 page_table: Arc<PageTable>,
53 schema: Schema,
54
55 fragment_id: u64,
58
59 stats_page_table: Arc<Option<PageTable>>,
61}
62
63impl std::fmt::Debug for FileReader {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("FileReader")
66 .field("fragment", &self.fragment_id)
67 .field("path", &self.object_reader.path())
68 .finish()
69 }
70}
71
72struct StringCacheKey<'a, T> {
74 key: &'a str,
75 _phantom: std::marker::PhantomData<T>,
76}
77
78impl<'a, T> StringCacheKey<'a, T> {
79 fn new(key: &'a str) -> Self {
80 Self {
81 key,
82 _phantom: std::marker::PhantomData,
83 }
84 }
85}
86
87impl<T> CacheKey for StringCacheKey<'_, T> {
88 type ValueType = T;
89
90 fn key(&self) -> Cow<'_, str> {
91 self.key.into()
92 }
93}
94
95impl FileReader {
96 #[instrument(level = "debug", skip(object_store, schema, session))]
109 pub async fn try_new_with_fragment_id(
110 object_store: &ObjectStore,
111 path: &Path,
112 schema: Schema,
113 fragment_id: u32,
114 field_id_offset: i32,
115 max_field_id: i32,
116 session: Option<&LanceCache>,
117 ) -> Result<Self> {
118 let object_reader = object_store.open(path).await?;
119
120 let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
121
122 Self::try_new_from_reader(
123 path,
124 object_reader.into(),
125 Some(metadata),
126 schema,
127 fragment_id,
128 field_id_offset,
129 max_field_id,
130 session,
131 )
132 .await
133 }
134
135 #[allow(clippy::too_many_arguments)]
136 pub async fn try_new_from_reader(
137 path: &Path,
138 object_reader: Arc<dyn Reader>,
139 metadata: Option<Arc<Metadata>>,
140 schema: Schema,
141 fragment_id: u32,
142 field_id_offset: i32,
143 max_field_id: i32,
144 session: Option<&LanceCache>,
145 ) -> Result<Self> {
146 let metadata = match metadata {
147 Some(metadata) => metadata,
148 None => Self::read_metadata(object_reader.as_ref(), session).await?,
149 };
150
151 let page_table = async {
152 Self::load_from_cache(session, path.to_string(), |_| async {
153 PageTable::load(
154 object_reader.as_ref(),
155 metadata.page_table_position,
156 field_id_offset,
157 max_field_id,
158 metadata.num_batches() as i32,
159 )
160 .await
161 })
162 .await
163 };
164
165 let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
166
167 let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
169
170 Ok(Self {
171 object_reader,
172 metadata,
173 schema,
174 page_table,
175 fragment_id: fragment_id as u64,
176 stats_page_table,
177 })
178 }
179
180 pub async fn read_metadata(
181 object_reader: &dyn Reader,
182 cache: Option<&LanceCache>,
183 ) -> Result<Arc<Metadata>> {
184 Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
185 let file_size = object_reader.size().await?;
186 let begin = if file_size < object_reader.block_size() {
187 0
188 } else {
189 file_size - object_reader.block_size()
190 };
191 let tail_bytes = object_reader.get_range(begin..file_size).await?;
192 let metadata_pos = read_metadata_offset(&tail_bytes)?;
193
194 let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
195 read_struct(object_reader, metadata_pos).await?
197 } else {
198 let offset = tail_bytes.len() - (file_size - metadata_pos);
199 read_struct_from_buf(&tail_bytes.slice(offset..))?
200 };
201 Ok(metadata)
202 })
203 .await
204 }
205
206 async fn read_stats_page_table(
210 reader: &dyn Reader,
211 cache: Option<&LanceCache>,
212 ) -> Result<Arc<Option<PageTable>>> {
213 Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async {
215 let metadata = Self::read_metadata(reader, cache).await?;
216
217 if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
218 Ok(Some(
219 PageTable::load(
220 reader,
221 stats_meta.page_table_position,
222 0,
223 *stats_meta.leaf_field_ids.iter().max().unwrap(),
224 1,
225 )
226 .await?,
227 ))
228 } else {
229 Ok(None)
230 }
231 })
232 .await
233 }
234
235 async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
237 cache: Option<&LanceCache>,
238 key: String,
239 loader: F,
240 ) -> Result<Arc<T>>
241 where
242 F: Fn(&str) -> Fut,
243 Fut: Future<Output = Result<T>> + Send,
244 {
245 if let Some(cache) = cache {
246 let cache_key = StringCacheKey::<T>::new(key.as_str());
247 cache
248 .get_or_insert_with_key(cache_key, || loader(key.as_str()))
249 .await
250 } else {
251 Ok(Arc::new(loader(key.as_str()).await?))
252 }
253 }
254
255 pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
257 let max_field_id = schema.max_field_id().unwrap_or_default();
259 Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
260 }
261
262 fn io_parallelism(&self) -> usize {
263 self.object_reader.io_parallelism()
264 }
265
266 pub fn schema(&self) -> &Schema {
268 &self.schema
269 }
270
271 pub fn num_batches(&self) -> usize {
272 self.metadata.num_batches()
273 }
274
275 pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
277 self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
278 }
279
280 pub fn len(&self) -> usize {
282 self.metadata.len()
283 }
284
285 pub fn is_empty(&self) -> bool {
286 self.metadata.is_empty()
287 }
288
289 #[instrument(level = "debug", skip(self, params, projection))]
293 pub async fn read_batch(
294 &self,
295 batch_id: i32,
296 params: impl Into<ReadBatchParams>,
297 projection: &Schema,
298 ) -> Result<RecordBatch> {
299 read_batch(self, ¶ms.into(), projection, batch_id).await
300 }
301
302 #[instrument(level = "debug", skip(self, projection))]
307 pub async fn read_range(
308 &self,
309 range: Range<usize>,
310 projection: &Schema,
311 ) -> Result<RecordBatch> {
312 if range.is_empty() {
313 return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
314 }
315 let range_in_batches = self.metadata.range_to_batches(range)?;
316 let batches =
317 stream::iter(range_in_batches)
318 .map(|(batch_id, range)| async move {
319 self.read_batch(batch_id, range, projection).await
320 })
321 .buffered(self.io_parallelism())
322 .try_collect::<Vec<_>>()
323 .await?;
324 if batches.len() == 1 {
325 return Ok(batches[0].clone());
326 }
327 let schema = batches[0].schema();
328 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
329 }
330
331 #[instrument(level = "debug", skip_all)]
335 pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
336 let num_batches = self.num_batches();
337 let num_rows = self.len() as u32;
338 let indices_in_batches = self.metadata.group_indices_to_batches(indices);
339 let batches = stream::iter(indices_in_batches)
340 .map(|batch| async move {
341 if batch.batch_id >= num_batches as i32 {
342 Err(Error::InvalidInput {
343 source: format!("batch_id: {} out of bounds", batch.batch_id).into(),
344 location: location!(),
345 })
346 } else if *batch.offsets.last().expect("got empty batch") > num_rows {
347 Err(Error::InvalidInput {
348 source: format!("indices: {:?} out of bounds", batch.offsets).into(),
349 location: location!(),
350 })
351 } else {
352 self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
353 .await
354 }
355 })
356 .buffered(self.io_parallelism())
357 .try_collect::<Vec<_>>()
358 .await?;
359
360 let schema = Arc::new(ArrowSchema::from(projection));
361
362 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
363 }
364
365 pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
367 self.metadata.stats_metadata.as_ref().map(|meta| {
368 let mut stats_field_ids = vec![];
369 for stats_field in &meta.schema.fields {
370 if let Ok(stats_field_id) = stats_field.name.parse::<i32>() {
371 if field_ids.contains(&stats_field_id) {
372 stats_field_ids.push(stats_field.id);
373 for child in &stats_field.children {
374 stats_field_ids.push(child.id);
375 }
376 }
377 }
378 }
379 meta.schema.project_by_ids(&stats_field_ids, true)
380 })
381 }
382
383 pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
385 if let Some(stats_page_table) = self.stats_page_table.as_ref() {
386 let projection = self.page_stats_schema(field_ids).unwrap();
387 if projection.fields.is_empty() {
389 return Ok(None);
390 }
391 let arrays = futures::stream::iter(projection.fields.iter().cloned())
392 .map(|field| async move {
393 read_array(
394 self,
395 &field,
396 0,
397 stats_page_table,
398 &ReadBatchParams::RangeFull,
399 )
400 .await
401 })
402 .buffered(self.io_parallelism())
403 .try_collect::<Vec<_>>()
404 .await?;
405
406 let schema = ArrowSchema::from(&projection);
407 let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
408 Ok(Some(batch))
409 } else {
410 Ok(None)
411 }
412 }
413}
414
415pub fn batches_stream(
426 reader: FileReader,
427 projection: Schema,
428 predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
429) -> impl RecordBatchStream {
430 let projection = Arc::new(projection);
432 let arrow_schema = ArrowSchema::from(projection.as_ref());
433
434 let total_batches = reader.num_batches() as i32;
435 let batches = (0..total_batches).filter(predicate);
436 let this = Arc::new(reader);
438 let inner = stream::iter(batches)
439 .zip(stream::repeat_with(move || {
440 (this.clone(), projection.clone())
441 }))
442 .map(move |(batch_id, (reader, projection))| async move {
443 reader
444 .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
445 .await
446 })
447 .buffered(2)
448 .boxed();
449 RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
450}
451
452pub async fn read_batch(
457 reader: &FileReader,
458 params: &ReadBatchParams,
459 schema: &Schema,
460 batch_id: i32,
461) -> Result<RecordBatch> {
462 if !schema.fields.is_empty() {
463 let arrs = stream::iter(&schema.fields)
465 .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
466 .buffered(reader.io_parallelism())
467 .try_collect::<Vec<_>>()
468 .boxed();
469 let arrs = arrs.await?;
470 Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
471 } else {
472 Err(Error::invalid_input("no fields requested", location!()))
473 }
474}
475
476#[async_recursion]
477async fn read_array(
478 reader: &FileReader,
479 field: &Field,
480 batch_id: i32,
481 page_table: &PageTable,
482 params: &ReadBatchParams,
483) -> Result<ArrayRef> {
484 let data_type = field.data_type();
485
486 use DataType::*;
487
488 if data_type.is_fixed_stride() {
489 _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
490 } else {
491 match data_type {
492 Null => read_null_array(field, batch_id, page_table, params),
493 Utf8 | LargeUtf8 | Binary | LargeBinary => {
494 read_binary_array(reader, field, batch_id, page_table, params).await
495 }
496 Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
497 Dictionary(_, _) => {
498 read_dictionary_array(reader, field, batch_id, page_table, params).await
499 }
500 List(_) => {
501 read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
502 }
503 LargeList(_) => {
504 read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
505 }
506 _ => {
507 unimplemented!("{}", format!("No support for {data_type} yet"));
508 }
509 }
510 }
511}
512
513fn get_page_info<'a>(
514 page_table: &'a PageTable,
515 field: &'a Field,
516 batch_id: i32,
517) -> Result<&'a PageInfo> {
518 page_table.get(field.id, batch_id).ok_or_else(|| {
519 Error::io(
520 format!(
521 "No page info found for field: {}, field_id={} batch={}",
522 field.name, field.id, batch_id
523 ),
524 location!(),
525 )
526 })
527}
528
529async fn _read_fixed_stride_array(
531 reader: &FileReader,
532 field: &Field,
533 batch_id: i32,
534 page_table: &PageTable,
535 params: &ReadBatchParams,
536) -> Result<ArrayRef> {
537 let page_info = get_page_info(page_table, field, batch_id)?;
538 read_fixed_stride_array(
539 reader.object_reader.as_ref(),
540 &field.data_type(),
541 page_info.position,
542 page_info.length,
543 params.clone(),
544 )
545 .await
546}
547
548fn read_null_array(
549 field: &Field,
550 batch_id: i32,
551 page_table: &PageTable,
552 params: &ReadBatchParams,
553) -> Result<ArrayRef> {
554 let page_info = get_page_info(page_table, field, batch_id)?;
555
556 let length_output = match params {
557 ReadBatchParams::Indices(indices) => {
558 if indices.is_empty() {
559 0
560 } else {
561 let idx_max = *indices.values().iter().max().unwrap() as u64;
562 if idx_max >= page_info.length as u64 {
563 return Err(Error::io(
564 format!(
565 "NullArray Reader: request([{}]) out of range: [0..{}]",
566 idx_max, page_info.length
567 ),
568 location!(),
569 ));
570 }
571 indices.len()
572 }
573 }
574 _ => {
575 let (idx_start, idx_end) = match params {
576 ReadBatchParams::Range(r) => (r.start, r.end),
577 ReadBatchParams::RangeFull => (0, page_info.length),
578 ReadBatchParams::RangeTo(r) => (0, r.end),
579 ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
580 _ => unreachable!(),
581 };
582 if idx_end > page_info.length {
583 return Err(Error::io(
584 format!(
585 "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
586 idx_start,
588 idx_end,
589 page_info.length
590 ),
591 location!(),
592 ));
593 }
594 idx_end - idx_start
595 }
596 };
597
598 Ok(Arc::new(NullArray::new(length_output)))
599}
600
601async fn read_binary_array(
602 reader: &FileReader,
603 field: &Field,
604 batch_id: i32,
605 page_table: &PageTable,
606 params: &ReadBatchParams,
607) -> Result<ArrayRef> {
608 let page_info = get_page_info(page_table, field, batch_id)?;
609
610 lance_io::utils::read_binary_array(
611 reader.object_reader.as_ref(),
612 &field.data_type(),
613 field.nullable,
614 page_info.position,
615 page_info.length,
616 params,
617 )
618 .await
619}
620
621async fn read_dictionary_array(
622 reader: &FileReader,
623 field: &Field,
624 batch_id: i32,
625 page_table: &PageTable,
626 params: &ReadBatchParams,
627) -> Result<ArrayRef> {
628 let page_info = get_page_info(page_table, field, batch_id)?;
629 let data_type = field.data_type();
630 let decoder = DictionaryDecoder::new(
631 reader.object_reader.as_ref(),
632 page_info.position,
633 page_info.length,
634 &data_type,
635 field
636 .dictionary
637 .as_ref()
638 .unwrap()
639 .values
640 .as_ref()
641 .unwrap()
642 .clone(),
643 );
644 decoder.get(params.clone()).await
645}
646
647async fn read_struct_array(
648 reader: &FileReader,
649 field: &Field,
650 batch_id: i32,
651 page_table: &PageTable,
652 params: &ReadBatchParams,
653) -> Result<ArrayRef> {
654 let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
656
657 for child in field.children.as_slice() {
658 let arr = read_array(reader, child, batch_id, page_table, params).await?;
659 sub_arrays.push((Arc::new(child.into()), arr));
660 }
661
662 Ok(Arc::new(StructArray::from(sub_arrays)))
663}
664
665async fn take_list_array<T: ArrowNumericType>(
666 reader: &FileReader,
667 field: &Field,
668 batch_id: i32,
669 page_table: &PageTable,
670 positions: &PrimitiveArray<T>,
671 indices: &UInt32Array,
672) -> Result<ArrayRef>
673where
674 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
675{
676 let first_idx = indices.value(0);
677 let ranges = indices
679 .values()
680 .iter()
681 .map(|i| (*i - first_idx).as_usize())
682 .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
683 .collect::<Vec<_>>();
684 let field = field.clone();
685 let mut list_values: Vec<ArrayRef> = vec![];
686 for range in ranges.iter() {
688 list_values.push(
689 read_array(
690 reader,
691 &field.children[0],
692 batch_id,
693 page_table,
694 &(range.clone()).into(),
695 )
696 .await?,
697 );
698 }
699
700 let value_refs = list_values
701 .iter()
702 .map(|arr| arr.as_ref())
703 .collect::<Vec<_>>();
704 let mut offsets_builder = PrimitiveBuilder::<T>::new();
705 offsets_builder.append_value(T::Native::usize_as(0));
706 let mut off = 0_usize;
707 for range in ranges {
708 off += range.len();
709 offsets_builder.append_value(T::Native::usize_as(off));
710 }
711 let all_values = concat::concat(value_refs.as_slice())?;
712 let offset_arr = offsets_builder.finish();
713 let arr = try_new_generic_list_array(all_values, &offset_arr)?;
714 Ok(Arc::new(arr) as ArrayRef)
715}
716
717async fn read_list_array<T: ArrowNumericType>(
718 reader: &FileReader,
719 field: &Field,
720 batch_id: i32,
721 page_table: &PageTable,
722 params: &ReadBatchParams,
723) -> Result<ArrayRef>
724where
725 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
726{
727 let positions_params = match params {
729 ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
730 ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
731 ReadBatchParams::Indices(indices) => {
732 (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
733 }
734 p => p.clone(),
735 };
736
737 let page_info = get_page_info(&reader.page_table, field, batch_id)?;
738 let position_arr = read_fixed_stride_array(
739 reader.object_reader.as_ref(),
740 &T::DATA_TYPE,
741 page_info.position,
742 page_info.length,
743 positions_params,
744 )
745 .await?;
746
747 let positions: &PrimitiveArray<T> = position_arr.as_primitive();
748
749 let value_params = match params {
751 ReadBatchParams::Range(range) => ReadBatchParams::from(
752 positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
753 ),
754 ReadBatchParams::Ranges(_) => {
755 return Err(Error::Internal {
756 message: "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
757 location: location!(),
758 })
759 }
760 ReadBatchParams::RangeTo(RangeTo { end }) => {
761 ReadBatchParams::from(..positions.value(*end).as_usize())
762 }
763 ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
764 ReadBatchParams::RangeFull => ReadBatchParams::from(
765 positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
766 ),
767 ReadBatchParams::Indices(indices) => {
768 return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
769 }
770 };
771
772 let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
773 let offset_arr = sub(positions, &start_position)?;
774 let offset_arr_ref = offset_arr.as_primitive::<T>();
775 let value_arrs = read_array(
776 reader,
777 &field.children[0],
778 batch_id,
779 page_table,
780 &value_params,
781 )
782 .await?;
783 let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
784 Ok(Arc::new(arr) as ArrayRef)
785}
786
787#[cfg(test)]
788mod tests {
789 use crate::writer::{FileWriter, NotSelfDescribing};
790
791 use super::*;
792
793 use arrow_array::{
794 builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
795 cast::{as_string_array, as_struct_array},
796 types::UInt8Type,
797 Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
798 UInt8Array,
799 };
800 use arrow_array::{BooleanArray, Int32Array};
801 use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
802 use lance_io::object_store::ObjectStoreParams;
803
804 #[tokio::test]
805 async fn test_take() {
806 let arrow_schema = ArrowSchema::new(vec![
807 ArrowField::new("i", DataType::Int64, true),
808 ArrowField::new("f", DataType::Float32, false),
809 ArrowField::new("s", DataType::Utf8, false),
810 ArrowField::new(
811 "d",
812 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
813 false,
814 ),
815 ]);
816 let mut schema = Schema::try_from(&arrow_schema).unwrap();
817
818 let store = ObjectStore::memory();
819 let path = Path::from("/take_test");
820
821 let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
823 let values_ref = Arc::new(values);
824 let mut batches = vec![];
825 for batch_id in 0..10 {
826 let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
827 let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
828 let columns: Vec<ArrayRef> = vec![
829 Arc::new(Int64Array::from_iter(
830 value_range.clone().collect::<Vec<_>>(),
831 )),
832 Arc::new(Float32Array::from_iter(
833 value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
834 )),
835 Arc::new(StringArray::from_iter_values(
836 value_range.clone().map(|n| format!("str-{}", n)),
837 )),
838 Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
839 ];
840 batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
841 }
842 schema.set_dictionary(&batches[0]).unwrap();
843
844 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
845 &store,
846 &path,
847 schema.clone(),
848 &Default::default(),
849 )
850 .await
851 .unwrap();
852 for batch in batches.iter() {
853 file_writer
854 .write(std::slice::from_ref(batch))
855 .await
856 .unwrap();
857 }
858 file_writer.finish().await.unwrap();
859
860 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
861 let batch = reader
862 .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
863 .await
864 .unwrap();
865 let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
866 assert_eq!(
867 batch,
868 RecordBatch::try_new(
869 batch.schema(),
870 vec![
871 Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
872 Arc::new(Float32Array::from_iter_values([
873 1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
874 ])),
875 Arc::new(StringArray::from_iter_values([
876 "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
877 ])),
878 Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
879 ]
880 )
881 .unwrap()
882 );
883 }
884
885 async fn test_write_null_string_in_struct(field_nullable: bool) {
886 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
887 "parent",
888 DataType::Struct(ArrowFields::from(vec![ArrowField::new(
889 "str",
890 DataType::Utf8,
891 field_nullable,
892 )])),
893 true,
894 )]));
895
896 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
897
898 let store = ObjectStore::memory();
899 let path = Path::from("/null_strings");
900
901 let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
902 let struct_arr = Arc::new(StructArray::from(vec![(
903 Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
904 string_arr.clone() as ArrayRef,
905 )]));
906 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
907
908 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
909 &store,
910 &path,
911 schema.clone(),
912 &Default::default(),
913 )
914 .await
915 .unwrap();
916 file_writer
917 .write(std::slice::from_ref(&batch))
918 .await
919 .unwrap();
920 file_writer.finish().await.unwrap();
921
922 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
923 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
924
925 if field_nullable {
926 assert_eq!(
927 &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
928 as_string_array(
929 as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
930 .column_by_name("str")
931 .unwrap()
932 .as_ref()
933 )
934 );
935 } else {
936 assert_eq!(actual_batch, batch);
937 }
938 }
939
940 #[tokio::test]
941 async fn read_nullable_string_in_struct() {
942 test_write_null_string_in_struct(true).await;
943 test_write_null_string_in_struct(false).await;
944 }
945
946 #[tokio::test]
947 async fn test_read_struct_of_list_arrays() {
948 let store = ObjectStore::memory();
949 let path = Path::from("/null_strings");
950
951 let arrow_schema = make_schema_of_list_array();
952 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
953
954 let batches = (0..3)
955 .map(|_| {
956 let struct_array = make_struct_of_list_array(10, 10);
957 RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
958 })
959 .collect::<Vec<_>>();
960 let batches_ref = batches.iter().collect::<Vec<_>>();
961
962 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
963 &store,
964 &path,
965 schema.clone(),
966 &Default::default(),
967 )
968 .await
969 .unwrap();
970 file_writer.write(&batches).await.unwrap();
971 file_writer.finish().await.unwrap();
972
973 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
974 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
975 let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
976 assert_eq!(expected, actual_batch);
977 }
978
979 #[tokio::test]
980 async fn test_scan_struct_of_list_arrays() {
981 let store = ObjectStore::memory();
982 let path = Path::from("/null_strings");
983
984 let arrow_schema = make_schema_of_list_array();
985 let struct_array = make_struct_of_list_array(3, 10);
986 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
987 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
988
989 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
990 &store,
991 &path,
992 schema.clone(),
993 &Default::default(),
994 )
995 .await
996 .unwrap();
997 file_writer.write(&[batch]).await.unwrap();
998 file_writer.finish().await.unwrap();
999
1000 let mut expected_columns: Vec<ArrayRef> = Vec::new();
1001 for c in struct_array.columns().iter() {
1002 expected_columns.push(c.slice(1, 1));
1003 }
1004
1005 let expected_struct = match arrow_schema.fields[0].data_type() {
1006 DataType::Struct(subfields) => subfields
1007 .iter()
1008 .zip(expected_columns)
1009 .map(|(f, d)| (f.clone(), d))
1010 .collect::<Vec<_>>(),
1011 _ => panic!("unexpected field"),
1012 };
1013
1014 let expected_struct_array = StructArray::from(expected_struct);
1015 let expected_batch = RecordBatch::from(&StructArray::from(vec![(
1016 Arc::new(arrow_schema.fields[0].as_ref().clone()),
1017 Arc::new(expected_struct_array) as ArrayRef,
1018 )]));
1019
1020 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1021 let params = ReadBatchParams::Range(1..2);
1022 let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
1023 assert_eq!(expected_batch, slice_of_batch);
1024 }
1025
1026 fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
1027 Arc::new(ArrowSchema::new(vec![ArrowField::new(
1028 "s",
1029 DataType::Struct(ArrowFields::from(vec![
1030 ArrowField::new(
1031 "li",
1032 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1033 true,
1034 ),
1035 ArrowField::new(
1036 "ls",
1037 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1038 true,
1039 ),
1040 ArrowField::new(
1041 "ll",
1042 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1043 false,
1044 ),
1045 ])),
1046 true,
1047 )]))
1048 }
1049
1050 fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1051 let mut li_builder = ListBuilder::new(Int32Builder::new());
1052 let mut ls_builder = ListBuilder::new(StringBuilder::new());
1053 let ll_value_builder = Int32Builder::new();
1054 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1055 for i in 0..rows {
1056 for j in 0..num_items {
1057 li_builder.values().append_value(i * 10 + j);
1058 ls_builder
1059 .values()
1060 .append_value(format!("str-{}", i * 10 + j));
1061 large_list_builder.values().append_value(i * 10 + j);
1062 }
1063 li_builder.append(true);
1064 ls_builder.append(true);
1065 large_list_builder.append(true);
1066 }
1067 Arc::new(StructArray::from(vec![
1068 (
1069 Arc::new(ArrowField::new(
1070 "li",
1071 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1072 true,
1073 )),
1074 Arc::new(li_builder.finish()) as ArrayRef,
1075 ),
1076 (
1077 Arc::new(ArrowField::new(
1078 "ls",
1079 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1080 true,
1081 )),
1082 Arc::new(ls_builder.finish()) as ArrayRef,
1083 ),
1084 (
1085 Arc::new(ArrowField::new(
1086 "ll",
1087 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1088 false,
1089 )),
1090 Arc::new(large_list_builder.finish()) as ArrayRef,
1091 ),
1092 ]))
1093 }
1094
1095 #[tokio::test]
1096 async fn test_read_nullable_arrays() {
1097 use arrow_array::Array;
1098
1099 let arrow_schema = ArrowSchema::new(vec![
1101 ArrowField::new("i", DataType::Int64, false),
1102 ArrowField::new("n", DataType::Null, true),
1103 ]);
1104 let schema = Schema::try_from(&arrow_schema).unwrap();
1105 let columns: Vec<ArrayRef> = vec![
1106 Arc::new(Int64Array::from_iter_values(0..100)),
1107 Arc::new(NullArray::new(100)),
1108 ];
1109 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1110
1111 let store = ObjectStore::memory();
1113 let path = Path::from("/takes");
1114 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1115 &store,
1116 &path,
1117 schema.clone(),
1118 &Default::default(),
1119 )
1120 .await
1121 .unwrap();
1122 file_writer.write(&[batch]).await.unwrap();
1123 file_writer.finish().await.unwrap();
1124
1125 let reader = FileReader::try_new(&store, &path, schema.clone())
1127 .await
1128 .unwrap();
1129
1130 async fn read_array_w_params(
1131 reader: &FileReader,
1132 field: &Field,
1133 params: ReadBatchParams,
1134 ) -> ArrayRef {
1135 read_array(reader, field, 0, reader.page_table.as_ref(), ¶ms)
1136 .await
1137 .expect("Error reading back the null array from file") as _
1138 }
1139
1140 let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1141 assert_eq!(100, arr.len());
1142 assert_eq!(arr.data_type(), &DataType::Null);
1143
1144 let arr =
1145 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1146 assert_eq!(15, arr.len());
1147 assert_eq!(arr.data_type(), &DataType::Null);
1148
1149 let arr =
1150 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1151 assert_eq!(40, arr.len());
1152 assert_eq!(arr.data_type(), &DataType::Null);
1153
1154 let arr =
1155 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1156 assert_eq!(25, arr.len());
1157 assert_eq!(arr.data_type(), &DataType::Null);
1158
1159 let arr = read_array_w_params(
1160 &reader,
1161 &schema.fields[1],
1162 ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1163 )
1164 .await;
1165 assert_eq!(4, arr.len());
1166 assert_eq!(arr.data_type(), &DataType::Null);
1167
1168 let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1170 let arr = read_array(
1171 &reader,
1172 &schema.fields[1],
1173 0,
1174 reader.page_table.as_ref(),
1175 ¶ms,
1176 );
1177 assert!(arr.await.is_err());
1178
1179 let params = ReadBatchParams::RangeTo(..107);
1181 let arr = read_array(
1182 &reader,
1183 &schema.fields[1],
1184 0,
1185 reader.page_table.as_ref(),
1186 ¶ms,
1187 );
1188 assert!(arr.await.is_err());
1189 }
1190
1191 #[tokio::test]
1192 async fn test_take_lists() {
1193 let arrow_schema = ArrowSchema::new(vec![
1194 ArrowField::new(
1195 "l",
1196 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1197 false,
1198 ),
1199 ArrowField::new(
1200 "ll",
1201 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1202 false,
1203 ),
1204 ]);
1205
1206 let value_builder = Int32Builder::new();
1207 let mut list_builder = ListBuilder::new(value_builder);
1208 let ll_value_builder = Int32Builder::new();
1209 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1210 for i in 0..100 {
1211 list_builder.values().append_value(i);
1212 large_list_builder.values().append_value(i);
1213 if (i + 1) % 10 == 0 {
1214 list_builder.append(true);
1215 large_list_builder.append(true);
1216 }
1217 }
1218 let list_arr = Arc::new(list_builder.finish());
1219 let large_list_arr = Arc::new(large_list_builder.finish());
1220
1221 let batch = RecordBatch::try_new(
1222 Arc::new(arrow_schema.clone()),
1223 vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1224 )
1225 .unwrap();
1226
1227 let store = ObjectStore::memory();
1229 let path = Path::from("/take_list");
1230 let schema: Schema = (&arrow_schema).try_into().unwrap();
1231 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1232 &store,
1233 &path,
1234 schema.clone(),
1235 &Default::default(),
1236 )
1237 .await
1238 .unwrap();
1239 file_writer.write(&[batch]).await.unwrap();
1240 file_writer.finish().await.unwrap();
1241
1242 let reader = FileReader::try_new(&store, &path, schema.clone())
1244 .await
1245 .unwrap();
1246 let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1247
1248 let value_builder = Int32Builder::new();
1249 let mut list_builder = ListBuilder::new(value_builder);
1250 let ll_value_builder = Int32Builder::new();
1251 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1252 for i in [1, 3, 5, 9] {
1253 for j in 0..10 {
1254 list_builder.values().append_value(i * 10 + j);
1255 large_list_builder.values().append_value(i * 10 + j);
1256 }
1257 list_builder.append(true);
1258 large_list_builder.append(true);
1259 }
1260 let expected_list = list_builder.finish();
1261 let expected_large_list = large_list_builder.finish();
1262
1263 assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1264 assert_eq!(
1265 actual.column_by_name("ll").unwrap().as_ref(),
1266 &expected_large_list
1267 );
1268 }
1269
1270 #[tokio::test]
1271 async fn test_list_array_with_offsets() {
1272 let arrow_schema = ArrowSchema::new(vec![
1273 ArrowField::new(
1274 "l",
1275 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1276 false,
1277 ),
1278 ArrowField::new(
1279 "ll",
1280 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1281 false,
1282 ),
1283 ]);
1284
1285 let store = ObjectStore::memory();
1286 let path = Path::from("/lists");
1287
1288 let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1289 Some(vec![Some(1), Some(2)]),
1290 Some(vec![Some(3), Some(4)]),
1291 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1292 ])
1293 .slice(1, 1);
1294 let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1295 Some(vec![Some(10), Some(11)]),
1296 Some(vec![Some(12), Some(13)]),
1297 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1298 ])
1299 .slice(1, 1);
1300
1301 let batch = RecordBatch::try_new(
1302 Arc::new(arrow_schema.clone()),
1303 vec![Arc::new(list_array), Arc::new(large_list_array)],
1304 )
1305 .unwrap();
1306
1307 let schema: Schema = (&arrow_schema).try_into().unwrap();
1308 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1309 &store,
1310 &path,
1311 schema.clone(),
1312 &Default::default(),
1313 )
1314 .await
1315 .unwrap();
1316 file_writer
1317 .write(std::slice::from_ref(&batch))
1318 .await
1319 .unwrap();
1320 file_writer.finish().await.unwrap();
1321
1322 let file_size_bytes = store.size(&path).await.unwrap();
1324 assert!(file_size_bytes < 1_000);
1325
1326 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1327 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1328 assert_eq!(batch, actual_batch);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_read_ranges() {
1333 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1335 let schema = Schema::try_from(&arrow_schema).unwrap();
1336 let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1337 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1338
1339 let store = ObjectStore::memory();
1341 let path = Path::from("/read_range");
1342 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1343 &store,
1344 &path,
1345 schema.clone(),
1346 &Default::default(),
1347 )
1348 .await
1349 .unwrap();
1350 file_writer.write(&[batch]).await.unwrap();
1351 file_writer.finish().await.unwrap();
1352
1353 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1354 let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1355
1356 assert_eq!(
1357 actual_batch.column_by_name("i").unwrap().as_ref(),
1358 &Int64Array::from_iter_values(7..25)
1359 );
1360 }
1361
1362 #[tokio::test]
1363 async fn test_batches_stream() {
1364 let store = ObjectStore::memory();
1365 let path = Path::from("/batch_stream");
1366
1367 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1368 let schema = Schema::try_from(&arrow_schema).unwrap();
1369 let mut writer = FileWriter::<NotSelfDescribing>::try_new(
1370 &store,
1371 &path,
1372 schema.clone(),
1373 &Default::default(),
1374 )
1375 .await
1376 .unwrap();
1377 for i in 0..10 {
1378 let batch = RecordBatch::try_new(
1379 Arc::new(arrow_schema.clone()),
1380 vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1381 )
1382 .unwrap();
1383 writer.write(&[batch]).await.unwrap();
1384 }
1385 writer.finish().await.unwrap();
1386
1387 let reader = FileReader::try_new(&store, &path, schema.clone())
1388 .await
1389 .unwrap();
1390 let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1391 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1392
1393 assert_eq!(batches.len(), 5);
1394 for (i, batch) in batches.iter().enumerate() {
1395 assert_eq!(
1396 batch,
1397 &RecordBatch::try_new(
1398 Arc::new(arrow_schema.clone()),
1399 vec![Arc::new(Int32Array::from_iter_values(
1400 i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1401 ))],
1402 )
1403 .unwrap()
1404 )
1405 }
1406 }
1407
1408 #[tokio::test]
1409 async fn test_take_boolean_beyond_chunk() {
1410 let store = ObjectStore::from_uri_and_params(
1411 Arc::new(Default::default()),
1412 "memory://",
1413 &ObjectStoreParams {
1414 block_size: Some(256),
1415 ..Default::default()
1416 },
1417 )
1418 .await
1419 .unwrap()
1420 .0;
1421 let path = Path::from("/take_bools");
1422
1423 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1424 "b",
1425 DataType::Boolean,
1426 false,
1427 )]));
1428 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1429 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1430 &store,
1431 &path,
1432 schema.clone(),
1433 &Default::default(),
1434 )
1435 .await
1436 .unwrap();
1437
1438 let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1439 let batch =
1440 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1441 file_writer.write(&[batch]).await.unwrap();
1442 file_writer.finish().await.unwrap();
1443
1444 let reader = FileReader::try_new(&store, &path, schema.clone())
1445 .await
1446 .unwrap();
1447 let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1448
1449 assert_eq!(
1450 actual.column_by_name("b").unwrap().as_ref(),
1451 &BooleanArray::from(vec![false, false, true, false, true])
1452 );
1453 }
1454
1455 #[tokio::test]
1456 async fn test_read_projection() {
1457 let store = ObjectStore::memory();
1461 let path = Path::from("/partial_read");
1462
1463 let mut fields = vec![];
1465 for i in 0..100 {
1466 fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1467 }
1468 let arrow_schema = ArrowSchema::new(fields);
1469 let schema = Schema::try_from(&arrow_schema).unwrap();
1470
1471 let partial_schema = schema.project(&["f50"]).unwrap();
1472 let partial_arrow: ArrowSchema = (&partial_schema).into();
1473
1474 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1475 &store,
1476 &path,
1477 partial_schema.clone(),
1478 &Default::default(),
1479 )
1480 .await
1481 .unwrap();
1482
1483 let array = Int32Array::from(vec![0; 15]);
1484 let batch =
1485 RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1486 file_writer
1487 .write(std::slice::from_ref(&batch))
1488 .await
1489 .unwrap();
1490 file_writer.finish().await.unwrap();
1491
1492 let field_id = partial_schema.fields.first().unwrap().id;
1493 let reader = FileReader::try_new_with_fragment_id(
1494 &store,
1495 &path,
1496 schema.clone(),
1497 0,
1498 field_id,
1499 field_id,
1500 None,
1501 )
1502 .await
1503 .unwrap();
1504 let actual = reader
1505 .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1506 .await
1507 .unwrap();
1508
1509 assert_eq!(actual, batch);
1510 }
1511}