1use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12 ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
13 RecordBatch, StructArray, UInt32Array,
14 builder::PrimitiveBuilder,
15 cast::AsArray,
16 types::{Int32Type, Int64Type},
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{Future, FutureExt, StreamExt, TryStreamExt, stream};
24use lance_arrow::*;
25use lance_core::cache::{CacheKey, LanceCache};
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::AsyncIndex;
29use lance_io::encodings::dictionary::DictionaryDecoder;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33 read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{ReadBatchParams, object_store::ObjectStore};
36use std::borrow::Cow;
37
38use object_store::path::Path;
39use tracing::instrument;
40
41use crate::previous::format::metadata::Metadata;
42use crate::previous::page_table::{PageInfo, PageTable};
43
44#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49 pub object_reader: Arc<dyn Reader>,
50 metadata: Arc<Metadata>,
51 page_table: Arc<PageTable>,
52 schema: Schema,
53
54 fragment_id: u64,
57
58 stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("FileReader")
65 .field("fragment", &self.fragment_id)
66 .field("path", &self.object_reader.path())
67 .finish()
68 }
69}
70
71struct StringCacheKey<'a, T> {
73 key: &'a str,
74 _phantom: std::marker::PhantomData<T>,
75}
76
77impl<'a, T> StringCacheKey<'a, T> {
78 fn new(key: &'a str) -> Self {
79 Self {
80 key,
81 _phantom: std::marker::PhantomData,
82 }
83 }
84}
85
86impl<T: 'static> CacheKey for StringCacheKey<'_, T> {
87 type ValueType = T;
88
89 fn key(&self) -> Cow<'_, str> {
90 self.key.into()
91 }
92
93 fn type_name() -> &'static str {
94 std::any::type_name::<T>()
98 }
99}
100
101impl FileReader {
102 #[instrument(level = "debug", skip(object_store, schema, session))]
115 pub async fn try_new_with_fragment_id(
116 object_store: &ObjectStore,
117 path: &Path,
118 schema: Schema,
119 fragment_id: u32,
120 field_id_offset: i32,
121 max_field_id: i32,
122 session: Option<&LanceCache>,
123 ) -> Result<Self> {
124 let object_reader = object_store.open(path).await?;
125
126 let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
127
128 Self::try_new_from_reader(
129 path,
130 object_reader.into(),
131 Some(metadata),
132 schema,
133 fragment_id,
134 field_id_offset,
135 max_field_id,
136 session,
137 )
138 .await
139 }
140
141 #[allow(clippy::too_many_arguments)]
142 pub async fn try_new_from_reader(
143 path: &Path,
144 object_reader: Arc<dyn Reader>,
145 metadata: Option<Arc<Metadata>>,
146 schema: Schema,
147 fragment_id: u32,
148 field_id_offset: i32,
149 max_field_id: i32,
150 session: Option<&LanceCache>,
151 ) -> Result<Self> {
152 let metadata = match metadata {
153 Some(metadata) => metadata,
154 None => Self::read_metadata(object_reader.as_ref(), session).await?,
155 };
156
157 let page_table = async {
158 Self::load_from_cache(session, path.to_string(), |_| async {
159 PageTable::load(
160 object_reader.as_ref(),
161 metadata.page_table_position,
162 field_id_offset,
163 max_field_id,
164 metadata.num_batches() as i32,
165 )
166 .await
167 })
168 .await
169 };
170
171 let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
172
173 let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
175
176 Ok(Self {
177 object_reader,
178 metadata,
179 schema,
180 page_table,
181 fragment_id: fragment_id as u64,
182 stats_page_table,
183 })
184 }
185
186 pub async fn read_metadata(
187 object_reader: &dyn Reader,
188 cache: Option<&LanceCache>,
189 ) -> Result<Arc<Metadata>> {
190 Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
191 let file_size = object_reader.size().await?;
192 let begin = if file_size < object_reader.block_size() {
193 0
194 } else {
195 file_size - object_reader.block_size()
196 };
197 let tail_bytes = object_reader.get_range(begin..file_size).await?;
198 let metadata_pos = read_metadata_offset(&tail_bytes)?;
199
200 let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
201 read_struct(object_reader, metadata_pos).await?
203 } else {
204 let offset = tail_bytes.len() - (file_size - metadata_pos);
205 read_struct_from_buf(&tail_bytes.slice(offset..))?
206 };
207 Ok(metadata)
208 })
209 .await
210 }
211
212 async fn read_stats_page_table(
216 reader: &dyn Reader,
217 cache: Option<&LanceCache>,
218 ) -> Result<Arc<Option<PageTable>>> {
219 Self::load_from_cache(
221 cache,
222 reader.path().clone().join("stats").to_string(),
223 |_| async {
224 let metadata = Self::read_metadata(reader, cache).await?;
225
226 if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
227 Ok(Some(
228 PageTable::load(
229 reader,
230 stats_meta.page_table_position,
231 0,
232 *stats_meta.leaf_field_ids.iter().max().unwrap(),
234 1,
235 )
236 .await?,
237 ))
238 } else {
239 Ok(None)
240 }
241 },
242 )
243 .await
244 }
245
246 async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
248 cache: Option<&LanceCache>,
249 key: String,
250 loader: F,
251 ) -> Result<Arc<T>>
252 where
253 F: Fn(&str) -> Fut + Send + Sync,
254 Fut: Future<Output = Result<T>> + Send,
255 {
256 if let Some(cache) = cache {
257 let cache_key = StringCacheKey::<T>::new(key.as_str());
258 cache
259 .get_or_insert_with_key(cache_key, || loader(key.as_str()))
260 .await
261 } else {
262 Ok(Arc::new(loader(key.as_str()).await?))
263 }
264 }
265
266 pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
268 let max_field_id = schema.max_field_id().unwrap_or_default();
270 Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
271 }
272
273 fn io_parallelism(&self) -> usize {
274 self.object_reader.io_parallelism()
275 }
276
277 pub fn schema(&self) -> &Schema {
279 &self.schema
280 }
281
282 pub fn num_batches(&self) -> usize {
283 self.metadata.num_batches()
284 }
285
286 pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
288 self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
289 }
290
291 pub fn len(&self) -> usize {
293 self.metadata.len()
294 }
295
296 pub fn is_empty(&self) -> bool {
297 self.metadata.is_empty()
298 }
299
300 #[instrument(level = "debug", skip(self, params, projection))]
304 pub async fn read_batch(
305 &self,
306 batch_id: i32,
307 params: impl Into<ReadBatchParams>,
308 projection: &Schema,
309 ) -> Result<RecordBatch> {
310 read_batch(self, ¶ms.into(), projection, batch_id).await
311 }
312
313 #[instrument(level = "debug", skip(self, projection))]
318 pub async fn read_range(
319 &self,
320 range: Range<usize>,
321 projection: &Schema,
322 ) -> Result<RecordBatch> {
323 if range.is_empty() {
324 return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
325 }
326 let range_in_batches = self.metadata.range_to_batches(range)?;
327 let batches =
328 stream::iter(range_in_batches)
329 .map(|(batch_id, range)| async move {
330 self.read_batch(batch_id, range, projection).await
331 })
332 .buffered(self.io_parallelism())
333 .try_collect::<Vec<_>>()
334 .await?;
335 if batches.len() == 1 {
336 return Ok(batches[0].clone());
337 }
338 let schema = batches[0].schema();
339 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
340 }
341
342 #[instrument(level = "debug", skip_all)]
346 pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
347 let num_batches = self.num_batches();
348 let num_rows = self.len() as u32;
349 let indices_in_batches = self.metadata.group_indices_to_batches(indices);
350 let batches = stream::iter(indices_in_batches)
351 .map(|batch| async move {
352 if batch.batch_id >= num_batches as i32 {
353 Err(Error::invalid_input_source(
354 format!("batch_id: {} out of bounds", batch.batch_id).into(),
355 ))
356 } else if *batch.offsets.last().expect("got empty batch") > num_rows {
357 Err(Error::invalid_input_source(
358 format!("indices: {:?} out of bounds", batch.offsets).into(),
359 ))
360 } else {
361 self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
362 .await
363 }
364 })
365 .buffered(self.io_parallelism())
366 .try_collect::<Vec<_>>()
367 .await?;
368
369 let schema = Arc::new(ArrowSchema::from(projection));
370
371 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
372 }
373
374 pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
376 self.metadata.stats_metadata.as_ref().map(|meta| {
377 let mut stats_field_ids = vec![];
378 for stats_field in &meta.schema.fields {
379 if let Ok(stats_field_id) = stats_field.name.parse::<i32>()
380 && field_ids.contains(&stats_field_id)
381 {
382 stats_field_ids.push(stats_field.id);
383 for child in &stats_field.children {
384 stats_field_ids.push(child.id);
385 }
386 }
387 }
388 meta.schema.project_by_ids(&stats_field_ids, true)
389 })
390 }
391
392 pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
394 if let Some(stats_page_table) = self.stats_page_table.as_ref() {
395 let projection = self.page_stats_schema(field_ids).unwrap();
396 if projection.fields.is_empty() {
398 return Ok(None);
399 }
400 let arrays = futures::stream::iter(projection.fields.iter().cloned())
401 .map(|field| async move {
402 read_array(
403 self,
404 &field,
405 0,
406 stats_page_table,
407 &ReadBatchParams::RangeFull,
408 )
409 .await
410 })
411 .buffered(self.io_parallelism())
412 .try_collect::<Vec<_>>()
413 .await?;
414
415 let schema = ArrowSchema::from(&projection);
416 let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
417 Ok(Some(batch))
418 } else {
419 Ok(None)
420 }
421 }
422}
423
424pub fn batches_stream(
435 reader: FileReader,
436 projection: Schema,
437 predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
438) -> impl RecordBatchStream {
439 let projection = Arc::new(projection);
441 let arrow_schema = ArrowSchema::from(projection.as_ref());
442
443 let total_batches = reader.num_batches() as i32;
444 let batches = (0..total_batches).filter(predicate);
445 let this = Arc::new(reader);
447 let inner = stream::iter(batches)
448 .zip(stream::repeat_with(move || {
449 (this.clone(), projection.clone())
450 }))
451 .map(move |(batch_id, (reader, projection))| async move {
452 reader
453 .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
454 .await
455 })
456 .buffered(2)
457 .boxed();
458 RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
459}
460
461pub async fn read_batch(
466 reader: &FileReader,
467 params: &ReadBatchParams,
468 schema: &Schema,
469 batch_id: i32,
470) -> Result<RecordBatch> {
471 if !schema.fields.is_empty() {
472 let arrs = stream::iter(&schema.fields)
474 .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
475 .buffered(reader.io_parallelism())
476 .try_collect::<Vec<_>>()
477 .boxed();
478 let arrs = arrs.await?;
479 Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
480 } else {
481 Err(Error::invalid_input("no fields requested"))
482 }
483}
484
485#[async_recursion]
486async fn read_array(
487 reader: &FileReader,
488 field: &Field,
489 batch_id: i32,
490 page_table: &PageTable,
491 params: &ReadBatchParams,
492) -> Result<ArrayRef> {
493 let data_type = field.data_type();
494
495 use DataType::*;
496
497 if data_type.is_fixed_stride() {
498 _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
499 } else {
500 match data_type {
501 Null => read_null_array(field, batch_id, page_table, params),
502 Utf8 | LargeUtf8 | Binary | LargeBinary => {
503 read_binary_array(reader, field, batch_id, page_table, params).await
504 }
505 Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
506 Dictionary(_, _) => {
507 read_dictionary_array(reader, field, batch_id, page_table, params).await
508 }
509 List(_) => {
510 read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
511 }
512 LargeList(_) => {
513 read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
514 }
515 _ => {
516 unimplemented!("{}", format!("No support for {data_type} yet"));
517 }
518 }
519 }
520}
521
522fn get_page_info<'a>(
523 page_table: &'a PageTable,
524 field: &'a Field,
525 batch_id: i32,
526) -> Result<&'a PageInfo> {
527 page_table.get(field.id, batch_id).ok_or_else(|| {
528 Error::invalid_input(format!(
529 "No page info found for field: {}, field_id={} batch={}",
530 field.name, field.id, batch_id
531 ))
532 })
533}
534
535async fn _read_fixed_stride_array(
537 reader: &FileReader,
538 field: &Field,
539 batch_id: i32,
540 page_table: &PageTable,
541 params: &ReadBatchParams,
542) -> Result<ArrayRef> {
543 let page_info = get_page_info(page_table, field, batch_id)?;
544 read_fixed_stride_array(
545 reader.object_reader.as_ref(),
546 &field.data_type(),
547 page_info.position,
548 page_info.length,
549 params.clone(),
550 )
551 .await
552}
553
554fn read_null_array(
555 field: &Field,
556 batch_id: i32,
557 page_table: &PageTable,
558 params: &ReadBatchParams,
559) -> Result<ArrayRef> {
560 let page_info = get_page_info(page_table, field, batch_id)?;
561
562 let length_output = match params {
563 ReadBatchParams::Indices(indices) => {
564 if indices.is_empty() {
565 0
566 } else {
567 let idx_max = *indices.values().iter().max().unwrap() as u64;
568 if idx_max >= page_info.length as u64 {
569 return Err(Error::invalid_input(format!(
570 "NullArray Reader: request([{}]) out of range: [0..{}]",
571 idx_max, page_info.length
572 )));
573 }
574 indices.len()
575 }
576 }
577 _ => {
578 let (idx_start, idx_end) = match params {
579 ReadBatchParams::Range(r) => (r.start, r.end),
580 ReadBatchParams::RangeFull => (0, page_info.length),
581 ReadBatchParams::RangeTo(r) => (0, r.end),
582 ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
583 _ => unreachable!(),
584 };
585 if idx_end > page_info.length {
586 return Err(Error::invalid_input(format!(
587 "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
588 idx_start,
590 idx_end,
591 page_info.length
592 )));
593 }
594 idx_end - idx_start
595 }
596 };
597
598 Ok(Arc::new(NullArray::new(length_output)))
599}
600
601async fn read_binary_array(
602 reader: &FileReader,
603 field: &Field,
604 batch_id: i32,
605 page_table: &PageTable,
606 params: &ReadBatchParams,
607) -> Result<ArrayRef> {
608 let page_info = get_page_info(page_table, field, batch_id)?;
609
610 lance_io::utils::read_binary_array(
611 reader.object_reader.as_ref(),
612 &field.data_type(),
613 field.nullable,
614 page_info.position,
615 page_info.length,
616 params,
617 )
618 .await
619}
620
621async fn read_dictionary_array(
622 reader: &FileReader,
623 field: &Field,
624 batch_id: i32,
625 page_table: &PageTable,
626 params: &ReadBatchParams,
627) -> Result<ArrayRef> {
628 let page_info = get_page_info(page_table, field, batch_id)?;
629 let data_type = field.data_type();
630 let decoder = DictionaryDecoder::new(
631 reader.object_reader.as_ref(),
632 page_info.position,
633 page_info.length,
634 &data_type,
635 field
636 .dictionary
637 .as_ref()
638 .unwrap()
639 .values
640 .as_ref()
641 .unwrap()
642 .clone(),
643 );
644 decoder.get(params.clone()).await
645}
646
647async fn read_struct_array(
648 reader: &FileReader,
649 field: &Field,
650 batch_id: i32,
651 page_table: &PageTable,
652 params: &ReadBatchParams,
653) -> Result<ArrayRef> {
654 let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
656
657 for child in field.children.as_slice() {
658 let arr = read_array(reader, child, batch_id, page_table, params).await?;
659 sub_arrays.push((Arc::new(child.into()), arr));
660 }
661
662 Ok(Arc::new(StructArray::from(sub_arrays)))
663}
664
665async fn take_list_array<T: ArrowNumericType>(
666 reader: &FileReader,
667 field: &Field,
668 batch_id: i32,
669 page_table: &PageTable,
670 positions: &PrimitiveArray<T>,
671 indices: &UInt32Array,
672) -> Result<ArrayRef>
673where
674 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
675{
676 let first_idx = indices.value(0);
677 let ranges = indices
679 .values()
680 .iter()
681 .map(|i| (*i - first_idx).as_usize())
682 .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
683 .collect::<Vec<_>>();
684 let field = field.clone();
685 let mut list_values: Vec<ArrayRef> = vec![];
686 for range in ranges.iter() {
688 list_values.push(
689 read_array(
690 reader,
691 &field.children[0],
692 batch_id,
693 page_table,
694 &(range.clone()).into(),
695 )
696 .await?,
697 );
698 }
699
700 let value_refs = list_values
701 .iter()
702 .map(|arr| arr.as_ref())
703 .collect::<Vec<_>>();
704 let mut offsets_builder = PrimitiveBuilder::<T>::new();
705 offsets_builder.append_value(T::Native::usize_as(0));
706 let mut off = 0_usize;
707 for range in ranges {
708 off += range.len();
709 offsets_builder.append_value(T::Native::usize_as(off));
710 }
711 let all_values = concat::concat(value_refs.as_slice())?;
712 let offset_arr = offsets_builder.finish();
713 let arr = try_new_generic_list_array(all_values, &offset_arr)?;
714 Ok(Arc::new(arr) as ArrayRef)
715}
716
717async fn read_list_array<T: ArrowNumericType>(
718 reader: &FileReader,
719 field: &Field,
720 batch_id: i32,
721 page_table: &PageTable,
722 params: &ReadBatchParams,
723) -> Result<ArrayRef>
724where
725 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
726{
727 let positions_params = match params {
729 ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
730 ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
731 ReadBatchParams::Indices(indices) => {
732 (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
733 }
734 p => p.clone(),
735 };
736
737 let page_info = get_page_info(&reader.page_table, field, batch_id)?;
738 let position_arr = read_fixed_stride_array(
739 reader.object_reader.as_ref(),
740 &T::DATA_TYPE,
741 page_info.position,
742 page_info.length,
743 positions_params,
744 )
745 .await?;
746
747 let positions: &PrimitiveArray<T> = position_arr.as_primitive();
748
749 let value_params = match params {
751 ReadBatchParams::Range(range) => ReadBatchParams::from(
752 positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
753 ),
754 ReadBatchParams::Ranges(_) => {
755 return Err(Error::internal(
756 "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
757 ));
758 }
759 ReadBatchParams::RangeTo(RangeTo { end }) => {
760 ReadBatchParams::from(..positions.value(*end).as_usize())
761 }
762 ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
763 ReadBatchParams::RangeFull => ReadBatchParams::from(
764 positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
765 ),
766 ReadBatchParams::Indices(indices) => {
767 return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
768 }
769 };
770
771 let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
772 let offset_arr = sub(positions, &start_position)?;
773 let offset_arr_ref = offset_arr.as_primitive::<T>();
774 let value_arrs = read_array(
775 reader,
776 &field.children[0],
777 batch_id,
778 page_table,
779 &value_params,
780 )
781 .await?;
782 let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
783 Ok(Arc::new(arr) as ArrayRef)
784}
785
786#[cfg(test)]
787mod tests {
788 use crate::previous::writer::{FileWriter as PreviousFileWriter, NotSelfDescribing};
789
790 use super::*;
791
792 use arrow_array::{
793 Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
794 UInt8Array,
795 builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
796 cast::{as_string_array, as_struct_array},
797 types::UInt8Type,
798 };
799 use arrow_array::{BooleanArray, Int32Array};
800 use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
801 use lance_io::object_store::ObjectStoreParams;
802
803 #[tokio::test]
804 async fn test_take() {
805 let arrow_schema = ArrowSchema::new(vec![
806 ArrowField::new("i", DataType::Int64, true),
807 ArrowField::new("f", DataType::Float32, false),
808 ArrowField::new("s", DataType::Utf8, false),
809 ArrowField::new(
810 "d",
811 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
812 false,
813 ),
814 ]);
815 let mut schema = Schema::try_from(&arrow_schema).unwrap();
816
817 let store = ObjectStore::memory();
818 let path = Path::from("/take_test");
819
820 let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
822 let values_ref = Arc::new(values);
823 let mut batches = vec![];
824 for batch_id in 0..10 {
825 let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
826 let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
827 let columns: Vec<ArrayRef> = vec![
828 Arc::new(Int64Array::from_iter(
829 value_range.clone().collect::<Vec<_>>(),
830 )),
831 Arc::new(Float32Array::from_iter(
832 value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
833 )),
834 Arc::new(StringArray::from_iter_values(
835 value_range.clone().map(|n| format!("str-{}", n)),
836 )),
837 Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
838 ];
839 batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
840 }
841 schema.set_dictionary(&batches[0]).unwrap();
842
843 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
844 &store,
845 &path,
846 schema.clone(),
847 &Default::default(),
848 )
849 .await
850 .unwrap();
851 for batch in batches.iter() {
852 file_writer
853 .write(std::slice::from_ref(batch))
854 .await
855 .unwrap();
856 }
857 file_writer.finish().await.unwrap();
858
859 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
860 let batch = reader
861 .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
862 .await
863 .unwrap();
864 let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
865 assert_eq!(
866 batch,
867 RecordBatch::try_new(
868 batch.schema(),
869 vec![
870 Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
871 Arc::new(Float32Array::from_iter_values([
872 1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
873 ])),
874 Arc::new(StringArray::from_iter_values([
875 "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
876 ])),
877 Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
878 ]
879 )
880 .unwrap()
881 );
882 }
883
884 async fn test_write_null_string_in_struct(field_nullable: bool) {
885 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
886 "parent",
887 DataType::Struct(ArrowFields::from(vec![ArrowField::new(
888 "str",
889 DataType::Utf8,
890 field_nullable,
891 )])),
892 true,
893 )]));
894
895 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
896
897 let store = ObjectStore::memory();
898 let path = Path::from("/null_strings");
899
900 let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
901 let struct_arr = Arc::new(StructArray::from(vec![(
902 Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
903 string_arr.clone() as ArrayRef,
904 )]));
905 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
906
907 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
908 &store,
909 &path,
910 schema.clone(),
911 &Default::default(),
912 )
913 .await
914 .unwrap();
915 file_writer
916 .write(std::slice::from_ref(&batch))
917 .await
918 .unwrap();
919 file_writer.finish().await.unwrap();
920
921 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
922 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
923
924 if field_nullable {
925 assert_eq!(
926 &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
927 as_string_array(
928 as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
929 .column_by_name("str")
930 .unwrap()
931 .as_ref()
932 )
933 );
934 } else {
935 assert_eq!(actual_batch, batch);
936 }
937 }
938
939 #[tokio::test]
940 async fn read_nullable_string_in_struct() {
941 test_write_null_string_in_struct(true).await;
942 test_write_null_string_in_struct(false).await;
943 }
944
945 #[tokio::test]
946 async fn test_read_struct_of_list_arrays() {
947 let store = ObjectStore::memory();
948 let path = Path::from("/null_strings");
949
950 let arrow_schema = make_schema_of_list_array();
951 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
952
953 let batches = (0..3)
954 .map(|_| {
955 let struct_array = make_struct_of_list_array(10, 10);
956 RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
957 })
958 .collect::<Vec<_>>();
959 let batches_ref = batches.iter().collect::<Vec<_>>();
960
961 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
962 &store,
963 &path,
964 schema.clone(),
965 &Default::default(),
966 )
967 .await
968 .unwrap();
969 file_writer.write(&batches).await.unwrap();
970 file_writer.finish().await.unwrap();
971
972 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
973 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
974 let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
975 assert_eq!(expected, actual_batch);
976 }
977
978 #[tokio::test]
979 async fn test_scan_struct_of_list_arrays() {
980 let store = ObjectStore::memory();
981 let path = Path::from("/null_strings");
982
983 let arrow_schema = make_schema_of_list_array();
984 let struct_array = make_struct_of_list_array(3, 10);
985 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
986 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
987
988 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
989 &store,
990 &path,
991 schema.clone(),
992 &Default::default(),
993 )
994 .await
995 .unwrap();
996 file_writer.write(&[batch]).await.unwrap();
997 file_writer.finish().await.unwrap();
998
999 let mut expected_columns: Vec<ArrayRef> = Vec::new();
1000 for c in struct_array.columns().iter() {
1001 expected_columns.push(c.slice(1, 1));
1002 }
1003
1004 let expected_struct = match arrow_schema.fields[0].data_type() {
1005 DataType::Struct(subfields) => subfields
1006 .iter()
1007 .zip(expected_columns)
1008 .map(|(f, d)| (f.clone(), d))
1009 .collect::<Vec<_>>(),
1010 _ => panic!("unexpected field"),
1011 };
1012
1013 let expected_struct_array = StructArray::from(expected_struct);
1014 let expected_batch = RecordBatch::from(&StructArray::from(vec![(
1015 Arc::new(arrow_schema.fields[0].as_ref().clone()),
1016 Arc::new(expected_struct_array) as ArrayRef,
1017 )]));
1018
1019 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1020 let params = ReadBatchParams::Range(1..2);
1021 let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
1022 assert_eq!(expected_batch, slice_of_batch);
1023 }
1024
1025 fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
1026 Arc::new(ArrowSchema::new(vec![ArrowField::new(
1027 "s",
1028 DataType::Struct(ArrowFields::from(vec![
1029 ArrowField::new(
1030 "li",
1031 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1032 true,
1033 ),
1034 ArrowField::new(
1035 "ls",
1036 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1037 true,
1038 ),
1039 ArrowField::new(
1040 "ll",
1041 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1042 false,
1043 ),
1044 ])),
1045 true,
1046 )]))
1047 }
1048
1049 fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1050 let mut li_builder = ListBuilder::new(Int32Builder::new());
1051 let mut ls_builder = ListBuilder::new(StringBuilder::new());
1052 let ll_value_builder = Int32Builder::new();
1053 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1054 for i in 0..rows {
1055 for j in 0..num_items {
1056 li_builder.values().append_value(i * 10 + j);
1057 ls_builder
1058 .values()
1059 .append_value(format!("str-{}", i * 10 + j));
1060 large_list_builder.values().append_value(i * 10 + j);
1061 }
1062 li_builder.append(true);
1063 ls_builder.append(true);
1064 large_list_builder.append(true);
1065 }
1066 Arc::new(StructArray::from(vec![
1067 (
1068 Arc::new(ArrowField::new(
1069 "li",
1070 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1071 true,
1072 )),
1073 Arc::new(li_builder.finish()) as ArrayRef,
1074 ),
1075 (
1076 Arc::new(ArrowField::new(
1077 "ls",
1078 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1079 true,
1080 )),
1081 Arc::new(ls_builder.finish()) as ArrayRef,
1082 ),
1083 (
1084 Arc::new(ArrowField::new(
1085 "ll",
1086 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1087 false,
1088 )),
1089 Arc::new(large_list_builder.finish()) as ArrayRef,
1090 ),
1091 ]))
1092 }
1093
1094 #[tokio::test]
1095 async fn test_read_nullable_arrays() {
1096 use arrow_array::Array;
1097
1098 let arrow_schema = ArrowSchema::new(vec![
1100 ArrowField::new("i", DataType::Int64, false),
1101 ArrowField::new("n", DataType::Null, true),
1102 ]);
1103 let schema = Schema::try_from(&arrow_schema).unwrap();
1104 let columns: Vec<ArrayRef> = vec![
1105 Arc::new(Int64Array::from_iter_values(0..100)),
1106 Arc::new(NullArray::new(100)),
1107 ];
1108 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1109
1110 let store = ObjectStore::memory();
1112 let path = Path::from("/takes");
1113 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1114 &store,
1115 &path,
1116 schema.clone(),
1117 &Default::default(),
1118 )
1119 .await
1120 .unwrap();
1121 file_writer.write(&[batch]).await.unwrap();
1122 file_writer.finish().await.unwrap();
1123
1124 let reader = FileReader::try_new(&store, &path, schema.clone())
1126 .await
1127 .unwrap();
1128
1129 async fn read_array_w_params(
1130 reader: &FileReader,
1131 field: &Field,
1132 params: ReadBatchParams,
1133 ) -> ArrayRef {
1134 read_array(reader, field, 0, reader.page_table.as_ref(), ¶ms)
1135 .await
1136 .expect("Error reading back the null array from file") as _
1137 }
1138
1139 let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1140 assert_eq!(100, arr.len());
1141 assert_eq!(arr.data_type(), &DataType::Null);
1142
1143 let arr =
1144 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1145 assert_eq!(15, arr.len());
1146 assert_eq!(arr.data_type(), &DataType::Null);
1147
1148 let arr =
1149 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1150 assert_eq!(40, arr.len());
1151 assert_eq!(arr.data_type(), &DataType::Null);
1152
1153 let arr =
1154 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1155 assert_eq!(25, arr.len());
1156 assert_eq!(arr.data_type(), &DataType::Null);
1157
1158 let arr = read_array_w_params(
1159 &reader,
1160 &schema.fields[1],
1161 ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1162 )
1163 .await;
1164 assert_eq!(4, arr.len());
1165 assert_eq!(arr.data_type(), &DataType::Null);
1166
1167 let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1169 let arr = read_array(
1170 &reader,
1171 &schema.fields[1],
1172 0,
1173 reader.page_table.as_ref(),
1174 ¶ms,
1175 );
1176 assert!(arr.await.is_err());
1177
1178 let params = ReadBatchParams::RangeTo(..107);
1180 let arr = read_array(
1181 &reader,
1182 &schema.fields[1],
1183 0,
1184 reader.page_table.as_ref(),
1185 ¶ms,
1186 );
1187 assert!(arr.await.is_err());
1188 }
1189
1190 #[tokio::test]
1191 async fn test_take_lists() {
1192 let arrow_schema = ArrowSchema::new(vec![
1193 ArrowField::new(
1194 "l",
1195 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1196 false,
1197 ),
1198 ArrowField::new(
1199 "ll",
1200 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1201 false,
1202 ),
1203 ]);
1204
1205 let value_builder = Int32Builder::new();
1206 let mut list_builder = ListBuilder::new(value_builder);
1207 let ll_value_builder = Int32Builder::new();
1208 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1209 for i in 0..100 {
1210 list_builder.values().append_value(i);
1211 large_list_builder.values().append_value(i);
1212 if (i + 1) % 10 == 0 {
1213 list_builder.append(true);
1214 large_list_builder.append(true);
1215 }
1216 }
1217 let list_arr = Arc::new(list_builder.finish());
1218 let large_list_arr = Arc::new(large_list_builder.finish());
1219
1220 let batch = RecordBatch::try_new(
1221 Arc::new(arrow_schema.clone()),
1222 vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1223 )
1224 .unwrap();
1225
1226 let store = ObjectStore::memory();
1228 let path = Path::from("/take_list");
1229 let schema: Schema = (&arrow_schema).try_into().unwrap();
1230 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1231 &store,
1232 &path,
1233 schema.clone(),
1234 &Default::default(),
1235 )
1236 .await
1237 .unwrap();
1238 file_writer.write(&[batch]).await.unwrap();
1239 file_writer.finish().await.unwrap();
1240
1241 let reader = FileReader::try_new(&store, &path, schema.clone())
1243 .await
1244 .unwrap();
1245 let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1246
1247 let value_builder = Int32Builder::new();
1248 let mut list_builder = ListBuilder::new(value_builder);
1249 let ll_value_builder = Int32Builder::new();
1250 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1251 for i in [1, 3, 5, 9] {
1252 for j in 0..10 {
1253 list_builder.values().append_value(i * 10 + j);
1254 large_list_builder.values().append_value(i * 10 + j);
1255 }
1256 list_builder.append(true);
1257 large_list_builder.append(true);
1258 }
1259 let expected_list = list_builder.finish();
1260 let expected_large_list = large_list_builder.finish();
1261
1262 assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1263 assert_eq!(
1264 actual.column_by_name("ll").unwrap().as_ref(),
1265 &expected_large_list
1266 );
1267 }
1268
1269 #[tokio::test]
1270 async fn test_list_array_with_offsets() {
1271 let arrow_schema = ArrowSchema::new(vec![
1272 ArrowField::new(
1273 "l",
1274 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1275 false,
1276 ),
1277 ArrowField::new(
1278 "ll",
1279 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1280 false,
1281 ),
1282 ]);
1283
1284 let store = ObjectStore::memory();
1285 let path = Path::from("/lists");
1286
1287 let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1288 Some(vec![Some(1), Some(2)]),
1289 Some(vec![Some(3), Some(4)]),
1290 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1291 ])
1292 .slice(1, 1);
1293 let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1294 Some(vec![Some(10), Some(11)]),
1295 Some(vec![Some(12), Some(13)]),
1296 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1297 ])
1298 .slice(1, 1);
1299
1300 let batch = RecordBatch::try_new(
1301 Arc::new(arrow_schema.clone()),
1302 vec![Arc::new(list_array), Arc::new(large_list_array)],
1303 )
1304 .unwrap();
1305
1306 let schema: Schema = (&arrow_schema).try_into().unwrap();
1307 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1308 &store,
1309 &path,
1310 schema.clone(),
1311 &Default::default(),
1312 )
1313 .await
1314 .unwrap();
1315 file_writer
1316 .write(std::slice::from_ref(&batch))
1317 .await
1318 .unwrap();
1319 file_writer.finish().await.unwrap();
1320
1321 let file_size_bytes = store.size(&path).await.unwrap();
1323 assert!(file_size_bytes < 1_000);
1324
1325 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1326 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1327 assert_eq!(batch, actual_batch);
1328 }
1329
1330 #[tokio::test]
1331 async fn test_read_ranges() {
1332 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1334 let schema = Schema::try_from(&arrow_schema).unwrap();
1335 let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1336 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1337
1338 let store = ObjectStore::memory();
1340 let path = Path::from("/read_range");
1341 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1342 &store,
1343 &path,
1344 schema.clone(),
1345 &Default::default(),
1346 )
1347 .await
1348 .unwrap();
1349 file_writer.write(&[batch]).await.unwrap();
1350 file_writer.finish().await.unwrap();
1351
1352 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1353 let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1354
1355 assert_eq!(
1356 actual_batch.column_by_name("i").unwrap().as_ref(),
1357 &Int64Array::from_iter_values(7..25)
1358 );
1359 }
1360
1361 #[tokio::test]
1362 async fn test_batches_stream() {
1363 let store = ObjectStore::memory();
1364 let path = Path::from("/batch_stream");
1365
1366 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1367 let schema = Schema::try_from(&arrow_schema).unwrap();
1368 let mut writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1369 &store,
1370 &path,
1371 schema.clone(),
1372 &Default::default(),
1373 )
1374 .await
1375 .unwrap();
1376 for i in 0..10 {
1377 let batch = RecordBatch::try_new(
1378 Arc::new(arrow_schema.clone()),
1379 vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1380 )
1381 .unwrap();
1382 writer.write(&[batch]).await.unwrap();
1383 }
1384 writer.finish().await.unwrap();
1385
1386 let reader = FileReader::try_new(&store, &path, schema.clone())
1387 .await
1388 .unwrap();
1389 let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1390 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1391
1392 assert_eq!(batches.len(), 5);
1393 for (i, batch) in batches.iter().enumerate() {
1394 assert_eq!(
1395 batch,
1396 &RecordBatch::try_new(
1397 Arc::new(arrow_schema.clone()),
1398 vec![Arc::new(Int32Array::from_iter_values(
1399 i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1400 ))],
1401 )
1402 .unwrap()
1403 )
1404 }
1405 }
1406
1407 #[tokio::test]
1408 async fn test_take_boolean_beyond_chunk() {
1409 let store = ObjectStore::from_uri_and_params(
1410 Arc::new(Default::default()),
1411 "memory://",
1412 &ObjectStoreParams {
1413 block_size: Some(256),
1414 ..Default::default()
1415 },
1416 )
1417 .await
1418 .unwrap()
1419 .0;
1420 let path = Path::from("/take_bools");
1421
1422 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1423 "b",
1424 DataType::Boolean,
1425 false,
1426 )]));
1427 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1428 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1429 &store,
1430 &path,
1431 schema.clone(),
1432 &Default::default(),
1433 )
1434 .await
1435 .unwrap();
1436
1437 let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1438 let batch =
1439 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1440 file_writer.write(&[batch]).await.unwrap();
1441 file_writer.finish().await.unwrap();
1442
1443 let reader = FileReader::try_new(&store, &path, schema.clone())
1444 .await
1445 .unwrap();
1446 let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1447
1448 assert_eq!(
1449 actual.column_by_name("b").unwrap().as_ref(),
1450 &BooleanArray::from(vec![false, false, true, false, true])
1451 );
1452 }
1453
1454 #[tokio::test]
1455 async fn test_read_projection() {
1456 let store = ObjectStore::memory();
1460 let path = Path::from("/partial_read");
1461
1462 let mut fields = vec![];
1464 for i in 0..100 {
1465 fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1466 }
1467 let arrow_schema = ArrowSchema::new(fields);
1468 let schema = Schema::try_from(&arrow_schema).unwrap();
1469
1470 let partial_schema = schema.project(&["f50"]).unwrap();
1471 let partial_arrow: ArrowSchema = (&partial_schema).into();
1472
1473 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1474 &store,
1475 &path,
1476 partial_schema.clone(),
1477 &Default::default(),
1478 )
1479 .await
1480 .unwrap();
1481
1482 let array = Int32Array::from(vec![0; 15]);
1483 let batch =
1484 RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1485 file_writer
1486 .write(std::slice::from_ref(&batch))
1487 .await
1488 .unwrap();
1489 file_writer.finish().await.unwrap();
1490
1491 let field_id = partial_schema.fields.first().unwrap().id;
1492 let reader = FileReader::try_new_with_fragment_id(
1493 &store,
1494 &path,
1495 schema.clone(),
1496 0,
1497 field_id,
1498 field_id,
1499 None,
1500 )
1501 .await
1502 .unwrap();
1503 let actual = reader
1504 .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1505 .await
1506 .unwrap();
1507
1508 assert_eq!(actual, batch);
1509 }
1510}