1use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12 ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
13 RecordBatch, StructArray, UInt32Array,
14 builder::PrimitiveBuilder,
15 cast::AsArray,
16 types::{Int32Type, Int64Type},
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{Future, FutureExt, StreamExt, TryStreamExt, stream};
24use lance_arrow::*;
25use lance_core::cache::{CacheKey, LanceCache};
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::AsyncIndex;
29use lance_io::encodings::dictionary::DictionaryDecoder;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33 read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{ReadBatchParams, object_store::ObjectStore};
36use std::borrow::Cow;
37
38use object_store::path::Path;
39use tracing::instrument;
40
41use crate::previous::format::metadata::Metadata;
42use crate::previous::page_table::{PageInfo, PageTable};
43
44#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49 pub object_reader: Arc<dyn Reader>,
50 metadata: Arc<Metadata>,
51 page_table: Arc<PageTable>,
52 schema: Schema,
53
54 fragment_id: u64,
57
58 stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("FileReader")
65 .field("fragment", &self.fragment_id)
66 .field("path", &self.object_reader.path())
67 .finish()
68 }
69}
70
71struct StringCacheKey<'a, T> {
73 key: &'a str,
74 _phantom: std::marker::PhantomData<T>,
75}
76
77impl<'a, T> StringCacheKey<'a, T> {
78 fn new(key: &'a str) -> Self {
79 Self {
80 key,
81 _phantom: std::marker::PhantomData,
82 }
83 }
84}
85
86impl<T: 'static> CacheKey for StringCacheKey<'_, T> {
87 type ValueType = T;
88
89 fn key(&self) -> Cow<'_, str> {
90 self.key.into()
91 }
92
93 fn type_name() -> &'static str {
94 std::any::type_name::<T>()
98 }
99}
100
101impl FileReader {
102 #[instrument(level = "debug", skip(object_store, schema, session))]
115 pub async fn try_new_with_fragment_id(
116 object_store: &ObjectStore,
117 path: &Path,
118 schema: Schema,
119 fragment_id: u32,
120 field_id_offset: i32,
121 max_field_id: i32,
122 session: Option<&LanceCache>,
123 ) -> Result<Self> {
124 let object_reader = object_store.open(path).await?;
125
126 let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
127
128 Self::try_new_from_reader(
129 path,
130 object_reader.into(),
131 Some(metadata),
132 schema,
133 fragment_id,
134 field_id_offset,
135 max_field_id,
136 session,
137 )
138 .await
139 }
140
141 #[allow(clippy::too_many_arguments)]
142 pub async fn try_new_from_reader(
143 path: &Path,
144 object_reader: Arc<dyn Reader>,
145 metadata: Option<Arc<Metadata>>,
146 schema: Schema,
147 fragment_id: u32,
148 field_id_offset: i32,
149 max_field_id: i32,
150 session: Option<&LanceCache>,
151 ) -> Result<Self> {
152 let metadata = match metadata {
153 Some(metadata) => metadata,
154 None => Self::read_metadata(object_reader.as_ref(), session).await?,
155 };
156
157 let page_table = async {
158 Self::load_from_cache(session, path.to_string(), |_| async {
159 PageTable::load(
160 object_reader.as_ref(),
161 metadata.page_table_position,
162 field_id_offset,
163 max_field_id,
164 metadata.num_batches() as i32,
165 )
166 .await
167 })
168 .await
169 };
170
171 let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
172
173 let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
175
176 Ok(Self {
177 object_reader,
178 metadata,
179 schema,
180 page_table,
181 fragment_id: fragment_id as u64,
182 stats_page_table,
183 })
184 }
185
186 pub async fn read_metadata(
187 object_reader: &dyn Reader,
188 cache: Option<&LanceCache>,
189 ) -> Result<Arc<Metadata>> {
190 Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
191 let file_size = object_reader.size().await?;
192 let begin = if file_size < object_reader.block_size() {
193 0
194 } else {
195 file_size - object_reader.block_size()
196 };
197 let tail_bytes = object_reader.get_range(begin..file_size).await?;
198 let metadata_pos = read_metadata_offset(&tail_bytes)?;
199
200 let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
201 read_struct(object_reader, metadata_pos).await?
203 } else {
204 let offset = tail_bytes.len() - (file_size - metadata_pos);
205 read_struct_from_buf(&tail_bytes.slice(offset..))?
206 };
207 Ok(metadata)
208 })
209 .await
210 }
211
212 async fn read_stats_page_table(
216 reader: &dyn Reader,
217 cache: Option<&LanceCache>,
218 ) -> Result<Arc<Option<PageTable>>> {
219 Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async {
221 let metadata = Self::read_metadata(reader, cache).await?;
222
223 if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
224 Ok(Some(
225 PageTable::load(
226 reader,
227 stats_meta.page_table_position,
228 0,
229 *stats_meta.leaf_field_ids.iter().max().unwrap(),
230 1,
231 )
232 .await?,
233 ))
234 } else {
235 Ok(None)
236 }
237 })
238 .await
239 }
240
241 async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
243 cache: Option<&LanceCache>,
244 key: String,
245 loader: F,
246 ) -> Result<Arc<T>>
247 where
248 F: Fn(&str) -> Fut + Send + Sync,
249 Fut: Future<Output = Result<T>> + Send,
250 {
251 if let Some(cache) = cache {
252 let cache_key = StringCacheKey::<T>::new(key.as_str());
253 cache
254 .get_or_insert_with_key(cache_key, || loader(key.as_str()))
255 .await
256 } else {
257 Ok(Arc::new(loader(key.as_str()).await?))
258 }
259 }
260
261 pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
263 let max_field_id = schema.max_field_id().unwrap_or_default();
265 Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
266 }
267
268 fn io_parallelism(&self) -> usize {
269 self.object_reader.io_parallelism()
270 }
271
272 pub fn schema(&self) -> &Schema {
274 &self.schema
275 }
276
277 pub fn num_batches(&self) -> usize {
278 self.metadata.num_batches()
279 }
280
281 pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
283 self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
284 }
285
286 pub fn len(&self) -> usize {
288 self.metadata.len()
289 }
290
291 pub fn is_empty(&self) -> bool {
292 self.metadata.is_empty()
293 }
294
295 #[instrument(level = "debug", skip(self, params, projection))]
299 pub async fn read_batch(
300 &self,
301 batch_id: i32,
302 params: impl Into<ReadBatchParams>,
303 projection: &Schema,
304 ) -> Result<RecordBatch> {
305 read_batch(self, ¶ms.into(), projection, batch_id).await
306 }
307
308 #[instrument(level = "debug", skip(self, projection))]
313 pub async fn read_range(
314 &self,
315 range: Range<usize>,
316 projection: &Schema,
317 ) -> Result<RecordBatch> {
318 if range.is_empty() {
319 return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
320 }
321 let range_in_batches = self.metadata.range_to_batches(range)?;
322 let batches =
323 stream::iter(range_in_batches)
324 .map(|(batch_id, range)| async move {
325 self.read_batch(batch_id, range, projection).await
326 })
327 .buffered(self.io_parallelism())
328 .try_collect::<Vec<_>>()
329 .await?;
330 if batches.len() == 1 {
331 return Ok(batches[0].clone());
332 }
333 let schema = batches[0].schema();
334 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
335 }
336
337 #[instrument(level = "debug", skip_all)]
341 pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
342 let num_batches = self.num_batches();
343 let num_rows = self.len() as u32;
344 let indices_in_batches = self.metadata.group_indices_to_batches(indices);
345 let batches = stream::iter(indices_in_batches)
346 .map(|batch| async move {
347 if batch.batch_id >= num_batches as i32 {
348 Err(Error::invalid_input_source(
349 format!("batch_id: {} out of bounds", batch.batch_id).into(),
350 ))
351 } else if *batch.offsets.last().expect("got empty batch") > num_rows {
352 Err(Error::invalid_input_source(
353 format!("indices: {:?} out of bounds", batch.offsets).into(),
354 ))
355 } else {
356 self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
357 .await
358 }
359 })
360 .buffered(self.io_parallelism())
361 .try_collect::<Vec<_>>()
362 .await?;
363
364 let schema = Arc::new(ArrowSchema::from(projection));
365
366 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
367 }
368
369 pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
371 self.metadata.stats_metadata.as_ref().map(|meta| {
372 let mut stats_field_ids = vec![];
373 for stats_field in &meta.schema.fields {
374 if let Ok(stats_field_id) = stats_field.name.parse::<i32>()
375 && field_ids.contains(&stats_field_id)
376 {
377 stats_field_ids.push(stats_field.id);
378 for child in &stats_field.children {
379 stats_field_ids.push(child.id);
380 }
381 }
382 }
383 meta.schema.project_by_ids(&stats_field_ids, true)
384 })
385 }
386
387 pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
389 if let Some(stats_page_table) = self.stats_page_table.as_ref() {
390 let projection = self.page_stats_schema(field_ids).unwrap();
391 if projection.fields.is_empty() {
393 return Ok(None);
394 }
395 let arrays = futures::stream::iter(projection.fields.iter().cloned())
396 .map(|field| async move {
397 read_array(
398 self,
399 &field,
400 0,
401 stats_page_table,
402 &ReadBatchParams::RangeFull,
403 )
404 .await
405 })
406 .buffered(self.io_parallelism())
407 .try_collect::<Vec<_>>()
408 .await?;
409
410 let schema = ArrowSchema::from(&projection);
411 let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
412 Ok(Some(batch))
413 } else {
414 Ok(None)
415 }
416 }
417}
418
419pub fn batches_stream(
430 reader: FileReader,
431 projection: Schema,
432 predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
433) -> impl RecordBatchStream {
434 let projection = Arc::new(projection);
436 let arrow_schema = ArrowSchema::from(projection.as_ref());
437
438 let total_batches = reader.num_batches() as i32;
439 let batches = (0..total_batches).filter(predicate);
440 let this = Arc::new(reader);
442 let inner = stream::iter(batches)
443 .zip(stream::repeat_with(move || {
444 (this.clone(), projection.clone())
445 }))
446 .map(move |(batch_id, (reader, projection))| async move {
447 reader
448 .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
449 .await
450 })
451 .buffered(2)
452 .boxed();
453 RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
454}
455
456pub async fn read_batch(
461 reader: &FileReader,
462 params: &ReadBatchParams,
463 schema: &Schema,
464 batch_id: i32,
465) -> Result<RecordBatch> {
466 if !schema.fields.is_empty() {
467 let arrs = stream::iter(&schema.fields)
469 .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
470 .buffered(reader.io_parallelism())
471 .try_collect::<Vec<_>>()
472 .boxed();
473 let arrs = arrs.await?;
474 Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
475 } else {
476 Err(Error::invalid_input("no fields requested"))
477 }
478}
479
480#[async_recursion]
481async fn read_array(
482 reader: &FileReader,
483 field: &Field,
484 batch_id: i32,
485 page_table: &PageTable,
486 params: &ReadBatchParams,
487) -> Result<ArrayRef> {
488 let data_type = field.data_type();
489
490 use DataType::*;
491
492 if data_type.is_fixed_stride() {
493 _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
494 } else {
495 match data_type {
496 Null => read_null_array(field, batch_id, page_table, params),
497 Utf8 | LargeUtf8 | Binary | LargeBinary => {
498 read_binary_array(reader, field, batch_id, page_table, params).await
499 }
500 Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
501 Dictionary(_, _) => {
502 read_dictionary_array(reader, field, batch_id, page_table, params).await
503 }
504 List(_) => {
505 read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
506 }
507 LargeList(_) => {
508 read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
509 }
510 _ => {
511 unimplemented!("{}", format!("No support for {data_type} yet"));
512 }
513 }
514 }
515}
516
517fn get_page_info<'a>(
518 page_table: &'a PageTable,
519 field: &'a Field,
520 batch_id: i32,
521) -> Result<&'a PageInfo> {
522 page_table.get(field.id, batch_id).ok_or_else(|| {
523 Error::invalid_input(format!(
524 "No page info found for field: {}, field_id={} batch={}",
525 field.name, field.id, batch_id
526 ))
527 })
528}
529
530async fn _read_fixed_stride_array(
532 reader: &FileReader,
533 field: &Field,
534 batch_id: i32,
535 page_table: &PageTable,
536 params: &ReadBatchParams,
537) -> Result<ArrayRef> {
538 let page_info = get_page_info(page_table, field, batch_id)?;
539 read_fixed_stride_array(
540 reader.object_reader.as_ref(),
541 &field.data_type(),
542 page_info.position,
543 page_info.length,
544 params.clone(),
545 )
546 .await
547}
548
549fn read_null_array(
550 field: &Field,
551 batch_id: i32,
552 page_table: &PageTable,
553 params: &ReadBatchParams,
554) -> Result<ArrayRef> {
555 let page_info = get_page_info(page_table, field, batch_id)?;
556
557 let length_output = match params {
558 ReadBatchParams::Indices(indices) => {
559 if indices.is_empty() {
560 0
561 } else {
562 let idx_max = *indices.values().iter().max().unwrap() as u64;
563 if idx_max >= page_info.length as u64 {
564 return Err(Error::invalid_input(format!(
565 "NullArray Reader: request([{}]) out of range: [0..{}]",
566 idx_max, page_info.length
567 )));
568 }
569 indices.len()
570 }
571 }
572 _ => {
573 let (idx_start, idx_end) = match params {
574 ReadBatchParams::Range(r) => (r.start, r.end),
575 ReadBatchParams::RangeFull => (0, page_info.length),
576 ReadBatchParams::RangeTo(r) => (0, r.end),
577 ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
578 _ => unreachable!(),
579 };
580 if idx_end > page_info.length {
581 return Err(Error::invalid_input(format!(
582 "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
583 idx_start,
585 idx_end,
586 page_info.length
587 )));
588 }
589 idx_end - idx_start
590 }
591 };
592
593 Ok(Arc::new(NullArray::new(length_output)))
594}
595
596async fn read_binary_array(
597 reader: &FileReader,
598 field: &Field,
599 batch_id: i32,
600 page_table: &PageTable,
601 params: &ReadBatchParams,
602) -> Result<ArrayRef> {
603 let page_info = get_page_info(page_table, field, batch_id)?;
604
605 lance_io::utils::read_binary_array(
606 reader.object_reader.as_ref(),
607 &field.data_type(),
608 field.nullable,
609 page_info.position,
610 page_info.length,
611 params,
612 )
613 .await
614}
615
616async fn read_dictionary_array(
617 reader: &FileReader,
618 field: &Field,
619 batch_id: i32,
620 page_table: &PageTable,
621 params: &ReadBatchParams,
622) -> Result<ArrayRef> {
623 let page_info = get_page_info(page_table, field, batch_id)?;
624 let data_type = field.data_type();
625 let decoder = DictionaryDecoder::new(
626 reader.object_reader.as_ref(),
627 page_info.position,
628 page_info.length,
629 &data_type,
630 field
631 .dictionary
632 .as_ref()
633 .unwrap()
634 .values
635 .as_ref()
636 .unwrap()
637 .clone(),
638 );
639 decoder.get(params.clone()).await
640}
641
642async fn read_struct_array(
643 reader: &FileReader,
644 field: &Field,
645 batch_id: i32,
646 page_table: &PageTable,
647 params: &ReadBatchParams,
648) -> Result<ArrayRef> {
649 let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
651
652 for child in field.children.as_slice() {
653 let arr = read_array(reader, child, batch_id, page_table, params).await?;
654 sub_arrays.push((Arc::new(child.into()), arr));
655 }
656
657 Ok(Arc::new(StructArray::from(sub_arrays)))
658}
659
660async fn take_list_array<T: ArrowNumericType>(
661 reader: &FileReader,
662 field: &Field,
663 batch_id: i32,
664 page_table: &PageTable,
665 positions: &PrimitiveArray<T>,
666 indices: &UInt32Array,
667) -> Result<ArrayRef>
668where
669 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
670{
671 let first_idx = indices.value(0);
672 let ranges = indices
674 .values()
675 .iter()
676 .map(|i| (*i - first_idx).as_usize())
677 .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
678 .collect::<Vec<_>>();
679 let field = field.clone();
680 let mut list_values: Vec<ArrayRef> = vec![];
681 for range in ranges.iter() {
683 list_values.push(
684 read_array(
685 reader,
686 &field.children[0],
687 batch_id,
688 page_table,
689 &(range.clone()).into(),
690 )
691 .await?,
692 );
693 }
694
695 let value_refs = list_values
696 .iter()
697 .map(|arr| arr.as_ref())
698 .collect::<Vec<_>>();
699 let mut offsets_builder = PrimitiveBuilder::<T>::new();
700 offsets_builder.append_value(T::Native::usize_as(0));
701 let mut off = 0_usize;
702 for range in ranges {
703 off += range.len();
704 offsets_builder.append_value(T::Native::usize_as(off));
705 }
706 let all_values = concat::concat(value_refs.as_slice())?;
707 let offset_arr = offsets_builder.finish();
708 let arr = try_new_generic_list_array(all_values, &offset_arr)?;
709 Ok(Arc::new(arr) as ArrayRef)
710}
711
712async fn read_list_array<T: ArrowNumericType>(
713 reader: &FileReader,
714 field: &Field,
715 batch_id: i32,
716 page_table: &PageTable,
717 params: &ReadBatchParams,
718) -> Result<ArrayRef>
719where
720 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
721{
722 let positions_params = match params {
724 ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
725 ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
726 ReadBatchParams::Indices(indices) => {
727 (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
728 }
729 p => p.clone(),
730 };
731
732 let page_info = get_page_info(&reader.page_table, field, batch_id)?;
733 let position_arr = read_fixed_stride_array(
734 reader.object_reader.as_ref(),
735 &T::DATA_TYPE,
736 page_info.position,
737 page_info.length,
738 positions_params,
739 )
740 .await?;
741
742 let positions: &PrimitiveArray<T> = position_arr.as_primitive();
743
744 let value_params = match params {
746 ReadBatchParams::Range(range) => ReadBatchParams::from(
747 positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
748 ),
749 ReadBatchParams::Ranges(_) => {
750 return Err(Error::internal(
751 "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
752 ));
753 }
754 ReadBatchParams::RangeTo(RangeTo { end }) => {
755 ReadBatchParams::from(..positions.value(*end).as_usize())
756 }
757 ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
758 ReadBatchParams::RangeFull => ReadBatchParams::from(
759 positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
760 ),
761 ReadBatchParams::Indices(indices) => {
762 return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
763 }
764 };
765
766 let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
767 let offset_arr = sub(positions, &start_position)?;
768 let offset_arr_ref = offset_arr.as_primitive::<T>();
769 let value_arrs = read_array(
770 reader,
771 &field.children[0],
772 batch_id,
773 page_table,
774 &value_params,
775 )
776 .await?;
777 let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
778 Ok(Arc::new(arr) as ArrayRef)
779}
780
781#[cfg(test)]
782mod tests {
783 use crate::previous::writer::{FileWriter as PreviousFileWriter, NotSelfDescribing};
784
785 use super::*;
786
787 use arrow_array::{
788 Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
789 UInt8Array,
790 builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
791 cast::{as_string_array, as_struct_array},
792 types::UInt8Type,
793 };
794 use arrow_array::{BooleanArray, Int32Array};
795 use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
796 use lance_io::object_store::ObjectStoreParams;
797
798 #[tokio::test]
799 async fn test_take() {
800 let arrow_schema = ArrowSchema::new(vec![
801 ArrowField::new("i", DataType::Int64, true),
802 ArrowField::new("f", DataType::Float32, false),
803 ArrowField::new("s", DataType::Utf8, false),
804 ArrowField::new(
805 "d",
806 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
807 false,
808 ),
809 ]);
810 let mut schema = Schema::try_from(&arrow_schema).unwrap();
811
812 let store = ObjectStore::memory();
813 let path = Path::from("/take_test");
814
815 let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
817 let values_ref = Arc::new(values);
818 let mut batches = vec![];
819 for batch_id in 0..10 {
820 let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
821 let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
822 let columns: Vec<ArrayRef> = vec![
823 Arc::new(Int64Array::from_iter(
824 value_range.clone().collect::<Vec<_>>(),
825 )),
826 Arc::new(Float32Array::from_iter(
827 value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
828 )),
829 Arc::new(StringArray::from_iter_values(
830 value_range.clone().map(|n| format!("str-{}", n)),
831 )),
832 Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
833 ];
834 batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
835 }
836 schema.set_dictionary(&batches[0]).unwrap();
837
838 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
839 &store,
840 &path,
841 schema.clone(),
842 &Default::default(),
843 )
844 .await
845 .unwrap();
846 for batch in batches.iter() {
847 file_writer
848 .write(std::slice::from_ref(batch))
849 .await
850 .unwrap();
851 }
852 file_writer.finish().await.unwrap();
853
854 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
855 let batch = reader
856 .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
857 .await
858 .unwrap();
859 let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
860 assert_eq!(
861 batch,
862 RecordBatch::try_new(
863 batch.schema(),
864 vec![
865 Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
866 Arc::new(Float32Array::from_iter_values([
867 1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
868 ])),
869 Arc::new(StringArray::from_iter_values([
870 "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
871 ])),
872 Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
873 ]
874 )
875 .unwrap()
876 );
877 }
878
879 async fn test_write_null_string_in_struct(field_nullable: bool) {
880 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
881 "parent",
882 DataType::Struct(ArrowFields::from(vec![ArrowField::new(
883 "str",
884 DataType::Utf8,
885 field_nullable,
886 )])),
887 true,
888 )]));
889
890 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
891
892 let store = ObjectStore::memory();
893 let path = Path::from("/null_strings");
894
895 let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
896 let struct_arr = Arc::new(StructArray::from(vec![(
897 Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
898 string_arr.clone() as ArrayRef,
899 )]));
900 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
901
902 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
903 &store,
904 &path,
905 schema.clone(),
906 &Default::default(),
907 )
908 .await
909 .unwrap();
910 file_writer
911 .write(std::slice::from_ref(&batch))
912 .await
913 .unwrap();
914 file_writer.finish().await.unwrap();
915
916 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
917 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
918
919 if field_nullable {
920 assert_eq!(
921 &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
922 as_string_array(
923 as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
924 .column_by_name("str")
925 .unwrap()
926 .as_ref()
927 )
928 );
929 } else {
930 assert_eq!(actual_batch, batch);
931 }
932 }
933
934 #[tokio::test]
935 async fn read_nullable_string_in_struct() {
936 test_write_null_string_in_struct(true).await;
937 test_write_null_string_in_struct(false).await;
938 }
939
940 #[tokio::test]
941 async fn test_read_struct_of_list_arrays() {
942 let store = ObjectStore::memory();
943 let path = Path::from("/null_strings");
944
945 let arrow_schema = make_schema_of_list_array();
946 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
947
948 let batches = (0..3)
949 .map(|_| {
950 let struct_array = make_struct_of_list_array(10, 10);
951 RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
952 })
953 .collect::<Vec<_>>();
954 let batches_ref = batches.iter().collect::<Vec<_>>();
955
956 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
957 &store,
958 &path,
959 schema.clone(),
960 &Default::default(),
961 )
962 .await
963 .unwrap();
964 file_writer.write(&batches).await.unwrap();
965 file_writer.finish().await.unwrap();
966
967 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
968 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
969 let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
970 assert_eq!(expected, actual_batch);
971 }
972
973 #[tokio::test]
974 async fn test_scan_struct_of_list_arrays() {
975 let store = ObjectStore::memory();
976 let path = Path::from("/null_strings");
977
978 let arrow_schema = make_schema_of_list_array();
979 let struct_array = make_struct_of_list_array(3, 10);
980 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
981 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
982
983 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
984 &store,
985 &path,
986 schema.clone(),
987 &Default::default(),
988 )
989 .await
990 .unwrap();
991 file_writer.write(&[batch]).await.unwrap();
992 file_writer.finish().await.unwrap();
993
994 let mut expected_columns: Vec<ArrayRef> = Vec::new();
995 for c in struct_array.columns().iter() {
996 expected_columns.push(c.slice(1, 1));
997 }
998
999 let expected_struct = match arrow_schema.fields[0].data_type() {
1000 DataType::Struct(subfields) => subfields
1001 .iter()
1002 .zip(expected_columns)
1003 .map(|(f, d)| (f.clone(), d))
1004 .collect::<Vec<_>>(),
1005 _ => panic!("unexpected field"),
1006 };
1007
1008 let expected_struct_array = StructArray::from(expected_struct);
1009 let expected_batch = RecordBatch::from(&StructArray::from(vec![(
1010 Arc::new(arrow_schema.fields[0].as_ref().clone()),
1011 Arc::new(expected_struct_array) as ArrayRef,
1012 )]));
1013
1014 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1015 let params = ReadBatchParams::Range(1..2);
1016 let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
1017 assert_eq!(expected_batch, slice_of_batch);
1018 }
1019
1020 fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
1021 Arc::new(ArrowSchema::new(vec![ArrowField::new(
1022 "s",
1023 DataType::Struct(ArrowFields::from(vec![
1024 ArrowField::new(
1025 "li",
1026 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1027 true,
1028 ),
1029 ArrowField::new(
1030 "ls",
1031 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1032 true,
1033 ),
1034 ArrowField::new(
1035 "ll",
1036 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1037 false,
1038 ),
1039 ])),
1040 true,
1041 )]))
1042 }
1043
1044 fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1045 let mut li_builder = ListBuilder::new(Int32Builder::new());
1046 let mut ls_builder = ListBuilder::new(StringBuilder::new());
1047 let ll_value_builder = Int32Builder::new();
1048 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1049 for i in 0..rows {
1050 for j in 0..num_items {
1051 li_builder.values().append_value(i * 10 + j);
1052 ls_builder
1053 .values()
1054 .append_value(format!("str-{}", i * 10 + j));
1055 large_list_builder.values().append_value(i * 10 + j);
1056 }
1057 li_builder.append(true);
1058 ls_builder.append(true);
1059 large_list_builder.append(true);
1060 }
1061 Arc::new(StructArray::from(vec![
1062 (
1063 Arc::new(ArrowField::new(
1064 "li",
1065 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1066 true,
1067 )),
1068 Arc::new(li_builder.finish()) as ArrayRef,
1069 ),
1070 (
1071 Arc::new(ArrowField::new(
1072 "ls",
1073 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1074 true,
1075 )),
1076 Arc::new(ls_builder.finish()) as ArrayRef,
1077 ),
1078 (
1079 Arc::new(ArrowField::new(
1080 "ll",
1081 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1082 false,
1083 )),
1084 Arc::new(large_list_builder.finish()) as ArrayRef,
1085 ),
1086 ]))
1087 }
1088
1089 #[tokio::test]
1090 async fn test_read_nullable_arrays() {
1091 use arrow_array::Array;
1092
1093 let arrow_schema = ArrowSchema::new(vec![
1095 ArrowField::new("i", DataType::Int64, false),
1096 ArrowField::new("n", DataType::Null, true),
1097 ]);
1098 let schema = Schema::try_from(&arrow_schema).unwrap();
1099 let columns: Vec<ArrayRef> = vec![
1100 Arc::new(Int64Array::from_iter_values(0..100)),
1101 Arc::new(NullArray::new(100)),
1102 ];
1103 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1104
1105 let store = ObjectStore::memory();
1107 let path = Path::from("/takes");
1108 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1109 &store,
1110 &path,
1111 schema.clone(),
1112 &Default::default(),
1113 )
1114 .await
1115 .unwrap();
1116 file_writer.write(&[batch]).await.unwrap();
1117 file_writer.finish().await.unwrap();
1118
1119 let reader = FileReader::try_new(&store, &path, schema.clone())
1121 .await
1122 .unwrap();
1123
1124 async fn read_array_w_params(
1125 reader: &FileReader,
1126 field: &Field,
1127 params: ReadBatchParams,
1128 ) -> ArrayRef {
1129 read_array(reader, field, 0, reader.page_table.as_ref(), ¶ms)
1130 .await
1131 .expect("Error reading back the null array from file") as _
1132 }
1133
1134 let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1135 assert_eq!(100, arr.len());
1136 assert_eq!(arr.data_type(), &DataType::Null);
1137
1138 let arr =
1139 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1140 assert_eq!(15, arr.len());
1141 assert_eq!(arr.data_type(), &DataType::Null);
1142
1143 let arr =
1144 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1145 assert_eq!(40, arr.len());
1146 assert_eq!(arr.data_type(), &DataType::Null);
1147
1148 let arr =
1149 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1150 assert_eq!(25, arr.len());
1151 assert_eq!(arr.data_type(), &DataType::Null);
1152
1153 let arr = read_array_w_params(
1154 &reader,
1155 &schema.fields[1],
1156 ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1157 )
1158 .await;
1159 assert_eq!(4, arr.len());
1160 assert_eq!(arr.data_type(), &DataType::Null);
1161
1162 let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1164 let arr = read_array(
1165 &reader,
1166 &schema.fields[1],
1167 0,
1168 reader.page_table.as_ref(),
1169 ¶ms,
1170 );
1171 assert!(arr.await.is_err());
1172
1173 let params = ReadBatchParams::RangeTo(..107);
1175 let arr = read_array(
1176 &reader,
1177 &schema.fields[1],
1178 0,
1179 reader.page_table.as_ref(),
1180 ¶ms,
1181 );
1182 assert!(arr.await.is_err());
1183 }
1184
1185 #[tokio::test]
1186 async fn test_take_lists() {
1187 let arrow_schema = ArrowSchema::new(vec![
1188 ArrowField::new(
1189 "l",
1190 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1191 false,
1192 ),
1193 ArrowField::new(
1194 "ll",
1195 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1196 false,
1197 ),
1198 ]);
1199
1200 let value_builder = Int32Builder::new();
1201 let mut list_builder = ListBuilder::new(value_builder);
1202 let ll_value_builder = Int32Builder::new();
1203 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1204 for i in 0..100 {
1205 list_builder.values().append_value(i);
1206 large_list_builder.values().append_value(i);
1207 if (i + 1) % 10 == 0 {
1208 list_builder.append(true);
1209 large_list_builder.append(true);
1210 }
1211 }
1212 let list_arr = Arc::new(list_builder.finish());
1213 let large_list_arr = Arc::new(large_list_builder.finish());
1214
1215 let batch = RecordBatch::try_new(
1216 Arc::new(arrow_schema.clone()),
1217 vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1218 )
1219 .unwrap();
1220
1221 let store = ObjectStore::memory();
1223 let path = Path::from("/take_list");
1224 let schema: Schema = (&arrow_schema).try_into().unwrap();
1225 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1226 &store,
1227 &path,
1228 schema.clone(),
1229 &Default::default(),
1230 )
1231 .await
1232 .unwrap();
1233 file_writer.write(&[batch]).await.unwrap();
1234 file_writer.finish().await.unwrap();
1235
1236 let reader = FileReader::try_new(&store, &path, schema.clone())
1238 .await
1239 .unwrap();
1240 let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1241
1242 let value_builder = Int32Builder::new();
1243 let mut list_builder = ListBuilder::new(value_builder);
1244 let ll_value_builder = Int32Builder::new();
1245 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1246 for i in [1, 3, 5, 9] {
1247 for j in 0..10 {
1248 list_builder.values().append_value(i * 10 + j);
1249 large_list_builder.values().append_value(i * 10 + j);
1250 }
1251 list_builder.append(true);
1252 large_list_builder.append(true);
1253 }
1254 let expected_list = list_builder.finish();
1255 let expected_large_list = large_list_builder.finish();
1256
1257 assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1258 assert_eq!(
1259 actual.column_by_name("ll").unwrap().as_ref(),
1260 &expected_large_list
1261 );
1262 }
1263
1264 #[tokio::test]
1265 async fn test_list_array_with_offsets() {
1266 let arrow_schema = ArrowSchema::new(vec![
1267 ArrowField::new(
1268 "l",
1269 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1270 false,
1271 ),
1272 ArrowField::new(
1273 "ll",
1274 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1275 false,
1276 ),
1277 ]);
1278
1279 let store = ObjectStore::memory();
1280 let path = Path::from("/lists");
1281
1282 let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1283 Some(vec![Some(1), Some(2)]),
1284 Some(vec![Some(3), Some(4)]),
1285 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1286 ])
1287 .slice(1, 1);
1288 let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1289 Some(vec![Some(10), Some(11)]),
1290 Some(vec![Some(12), Some(13)]),
1291 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1292 ])
1293 .slice(1, 1);
1294
1295 let batch = RecordBatch::try_new(
1296 Arc::new(arrow_schema.clone()),
1297 vec![Arc::new(list_array), Arc::new(large_list_array)],
1298 )
1299 .unwrap();
1300
1301 let schema: Schema = (&arrow_schema).try_into().unwrap();
1302 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1303 &store,
1304 &path,
1305 schema.clone(),
1306 &Default::default(),
1307 )
1308 .await
1309 .unwrap();
1310 file_writer
1311 .write(std::slice::from_ref(&batch))
1312 .await
1313 .unwrap();
1314 file_writer.finish().await.unwrap();
1315
1316 let file_size_bytes = store.size(&path).await.unwrap();
1318 assert!(file_size_bytes < 1_000);
1319
1320 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1321 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1322 assert_eq!(batch, actual_batch);
1323 }
1324
1325 #[tokio::test]
1326 async fn test_read_ranges() {
1327 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1329 let schema = Schema::try_from(&arrow_schema).unwrap();
1330 let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1331 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1332
1333 let store = ObjectStore::memory();
1335 let path = Path::from("/read_range");
1336 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1337 &store,
1338 &path,
1339 schema.clone(),
1340 &Default::default(),
1341 )
1342 .await
1343 .unwrap();
1344 file_writer.write(&[batch]).await.unwrap();
1345 file_writer.finish().await.unwrap();
1346
1347 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1348 let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1349
1350 assert_eq!(
1351 actual_batch.column_by_name("i").unwrap().as_ref(),
1352 &Int64Array::from_iter_values(7..25)
1353 );
1354 }
1355
1356 #[tokio::test]
1357 async fn test_batches_stream() {
1358 let store = ObjectStore::memory();
1359 let path = Path::from("/batch_stream");
1360
1361 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1362 let schema = Schema::try_from(&arrow_schema).unwrap();
1363 let mut writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1364 &store,
1365 &path,
1366 schema.clone(),
1367 &Default::default(),
1368 )
1369 .await
1370 .unwrap();
1371 for i in 0..10 {
1372 let batch = RecordBatch::try_new(
1373 Arc::new(arrow_schema.clone()),
1374 vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1375 )
1376 .unwrap();
1377 writer.write(&[batch]).await.unwrap();
1378 }
1379 writer.finish().await.unwrap();
1380
1381 let reader = FileReader::try_new(&store, &path, schema.clone())
1382 .await
1383 .unwrap();
1384 let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1385 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1386
1387 assert_eq!(batches.len(), 5);
1388 for (i, batch) in batches.iter().enumerate() {
1389 assert_eq!(
1390 batch,
1391 &RecordBatch::try_new(
1392 Arc::new(arrow_schema.clone()),
1393 vec![Arc::new(Int32Array::from_iter_values(
1394 i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1395 ))],
1396 )
1397 .unwrap()
1398 )
1399 }
1400 }
1401
1402 #[tokio::test]
1403 async fn test_take_boolean_beyond_chunk() {
1404 let store = ObjectStore::from_uri_and_params(
1405 Arc::new(Default::default()),
1406 "memory://",
1407 &ObjectStoreParams {
1408 block_size: Some(256),
1409 ..Default::default()
1410 },
1411 )
1412 .await
1413 .unwrap()
1414 .0;
1415 let path = Path::from("/take_bools");
1416
1417 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1418 "b",
1419 DataType::Boolean,
1420 false,
1421 )]));
1422 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1423 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1424 &store,
1425 &path,
1426 schema.clone(),
1427 &Default::default(),
1428 )
1429 .await
1430 .unwrap();
1431
1432 let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1433 let batch =
1434 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1435 file_writer.write(&[batch]).await.unwrap();
1436 file_writer.finish().await.unwrap();
1437
1438 let reader = FileReader::try_new(&store, &path, schema.clone())
1439 .await
1440 .unwrap();
1441 let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1442
1443 assert_eq!(
1444 actual.column_by_name("b").unwrap().as_ref(),
1445 &BooleanArray::from(vec![false, false, true, false, true])
1446 );
1447 }
1448
1449 #[tokio::test]
1450 async fn test_read_projection() {
1451 let store = ObjectStore::memory();
1455 let path = Path::from("/partial_read");
1456
1457 let mut fields = vec![];
1459 for i in 0..100 {
1460 fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1461 }
1462 let arrow_schema = ArrowSchema::new(fields);
1463 let schema = Schema::try_from(&arrow_schema).unwrap();
1464
1465 let partial_schema = schema.project(&["f50"]).unwrap();
1466 let partial_arrow: ArrowSchema = (&partial_schema).into();
1467
1468 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1469 &store,
1470 &path,
1471 partial_schema.clone(),
1472 &Default::default(),
1473 )
1474 .await
1475 .unwrap();
1476
1477 let array = Int32Array::from(vec![0; 15]);
1478 let batch =
1479 RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1480 file_writer
1481 .write(std::slice::from_ref(&batch))
1482 .await
1483 .unwrap();
1484 file_writer.finish().await.unwrap();
1485
1486 let field_id = partial_schema.fields.first().unwrap().id;
1487 let reader = FileReader::try_new_with_fragment_id(
1488 &store,
1489 &path,
1490 schema.clone(),
1491 0,
1492 field_id,
1493 field_id,
1494 None,
1495 )
1496 .await
1497 .unwrap();
1498 let actual = reader
1499 .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1500 .await
1501 .unwrap();
1502
1503 assert_eq!(actual, batch);
1504 }
1505}