1use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12 ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
13 RecordBatch, StructArray, UInt32Array,
14 builder::PrimitiveBuilder,
15 cast::AsArray,
16 types::{Int32Type, Int64Type},
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{Future, FutureExt, StreamExt, TryStreamExt, stream};
24use lance_arrow::*;
25use lance_core::cache::{CacheKey, LanceCache};
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::AsyncIndex;
29use lance_io::encodings::dictionary::DictionaryDecoder;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33 read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{ReadBatchParams, object_store::ObjectStore};
36use std::borrow::Cow;
37
38use object_store::path::Path;
39use tracing::instrument;
40
41use crate::previous::format::metadata::Metadata;
42use crate::previous::page_table::{PageInfo, PageTable};
43
44#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49 pub object_reader: Arc<dyn Reader>,
50 metadata: Arc<Metadata>,
51 page_table: Arc<PageTable>,
52 schema: Schema,
53
54 fragment_id: u64,
57
58 stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("FileReader")
65 .field("fragment", &self.fragment_id)
66 .field("path", &self.object_reader.path())
67 .finish()
68 }
69}
70
71struct StringCacheKey<'a, T> {
73 key: &'a str,
74 _phantom: std::marker::PhantomData<T>,
75}
76
77impl<'a, T> StringCacheKey<'a, T> {
78 fn new(key: &'a str) -> Self {
79 Self {
80 key,
81 _phantom: std::marker::PhantomData,
82 }
83 }
84}
85
86impl<T> CacheKey for StringCacheKey<'_, T> {
87 type ValueType = T;
88
89 fn key(&self) -> Cow<'_, str> {
90 self.key.into()
91 }
92}
93
94impl FileReader {
95 #[instrument(level = "debug", skip(object_store, schema, session))]
108 pub async fn try_new_with_fragment_id(
109 object_store: &ObjectStore,
110 path: &Path,
111 schema: Schema,
112 fragment_id: u32,
113 field_id_offset: i32,
114 max_field_id: i32,
115 session: Option<&LanceCache>,
116 ) -> Result<Self> {
117 let object_reader = object_store.open(path).await?;
118
119 let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
120
121 Self::try_new_from_reader(
122 path,
123 object_reader.into(),
124 Some(metadata),
125 schema,
126 fragment_id,
127 field_id_offset,
128 max_field_id,
129 session,
130 )
131 .await
132 }
133
134 #[allow(clippy::too_many_arguments)]
135 pub async fn try_new_from_reader(
136 path: &Path,
137 object_reader: Arc<dyn Reader>,
138 metadata: Option<Arc<Metadata>>,
139 schema: Schema,
140 fragment_id: u32,
141 field_id_offset: i32,
142 max_field_id: i32,
143 session: Option<&LanceCache>,
144 ) -> Result<Self> {
145 let metadata = match metadata {
146 Some(metadata) => metadata,
147 None => Self::read_metadata(object_reader.as_ref(), session).await?,
148 };
149
150 let page_table = async {
151 Self::load_from_cache(session, path.to_string(), |_| async {
152 PageTable::load(
153 object_reader.as_ref(),
154 metadata.page_table_position,
155 field_id_offset,
156 max_field_id,
157 metadata.num_batches() as i32,
158 )
159 .await
160 })
161 .await
162 };
163
164 let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
165
166 let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
168
169 Ok(Self {
170 object_reader,
171 metadata,
172 schema,
173 page_table,
174 fragment_id: fragment_id as u64,
175 stats_page_table,
176 })
177 }
178
179 pub async fn read_metadata(
180 object_reader: &dyn Reader,
181 cache: Option<&LanceCache>,
182 ) -> Result<Arc<Metadata>> {
183 Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
184 let file_size = object_reader.size().await?;
185 let begin = if file_size < object_reader.block_size() {
186 0
187 } else {
188 file_size - object_reader.block_size()
189 };
190 let tail_bytes = object_reader.get_range(begin..file_size).await?;
191 let metadata_pos = read_metadata_offset(&tail_bytes)?;
192
193 let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
194 read_struct(object_reader, metadata_pos).await?
196 } else {
197 let offset = tail_bytes.len() - (file_size - metadata_pos);
198 read_struct_from_buf(&tail_bytes.slice(offset..))?
199 };
200 Ok(metadata)
201 })
202 .await
203 }
204
205 async fn read_stats_page_table(
209 reader: &dyn Reader,
210 cache: Option<&LanceCache>,
211 ) -> Result<Arc<Option<PageTable>>> {
212 Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async {
214 let metadata = Self::read_metadata(reader, cache).await?;
215
216 if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
217 Ok(Some(
218 PageTable::load(
219 reader,
220 stats_meta.page_table_position,
221 0,
222 *stats_meta.leaf_field_ids.iter().max().unwrap(),
223 1,
224 )
225 .await?,
226 ))
227 } else {
228 Ok(None)
229 }
230 })
231 .await
232 }
233
234 async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
236 cache: Option<&LanceCache>,
237 key: String,
238 loader: F,
239 ) -> Result<Arc<T>>
240 where
241 F: Fn(&str) -> Fut,
242 Fut: Future<Output = Result<T>> + Send,
243 {
244 if let Some(cache) = cache {
245 let cache_key = StringCacheKey::<T>::new(key.as_str());
246 cache
247 .get_or_insert_with_key(cache_key, || loader(key.as_str()))
248 .await
249 } else {
250 Ok(Arc::new(loader(key.as_str()).await?))
251 }
252 }
253
254 pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
256 let max_field_id = schema.max_field_id().unwrap_or_default();
258 Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
259 }
260
261 fn io_parallelism(&self) -> usize {
262 self.object_reader.io_parallelism()
263 }
264
265 pub fn schema(&self) -> &Schema {
267 &self.schema
268 }
269
270 pub fn num_batches(&self) -> usize {
271 self.metadata.num_batches()
272 }
273
274 pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
276 self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
277 }
278
279 pub fn len(&self) -> usize {
281 self.metadata.len()
282 }
283
284 pub fn is_empty(&self) -> bool {
285 self.metadata.is_empty()
286 }
287
288 #[instrument(level = "debug", skip(self, params, projection))]
292 pub async fn read_batch(
293 &self,
294 batch_id: i32,
295 params: impl Into<ReadBatchParams>,
296 projection: &Schema,
297 ) -> Result<RecordBatch> {
298 read_batch(self, ¶ms.into(), projection, batch_id).await
299 }
300
301 #[instrument(level = "debug", skip(self, projection))]
306 pub async fn read_range(
307 &self,
308 range: Range<usize>,
309 projection: &Schema,
310 ) -> Result<RecordBatch> {
311 if range.is_empty() {
312 return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
313 }
314 let range_in_batches = self.metadata.range_to_batches(range)?;
315 let batches =
316 stream::iter(range_in_batches)
317 .map(|(batch_id, range)| async move {
318 self.read_batch(batch_id, range, projection).await
319 })
320 .buffered(self.io_parallelism())
321 .try_collect::<Vec<_>>()
322 .await?;
323 if batches.len() == 1 {
324 return Ok(batches[0].clone());
325 }
326 let schema = batches[0].schema();
327 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
328 }
329
330 #[instrument(level = "debug", skip_all)]
334 pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
335 let num_batches = self.num_batches();
336 let num_rows = self.len() as u32;
337 let indices_in_batches = self.metadata.group_indices_to_batches(indices);
338 let batches = stream::iter(indices_in_batches)
339 .map(|batch| async move {
340 if batch.batch_id >= num_batches as i32 {
341 Err(Error::invalid_input_source(
342 format!("batch_id: {} out of bounds", batch.batch_id).into(),
343 ))
344 } else if *batch.offsets.last().expect("got empty batch") > num_rows {
345 Err(Error::invalid_input_source(
346 format!("indices: {:?} out of bounds", batch.offsets).into(),
347 ))
348 } else {
349 self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
350 .await
351 }
352 })
353 .buffered(self.io_parallelism())
354 .try_collect::<Vec<_>>()
355 .await?;
356
357 let schema = Arc::new(ArrowSchema::from(projection));
358
359 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
360 }
361
362 pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
364 self.metadata.stats_metadata.as_ref().map(|meta| {
365 let mut stats_field_ids = vec![];
366 for stats_field in &meta.schema.fields {
367 if let Ok(stats_field_id) = stats_field.name.parse::<i32>()
368 && field_ids.contains(&stats_field_id)
369 {
370 stats_field_ids.push(stats_field.id);
371 for child in &stats_field.children {
372 stats_field_ids.push(child.id);
373 }
374 }
375 }
376 meta.schema.project_by_ids(&stats_field_ids, true)
377 })
378 }
379
380 pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
382 if let Some(stats_page_table) = self.stats_page_table.as_ref() {
383 let projection = self.page_stats_schema(field_ids).unwrap();
384 if projection.fields.is_empty() {
386 return Ok(None);
387 }
388 let arrays = futures::stream::iter(projection.fields.iter().cloned())
389 .map(|field| async move {
390 read_array(
391 self,
392 &field,
393 0,
394 stats_page_table,
395 &ReadBatchParams::RangeFull,
396 )
397 .await
398 })
399 .buffered(self.io_parallelism())
400 .try_collect::<Vec<_>>()
401 .await?;
402
403 let schema = ArrowSchema::from(&projection);
404 let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
405 Ok(Some(batch))
406 } else {
407 Ok(None)
408 }
409 }
410}
411
412pub fn batches_stream(
423 reader: FileReader,
424 projection: Schema,
425 predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
426) -> impl RecordBatchStream {
427 let projection = Arc::new(projection);
429 let arrow_schema = ArrowSchema::from(projection.as_ref());
430
431 let total_batches = reader.num_batches() as i32;
432 let batches = (0..total_batches).filter(predicate);
433 let this = Arc::new(reader);
435 let inner = stream::iter(batches)
436 .zip(stream::repeat_with(move || {
437 (this.clone(), projection.clone())
438 }))
439 .map(move |(batch_id, (reader, projection))| async move {
440 reader
441 .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
442 .await
443 })
444 .buffered(2)
445 .boxed();
446 RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
447}
448
449pub async fn read_batch(
454 reader: &FileReader,
455 params: &ReadBatchParams,
456 schema: &Schema,
457 batch_id: i32,
458) -> Result<RecordBatch> {
459 if !schema.fields.is_empty() {
460 let arrs = stream::iter(&schema.fields)
462 .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
463 .buffered(reader.io_parallelism())
464 .try_collect::<Vec<_>>()
465 .boxed();
466 let arrs = arrs.await?;
467 Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
468 } else {
469 Err(Error::invalid_input("no fields requested"))
470 }
471}
472
473#[async_recursion]
474async fn read_array(
475 reader: &FileReader,
476 field: &Field,
477 batch_id: i32,
478 page_table: &PageTable,
479 params: &ReadBatchParams,
480) -> Result<ArrayRef> {
481 let data_type = field.data_type();
482
483 use DataType::*;
484
485 if data_type.is_fixed_stride() {
486 _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
487 } else {
488 match data_type {
489 Null => read_null_array(field, batch_id, page_table, params),
490 Utf8 | LargeUtf8 | Binary | LargeBinary => {
491 read_binary_array(reader, field, batch_id, page_table, params).await
492 }
493 Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
494 Dictionary(_, _) => {
495 read_dictionary_array(reader, field, batch_id, page_table, params).await
496 }
497 List(_) => {
498 read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
499 }
500 LargeList(_) => {
501 read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
502 }
503 _ => {
504 unimplemented!("{}", format!("No support for {data_type} yet"));
505 }
506 }
507 }
508}
509
510fn get_page_info<'a>(
511 page_table: &'a PageTable,
512 field: &'a Field,
513 batch_id: i32,
514) -> Result<&'a PageInfo> {
515 page_table.get(field.id, batch_id).ok_or_else(|| {
516 Error::invalid_input(format!(
517 "No page info found for field: {}, field_id={} batch={}",
518 field.name, field.id, batch_id
519 ))
520 })
521}
522
523async fn _read_fixed_stride_array(
525 reader: &FileReader,
526 field: &Field,
527 batch_id: i32,
528 page_table: &PageTable,
529 params: &ReadBatchParams,
530) -> Result<ArrayRef> {
531 let page_info = get_page_info(page_table, field, batch_id)?;
532 read_fixed_stride_array(
533 reader.object_reader.as_ref(),
534 &field.data_type(),
535 page_info.position,
536 page_info.length,
537 params.clone(),
538 )
539 .await
540}
541
542fn read_null_array(
543 field: &Field,
544 batch_id: i32,
545 page_table: &PageTable,
546 params: &ReadBatchParams,
547) -> Result<ArrayRef> {
548 let page_info = get_page_info(page_table, field, batch_id)?;
549
550 let length_output = match params {
551 ReadBatchParams::Indices(indices) => {
552 if indices.is_empty() {
553 0
554 } else {
555 let idx_max = *indices.values().iter().max().unwrap() as u64;
556 if idx_max >= page_info.length as u64 {
557 return Err(Error::invalid_input(format!(
558 "NullArray Reader: request([{}]) out of range: [0..{}]",
559 idx_max, page_info.length
560 )));
561 }
562 indices.len()
563 }
564 }
565 _ => {
566 let (idx_start, idx_end) = match params {
567 ReadBatchParams::Range(r) => (r.start, r.end),
568 ReadBatchParams::RangeFull => (0, page_info.length),
569 ReadBatchParams::RangeTo(r) => (0, r.end),
570 ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
571 _ => unreachable!(),
572 };
573 if idx_end > page_info.length {
574 return Err(Error::invalid_input(format!(
575 "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
576 idx_start,
578 idx_end,
579 page_info.length
580 )));
581 }
582 idx_end - idx_start
583 }
584 };
585
586 Ok(Arc::new(NullArray::new(length_output)))
587}
588
589async fn read_binary_array(
590 reader: &FileReader,
591 field: &Field,
592 batch_id: i32,
593 page_table: &PageTable,
594 params: &ReadBatchParams,
595) -> Result<ArrayRef> {
596 let page_info = get_page_info(page_table, field, batch_id)?;
597
598 lance_io::utils::read_binary_array(
599 reader.object_reader.as_ref(),
600 &field.data_type(),
601 field.nullable,
602 page_info.position,
603 page_info.length,
604 params,
605 )
606 .await
607}
608
609async fn read_dictionary_array(
610 reader: &FileReader,
611 field: &Field,
612 batch_id: i32,
613 page_table: &PageTable,
614 params: &ReadBatchParams,
615) -> Result<ArrayRef> {
616 let page_info = get_page_info(page_table, field, batch_id)?;
617 let data_type = field.data_type();
618 let decoder = DictionaryDecoder::new(
619 reader.object_reader.as_ref(),
620 page_info.position,
621 page_info.length,
622 &data_type,
623 field
624 .dictionary
625 .as_ref()
626 .unwrap()
627 .values
628 .as_ref()
629 .unwrap()
630 .clone(),
631 );
632 decoder.get(params.clone()).await
633}
634
635async fn read_struct_array(
636 reader: &FileReader,
637 field: &Field,
638 batch_id: i32,
639 page_table: &PageTable,
640 params: &ReadBatchParams,
641) -> Result<ArrayRef> {
642 let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
644
645 for child in field.children.as_slice() {
646 let arr = read_array(reader, child, batch_id, page_table, params).await?;
647 sub_arrays.push((Arc::new(child.into()), arr));
648 }
649
650 Ok(Arc::new(StructArray::from(sub_arrays)))
651}
652
653async fn take_list_array<T: ArrowNumericType>(
654 reader: &FileReader,
655 field: &Field,
656 batch_id: i32,
657 page_table: &PageTable,
658 positions: &PrimitiveArray<T>,
659 indices: &UInt32Array,
660) -> Result<ArrayRef>
661where
662 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
663{
664 let first_idx = indices.value(0);
665 let ranges = indices
667 .values()
668 .iter()
669 .map(|i| (*i - first_idx).as_usize())
670 .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
671 .collect::<Vec<_>>();
672 let field = field.clone();
673 let mut list_values: Vec<ArrayRef> = vec![];
674 for range in ranges.iter() {
676 list_values.push(
677 read_array(
678 reader,
679 &field.children[0],
680 batch_id,
681 page_table,
682 &(range.clone()).into(),
683 )
684 .await?,
685 );
686 }
687
688 let value_refs = list_values
689 .iter()
690 .map(|arr| arr.as_ref())
691 .collect::<Vec<_>>();
692 let mut offsets_builder = PrimitiveBuilder::<T>::new();
693 offsets_builder.append_value(T::Native::usize_as(0));
694 let mut off = 0_usize;
695 for range in ranges {
696 off += range.len();
697 offsets_builder.append_value(T::Native::usize_as(off));
698 }
699 let all_values = concat::concat(value_refs.as_slice())?;
700 let offset_arr = offsets_builder.finish();
701 let arr = try_new_generic_list_array(all_values, &offset_arr)?;
702 Ok(Arc::new(arr) as ArrayRef)
703}
704
705async fn read_list_array<T: ArrowNumericType>(
706 reader: &FileReader,
707 field: &Field,
708 batch_id: i32,
709 page_table: &PageTable,
710 params: &ReadBatchParams,
711) -> Result<ArrayRef>
712where
713 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
714{
715 let positions_params = match params {
717 ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
718 ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
719 ReadBatchParams::Indices(indices) => {
720 (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
721 }
722 p => p.clone(),
723 };
724
725 let page_info = get_page_info(&reader.page_table, field, batch_id)?;
726 let position_arr = read_fixed_stride_array(
727 reader.object_reader.as_ref(),
728 &T::DATA_TYPE,
729 page_info.position,
730 page_info.length,
731 positions_params,
732 )
733 .await?;
734
735 let positions: &PrimitiveArray<T> = position_arr.as_primitive();
736
737 let value_params = match params {
739 ReadBatchParams::Range(range) => ReadBatchParams::from(
740 positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
741 ),
742 ReadBatchParams::Ranges(_) => {
743 return Err(Error::internal(
744 "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
745 ));
746 }
747 ReadBatchParams::RangeTo(RangeTo { end }) => {
748 ReadBatchParams::from(..positions.value(*end).as_usize())
749 }
750 ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
751 ReadBatchParams::RangeFull => ReadBatchParams::from(
752 positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
753 ),
754 ReadBatchParams::Indices(indices) => {
755 return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
756 }
757 };
758
759 let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
760 let offset_arr = sub(positions, &start_position)?;
761 let offset_arr_ref = offset_arr.as_primitive::<T>();
762 let value_arrs = read_array(
763 reader,
764 &field.children[0],
765 batch_id,
766 page_table,
767 &value_params,
768 )
769 .await?;
770 let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
771 Ok(Arc::new(arr) as ArrayRef)
772}
773
774#[cfg(test)]
775mod tests {
776 use crate::previous::writer::{FileWriter as PreviousFileWriter, NotSelfDescribing};
777
778 use super::*;
779
780 use arrow_array::{
781 Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
782 UInt8Array,
783 builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
784 cast::{as_string_array, as_struct_array},
785 types::UInt8Type,
786 };
787 use arrow_array::{BooleanArray, Int32Array};
788 use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
789 use lance_io::object_store::ObjectStoreParams;
790
791 #[tokio::test]
792 async fn test_take() {
793 let arrow_schema = ArrowSchema::new(vec![
794 ArrowField::new("i", DataType::Int64, true),
795 ArrowField::new("f", DataType::Float32, false),
796 ArrowField::new("s", DataType::Utf8, false),
797 ArrowField::new(
798 "d",
799 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
800 false,
801 ),
802 ]);
803 let mut schema = Schema::try_from(&arrow_schema).unwrap();
804
805 let store = ObjectStore::memory();
806 let path = Path::from("/take_test");
807
808 let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
810 let values_ref = Arc::new(values);
811 let mut batches = vec![];
812 for batch_id in 0..10 {
813 let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
814 let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
815 let columns: Vec<ArrayRef> = vec![
816 Arc::new(Int64Array::from_iter(
817 value_range.clone().collect::<Vec<_>>(),
818 )),
819 Arc::new(Float32Array::from_iter(
820 value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
821 )),
822 Arc::new(StringArray::from_iter_values(
823 value_range.clone().map(|n| format!("str-{}", n)),
824 )),
825 Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
826 ];
827 batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
828 }
829 schema.set_dictionary(&batches[0]).unwrap();
830
831 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
832 &store,
833 &path,
834 schema.clone(),
835 &Default::default(),
836 )
837 .await
838 .unwrap();
839 for batch in batches.iter() {
840 file_writer
841 .write(std::slice::from_ref(batch))
842 .await
843 .unwrap();
844 }
845 file_writer.finish().await.unwrap();
846
847 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
848 let batch = reader
849 .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
850 .await
851 .unwrap();
852 let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
853 assert_eq!(
854 batch,
855 RecordBatch::try_new(
856 batch.schema(),
857 vec![
858 Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
859 Arc::new(Float32Array::from_iter_values([
860 1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
861 ])),
862 Arc::new(StringArray::from_iter_values([
863 "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
864 ])),
865 Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
866 ]
867 )
868 .unwrap()
869 );
870 }
871
872 async fn test_write_null_string_in_struct(field_nullable: bool) {
873 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
874 "parent",
875 DataType::Struct(ArrowFields::from(vec![ArrowField::new(
876 "str",
877 DataType::Utf8,
878 field_nullable,
879 )])),
880 true,
881 )]));
882
883 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
884
885 let store = ObjectStore::memory();
886 let path = Path::from("/null_strings");
887
888 let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
889 let struct_arr = Arc::new(StructArray::from(vec![(
890 Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
891 string_arr.clone() as ArrayRef,
892 )]));
893 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
894
895 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
896 &store,
897 &path,
898 schema.clone(),
899 &Default::default(),
900 )
901 .await
902 .unwrap();
903 file_writer
904 .write(std::slice::from_ref(&batch))
905 .await
906 .unwrap();
907 file_writer.finish().await.unwrap();
908
909 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
910 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
911
912 if field_nullable {
913 assert_eq!(
914 &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
915 as_string_array(
916 as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
917 .column_by_name("str")
918 .unwrap()
919 .as_ref()
920 )
921 );
922 } else {
923 assert_eq!(actual_batch, batch);
924 }
925 }
926
927 #[tokio::test]
928 async fn read_nullable_string_in_struct() {
929 test_write_null_string_in_struct(true).await;
930 test_write_null_string_in_struct(false).await;
931 }
932
933 #[tokio::test]
934 async fn test_read_struct_of_list_arrays() {
935 let store = ObjectStore::memory();
936 let path = Path::from("/null_strings");
937
938 let arrow_schema = make_schema_of_list_array();
939 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
940
941 let batches = (0..3)
942 .map(|_| {
943 let struct_array = make_struct_of_list_array(10, 10);
944 RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
945 })
946 .collect::<Vec<_>>();
947 let batches_ref = batches.iter().collect::<Vec<_>>();
948
949 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
950 &store,
951 &path,
952 schema.clone(),
953 &Default::default(),
954 )
955 .await
956 .unwrap();
957 file_writer.write(&batches).await.unwrap();
958 file_writer.finish().await.unwrap();
959
960 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
961 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
962 let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
963 assert_eq!(expected, actual_batch);
964 }
965
966 #[tokio::test]
967 async fn test_scan_struct_of_list_arrays() {
968 let store = ObjectStore::memory();
969 let path = Path::from("/null_strings");
970
971 let arrow_schema = make_schema_of_list_array();
972 let struct_array = make_struct_of_list_array(3, 10);
973 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
974 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
975
976 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
977 &store,
978 &path,
979 schema.clone(),
980 &Default::default(),
981 )
982 .await
983 .unwrap();
984 file_writer.write(&[batch]).await.unwrap();
985 file_writer.finish().await.unwrap();
986
987 let mut expected_columns: Vec<ArrayRef> = Vec::new();
988 for c in struct_array.columns().iter() {
989 expected_columns.push(c.slice(1, 1));
990 }
991
992 let expected_struct = match arrow_schema.fields[0].data_type() {
993 DataType::Struct(subfields) => subfields
994 .iter()
995 .zip(expected_columns)
996 .map(|(f, d)| (f.clone(), d))
997 .collect::<Vec<_>>(),
998 _ => panic!("unexpected field"),
999 };
1000
1001 let expected_struct_array = StructArray::from(expected_struct);
1002 let expected_batch = RecordBatch::from(&StructArray::from(vec![(
1003 Arc::new(arrow_schema.fields[0].as_ref().clone()),
1004 Arc::new(expected_struct_array) as ArrayRef,
1005 )]));
1006
1007 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1008 let params = ReadBatchParams::Range(1..2);
1009 let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
1010 assert_eq!(expected_batch, slice_of_batch);
1011 }
1012
1013 fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
1014 Arc::new(ArrowSchema::new(vec![ArrowField::new(
1015 "s",
1016 DataType::Struct(ArrowFields::from(vec![
1017 ArrowField::new(
1018 "li",
1019 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1020 true,
1021 ),
1022 ArrowField::new(
1023 "ls",
1024 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1025 true,
1026 ),
1027 ArrowField::new(
1028 "ll",
1029 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1030 false,
1031 ),
1032 ])),
1033 true,
1034 )]))
1035 }
1036
1037 fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1038 let mut li_builder = ListBuilder::new(Int32Builder::new());
1039 let mut ls_builder = ListBuilder::new(StringBuilder::new());
1040 let ll_value_builder = Int32Builder::new();
1041 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1042 for i in 0..rows {
1043 for j in 0..num_items {
1044 li_builder.values().append_value(i * 10 + j);
1045 ls_builder
1046 .values()
1047 .append_value(format!("str-{}", i * 10 + j));
1048 large_list_builder.values().append_value(i * 10 + j);
1049 }
1050 li_builder.append(true);
1051 ls_builder.append(true);
1052 large_list_builder.append(true);
1053 }
1054 Arc::new(StructArray::from(vec![
1055 (
1056 Arc::new(ArrowField::new(
1057 "li",
1058 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1059 true,
1060 )),
1061 Arc::new(li_builder.finish()) as ArrayRef,
1062 ),
1063 (
1064 Arc::new(ArrowField::new(
1065 "ls",
1066 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1067 true,
1068 )),
1069 Arc::new(ls_builder.finish()) as ArrayRef,
1070 ),
1071 (
1072 Arc::new(ArrowField::new(
1073 "ll",
1074 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1075 false,
1076 )),
1077 Arc::new(large_list_builder.finish()) as ArrayRef,
1078 ),
1079 ]))
1080 }
1081
1082 #[tokio::test]
1083 async fn test_read_nullable_arrays() {
1084 use arrow_array::Array;
1085
1086 let arrow_schema = ArrowSchema::new(vec![
1088 ArrowField::new("i", DataType::Int64, false),
1089 ArrowField::new("n", DataType::Null, true),
1090 ]);
1091 let schema = Schema::try_from(&arrow_schema).unwrap();
1092 let columns: Vec<ArrayRef> = vec![
1093 Arc::new(Int64Array::from_iter_values(0..100)),
1094 Arc::new(NullArray::new(100)),
1095 ];
1096 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1097
1098 let store = ObjectStore::memory();
1100 let path = Path::from("/takes");
1101 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1102 &store,
1103 &path,
1104 schema.clone(),
1105 &Default::default(),
1106 )
1107 .await
1108 .unwrap();
1109 file_writer.write(&[batch]).await.unwrap();
1110 file_writer.finish().await.unwrap();
1111
1112 let reader = FileReader::try_new(&store, &path, schema.clone())
1114 .await
1115 .unwrap();
1116
1117 async fn read_array_w_params(
1118 reader: &FileReader,
1119 field: &Field,
1120 params: ReadBatchParams,
1121 ) -> ArrayRef {
1122 read_array(reader, field, 0, reader.page_table.as_ref(), ¶ms)
1123 .await
1124 .expect("Error reading back the null array from file") as _
1125 }
1126
1127 let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1128 assert_eq!(100, arr.len());
1129 assert_eq!(arr.data_type(), &DataType::Null);
1130
1131 let arr =
1132 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1133 assert_eq!(15, arr.len());
1134 assert_eq!(arr.data_type(), &DataType::Null);
1135
1136 let arr =
1137 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1138 assert_eq!(40, arr.len());
1139 assert_eq!(arr.data_type(), &DataType::Null);
1140
1141 let arr =
1142 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1143 assert_eq!(25, arr.len());
1144 assert_eq!(arr.data_type(), &DataType::Null);
1145
1146 let arr = read_array_w_params(
1147 &reader,
1148 &schema.fields[1],
1149 ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1150 )
1151 .await;
1152 assert_eq!(4, arr.len());
1153 assert_eq!(arr.data_type(), &DataType::Null);
1154
1155 let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1157 let arr = read_array(
1158 &reader,
1159 &schema.fields[1],
1160 0,
1161 reader.page_table.as_ref(),
1162 ¶ms,
1163 );
1164 assert!(arr.await.is_err());
1165
1166 let params = ReadBatchParams::RangeTo(..107);
1168 let arr = read_array(
1169 &reader,
1170 &schema.fields[1],
1171 0,
1172 reader.page_table.as_ref(),
1173 ¶ms,
1174 );
1175 assert!(arr.await.is_err());
1176 }
1177
1178 #[tokio::test]
1179 async fn test_take_lists() {
1180 let arrow_schema = ArrowSchema::new(vec![
1181 ArrowField::new(
1182 "l",
1183 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1184 false,
1185 ),
1186 ArrowField::new(
1187 "ll",
1188 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1189 false,
1190 ),
1191 ]);
1192
1193 let value_builder = Int32Builder::new();
1194 let mut list_builder = ListBuilder::new(value_builder);
1195 let ll_value_builder = Int32Builder::new();
1196 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1197 for i in 0..100 {
1198 list_builder.values().append_value(i);
1199 large_list_builder.values().append_value(i);
1200 if (i + 1) % 10 == 0 {
1201 list_builder.append(true);
1202 large_list_builder.append(true);
1203 }
1204 }
1205 let list_arr = Arc::new(list_builder.finish());
1206 let large_list_arr = Arc::new(large_list_builder.finish());
1207
1208 let batch = RecordBatch::try_new(
1209 Arc::new(arrow_schema.clone()),
1210 vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1211 )
1212 .unwrap();
1213
1214 let store = ObjectStore::memory();
1216 let path = Path::from("/take_list");
1217 let schema: Schema = (&arrow_schema).try_into().unwrap();
1218 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1219 &store,
1220 &path,
1221 schema.clone(),
1222 &Default::default(),
1223 )
1224 .await
1225 .unwrap();
1226 file_writer.write(&[batch]).await.unwrap();
1227 file_writer.finish().await.unwrap();
1228
1229 let reader = FileReader::try_new(&store, &path, schema.clone())
1231 .await
1232 .unwrap();
1233 let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1234
1235 let value_builder = Int32Builder::new();
1236 let mut list_builder = ListBuilder::new(value_builder);
1237 let ll_value_builder = Int32Builder::new();
1238 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1239 for i in [1, 3, 5, 9] {
1240 for j in 0..10 {
1241 list_builder.values().append_value(i * 10 + j);
1242 large_list_builder.values().append_value(i * 10 + j);
1243 }
1244 list_builder.append(true);
1245 large_list_builder.append(true);
1246 }
1247 let expected_list = list_builder.finish();
1248 let expected_large_list = large_list_builder.finish();
1249
1250 assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1251 assert_eq!(
1252 actual.column_by_name("ll").unwrap().as_ref(),
1253 &expected_large_list
1254 );
1255 }
1256
1257 #[tokio::test]
1258 async fn test_list_array_with_offsets() {
1259 let arrow_schema = ArrowSchema::new(vec![
1260 ArrowField::new(
1261 "l",
1262 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1263 false,
1264 ),
1265 ArrowField::new(
1266 "ll",
1267 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1268 false,
1269 ),
1270 ]);
1271
1272 let store = ObjectStore::memory();
1273 let path = Path::from("/lists");
1274
1275 let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1276 Some(vec![Some(1), Some(2)]),
1277 Some(vec![Some(3), Some(4)]),
1278 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1279 ])
1280 .slice(1, 1);
1281 let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1282 Some(vec![Some(10), Some(11)]),
1283 Some(vec![Some(12), Some(13)]),
1284 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1285 ])
1286 .slice(1, 1);
1287
1288 let batch = RecordBatch::try_new(
1289 Arc::new(arrow_schema.clone()),
1290 vec![Arc::new(list_array), Arc::new(large_list_array)],
1291 )
1292 .unwrap();
1293
1294 let schema: Schema = (&arrow_schema).try_into().unwrap();
1295 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1296 &store,
1297 &path,
1298 schema.clone(),
1299 &Default::default(),
1300 )
1301 .await
1302 .unwrap();
1303 file_writer
1304 .write(std::slice::from_ref(&batch))
1305 .await
1306 .unwrap();
1307 file_writer.finish().await.unwrap();
1308
1309 let file_size_bytes = store.size(&path).await.unwrap();
1311 assert!(file_size_bytes < 1_000);
1312
1313 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1314 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1315 assert_eq!(batch, actual_batch);
1316 }
1317
1318 #[tokio::test]
1319 async fn test_read_ranges() {
1320 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1322 let schema = Schema::try_from(&arrow_schema).unwrap();
1323 let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1324 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1325
1326 let store = ObjectStore::memory();
1328 let path = Path::from("/read_range");
1329 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1330 &store,
1331 &path,
1332 schema.clone(),
1333 &Default::default(),
1334 )
1335 .await
1336 .unwrap();
1337 file_writer.write(&[batch]).await.unwrap();
1338 file_writer.finish().await.unwrap();
1339
1340 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1341 let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1342
1343 assert_eq!(
1344 actual_batch.column_by_name("i").unwrap().as_ref(),
1345 &Int64Array::from_iter_values(7..25)
1346 );
1347 }
1348
1349 #[tokio::test]
1350 async fn test_batches_stream() {
1351 let store = ObjectStore::memory();
1352 let path = Path::from("/batch_stream");
1353
1354 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1355 let schema = Schema::try_from(&arrow_schema).unwrap();
1356 let mut writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1357 &store,
1358 &path,
1359 schema.clone(),
1360 &Default::default(),
1361 )
1362 .await
1363 .unwrap();
1364 for i in 0..10 {
1365 let batch = RecordBatch::try_new(
1366 Arc::new(arrow_schema.clone()),
1367 vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1368 )
1369 .unwrap();
1370 writer.write(&[batch]).await.unwrap();
1371 }
1372 writer.finish().await.unwrap();
1373
1374 let reader = FileReader::try_new(&store, &path, schema.clone())
1375 .await
1376 .unwrap();
1377 let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1378 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1379
1380 assert_eq!(batches.len(), 5);
1381 for (i, batch) in batches.iter().enumerate() {
1382 assert_eq!(
1383 batch,
1384 &RecordBatch::try_new(
1385 Arc::new(arrow_schema.clone()),
1386 vec![Arc::new(Int32Array::from_iter_values(
1387 i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1388 ))],
1389 )
1390 .unwrap()
1391 )
1392 }
1393 }
1394
1395 #[tokio::test]
1396 async fn test_take_boolean_beyond_chunk() {
1397 let store = ObjectStore::from_uri_and_params(
1398 Arc::new(Default::default()),
1399 "memory://",
1400 &ObjectStoreParams {
1401 block_size: Some(256),
1402 ..Default::default()
1403 },
1404 )
1405 .await
1406 .unwrap()
1407 .0;
1408 let path = Path::from("/take_bools");
1409
1410 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1411 "b",
1412 DataType::Boolean,
1413 false,
1414 )]));
1415 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1416 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1417 &store,
1418 &path,
1419 schema.clone(),
1420 &Default::default(),
1421 )
1422 .await
1423 .unwrap();
1424
1425 let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1426 let batch =
1427 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1428 file_writer.write(&[batch]).await.unwrap();
1429 file_writer.finish().await.unwrap();
1430
1431 let reader = FileReader::try_new(&store, &path, schema.clone())
1432 .await
1433 .unwrap();
1434 let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1435
1436 assert_eq!(
1437 actual.column_by_name("b").unwrap().as_ref(),
1438 &BooleanArray::from(vec![false, false, true, false, true])
1439 );
1440 }
1441
1442 #[tokio::test]
1443 async fn test_read_projection() {
1444 let store = ObjectStore::memory();
1448 let path = Path::from("/partial_read");
1449
1450 let mut fields = vec![];
1452 for i in 0..100 {
1453 fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1454 }
1455 let arrow_schema = ArrowSchema::new(fields);
1456 let schema = Schema::try_from(&arrow_schema).unwrap();
1457
1458 let partial_schema = schema.project(&["f50"]).unwrap();
1459 let partial_arrow: ArrowSchema = (&partial_schema).into();
1460
1461 let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1462 &store,
1463 &path,
1464 partial_schema.clone(),
1465 &Default::default(),
1466 )
1467 .await
1468 .unwrap();
1469
1470 let array = Int32Array::from(vec![0; 15]);
1471 let batch =
1472 RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1473 file_writer
1474 .write(std::slice::from_ref(&batch))
1475 .await
1476 .unwrap();
1477 file_writer.finish().await.unwrap();
1478
1479 let field_id = partial_schema.fields.first().unwrap().id;
1480 let reader = FileReader::try_new_with_fragment_id(
1481 &store,
1482 &path,
1483 schema.clone(),
1484 0,
1485 field_id,
1486 field_id,
1487 None,
1488 )
1489 .await
1490 .unwrap();
1491 let actual = reader
1492 .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1493 .await
1494 .unwrap();
1495
1496 assert_eq!(actual, batch);
1497 }
1498}