1use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12 builder::PrimitiveBuilder,
13 cast::AsArray,
14 types::{Int32Type, Int64Type},
15 ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
16 RecordBatch, StructArray, UInt32Array,
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
24use lance_arrow::*;
25use lance_core::cache::{CacheKey, LanceCache};
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::dictionary::DictionaryDecoder;
29use lance_io::encodings::AsyncIndex;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33 read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{object_store::ObjectStore, ReadBatchParams};
36use std::borrow::Cow;
37
38use object_store::path::Path;
39use snafu::location;
40use tracing::instrument;
41
42use crate::format::metadata::Metadata;
43use crate::page_table::{PageInfo, PageTable};
44
45#[derive(Clone, DeepSizeOf)]
49pub struct FileReader {
50 pub object_reader: Arc<dyn Reader>,
51 metadata: Arc<Metadata>,
52 page_table: Arc<PageTable>,
53 schema: Schema,
54
55 fragment_id: u64,
58
59 stats_page_table: Arc<Option<PageTable>>,
61}
62
63impl std::fmt::Debug for FileReader {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("FileReader")
66 .field("fragment", &self.fragment_id)
67 .field("path", &self.object_reader.path())
68 .finish()
69 }
70}
71
72struct StringCacheKey<'a, T> {
74 key: &'a str,
75 _phantom: std::marker::PhantomData<T>,
76}
77
78impl<'a, T> StringCacheKey<'a, T> {
79 fn new(key: &'a str) -> Self {
80 Self {
81 key,
82 _phantom: std::marker::PhantomData,
83 }
84 }
85}
86
87impl<T> CacheKey for StringCacheKey<'_, T> {
88 type ValueType = T;
89
90 fn key(&self) -> Cow<'_, str> {
91 self.key.into()
92 }
93}
94
95impl FileReader {
96 #[instrument(level = "debug", skip(object_store, schema, session))]
109 pub async fn try_new_with_fragment_id(
110 object_store: &ObjectStore,
111 path: &Path,
112 schema: Schema,
113 fragment_id: u32,
114 field_id_offset: i32,
115 max_field_id: i32,
116 session: Option<&LanceCache>,
117 ) -> Result<Self> {
118 let object_reader = object_store.open(path).await?;
119
120 let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
121
122 Self::try_new_from_reader(
123 path,
124 object_reader.into(),
125 Some(metadata),
126 schema,
127 fragment_id,
128 field_id_offset,
129 max_field_id,
130 session,
131 )
132 .await
133 }
134
135 #[allow(clippy::too_many_arguments)]
136 pub async fn try_new_from_reader(
137 path: &Path,
138 object_reader: Arc<dyn Reader>,
139 metadata: Option<Arc<Metadata>>,
140 schema: Schema,
141 fragment_id: u32,
142 field_id_offset: i32,
143 max_field_id: i32,
144 session: Option<&LanceCache>,
145 ) -> Result<Self> {
146 let metadata = match metadata {
147 Some(metadata) => metadata,
148 None => Self::read_metadata(object_reader.as_ref(), session).await?,
149 };
150
151 let page_table = async {
152 Self::load_from_cache(session, path.to_string(), |_| async {
153 PageTable::load(
154 object_reader.as_ref(),
155 metadata.page_table_position,
156 field_id_offset,
157 max_field_id,
158 metadata.num_batches() as i32,
159 )
160 .await
161 })
162 .await
163 };
164
165 let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
166
167 let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
169
170 Ok(Self {
171 object_reader,
172 metadata,
173 schema,
174 page_table,
175 fragment_id: fragment_id as u64,
176 stats_page_table,
177 })
178 }
179
180 pub async fn read_metadata(
181 object_reader: &dyn Reader,
182 cache: Option<&LanceCache>,
183 ) -> Result<Arc<Metadata>> {
184 Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
185 let file_size = object_reader.size().await?;
186 let begin = if file_size < object_reader.block_size() {
187 0
188 } else {
189 file_size - object_reader.block_size()
190 };
191 let tail_bytes = object_reader.get_range(begin..file_size).await?;
192 let metadata_pos = read_metadata_offset(&tail_bytes)?;
193
194 let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
195 read_struct(object_reader, metadata_pos).await?
197 } else {
198 let offset = tail_bytes.len() - (file_size - metadata_pos);
199 read_struct_from_buf(&tail_bytes.slice(offset..))?
200 };
201 Ok(metadata)
202 })
203 .await
204 }
205
206 async fn read_stats_page_table(
210 reader: &dyn Reader,
211 cache: Option<&LanceCache>,
212 ) -> Result<Arc<Option<PageTable>>> {
213 Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async {
215 let metadata = Self::read_metadata(reader, cache).await?;
216
217 if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
218 Ok(Some(
219 PageTable::load(
220 reader,
221 stats_meta.page_table_position,
222 0,
223 *stats_meta.leaf_field_ids.iter().max().unwrap(),
224 1,
225 )
226 .await?,
227 ))
228 } else {
229 Ok(None)
230 }
231 })
232 .await
233 }
234
235 async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
237 cache: Option<&LanceCache>,
238 key: String,
239 loader: F,
240 ) -> Result<Arc<T>>
241 where
242 F: Fn(&str) -> Fut,
243 Fut: Future<Output = Result<T>> + Send,
244 {
245 if let Some(cache) = cache {
246 let cache_key = StringCacheKey::<T>::new(key.as_str());
247 cache
248 .get_or_insert_with_key(cache_key, || loader(key.as_str()))
249 .await
250 } else {
251 Ok(Arc::new(loader(key.as_str()).await?))
252 }
253 }
254
255 pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
257 let max_field_id = schema.max_field_id().unwrap_or_default();
259 Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
260 }
261
262 fn io_parallelism(&self) -> usize {
263 self.object_reader.io_parallelism()
264 }
265
266 pub fn schema(&self) -> &Schema {
268 &self.schema
269 }
270
271 pub fn num_batches(&self) -> usize {
272 self.metadata.num_batches()
273 }
274
275 pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
277 self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
278 }
279
280 pub fn len(&self) -> usize {
282 self.metadata.len()
283 }
284
285 pub fn is_empty(&self) -> bool {
286 self.metadata.is_empty()
287 }
288
289 #[instrument(level = "debug", skip(self, params, projection))]
293 pub async fn read_batch(
294 &self,
295 batch_id: i32,
296 params: impl Into<ReadBatchParams>,
297 projection: &Schema,
298 ) -> Result<RecordBatch> {
299 read_batch(self, ¶ms.into(), projection, batch_id).await
300 }
301
302 #[instrument(level = "debug", skip(self, projection))]
307 pub async fn read_range(
308 &self,
309 range: Range<usize>,
310 projection: &Schema,
311 ) -> Result<RecordBatch> {
312 if range.is_empty() {
313 return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
314 }
315 let range_in_batches = self.metadata.range_to_batches(range)?;
316 let batches =
317 stream::iter(range_in_batches)
318 .map(|(batch_id, range)| async move {
319 self.read_batch(batch_id, range, projection).await
320 })
321 .buffered(self.io_parallelism())
322 .try_collect::<Vec<_>>()
323 .await?;
324 if batches.len() == 1 {
325 return Ok(batches[0].clone());
326 }
327 let schema = batches[0].schema();
328 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
329 }
330
331 #[instrument(level = "debug", skip_all)]
335 pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
336 let num_batches = self.num_batches();
337 let num_rows = self.len() as u32;
338 let indices_in_batches = self.metadata.group_indices_to_batches(indices);
339 let batches = stream::iter(indices_in_batches)
340 .map(|batch| async move {
341 if batch.batch_id >= num_batches as i32 {
342 Err(Error::InvalidInput {
343 source: format!("batch_id: {} out of bounds", batch.batch_id).into(),
344 location: location!(),
345 })
346 } else if *batch.offsets.last().expect("got empty batch") > num_rows {
347 Err(Error::InvalidInput {
348 source: format!("indices: {:?} out of bounds", batch.offsets).into(),
349 location: location!(),
350 })
351 } else {
352 self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
353 .await
354 }
355 })
356 .buffered(self.io_parallelism())
357 .try_collect::<Vec<_>>()
358 .await?;
359
360 let schema = Arc::new(ArrowSchema::from(projection));
361
362 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
363 }
364
365 pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
367 self.metadata.stats_metadata.as_ref().map(|meta| {
368 let mut stats_field_ids = vec![];
369 for stats_field in &meta.schema.fields {
370 if let Ok(stats_field_id) = stats_field.name.parse::<i32>() {
371 if field_ids.contains(&stats_field_id) {
372 stats_field_ids.push(stats_field.id);
373 for child in &stats_field.children {
374 stats_field_ids.push(child.id);
375 }
376 }
377 }
378 }
379 meta.schema.project_by_ids(&stats_field_ids, true)
380 })
381 }
382
383 pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
385 if let Some(stats_page_table) = self.stats_page_table.as_ref() {
386 let projection = self.page_stats_schema(field_ids).unwrap();
387 if projection.fields.is_empty() {
389 return Ok(None);
390 }
391 let arrays = futures::stream::iter(projection.fields.iter().cloned())
392 .map(|field| async move {
393 read_array(
394 self,
395 &field,
396 0,
397 stats_page_table,
398 &ReadBatchParams::RangeFull,
399 )
400 .await
401 })
402 .buffered(self.io_parallelism())
403 .try_collect::<Vec<_>>()
404 .await?;
405
406 let schema = ArrowSchema::from(&projection);
407 let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
408 Ok(Some(batch))
409 } else {
410 Ok(None)
411 }
412 }
413}
414
415pub fn batches_stream(
426 reader: FileReader,
427 projection: Schema,
428 predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
429) -> impl RecordBatchStream {
430 let projection = Arc::new(projection);
432 let arrow_schema = ArrowSchema::from(projection.as_ref());
433
434 let total_batches = reader.num_batches() as i32;
435 let batches = (0..total_batches).filter(predicate);
436 let this = Arc::new(reader);
438 let inner = stream::iter(batches)
439 .zip(stream::repeat_with(move || {
440 (this.clone(), projection.clone())
441 }))
442 .map(move |(batch_id, (reader, projection))| async move {
443 reader
444 .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
445 .await
446 })
447 .buffered(2)
448 .boxed();
449 RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
450}
451
452pub async fn read_batch(
457 reader: &FileReader,
458 params: &ReadBatchParams,
459 schema: &Schema,
460 batch_id: i32,
461) -> Result<RecordBatch> {
462 if !schema.fields.is_empty() {
463 let arrs = stream::iter(&schema.fields)
465 .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
466 .buffered(reader.io_parallelism())
467 .try_collect::<Vec<_>>()
468 .boxed();
469 let arrs = arrs.await?;
470 Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
471 } else {
472 Err(Error::invalid_input("no fields requested", location!()))
473 }
474}
475
476#[async_recursion]
477async fn read_array(
478 reader: &FileReader,
479 field: &Field,
480 batch_id: i32,
481 page_table: &PageTable,
482 params: &ReadBatchParams,
483) -> Result<ArrayRef> {
484 let data_type = field.data_type();
485
486 use DataType::*;
487
488 if data_type.is_fixed_stride() {
489 _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
490 } else {
491 match data_type {
492 Null => read_null_array(field, batch_id, page_table, params),
493 Utf8 | LargeUtf8 | Binary | LargeBinary => {
494 read_binary_array(reader, field, batch_id, page_table, params).await
495 }
496 Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
497 Dictionary(_, _) => {
498 read_dictionary_array(reader, field, batch_id, page_table, params).await
499 }
500 List(_) => {
501 read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
502 }
503 LargeList(_) => {
504 read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
505 }
506 _ => {
507 unimplemented!("{}", format!("No support for {data_type} yet"));
508 }
509 }
510 }
511}
512
513fn get_page_info<'a>(
514 page_table: &'a PageTable,
515 field: &'a Field,
516 batch_id: i32,
517) -> Result<&'a PageInfo> {
518 page_table.get(field.id, batch_id).ok_or_else(|| {
519 Error::io(
520 format!(
521 "No page info found for field: {}, field_id={} batch={}",
522 field.name, field.id, batch_id
523 ),
524 location!(),
525 )
526 })
527}
528
529async fn _read_fixed_stride_array(
531 reader: &FileReader,
532 field: &Field,
533 batch_id: i32,
534 page_table: &PageTable,
535 params: &ReadBatchParams,
536) -> Result<ArrayRef> {
537 let page_info = get_page_info(page_table, field, batch_id)?;
538 read_fixed_stride_array(
539 reader.object_reader.as_ref(),
540 &field.data_type(),
541 page_info.position,
542 page_info.length,
543 params.clone(),
544 )
545 .await
546}
547
548fn read_null_array(
549 field: &Field,
550 batch_id: i32,
551 page_table: &PageTable,
552 params: &ReadBatchParams,
553) -> Result<ArrayRef> {
554 let page_info = get_page_info(page_table, field, batch_id)?;
555
556 let length_output = match params {
557 ReadBatchParams::Indices(indices) => {
558 if indices.is_empty() {
559 0
560 } else {
561 let idx_max = *indices.values().iter().max().unwrap() as u64;
562 if idx_max >= page_info.length as u64 {
563 return Err(Error::io(
564 format!(
565 "NullArray Reader: request([{}]) out of range: [0..{}]",
566 idx_max, page_info.length
567 ),
568 location!(),
569 ));
570 }
571 indices.len()
572 }
573 }
574 _ => {
575 let (idx_start, idx_end) = match params {
576 ReadBatchParams::Range(r) => (r.start, r.end),
577 ReadBatchParams::RangeFull => (0, page_info.length),
578 ReadBatchParams::RangeTo(r) => (0, r.end),
579 ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
580 _ => unreachable!(),
581 };
582 if idx_end > page_info.length {
583 return Err(Error::io(
584 format!(
585 "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
586 idx_start,
588 idx_end,
589 page_info.length
590 ),
591 location!(),
592 ));
593 }
594 idx_end - idx_start
595 }
596 };
597
598 Ok(Arc::new(NullArray::new(length_output)))
599}
600
601async fn read_binary_array(
602 reader: &FileReader,
603 field: &Field,
604 batch_id: i32,
605 page_table: &PageTable,
606 params: &ReadBatchParams,
607) -> Result<ArrayRef> {
608 let page_info = get_page_info(page_table, field, batch_id)?;
609
610 lance_io::utils::read_binary_array(
611 reader.object_reader.as_ref(),
612 &field.data_type(),
613 field.nullable,
614 page_info.position,
615 page_info.length,
616 params,
617 )
618 .await
619}
620
621async fn read_dictionary_array(
622 reader: &FileReader,
623 field: &Field,
624 batch_id: i32,
625 page_table: &PageTable,
626 params: &ReadBatchParams,
627) -> Result<ArrayRef> {
628 let page_info = get_page_info(page_table, field, batch_id)?;
629 let data_type = field.data_type();
630 let decoder = DictionaryDecoder::new(
631 reader.object_reader.as_ref(),
632 page_info.position,
633 page_info.length,
634 &data_type,
635 field
636 .dictionary
637 .as_ref()
638 .unwrap()
639 .values
640 .as_ref()
641 .unwrap()
642 .clone(),
643 );
644 decoder.get(params.clone()).await
645}
646
647async fn read_struct_array(
648 reader: &FileReader,
649 field: &Field,
650 batch_id: i32,
651 page_table: &PageTable,
652 params: &ReadBatchParams,
653) -> Result<ArrayRef> {
654 let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
656
657 for child in field.children.as_slice() {
658 let arr = read_array(reader, child, batch_id, page_table, params).await?;
659 sub_arrays.push((Arc::new(child.into()), arr));
660 }
661
662 Ok(Arc::new(StructArray::from(sub_arrays)))
663}
664
665async fn take_list_array<T: ArrowNumericType>(
666 reader: &FileReader,
667 field: &Field,
668 batch_id: i32,
669 page_table: &PageTable,
670 positions: &PrimitiveArray<T>,
671 indices: &UInt32Array,
672) -> Result<ArrayRef>
673where
674 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
675{
676 let first_idx = indices.value(0);
677 let ranges = indices
679 .values()
680 .iter()
681 .map(|i| (*i - first_idx).as_usize())
682 .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
683 .collect::<Vec<_>>();
684 let field = field.clone();
685 let mut list_values: Vec<ArrayRef> = vec![];
686 for range in ranges.iter() {
688 list_values.push(
689 read_array(
690 reader,
691 &field.children[0],
692 batch_id,
693 page_table,
694 &(range.clone()).into(),
695 )
696 .await?,
697 );
698 }
699
700 let value_refs = list_values
701 .iter()
702 .map(|arr| arr.as_ref())
703 .collect::<Vec<_>>();
704 let mut offsets_builder = PrimitiveBuilder::<T>::new();
705 offsets_builder.append_value(T::Native::usize_as(0));
706 let mut off = 0_usize;
707 for range in ranges {
708 off += range.len();
709 offsets_builder.append_value(T::Native::usize_as(off));
710 }
711 let all_values = concat::concat(value_refs.as_slice())?;
712 let offset_arr = offsets_builder.finish();
713 let arr = try_new_generic_list_array(all_values, &offset_arr)?;
714 Ok(Arc::new(arr) as ArrayRef)
715}
716
717async fn read_list_array<T: ArrowNumericType>(
718 reader: &FileReader,
719 field: &Field,
720 batch_id: i32,
721 page_table: &PageTable,
722 params: &ReadBatchParams,
723) -> Result<ArrayRef>
724where
725 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
726{
727 let positions_params = match params {
729 ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
730 ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
731 ReadBatchParams::Indices(indices) => {
732 (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
733 }
734 p => p.clone(),
735 };
736
737 let page_info = get_page_info(&reader.page_table, field, batch_id)?;
738 let position_arr = read_fixed_stride_array(
739 reader.object_reader.as_ref(),
740 &T::DATA_TYPE,
741 page_info.position,
742 page_info.length,
743 positions_params,
744 )
745 .await?;
746
747 let positions: &PrimitiveArray<T> = position_arr.as_primitive();
748
749 let value_params = match params {
751 ReadBatchParams::Range(range) => ReadBatchParams::from(
752 positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
753 ),
754 ReadBatchParams::Ranges(_) => {
755 return Err(Error::Internal {
756 message: "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
757 location: location!(),
758 })
759 }
760 ReadBatchParams::RangeTo(RangeTo { end }) => {
761 ReadBatchParams::from(..positions.value(*end).as_usize())
762 }
763 ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
764 ReadBatchParams::RangeFull => ReadBatchParams::from(
765 positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
766 ),
767 ReadBatchParams::Indices(indices) => {
768 return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
769 }
770 };
771
772 let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
773 let offset_arr = sub(positions, &start_position)?;
774 let offset_arr_ref = offset_arr.as_primitive::<T>();
775 let value_arrs = read_array(
776 reader,
777 &field.children[0],
778 batch_id,
779 page_table,
780 &value_params,
781 )
782 .await?;
783 let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
784 Ok(Arc::new(arr) as ArrayRef)
785}
786
787#[cfg(test)]
788mod tests {
789 use crate::writer::{FileWriter, NotSelfDescribing};
790
791 use super::*;
792
793 use arrow_array::{
794 builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
795 cast::{as_string_array, as_struct_array},
796 types::UInt8Type,
797 Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
798 UInt8Array,
799 };
800 use arrow_array::{BooleanArray, Int32Array};
801 use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
802 use lance_io::object_store::ObjectStoreParams;
803
804 #[tokio::test]
805 async fn test_take() {
806 let arrow_schema = ArrowSchema::new(vec![
807 ArrowField::new("i", DataType::Int64, true),
808 ArrowField::new("f", DataType::Float32, false),
809 ArrowField::new("s", DataType::Utf8, false),
810 ArrowField::new(
811 "d",
812 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
813 false,
814 ),
815 ]);
816 let mut schema = Schema::try_from(&arrow_schema).unwrap();
817
818 let store = ObjectStore::memory();
819 let path = Path::from("/take_test");
820
821 let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
823 let values_ref = Arc::new(values);
824 let mut batches = vec![];
825 for batch_id in 0..10 {
826 let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
827 let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
828 let columns: Vec<ArrayRef> = vec![
829 Arc::new(Int64Array::from_iter(
830 value_range.clone().collect::<Vec<_>>(),
831 )),
832 Arc::new(Float32Array::from_iter(
833 value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
834 )),
835 Arc::new(StringArray::from_iter_values(
836 value_range.clone().map(|n| format!("str-{}", n)),
837 )),
838 Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
839 ];
840 batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
841 }
842 schema.set_dictionary(&batches[0]).unwrap();
843
844 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
845 &store,
846 &path,
847 schema.clone(),
848 &Default::default(),
849 )
850 .await
851 .unwrap();
852 for batch in batches.iter() {
853 file_writer.write(&[batch.clone()]).await.unwrap();
854 }
855 file_writer.finish().await.unwrap();
856
857 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
858 let batch = reader
859 .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
860 .await
861 .unwrap();
862 let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
863 assert_eq!(
864 batch,
865 RecordBatch::try_new(
866 batch.schema(),
867 vec![
868 Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
869 Arc::new(Float32Array::from_iter_values([
870 1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
871 ])),
872 Arc::new(StringArray::from_iter_values([
873 "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
874 ])),
875 Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
876 ]
877 )
878 .unwrap()
879 );
880 }
881
882 async fn test_write_null_string_in_struct(field_nullable: bool) {
883 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
884 "parent",
885 DataType::Struct(ArrowFields::from(vec![ArrowField::new(
886 "str",
887 DataType::Utf8,
888 field_nullable,
889 )])),
890 true,
891 )]));
892
893 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
894
895 let store = ObjectStore::memory();
896 let path = Path::from("/null_strings");
897
898 let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
899 let struct_arr = Arc::new(StructArray::from(vec![(
900 Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
901 string_arr.clone() as ArrayRef,
902 )]));
903 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
904
905 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
906 &store,
907 &path,
908 schema.clone(),
909 &Default::default(),
910 )
911 .await
912 .unwrap();
913 file_writer.write(&[batch.clone()]).await.unwrap();
914 file_writer.finish().await.unwrap();
915
916 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
917 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
918
919 if field_nullable {
920 assert_eq!(
921 &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
922 as_string_array(
923 as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
924 .column_by_name("str")
925 .unwrap()
926 .as_ref()
927 )
928 );
929 } else {
930 assert_eq!(actual_batch, batch);
931 }
932 }
933
934 #[tokio::test]
935 async fn read_nullable_string_in_struct() {
936 test_write_null_string_in_struct(true).await;
937 test_write_null_string_in_struct(false).await;
938 }
939
940 #[tokio::test]
941 async fn test_read_struct_of_list_arrays() {
942 let store = ObjectStore::memory();
943 let path = Path::from("/null_strings");
944
945 let arrow_schema = make_schema_of_list_array();
946 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
947
948 let batches = (0..3)
949 .map(|_| {
950 let struct_array = make_struct_of_list_array(10, 10);
951 RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
952 })
953 .collect::<Vec<_>>();
954 let batches_ref = batches.iter().collect::<Vec<_>>();
955
956 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
957 &store,
958 &path,
959 schema.clone(),
960 &Default::default(),
961 )
962 .await
963 .unwrap();
964 file_writer.write(&batches).await.unwrap();
965 file_writer.finish().await.unwrap();
966
967 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
968 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
969 let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
970 assert_eq!(expected, actual_batch);
971 }
972
973 #[tokio::test]
974 async fn test_scan_struct_of_list_arrays() {
975 let store = ObjectStore::memory();
976 let path = Path::from("/null_strings");
977
978 let arrow_schema = make_schema_of_list_array();
979 let struct_array = make_struct_of_list_array(3, 10);
980 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
981 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
982
983 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
984 &store,
985 &path,
986 schema.clone(),
987 &Default::default(),
988 )
989 .await
990 .unwrap();
991 file_writer.write(&[batch]).await.unwrap();
992 file_writer.finish().await.unwrap();
993
994 let mut expected_columns: Vec<ArrayRef> = Vec::new();
995 for c in struct_array.columns().iter() {
996 expected_columns.push(c.slice(1, 1));
997 }
998
999 let expected_struct = match arrow_schema.fields[0].data_type() {
1000 DataType::Struct(subfields) => subfields
1001 .iter()
1002 .zip(expected_columns)
1003 .map(|(f, d)| (f.clone(), d))
1004 .collect::<Vec<_>>(),
1005 _ => panic!("unexpected field"),
1006 };
1007
1008 let expected_struct_array = StructArray::from(expected_struct);
1009 let expected_batch = RecordBatch::from(&StructArray::from(vec![(
1010 Arc::new(arrow_schema.fields[0].as_ref().clone()),
1011 Arc::new(expected_struct_array) as ArrayRef,
1012 )]));
1013
1014 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1015 let params = ReadBatchParams::Range(1..2);
1016 let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
1017 assert_eq!(expected_batch, slice_of_batch);
1018 }
1019
1020 fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
1021 Arc::new(ArrowSchema::new(vec![ArrowField::new(
1022 "s",
1023 DataType::Struct(ArrowFields::from(vec![
1024 ArrowField::new(
1025 "li",
1026 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1027 true,
1028 ),
1029 ArrowField::new(
1030 "ls",
1031 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1032 true,
1033 ),
1034 ArrowField::new(
1035 "ll",
1036 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1037 false,
1038 ),
1039 ])),
1040 true,
1041 )]))
1042 }
1043
1044 fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1045 let mut li_builder = ListBuilder::new(Int32Builder::new());
1046 let mut ls_builder = ListBuilder::new(StringBuilder::new());
1047 let ll_value_builder = Int32Builder::new();
1048 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1049 for i in 0..rows {
1050 for j in 0..num_items {
1051 li_builder.values().append_value(i * 10 + j);
1052 ls_builder
1053 .values()
1054 .append_value(format!("str-{}", i * 10 + j));
1055 large_list_builder.values().append_value(i * 10 + j);
1056 }
1057 li_builder.append(true);
1058 ls_builder.append(true);
1059 large_list_builder.append(true);
1060 }
1061 Arc::new(StructArray::from(vec![
1062 (
1063 Arc::new(ArrowField::new(
1064 "li",
1065 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1066 true,
1067 )),
1068 Arc::new(li_builder.finish()) as ArrayRef,
1069 ),
1070 (
1071 Arc::new(ArrowField::new(
1072 "ls",
1073 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1074 true,
1075 )),
1076 Arc::new(ls_builder.finish()) as ArrayRef,
1077 ),
1078 (
1079 Arc::new(ArrowField::new(
1080 "ll",
1081 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1082 false,
1083 )),
1084 Arc::new(large_list_builder.finish()) as ArrayRef,
1085 ),
1086 ]))
1087 }
1088
1089 #[tokio::test]
1090 async fn test_read_nullable_arrays() {
1091 use arrow_array::Array;
1092
1093 let arrow_schema = ArrowSchema::new(vec![
1095 ArrowField::new("i", DataType::Int64, false),
1096 ArrowField::new("n", DataType::Null, true),
1097 ]);
1098 let schema = Schema::try_from(&arrow_schema).unwrap();
1099 let columns: Vec<ArrayRef> = vec![
1100 Arc::new(Int64Array::from_iter_values(0..100)),
1101 Arc::new(NullArray::new(100)),
1102 ];
1103 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1104
1105 let store = ObjectStore::memory();
1107 let path = Path::from("/takes");
1108 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1109 &store,
1110 &path,
1111 schema.clone(),
1112 &Default::default(),
1113 )
1114 .await
1115 .unwrap();
1116 file_writer.write(&[batch]).await.unwrap();
1117 file_writer.finish().await.unwrap();
1118
1119 let reader = FileReader::try_new(&store, &path, schema.clone())
1121 .await
1122 .unwrap();
1123
1124 async fn read_array_w_params(
1125 reader: &FileReader,
1126 field: &Field,
1127 params: ReadBatchParams,
1128 ) -> ArrayRef {
1129 read_array(reader, field, 0, reader.page_table.as_ref(), ¶ms)
1130 .await
1131 .expect("Error reading back the null array from file") as _
1132 }
1133
1134 let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1135 assert_eq!(100, arr.len());
1136 assert_eq!(arr.data_type(), &DataType::Null);
1137
1138 let arr =
1139 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1140 assert_eq!(15, arr.len());
1141 assert_eq!(arr.data_type(), &DataType::Null);
1142
1143 let arr =
1144 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1145 assert_eq!(40, arr.len());
1146 assert_eq!(arr.data_type(), &DataType::Null);
1147
1148 let arr =
1149 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1150 assert_eq!(25, arr.len());
1151 assert_eq!(arr.data_type(), &DataType::Null);
1152
1153 let arr = read_array_w_params(
1154 &reader,
1155 &schema.fields[1],
1156 ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1157 )
1158 .await;
1159 assert_eq!(4, arr.len());
1160 assert_eq!(arr.data_type(), &DataType::Null);
1161
1162 let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1164 let arr = read_array(
1165 &reader,
1166 &schema.fields[1],
1167 0,
1168 reader.page_table.as_ref(),
1169 ¶ms,
1170 );
1171 assert!(arr.await.is_err());
1172
1173 let params = ReadBatchParams::RangeTo(..107);
1175 let arr = read_array(
1176 &reader,
1177 &schema.fields[1],
1178 0,
1179 reader.page_table.as_ref(),
1180 ¶ms,
1181 );
1182 assert!(arr.await.is_err());
1183 }
1184
1185 #[tokio::test]
1186 async fn test_take_lists() {
1187 let arrow_schema = ArrowSchema::new(vec![
1188 ArrowField::new(
1189 "l",
1190 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1191 false,
1192 ),
1193 ArrowField::new(
1194 "ll",
1195 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1196 false,
1197 ),
1198 ]);
1199
1200 let value_builder = Int32Builder::new();
1201 let mut list_builder = ListBuilder::new(value_builder);
1202 let ll_value_builder = Int32Builder::new();
1203 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1204 for i in 0..100 {
1205 list_builder.values().append_value(i);
1206 large_list_builder.values().append_value(i);
1207 if (i + 1) % 10 == 0 {
1208 list_builder.append(true);
1209 large_list_builder.append(true);
1210 }
1211 }
1212 let list_arr = Arc::new(list_builder.finish());
1213 let large_list_arr = Arc::new(large_list_builder.finish());
1214
1215 let batch = RecordBatch::try_new(
1216 Arc::new(arrow_schema.clone()),
1217 vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1218 )
1219 .unwrap();
1220
1221 let store = ObjectStore::memory();
1223 let path = Path::from("/take_list");
1224 let schema: Schema = (&arrow_schema).try_into().unwrap();
1225 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1226 &store,
1227 &path,
1228 schema.clone(),
1229 &Default::default(),
1230 )
1231 .await
1232 .unwrap();
1233 file_writer.write(&[batch]).await.unwrap();
1234 file_writer.finish().await.unwrap();
1235
1236 let reader = FileReader::try_new(&store, &path, schema.clone())
1238 .await
1239 .unwrap();
1240 let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1241
1242 let value_builder = Int32Builder::new();
1243 let mut list_builder = ListBuilder::new(value_builder);
1244 let ll_value_builder = Int32Builder::new();
1245 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1246 for i in [1, 3, 5, 9] {
1247 for j in 0..10 {
1248 list_builder.values().append_value(i * 10 + j);
1249 large_list_builder.values().append_value(i * 10 + j);
1250 }
1251 list_builder.append(true);
1252 large_list_builder.append(true);
1253 }
1254 let expected_list = list_builder.finish();
1255 let expected_large_list = large_list_builder.finish();
1256
1257 assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1258 assert_eq!(
1259 actual.column_by_name("ll").unwrap().as_ref(),
1260 &expected_large_list
1261 );
1262 }
1263
1264 #[tokio::test]
1265 async fn test_list_array_with_offsets() {
1266 let arrow_schema = ArrowSchema::new(vec![
1267 ArrowField::new(
1268 "l",
1269 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1270 false,
1271 ),
1272 ArrowField::new(
1273 "ll",
1274 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1275 false,
1276 ),
1277 ]);
1278
1279 let store = ObjectStore::memory();
1280 let path = Path::from("/lists");
1281
1282 let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1283 Some(vec![Some(1), Some(2)]),
1284 Some(vec![Some(3), Some(4)]),
1285 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1286 ])
1287 .slice(1, 1);
1288 let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1289 Some(vec![Some(10), Some(11)]),
1290 Some(vec![Some(12), Some(13)]),
1291 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1292 ])
1293 .slice(1, 1);
1294
1295 let batch = RecordBatch::try_new(
1296 Arc::new(arrow_schema.clone()),
1297 vec![Arc::new(list_array), Arc::new(large_list_array)],
1298 )
1299 .unwrap();
1300
1301 let schema: Schema = (&arrow_schema).try_into().unwrap();
1302 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1303 &store,
1304 &path,
1305 schema.clone(),
1306 &Default::default(),
1307 )
1308 .await
1309 .unwrap();
1310 file_writer.write(&[batch.clone()]).await.unwrap();
1311 file_writer.finish().await.unwrap();
1312
1313 let file_size_bytes = store.size(&path).await.unwrap();
1315 assert!(file_size_bytes < 1_000);
1316
1317 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1318 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1319 assert_eq!(batch, actual_batch);
1320 }
1321
1322 #[tokio::test]
1323 async fn test_read_ranges() {
1324 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1326 let schema = Schema::try_from(&arrow_schema).unwrap();
1327 let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1328 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1329
1330 let store = ObjectStore::memory();
1332 let path = Path::from("/read_range");
1333 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1334 &store,
1335 &path,
1336 schema.clone(),
1337 &Default::default(),
1338 )
1339 .await
1340 .unwrap();
1341 file_writer.write(&[batch]).await.unwrap();
1342 file_writer.finish().await.unwrap();
1343
1344 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1345 let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1346
1347 assert_eq!(
1348 actual_batch.column_by_name("i").unwrap().as_ref(),
1349 &Int64Array::from_iter_values(7..25)
1350 );
1351 }
1352
1353 #[tokio::test]
1354 async fn test_batches_stream() {
1355 let store = ObjectStore::memory();
1356 let path = Path::from("/batch_stream");
1357
1358 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1359 let schema = Schema::try_from(&arrow_schema).unwrap();
1360 let mut writer = FileWriter::<NotSelfDescribing>::try_new(
1361 &store,
1362 &path,
1363 schema.clone(),
1364 &Default::default(),
1365 )
1366 .await
1367 .unwrap();
1368 for i in 0..10 {
1369 let batch = RecordBatch::try_new(
1370 Arc::new(arrow_schema.clone()),
1371 vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1372 )
1373 .unwrap();
1374 writer.write(&[batch]).await.unwrap();
1375 }
1376 writer.finish().await.unwrap();
1377
1378 let reader = FileReader::try_new(&store, &path, schema.clone())
1379 .await
1380 .unwrap();
1381 let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1382 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1383
1384 assert_eq!(batches.len(), 5);
1385 for (i, batch) in batches.iter().enumerate() {
1386 assert_eq!(
1387 batch,
1388 &RecordBatch::try_new(
1389 Arc::new(arrow_schema.clone()),
1390 vec![Arc::new(Int32Array::from_iter_values(
1391 i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1392 ))],
1393 )
1394 .unwrap()
1395 )
1396 }
1397 }
1398
1399 #[tokio::test]
1400 async fn test_take_boolean_beyond_chunk() {
1401 let store = ObjectStore::from_uri_and_params(
1402 Arc::new(Default::default()),
1403 "memory://",
1404 &ObjectStoreParams {
1405 block_size: Some(256),
1406 ..Default::default()
1407 },
1408 )
1409 .await
1410 .unwrap()
1411 .0;
1412 let path = Path::from("/take_bools");
1413
1414 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1415 "b",
1416 DataType::Boolean,
1417 false,
1418 )]));
1419 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1420 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1421 &store,
1422 &path,
1423 schema.clone(),
1424 &Default::default(),
1425 )
1426 .await
1427 .unwrap();
1428
1429 let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1430 let batch =
1431 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1432 file_writer.write(&[batch]).await.unwrap();
1433 file_writer.finish().await.unwrap();
1434
1435 let reader = FileReader::try_new(&store, &path, schema.clone())
1436 .await
1437 .unwrap();
1438 let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1439
1440 assert_eq!(
1441 actual.column_by_name("b").unwrap().as_ref(),
1442 &BooleanArray::from(vec![false, false, true, false, true])
1443 );
1444 }
1445
1446 #[tokio::test]
1447 async fn test_read_projection() {
1448 let store = ObjectStore::memory();
1452 let path = Path::from("/partial_read");
1453
1454 let mut fields = vec![];
1456 for i in 0..100 {
1457 fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1458 }
1459 let arrow_schema = ArrowSchema::new(fields);
1460 let schema = Schema::try_from(&arrow_schema).unwrap();
1461
1462 let partial_schema = schema.project(&["f50"]).unwrap();
1463 let partial_arrow: ArrowSchema = (&partial_schema).into();
1464
1465 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1466 &store,
1467 &path,
1468 partial_schema.clone(),
1469 &Default::default(),
1470 )
1471 .await
1472 .unwrap();
1473
1474 let array = Int32Array::from(vec![0; 15]);
1475 let batch =
1476 RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1477 file_writer.write(&[batch.clone()]).await.unwrap();
1478 file_writer.finish().await.unwrap();
1479
1480 let field_id = partial_schema.fields.first().unwrap().id;
1481 let reader = FileReader::try_new_with_fragment_id(
1482 &store,
1483 &path,
1484 schema.clone(),
1485 0,
1486 field_id,
1487 field_id,
1488 None,
1489 )
1490 .await
1491 .unwrap();
1492 let actual = reader
1493 .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1494 .await
1495 .unwrap();
1496
1497 assert_eq!(actual, batch);
1498 }
1499}