lance_file/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance Data File Reader
5
6// Standard
7use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12    builder::PrimitiveBuilder,
13    cast::AsArray,
14    types::{Int32Type, Int64Type},
15    ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
16    RecordBatch, StructArray, UInt32Array,
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
24use lance_arrow::*;
25use lance_core::cache::{CacheKey, LanceCache};
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::dictionary::DictionaryDecoder;
29use lance_io::encodings::AsyncIndex;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33    read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{object_store::ObjectStore, ReadBatchParams};
36use std::borrow::Cow;
37
38use object_store::path::Path;
39use snafu::location;
40use tracing::instrument;
41
42use crate::format::metadata::Metadata;
43use crate::page_table::{PageInfo, PageTable};
44
45/// Lance File Reader.
46///
47/// It reads arrow data from one data file.
48#[derive(Clone, DeepSizeOf)]
49pub struct FileReader {
50    pub object_reader: Arc<dyn Reader>,
51    metadata: Arc<Metadata>,
52    page_table: Arc<PageTable>,
53    schema: Schema,
54
55    /// The id of the fragment which this file belong to.
56    /// For simple file access, this can just be zero.
57    fragment_id: u64,
58
59    /// Page table for statistics
60    stats_page_table: Arc<Option<PageTable>>,
61}
62
63impl std::fmt::Debug for FileReader {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("FileReader")
66            .field("fragment", &self.fragment_id)
67            .field("path", &self.object_reader.path())
68            .finish()
69    }
70}
71
72// Generic cache key for string-based keys
73struct StringCacheKey<'a, T> {
74    key: &'a str,
75    _phantom: std::marker::PhantomData<T>,
76}
77
78impl<'a, T> StringCacheKey<'a, T> {
79    fn new(key: &'a str) -> Self {
80        Self {
81            key,
82            _phantom: std::marker::PhantomData,
83        }
84    }
85}
86
87impl<T> CacheKey for StringCacheKey<'_, T> {
88    type ValueType = T;
89
90    fn key(&self) -> Cow<'_, str> {
91        self.key.into()
92    }
93}
94
95impl FileReader {
96    /// Open file reader
97    ///
98    /// Open the file at the given path using the provided object store.
99    ///
100    /// The passed fragment ID determines the first 32-bits of the row IDs.
101    ///
102    /// If a manifest is passed in, it will be used to load the schema and dictionary.
103    /// This is typically done if the file is part of a dataset fragment. If no manifest
104    /// is passed in, then it is read from the file itself.
105    ///
106    /// The session passed in is used to cache metadata about the file. If no session
107    /// is passed in, there will be no caching.
108    #[instrument(level = "debug", skip(object_store, schema, session))]
109    pub async fn try_new_with_fragment_id(
110        object_store: &ObjectStore,
111        path: &Path,
112        schema: Schema,
113        fragment_id: u32,
114        field_id_offset: i32,
115        max_field_id: i32,
116        session: Option<&LanceCache>,
117    ) -> Result<Self> {
118        let object_reader = object_store.open(path).await?;
119
120        let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
121
122        Self::try_new_from_reader(
123            path,
124            object_reader.into(),
125            Some(metadata),
126            schema,
127            fragment_id,
128            field_id_offset,
129            max_field_id,
130            session,
131        )
132        .await
133    }
134
135    #[allow(clippy::too_many_arguments)]
136    pub async fn try_new_from_reader(
137        path: &Path,
138        object_reader: Arc<dyn Reader>,
139        metadata: Option<Arc<Metadata>>,
140        schema: Schema,
141        fragment_id: u32,
142        field_id_offset: i32,
143        max_field_id: i32,
144        session: Option<&LanceCache>,
145    ) -> Result<Self> {
146        let metadata = match metadata {
147            Some(metadata) => metadata,
148            None => Self::read_metadata(object_reader.as_ref(), session).await?,
149        };
150
151        let page_table = async {
152            Self::load_from_cache(session, path.to_string(), |_| async {
153                PageTable::load(
154                    object_reader.as_ref(),
155                    metadata.page_table_position,
156                    field_id_offset,
157                    max_field_id,
158                    metadata.num_batches() as i32,
159                )
160                .await
161            })
162            .await
163        };
164
165        let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
166
167        // Can concurrently load page tables
168        let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
169
170        Ok(Self {
171            object_reader,
172            metadata,
173            schema,
174            page_table,
175            fragment_id: fragment_id as u64,
176            stats_page_table,
177        })
178    }
179
180    pub async fn read_metadata(
181        object_reader: &dyn Reader,
182        cache: Option<&LanceCache>,
183    ) -> Result<Arc<Metadata>> {
184        Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
185            let file_size = object_reader.size().await?;
186            let begin = if file_size < object_reader.block_size() {
187                0
188            } else {
189                file_size - object_reader.block_size()
190            };
191            let tail_bytes = object_reader.get_range(begin..file_size).await?;
192            let metadata_pos = read_metadata_offset(&tail_bytes)?;
193
194            let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
195                // We have not read the metadata bytes yet.
196                read_struct(object_reader, metadata_pos).await?
197            } else {
198                let offset = tail_bytes.len() - (file_size - metadata_pos);
199                read_struct_from_buf(&tail_bytes.slice(offset..))?
200            };
201            Ok(metadata)
202        })
203        .await
204    }
205
206    /// Get the statistics page table. This will read the metadata if it is not cached.
207    ///
208    /// The page table is cached.
209    async fn read_stats_page_table(
210        reader: &dyn Reader,
211        cache: Option<&LanceCache>,
212    ) -> Result<Arc<Option<PageTable>>> {
213        // To prevent collisions, we cache this at a child path
214        Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async {
215            let metadata = Self::read_metadata(reader, cache).await?;
216
217            if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
218                Ok(Some(
219                    PageTable::load(
220                        reader,
221                        stats_meta.page_table_position,
222                        /*min_field_id=*/ 0,
223                        /*max_field_id=*/ *stats_meta.leaf_field_ids.iter().max().unwrap(),
224                        /*num_batches=*/ 1,
225                    )
226                    .await?,
227                ))
228            } else {
229                Ok(None)
230            }
231        })
232        .await
233    }
234
235    /// Load some metadata about the fragment from the cache, if there is one.
236    async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
237        cache: Option<&LanceCache>,
238        key: String,
239        loader: F,
240    ) -> Result<Arc<T>>
241    where
242        F: Fn(&str) -> Fut,
243        Fut: Future<Output = Result<T>> + Send,
244    {
245        if let Some(cache) = cache {
246            let cache_key = StringCacheKey::<T>::new(key.as_str());
247            cache
248                .get_or_insert_with_key(cache_key, || loader(key.as_str()))
249                .await
250        } else {
251            Ok(Arc::new(loader(key.as_str()).await?))
252        }
253    }
254
255    /// Open one Lance data file for read.
256    pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
257        // If just reading a lance data file we assume the schema is the schema of the data file
258        let max_field_id = schema.max_field_id().unwrap_or_default();
259        Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
260    }
261
262    fn io_parallelism(&self) -> usize {
263        self.object_reader.io_parallelism()
264    }
265
266    /// Requested projection of the data in this file, excluding the row id column.
267    pub fn schema(&self) -> &Schema {
268        &self.schema
269    }
270
271    pub fn num_batches(&self) -> usize {
272        self.metadata.num_batches()
273    }
274
275    /// Get the number of rows in this batch
276    pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
277        self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
278    }
279
280    /// Count the number of rows in this file.
281    pub fn len(&self) -> usize {
282        self.metadata.len()
283    }
284
285    pub fn is_empty(&self) -> bool {
286        self.metadata.is_empty()
287    }
288
289    /// Read a batch of data from the file.
290    ///
291    /// The schema of the returned [RecordBatch] is set by [`FileReader::schema()`].
292    #[instrument(level = "debug", skip(self, params, projection))]
293    pub async fn read_batch(
294        &self,
295        batch_id: i32,
296        params: impl Into<ReadBatchParams>,
297        projection: &Schema,
298    ) -> Result<RecordBatch> {
299        read_batch(self, &params.into(), projection, batch_id).await
300    }
301
302    /// Read a range of records into one batch.
303    ///
304    /// Note that it might call concat if the range is crossing multiple batches, which
305    /// makes it less efficient than [`FileReader::read_batch()`].
306    #[instrument(level = "debug", skip(self, projection))]
307    pub async fn read_range(
308        &self,
309        range: Range<usize>,
310        projection: &Schema,
311    ) -> Result<RecordBatch> {
312        if range.is_empty() {
313            return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
314        }
315        let range_in_batches = self.metadata.range_to_batches(range)?;
316        let batches =
317            stream::iter(range_in_batches)
318                .map(|(batch_id, range)| async move {
319                    self.read_batch(batch_id, range, projection).await
320                })
321                .buffered(self.io_parallelism())
322                .try_collect::<Vec<_>>()
323                .await?;
324        if batches.len() == 1 {
325            return Ok(batches[0].clone());
326        }
327        let schema = batches[0].schema();
328        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
329    }
330
331    /// Take by records by indices within the file.
332    ///
333    /// The indices must be sorted.
334    #[instrument(level = "debug", skip_all)]
335    pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
336        let num_batches = self.num_batches();
337        let num_rows = self.len() as u32;
338        let indices_in_batches = self.metadata.group_indices_to_batches(indices);
339        let batches = stream::iter(indices_in_batches)
340            .map(|batch| async move {
341                if batch.batch_id >= num_batches as i32 {
342                    Err(Error::InvalidInput {
343                        source: format!("batch_id: {} out of bounds", batch.batch_id).into(),
344                        location: location!(),
345                    })
346                } else if *batch.offsets.last().expect("got empty batch") > num_rows {
347                    Err(Error::InvalidInput {
348                        source: format!("indices: {:?} out of bounds", batch.offsets).into(),
349                        location: location!(),
350                    })
351                } else {
352                    self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
353                        .await
354                }
355            })
356            .buffered(self.io_parallelism())
357            .try_collect::<Vec<_>>()
358            .await?;
359
360        let schema = Arc::new(ArrowSchema::from(projection));
361
362        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
363    }
364
365    /// Get the schema of the statistics page table, for the given data field ids.
366    pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
367        self.metadata.stats_metadata.as_ref().map(|meta| {
368            let mut stats_field_ids = vec![];
369            for stats_field in &meta.schema.fields {
370                if let Ok(stats_field_id) = stats_field.name.parse::<i32>() {
371                    if field_ids.contains(&stats_field_id) {
372                        stats_field_ids.push(stats_field.id);
373                        for child in &stats_field.children {
374                            stats_field_ids.push(child.id);
375                        }
376                    }
377                }
378            }
379            meta.schema.project_by_ids(&stats_field_ids, true)
380        })
381    }
382
383    /// Get the page statistics for the given data field ids.
384    pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
385        if let Some(stats_page_table) = self.stats_page_table.as_ref() {
386            let projection = self.page_stats_schema(field_ids).unwrap();
387            // It's possible none of the requested fields have stats.
388            if projection.fields.is_empty() {
389                return Ok(None);
390            }
391            let arrays = futures::stream::iter(projection.fields.iter().cloned())
392                .map(|field| async move {
393                    read_array(
394                        self,
395                        &field,
396                        0,
397                        stats_page_table,
398                        &ReadBatchParams::RangeFull,
399                    )
400                    .await
401                })
402                .buffered(self.io_parallelism())
403                .try_collect::<Vec<_>>()
404                .await?;
405
406            let schema = ArrowSchema::from(&projection);
407            let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
408            Ok(Some(batch))
409        } else {
410            Ok(None)
411        }
412    }
413}
414
415/// Stream desired full batches from the file.
416///
417/// Parameters:
418/// - **reader**: An opened file reader.
419/// - **projection**: The schema of the returning [RecordBatch].
420/// - **predicate**: A function that takes a batch ID and returns true if the batch should be
421///   returned.
422///
423/// Returns:
424/// - A stream of [RecordBatch]s, each one corresponding to one full batch in the file.
425pub fn batches_stream(
426    reader: FileReader,
427    projection: Schema,
428    predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
429) -> impl RecordBatchStream {
430    // Make projection an Arc so we can clone it and pass between threads.
431    let projection = Arc::new(projection);
432    let arrow_schema = ArrowSchema::from(projection.as_ref());
433
434    let total_batches = reader.num_batches() as i32;
435    let batches = (0..total_batches).filter(predicate);
436    // Make another copy of self so we can clone it and pass between threads.
437    let this = Arc::new(reader);
438    let inner = stream::iter(batches)
439        .zip(stream::repeat_with(move || {
440            (this.clone(), projection.clone())
441        }))
442        .map(move |(batch_id, (reader, projection))| async move {
443            reader
444                .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
445                .await
446        })
447        .buffered(2)
448        .boxed();
449    RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
450}
451
452/// Read a batch.
453///
454/// `schema` may only be empty if `with_row_id` is also true. This function
455/// panics otherwise.
456pub async fn read_batch(
457    reader: &FileReader,
458    params: &ReadBatchParams,
459    schema: &Schema,
460    batch_id: i32,
461) -> Result<RecordBatch> {
462    if !schema.fields.is_empty() {
463        // We box this because otherwise we get a higher-order lifetime error.
464        let arrs = stream::iter(&schema.fields)
465            .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
466            .buffered(reader.io_parallelism())
467            .try_collect::<Vec<_>>()
468            .boxed();
469        let arrs = arrs.await?;
470        Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
471    } else {
472        Err(Error::invalid_input("no fields requested", location!()))
473    }
474}
475
476#[async_recursion]
477async fn read_array(
478    reader: &FileReader,
479    field: &Field,
480    batch_id: i32,
481    page_table: &PageTable,
482    params: &ReadBatchParams,
483) -> Result<ArrayRef> {
484    let data_type = field.data_type();
485
486    use DataType::*;
487
488    if data_type.is_fixed_stride() {
489        _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
490    } else {
491        match data_type {
492            Null => read_null_array(field, batch_id, page_table, params),
493            Utf8 | LargeUtf8 | Binary | LargeBinary => {
494                read_binary_array(reader, field, batch_id, page_table, params).await
495            }
496            Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
497            Dictionary(_, _) => {
498                read_dictionary_array(reader, field, batch_id, page_table, params).await
499            }
500            List(_) => {
501                read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
502            }
503            LargeList(_) => {
504                read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
505            }
506            _ => {
507                unimplemented!("{}", format!("No support for {data_type} yet"));
508            }
509        }
510    }
511}
512
513fn get_page_info<'a>(
514    page_table: &'a PageTable,
515    field: &'a Field,
516    batch_id: i32,
517) -> Result<&'a PageInfo> {
518    page_table.get(field.id, batch_id).ok_or_else(|| {
519        Error::io(
520            format!(
521                "No page info found for field: {}, field_id={} batch={}",
522                field.name, field.id, batch_id
523            ),
524            location!(),
525        )
526    })
527}
528
529/// Read primitive array for batch `batch_idx`.
530async fn _read_fixed_stride_array(
531    reader: &FileReader,
532    field: &Field,
533    batch_id: i32,
534    page_table: &PageTable,
535    params: &ReadBatchParams,
536) -> Result<ArrayRef> {
537    let page_info = get_page_info(page_table, field, batch_id)?;
538    read_fixed_stride_array(
539        reader.object_reader.as_ref(),
540        &field.data_type(),
541        page_info.position,
542        page_info.length,
543        params.clone(),
544    )
545    .await
546}
547
548fn read_null_array(
549    field: &Field,
550    batch_id: i32,
551    page_table: &PageTable,
552    params: &ReadBatchParams,
553) -> Result<ArrayRef> {
554    let page_info = get_page_info(page_table, field, batch_id)?;
555
556    let length_output = match params {
557        ReadBatchParams::Indices(indices) => {
558            if indices.is_empty() {
559                0
560            } else {
561                let idx_max = *indices.values().iter().max().unwrap() as u64;
562                if idx_max >= page_info.length as u64 {
563                    return Err(Error::io(
564                        format!(
565                            "NullArray Reader: request([{}]) out of range: [0..{}]",
566                            idx_max, page_info.length
567                        ),
568                        location!(),
569                    ));
570                }
571                indices.len()
572            }
573        }
574        _ => {
575            let (idx_start, idx_end) = match params {
576                ReadBatchParams::Range(r) => (r.start, r.end),
577                ReadBatchParams::RangeFull => (0, page_info.length),
578                ReadBatchParams::RangeTo(r) => (0, r.end),
579                ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
580                _ => unreachable!(),
581            };
582            if idx_end > page_info.length {
583                return Err(Error::io(
584                    format!(
585                        "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
586                        // and wrap it in here.
587                        idx_start,
588                        idx_end,
589                        page_info.length
590                    ),
591                    location!(),
592                ));
593            }
594            idx_end - idx_start
595        }
596    };
597
598    Ok(Arc::new(NullArray::new(length_output)))
599}
600
601async fn read_binary_array(
602    reader: &FileReader,
603    field: &Field,
604    batch_id: i32,
605    page_table: &PageTable,
606    params: &ReadBatchParams,
607) -> Result<ArrayRef> {
608    let page_info = get_page_info(page_table, field, batch_id)?;
609
610    lance_io::utils::read_binary_array(
611        reader.object_reader.as_ref(),
612        &field.data_type(),
613        field.nullable,
614        page_info.position,
615        page_info.length,
616        params,
617    )
618    .await
619}
620
621async fn read_dictionary_array(
622    reader: &FileReader,
623    field: &Field,
624    batch_id: i32,
625    page_table: &PageTable,
626    params: &ReadBatchParams,
627) -> Result<ArrayRef> {
628    let page_info = get_page_info(page_table, field, batch_id)?;
629    let data_type = field.data_type();
630    let decoder = DictionaryDecoder::new(
631        reader.object_reader.as_ref(),
632        page_info.position,
633        page_info.length,
634        &data_type,
635        field
636            .dictionary
637            .as_ref()
638            .unwrap()
639            .values
640            .as_ref()
641            .unwrap()
642            .clone(),
643    );
644    decoder.get(params.clone()).await
645}
646
647async fn read_struct_array(
648    reader: &FileReader,
649    field: &Field,
650    batch_id: i32,
651    page_table: &PageTable,
652    params: &ReadBatchParams,
653) -> Result<ArrayRef> {
654    // TODO: use tokio to make the reads in parallel.
655    let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
656
657    for child in field.children.as_slice() {
658        let arr = read_array(reader, child, batch_id, page_table, params).await?;
659        sub_arrays.push((Arc::new(child.into()), arr));
660    }
661
662    Ok(Arc::new(StructArray::from(sub_arrays)))
663}
664
665async fn take_list_array<T: ArrowNumericType>(
666    reader: &FileReader,
667    field: &Field,
668    batch_id: i32,
669    page_table: &PageTable,
670    positions: &PrimitiveArray<T>,
671    indices: &UInt32Array,
672) -> Result<ArrayRef>
673where
674    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
675{
676    let first_idx = indices.value(0);
677    // Range of values for each index
678    let ranges = indices
679        .values()
680        .iter()
681        .map(|i| (*i - first_idx).as_usize())
682        .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
683        .collect::<Vec<_>>();
684    let field = field.clone();
685    let mut list_values: Vec<ArrayRef> = vec![];
686    // TODO: read them in parallel.
687    for range in ranges.iter() {
688        list_values.push(
689            read_array(
690                reader,
691                &field.children[0],
692                batch_id,
693                page_table,
694                &(range.clone()).into(),
695            )
696            .await?,
697        );
698    }
699
700    let value_refs = list_values
701        .iter()
702        .map(|arr| arr.as_ref())
703        .collect::<Vec<_>>();
704    let mut offsets_builder = PrimitiveBuilder::<T>::new();
705    offsets_builder.append_value(T::Native::usize_as(0));
706    let mut off = 0_usize;
707    for range in ranges {
708        off += range.len();
709        offsets_builder.append_value(T::Native::usize_as(off));
710    }
711    let all_values = concat::concat(value_refs.as_slice())?;
712    let offset_arr = offsets_builder.finish();
713    let arr = try_new_generic_list_array(all_values, &offset_arr)?;
714    Ok(Arc::new(arr) as ArrayRef)
715}
716
717async fn read_list_array<T: ArrowNumericType>(
718    reader: &FileReader,
719    field: &Field,
720    batch_id: i32,
721    page_table: &PageTable,
722    params: &ReadBatchParams,
723) -> Result<ArrayRef>
724where
725    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
726{
727    // Offset the position array by 1 in order to include the upper bound of the last element
728    let positions_params = match params {
729        ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
730        ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
731        ReadBatchParams::Indices(indices) => {
732            (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
733        }
734        p => p.clone(),
735    };
736
737    let page_info = get_page_info(&reader.page_table, field, batch_id)?;
738    let position_arr = read_fixed_stride_array(
739        reader.object_reader.as_ref(),
740        &T::DATA_TYPE,
741        page_info.position,
742        page_info.length,
743        positions_params,
744    )
745    .await?;
746
747    let positions: &PrimitiveArray<T> = position_arr.as_primitive();
748
749    // Recompute params so they align with the offset array
750    let value_params = match params {
751        ReadBatchParams::Range(range) => ReadBatchParams::from(
752            positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
753        ),
754        ReadBatchParams::Ranges(_) => {
755            return Err(Error::Internal {
756                message: "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
757                location: location!(),
758            })
759        }
760        ReadBatchParams::RangeTo(RangeTo { end }) => {
761            ReadBatchParams::from(..positions.value(*end).as_usize())
762        }
763        ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
764        ReadBatchParams::RangeFull => ReadBatchParams::from(
765            positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
766        ),
767        ReadBatchParams::Indices(indices) => {
768            return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
769        }
770    };
771
772    let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
773    let offset_arr = sub(positions, &start_position)?;
774    let offset_arr_ref = offset_arr.as_primitive::<T>();
775    let value_arrs = read_array(
776        reader,
777        &field.children[0],
778        batch_id,
779        page_table,
780        &value_params,
781    )
782    .await?;
783    let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
784    Ok(Arc::new(arr) as ArrayRef)
785}
786
787#[cfg(test)]
788mod tests {
789    use crate::writer::{FileWriter, NotSelfDescribing};
790
791    use super::*;
792
793    use arrow_array::{
794        builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
795        cast::{as_string_array, as_struct_array},
796        types::UInt8Type,
797        Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
798        UInt8Array,
799    };
800    use arrow_array::{BooleanArray, Int32Array};
801    use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
802    use lance_io::object_store::ObjectStoreParams;
803
804    #[tokio::test]
805    async fn test_take() {
806        let arrow_schema = ArrowSchema::new(vec![
807            ArrowField::new("i", DataType::Int64, true),
808            ArrowField::new("f", DataType::Float32, false),
809            ArrowField::new("s", DataType::Utf8, false),
810            ArrowField::new(
811                "d",
812                DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
813                false,
814            ),
815        ]);
816        let mut schema = Schema::try_from(&arrow_schema).unwrap();
817
818        let store = ObjectStore::memory();
819        let path = Path::from("/take_test");
820
821        // Write 10 batches.
822        let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
823        let values_ref = Arc::new(values);
824        let mut batches = vec![];
825        for batch_id in 0..10 {
826            let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
827            let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
828            let columns: Vec<ArrayRef> = vec![
829                Arc::new(Int64Array::from_iter(
830                    value_range.clone().collect::<Vec<_>>(),
831                )),
832                Arc::new(Float32Array::from_iter(
833                    value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
834                )),
835                Arc::new(StringArray::from_iter_values(
836                    value_range.clone().map(|n| format!("str-{}", n)),
837                )),
838                Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
839            ];
840            batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
841        }
842        schema.set_dictionary(&batches[0]).unwrap();
843
844        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
845            &store,
846            &path,
847            schema.clone(),
848            &Default::default(),
849        )
850        .await
851        .unwrap();
852        for batch in batches.iter() {
853            file_writer
854                .write(std::slice::from_ref(batch))
855                .await
856                .unwrap();
857        }
858        file_writer.finish().await.unwrap();
859
860        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
861        let batch = reader
862            .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
863            .await
864            .unwrap();
865        let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
866        assert_eq!(
867            batch,
868            RecordBatch::try_new(
869                batch.schema(),
870                vec![
871                    Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
872                    Arc::new(Float32Array::from_iter_values([
873                        1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
874                    ])),
875                    Arc::new(StringArray::from_iter_values([
876                        "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
877                    ])),
878                    Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
879                ]
880            )
881            .unwrap()
882        );
883    }
884
885    async fn test_write_null_string_in_struct(field_nullable: bool) {
886        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
887            "parent",
888            DataType::Struct(ArrowFields::from(vec![ArrowField::new(
889                "str",
890                DataType::Utf8,
891                field_nullable,
892            )])),
893            true,
894        )]));
895
896        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
897
898        let store = ObjectStore::memory();
899        let path = Path::from("/null_strings");
900
901        let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
902        let struct_arr = Arc::new(StructArray::from(vec![(
903            Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
904            string_arr.clone() as ArrayRef,
905        )]));
906        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
907
908        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
909            &store,
910            &path,
911            schema.clone(),
912            &Default::default(),
913        )
914        .await
915        .unwrap();
916        file_writer
917            .write(std::slice::from_ref(&batch))
918            .await
919            .unwrap();
920        file_writer.finish().await.unwrap();
921
922        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
923        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
924
925        if field_nullable {
926            assert_eq!(
927                &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
928                as_string_array(
929                    as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
930                        .column_by_name("str")
931                        .unwrap()
932                        .as_ref()
933                )
934            );
935        } else {
936            assert_eq!(actual_batch, batch);
937        }
938    }
939
940    #[tokio::test]
941    async fn read_nullable_string_in_struct() {
942        test_write_null_string_in_struct(true).await;
943        test_write_null_string_in_struct(false).await;
944    }
945
946    #[tokio::test]
947    async fn test_read_struct_of_list_arrays() {
948        let store = ObjectStore::memory();
949        let path = Path::from("/null_strings");
950
951        let arrow_schema = make_schema_of_list_array();
952        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
953
954        let batches = (0..3)
955            .map(|_| {
956                let struct_array = make_struct_of_list_array(10, 10);
957                RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
958            })
959            .collect::<Vec<_>>();
960        let batches_ref = batches.iter().collect::<Vec<_>>();
961
962        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
963            &store,
964            &path,
965            schema.clone(),
966            &Default::default(),
967        )
968        .await
969        .unwrap();
970        file_writer.write(&batches).await.unwrap();
971        file_writer.finish().await.unwrap();
972
973        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
974        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
975        let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
976        assert_eq!(expected, actual_batch);
977    }
978
979    #[tokio::test]
980    async fn test_scan_struct_of_list_arrays() {
981        let store = ObjectStore::memory();
982        let path = Path::from("/null_strings");
983
984        let arrow_schema = make_schema_of_list_array();
985        let struct_array = make_struct_of_list_array(3, 10);
986        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
987        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
988
989        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
990            &store,
991            &path,
992            schema.clone(),
993            &Default::default(),
994        )
995        .await
996        .unwrap();
997        file_writer.write(&[batch]).await.unwrap();
998        file_writer.finish().await.unwrap();
999
1000        let mut expected_columns: Vec<ArrayRef> = Vec::new();
1001        for c in struct_array.columns().iter() {
1002            expected_columns.push(c.slice(1, 1));
1003        }
1004
1005        let expected_struct = match arrow_schema.fields[0].data_type() {
1006            DataType::Struct(subfields) => subfields
1007                .iter()
1008                .zip(expected_columns)
1009                .map(|(f, d)| (f.clone(), d))
1010                .collect::<Vec<_>>(),
1011            _ => panic!("unexpected field"),
1012        };
1013
1014        let expected_struct_array = StructArray::from(expected_struct);
1015        let expected_batch = RecordBatch::from(&StructArray::from(vec![(
1016            Arc::new(arrow_schema.fields[0].as_ref().clone()),
1017            Arc::new(expected_struct_array) as ArrayRef,
1018        )]));
1019
1020        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1021        let params = ReadBatchParams::Range(1..2);
1022        let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
1023        assert_eq!(expected_batch, slice_of_batch);
1024    }
1025
1026    fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
1027        Arc::new(ArrowSchema::new(vec![ArrowField::new(
1028            "s",
1029            DataType::Struct(ArrowFields::from(vec![
1030                ArrowField::new(
1031                    "li",
1032                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1033                    true,
1034                ),
1035                ArrowField::new(
1036                    "ls",
1037                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1038                    true,
1039                ),
1040                ArrowField::new(
1041                    "ll",
1042                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1043                    false,
1044                ),
1045            ])),
1046            true,
1047        )]))
1048    }
1049
1050    fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1051        let mut li_builder = ListBuilder::new(Int32Builder::new());
1052        let mut ls_builder = ListBuilder::new(StringBuilder::new());
1053        let ll_value_builder = Int32Builder::new();
1054        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1055        for i in 0..rows {
1056            for j in 0..num_items {
1057                li_builder.values().append_value(i * 10 + j);
1058                ls_builder
1059                    .values()
1060                    .append_value(format!("str-{}", i * 10 + j));
1061                large_list_builder.values().append_value(i * 10 + j);
1062            }
1063            li_builder.append(true);
1064            ls_builder.append(true);
1065            large_list_builder.append(true);
1066        }
1067        Arc::new(StructArray::from(vec![
1068            (
1069                Arc::new(ArrowField::new(
1070                    "li",
1071                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1072                    true,
1073                )),
1074                Arc::new(li_builder.finish()) as ArrayRef,
1075            ),
1076            (
1077                Arc::new(ArrowField::new(
1078                    "ls",
1079                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1080                    true,
1081                )),
1082                Arc::new(ls_builder.finish()) as ArrayRef,
1083            ),
1084            (
1085                Arc::new(ArrowField::new(
1086                    "ll",
1087                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1088                    false,
1089                )),
1090                Arc::new(large_list_builder.finish()) as ArrayRef,
1091            ),
1092        ]))
1093    }
1094
1095    #[tokio::test]
1096    async fn test_read_nullable_arrays() {
1097        use arrow_array::Array;
1098
1099        // create a record batch with a null array column
1100        let arrow_schema = ArrowSchema::new(vec![
1101            ArrowField::new("i", DataType::Int64, false),
1102            ArrowField::new("n", DataType::Null, true),
1103        ]);
1104        let schema = Schema::try_from(&arrow_schema).unwrap();
1105        let columns: Vec<ArrayRef> = vec![
1106            Arc::new(Int64Array::from_iter_values(0..100)),
1107            Arc::new(NullArray::new(100)),
1108        ];
1109        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1110
1111        // write to a lance file
1112        let store = ObjectStore::memory();
1113        let path = Path::from("/takes");
1114        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1115            &store,
1116            &path,
1117            schema.clone(),
1118            &Default::default(),
1119        )
1120        .await
1121        .unwrap();
1122        file_writer.write(&[batch]).await.unwrap();
1123        file_writer.finish().await.unwrap();
1124
1125        // read the file back
1126        let reader = FileReader::try_new(&store, &path, schema.clone())
1127            .await
1128            .unwrap();
1129
1130        async fn read_array_w_params(
1131            reader: &FileReader,
1132            field: &Field,
1133            params: ReadBatchParams,
1134        ) -> ArrayRef {
1135            read_array(reader, field, 0, reader.page_table.as_ref(), &params)
1136                .await
1137                .expect("Error reading back the null array from file") as _
1138        }
1139
1140        let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1141        assert_eq!(100, arr.len());
1142        assert_eq!(arr.data_type(), &DataType::Null);
1143
1144        let arr =
1145            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1146        assert_eq!(15, arr.len());
1147        assert_eq!(arr.data_type(), &DataType::Null);
1148
1149        let arr =
1150            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1151        assert_eq!(40, arr.len());
1152        assert_eq!(arr.data_type(), &DataType::Null);
1153
1154        let arr =
1155            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1156        assert_eq!(25, arr.len());
1157        assert_eq!(arr.data_type(), &DataType::Null);
1158
1159        let arr = read_array_w_params(
1160            &reader,
1161            &schema.fields[1],
1162            ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1163        )
1164        .await;
1165        assert_eq!(4, arr.len());
1166        assert_eq!(arr.data_type(), &DataType::Null);
1167
1168        // raise error if take indices are out of bounds
1169        let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1170        let arr = read_array(
1171            &reader,
1172            &schema.fields[1],
1173            0,
1174            reader.page_table.as_ref(),
1175            &params,
1176        );
1177        assert!(arr.await.is_err());
1178
1179        // raise error if range indices are out of bounds
1180        let params = ReadBatchParams::RangeTo(..107);
1181        let arr = read_array(
1182            &reader,
1183            &schema.fields[1],
1184            0,
1185            reader.page_table.as_ref(),
1186            &params,
1187        );
1188        assert!(arr.await.is_err());
1189    }
1190
1191    #[tokio::test]
1192    async fn test_take_lists() {
1193        let arrow_schema = ArrowSchema::new(vec![
1194            ArrowField::new(
1195                "l",
1196                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1197                false,
1198            ),
1199            ArrowField::new(
1200                "ll",
1201                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1202                false,
1203            ),
1204        ]);
1205
1206        let value_builder = Int32Builder::new();
1207        let mut list_builder = ListBuilder::new(value_builder);
1208        let ll_value_builder = Int32Builder::new();
1209        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1210        for i in 0..100 {
1211            list_builder.values().append_value(i);
1212            large_list_builder.values().append_value(i);
1213            if (i + 1) % 10 == 0 {
1214                list_builder.append(true);
1215                large_list_builder.append(true);
1216            }
1217        }
1218        let list_arr = Arc::new(list_builder.finish());
1219        let large_list_arr = Arc::new(large_list_builder.finish());
1220
1221        let batch = RecordBatch::try_new(
1222            Arc::new(arrow_schema.clone()),
1223            vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1224        )
1225        .unwrap();
1226
1227        // write to a lance file
1228        let store = ObjectStore::memory();
1229        let path = Path::from("/take_list");
1230        let schema: Schema = (&arrow_schema).try_into().unwrap();
1231        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1232            &store,
1233            &path,
1234            schema.clone(),
1235            &Default::default(),
1236        )
1237        .await
1238        .unwrap();
1239        file_writer.write(&[batch]).await.unwrap();
1240        file_writer.finish().await.unwrap();
1241
1242        // read the file back
1243        let reader = FileReader::try_new(&store, &path, schema.clone())
1244            .await
1245            .unwrap();
1246        let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1247
1248        let value_builder = Int32Builder::new();
1249        let mut list_builder = ListBuilder::new(value_builder);
1250        let ll_value_builder = Int32Builder::new();
1251        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1252        for i in [1, 3, 5, 9] {
1253            for j in 0..10 {
1254                list_builder.values().append_value(i * 10 + j);
1255                large_list_builder.values().append_value(i * 10 + j);
1256            }
1257            list_builder.append(true);
1258            large_list_builder.append(true);
1259        }
1260        let expected_list = list_builder.finish();
1261        let expected_large_list = large_list_builder.finish();
1262
1263        assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1264        assert_eq!(
1265            actual.column_by_name("ll").unwrap().as_ref(),
1266            &expected_large_list
1267        );
1268    }
1269
1270    #[tokio::test]
1271    async fn test_list_array_with_offsets() {
1272        let arrow_schema = ArrowSchema::new(vec![
1273            ArrowField::new(
1274                "l",
1275                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1276                false,
1277            ),
1278            ArrowField::new(
1279                "ll",
1280                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1281                false,
1282            ),
1283        ]);
1284
1285        let store = ObjectStore::memory();
1286        let path = Path::from("/lists");
1287
1288        let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1289            Some(vec![Some(1), Some(2)]),
1290            Some(vec![Some(3), Some(4)]),
1291            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1292        ])
1293        .slice(1, 1);
1294        let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1295            Some(vec![Some(10), Some(11)]),
1296            Some(vec![Some(12), Some(13)]),
1297            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1298        ])
1299        .slice(1, 1);
1300
1301        let batch = RecordBatch::try_new(
1302            Arc::new(arrow_schema.clone()),
1303            vec![Arc::new(list_array), Arc::new(large_list_array)],
1304        )
1305        .unwrap();
1306
1307        let schema: Schema = (&arrow_schema).try_into().unwrap();
1308        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1309            &store,
1310            &path,
1311            schema.clone(),
1312            &Default::default(),
1313        )
1314        .await
1315        .unwrap();
1316        file_writer
1317            .write(std::slice::from_ref(&batch))
1318            .await
1319            .unwrap();
1320        file_writer.finish().await.unwrap();
1321
1322        // Make sure the big array was not written to the file
1323        let file_size_bytes = store.size(&path).await.unwrap();
1324        assert!(file_size_bytes < 1_000);
1325
1326        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1327        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1328        assert_eq!(batch, actual_batch);
1329    }
1330
1331    #[tokio::test]
1332    async fn test_read_ranges() {
1333        // create a record batch with a null array column
1334        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1335        let schema = Schema::try_from(&arrow_schema).unwrap();
1336        let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1337        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1338
1339        // write to a lance file
1340        let store = ObjectStore::memory();
1341        let path = Path::from("/read_range");
1342        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1343            &store,
1344            &path,
1345            schema.clone(),
1346            &Default::default(),
1347        )
1348        .await
1349        .unwrap();
1350        file_writer.write(&[batch]).await.unwrap();
1351        file_writer.finish().await.unwrap();
1352
1353        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1354        let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1355
1356        assert_eq!(
1357            actual_batch.column_by_name("i").unwrap().as_ref(),
1358            &Int64Array::from_iter_values(7..25)
1359        );
1360    }
1361
1362    #[tokio::test]
1363    async fn test_batches_stream() {
1364        let store = ObjectStore::memory();
1365        let path = Path::from("/batch_stream");
1366
1367        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1368        let schema = Schema::try_from(&arrow_schema).unwrap();
1369        let mut writer = FileWriter::<NotSelfDescribing>::try_new(
1370            &store,
1371            &path,
1372            schema.clone(),
1373            &Default::default(),
1374        )
1375        .await
1376        .unwrap();
1377        for i in 0..10 {
1378            let batch = RecordBatch::try_new(
1379                Arc::new(arrow_schema.clone()),
1380                vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1381            )
1382            .unwrap();
1383            writer.write(&[batch]).await.unwrap();
1384        }
1385        writer.finish().await.unwrap();
1386
1387        let reader = FileReader::try_new(&store, &path, schema.clone())
1388            .await
1389            .unwrap();
1390        let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1391        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1392
1393        assert_eq!(batches.len(), 5);
1394        for (i, batch) in batches.iter().enumerate() {
1395            assert_eq!(
1396                batch,
1397                &RecordBatch::try_new(
1398                    Arc::new(arrow_schema.clone()),
1399                    vec![Arc::new(Int32Array::from_iter_values(
1400                        i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1401                    ))],
1402                )
1403                .unwrap()
1404            )
1405        }
1406    }
1407
1408    #[tokio::test]
1409    async fn test_take_boolean_beyond_chunk() {
1410        let store = ObjectStore::from_uri_and_params(
1411            Arc::new(Default::default()),
1412            "memory://",
1413            &ObjectStoreParams {
1414                block_size: Some(256),
1415                ..Default::default()
1416            },
1417        )
1418        .await
1419        .unwrap()
1420        .0;
1421        let path = Path::from("/take_bools");
1422
1423        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1424            "b",
1425            DataType::Boolean,
1426            false,
1427        )]));
1428        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1429        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1430            &store,
1431            &path,
1432            schema.clone(),
1433            &Default::default(),
1434        )
1435        .await
1436        .unwrap();
1437
1438        let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1439        let batch =
1440            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1441        file_writer.write(&[batch]).await.unwrap();
1442        file_writer.finish().await.unwrap();
1443
1444        let reader = FileReader::try_new(&store, &path, schema.clone())
1445            .await
1446            .unwrap();
1447        let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1448
1449        assert_eq!(
1450            actual.column_by_name("b").unwrap().as_ref(),
1451            &BooleanArray::from(vec![false, false, true, false, true])
1452        );
1453    }
1454
1455    #[tokio::test]
1456    async fn test_read_projection() {
1457        // The dataset schema may be very large.  The file reader should support reading
1458        // a small projection of that schema (this just tests the field_offset / num_fields
1459        // parameters)
1460        let store = ObjectStore::memory();
1461        let path = Path::from("/partial_read");
1462
1463        // Create a large schema
1464        let mut fields = vec![];
1465        for i in 0..100 {
1466            fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1467        }
1468        let arrow_schema = ArrowSchema::new(fields);
1469        let schema = Schema::try_from(&arrow_schema).unwrap();
1470
1471        let partial_schema = schema.project(&["f50"]).unwrap();
1472        let partial_arrow: ArrowSchema = (&partial_schema).into();
1473
1474        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1475            &store,
1476            &path,
1477            partial_schema.clone(),
1478            &Default::default(),
1479        )
1480        .await
1481        .unwrap();
1482
1483        let array = Int32Array::from(vec![0; 15]);
1484        let batch =
1485            RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1486        file_writer
1487            .write(std::slice::from_ref(&batch))
1488            .await
1489            .unwrap();
1490        file_writer.finish().await.unwrap();
1491
1492        let field_id = partial_schema.fields.first().unwrap().id;
1493        let reader = FileReader::try_new_with_fragment_id(
1494            &store,
1495            &path,
1496            schema.clone(),
1497            0,
1498            /*min_field_id=*/ field_id,
1499            /*max_field_id=*/ field_id,
1500            None,
1501        )
1502        .await
1503        .unwrap();
1504        let actual = reader
1505            .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1506            .await
1507            .unwrap();
1508
1509        assert_eq!(actual, batch);
1510    }
1511}