lance_file/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance Data File Reader
5
6// Standard
7use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12    builder::PrimitiveBuilder,
13    cast::AsArray,
14    types::{Int32Type, Int64Type},
15    ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
16    RecordBatch, StructArray, UInt32Array,
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
24use lance_arrow::*;
25use lance_core::cache::FileMetadataCache;
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::dictionary::DictionaryDecoder;
29use lance_io::encodings::AsyncIndex;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33    read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{object_store::ObjectStore, ReadBatchParams};
36
37use object_store::path::Path;
38use snafu::location;
39use tracing::instrument;
40
41use crate::format::metadata::Metadata;
42use crate::page_table::{PageInfo, PageTable};
43
44/// Lance File Reader.
45///
46/// It reads arrow data from one data file.
47#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49    pub object_reader: Arc<dyn Reader>,
50    metadata: Arc<Metadata>,
51    page_table: Arc<PageTable>,
52    schema: Schema,
53
54    /// The id of the fragment which this file belong to.
55    /// For simple file access, this can just be zero.
56    fragment_id: u64,
57
58    /// Page table for statistics
59    stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("FileReader")
65            .field("fragment", &self.fragment_id)
66            .field("path", &self.object_reader.path())
67            .finish()
68    }
69}
70
71impl FileReader {
72    /// Open file reader
73    ///
74    /// Open the file at the given path using the provided object store.
75    ///
76    /// The passed fragment ID determines the first 32-bits of the row IDs.
77    ///
78    /// If a manifest is passed in, it will be used to load the schema and dictionary.
79    /// This is typically done if the file is part of a dataset fragment. If no manifest
80    /// is passed in, then it is read from the file itself.
81    ///
82    /// The session passed in is used to cache metadata about the file. If no session
83    /// is passed in, there will be no caching.
84    #[instrument(level = "debug", skip(object_store, schema, session))]
85    pub async fn try_new_with_fragment_id(
86        object_store: &ObjectStore,
87        path: &Path,
88        schema: Schema,
89        fragment_id: u32,
90        field_id_offset: i32,
91        max_field_id: i32,
92        session: Option<&FileMetadataCache>,
93    ) -> Result<Self> {
94        let object_reader = object_store.open(path).await?;
95
96        let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
97
98        Self::try_new_from_reader(
99            path,
100            object_reader.into(),
101            Some(metadata),
102            schema,
103            fragment_id,
104            field_id_offset,
105            max_field_id,
106            session,
107        )
108        .await
109    }
110
111    #[allow(clippy::too_many_arguments)]
112    pub async fn try_new_from_reader(
113        path: &Path,
114        object_reader: Arc<dyn Reader>,
115        metadata: Option<Arc<Metadata>>,
116        schema: Schema,
117        fragment_id: u32,
118        field_id_offset: i32,
119        max_field_id: i32,
120        session: Option<&FileMetadataCache>,
121    ) -> Result<Self> {
122        let metadata = match metadata {
123            Some(metadata) => metadata,
124            None => Self::read_metadata(object_reader.as_ref(), session).await?,
125        };
126
127        let page_table = async {
128            Self::load_from_cache(session, path, |_| async {
129                PageTable::load(
130                    object_reader.as_ref(),
131                    metadata.page_table_position,
132                    field_id_offset,
133                    max_field_id,
134                    metadata.num_batches() as i32,
135                )
136                .await
137            })
138            .await
139        };
140
141        let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
142
143        // Can concurrently load page tables
144        let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
145
146        Ok(Self {
147            object_reader,
148            metadata,
149            schema,
150            page_table,
151            fragment_id: fragment_id as u64,
152            stats_page_table,
153        })
154    }
155
156    pub async fn read_metadata(
157        object_reader: &dyn Reader,
158        cache: Option<&FileMetadataCache>,
159    ) -> Result<Arc<Metadata>> {
160        Self::load_from_cache(cache, object_reader.path(), |_| async {
161            let file_size = object_reader.size().await?;
162            let begin = if file_size < object_reader.block_size() {
163                0
164            } else {
165                file_size - object_reader.block_size()
166            };
167            let tail_bytes = object_reader.get_range(begin..file_size).await?;
168            let metadata_pos = read_metadata_offset(&tail_bytes)?;
169
170            let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
171                // We have not read the metadata bytes yet.
172                read_struct(object_reader, metadata_pos).await?
173            } else {
174                let offset = tail_bytes.len() - (file_size - metadata_pos);
175                read_struct_from_buf(&tail_bytes.slice(offset..))?
176            };
177            Ok(metadata)
178        })
179        .await
180    }
181
182    /// Get the statistics page table. This will read the metadata if it is not cached.
183    ///
184    /// The page table is cached.
185    async fn read_stats_page_table(
186        reader: &dyn Reader,
187        cache: Option<&FileMetadataCache>,
188    ) -> Result<Arc<Option<PageTable>>> {
189        // To prevent collisions, we cache this at a child path
190        Self::load_from_cache(cache, &reader.path().child("stats"), |_| async {
191            let metadata = Self::read_metadata(reader, cache).await?;
192
193            if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
194                Ok(Some(
195                    PageTable::load(
196                        reader,
197                        stats_meta.page_table_position,
198                        /*min_field_id=*/ 0,
199                        /*max_field_id=*/ *stats_meta.leaf_field_ids.iter().max().unwrap(),
200                        /*num_batches=*/ 1,
201                    )
202                    .await?,
203                ))
204            } else {
205                Ok(None)
206            }
207        })
208        .await
209    }
210
211    /// Load some metadata about the fragment from the cache, if there is one.
212    async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
213        cache: Option<&FileMetadataCache>,
214        path: &Path,
215        loader: F,
216    ) -> Result<Arc<T>>
217    where
218        F: Fn(&Path) -> Fut,
219        Fut: Future<Output = Result<T>>,
220    {
221        if let Some(cache) = cache {
222            cache.get_or_insert(path, loader).await
223        } else {
224            Ok(Arc::new(loader(path).await?))
225        }
226    }
227
228    /// Open one Lance data file for read.
229    pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
230        // If just reading a lance data file we assume the schema is the schema of the data file
231        let max_field_id = schema.max_field_id().unwrap_or_default();
232        Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
233    }
234
235    fn io_parallelism(&self) -> usize {
236        self.object_reader.io_parallelism()
237    }
238
239    /// Requested projection of the data in this file, excluding the row id column.
240    pub fn schema(&self) -> &Schema {
241        &self.schema
242    }
243
244    pub fn num_batches(&self) -> usize {
245        self.metadata.num_batches()
246    }
247
248    /// Get the number of rows in this batch
249    pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
250        self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
251    }
252
253    /// Count the number of rows in this file.
254    pub fn len(&self) -> usize {
255        self.metadata.len()
256    }
257
258    pub fn is_empty(&self) -> bool {
259        self.metadata.is_empty()
260    }
261
262    /// Read a batch of data from the file.
263    ///
264    /// The schema of the returned [RecordBatch] is set by [`FileReader::schema()`].
265    #[instrument(level = "debug", skip(self, params, projection))]
266    pub async fn read_batch(
267        &self,
268        batch_id: i32,
269        params: impl Into<ReadBatchParams>,
270        projection: &Schema,
271    ) -> Result<RecordBatch> {
272        read_batch(self, &params.into(), projection, batch_id).await
273    }
274
275    /// Read a range of records into one batch.
276    ///
277    /// Note that it might call concat if the range is crossing multiple batches, which
278    /// makes it less efficient than [`FileReader::read_batch()`].
279    #[instrument(level = "debug", skip(self, projection))]
280    pub async fn read_range(
281        &self,
282        range: Range<usize>,
283        projection: &Schema,
284    ) -> Result<RecordBatch> {
285        if range.is_empty() {
286            return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
287        }
288        let range_in_batches = self.metadata.range_to_batches(range)?;
289        let batches =
290            stream::iter(range_in_batches)
291                .map(|(batch_id, range)| async move {
292                    self.read_batch(batch_id, range, projection).await
293                })
294                .buffered(self.io_parallelism())
295                .try_collect::<Vec<_>>()
296                .await?;
297        if batches.len() == 1 {
298            return Ok(batches[0].clone());
299        }
300        let schema = batches[0].schema();
301        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
302    }
303
304    /// Take by records by indices within the file.
305    ///
306    /// The indices must be sorted.
307    #[instrument(level = "debug", skip_all)]
308    pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
309        let num_batches = self.num_batches();
310        let num_rows = self.len() as u32;
311        let indices_in_batches = self.metadata.group_indices_to_batches(indices);
312        let batches = stream::iter(indices_in_batches)
313            .map(|batch| async move {
314                if batch.batch_id >= num_batches as i32 {
315                    Err(Error::InvalidInput {
316                        source: format!("batch_id: {} out of bounds", batch.batch_id).into(),
317                        location: location!(),
318                    })
319                } else if *batch.offsets.last().expect("got empty batch") > num_rows {
320                    Err(Error::InvalidInput {
321                        source: format!("indices: {:?} out of bounds", batch.offsets).into(),
322                        location: location!(),
323                    })
324                } else {
325                    self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
326                        .await
327                }
328            })
329            .buffered(self.io_parallelism())
330            .try_collect::<Vec<_>>()
331            .await?;
332
333        let schema = Arc::new(ArrowSchema::from(projection));
334
335        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
336    }
337
338    /// Get the schema of the statistics page table, for the given data field ids.
339    pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
340        self.metadata.stats_metadata.as_ref().map(|meta| {
341            let mut stats_field_ids = vec![];
342            for stats_field in &meta.schema.fields {
343                if let Ok(stats_field_id) = stats_field.name.parse::<i32>() {
344                    if field_ids.contains(&stats_field_id) {
345                        stats_field_ids.push(stats_field.id);
346                        for child in &stats_field.children {
347                            stats_field_ids.push(child.id);
348                        }
349                    }
350                }
351            }
352            meta.schema.project_by_ids(&stats_field_ids, true)
353        })
354    }
355
356    /// Get the page statistics for the given data field ids.
357    pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
358        if let Some(stats_page_table) = self.stats_page_table.as_ref() {
359            let projection = self.page_stats_schema(field_ids).unwrap();
360            // It's possible none of the requested fields have stats.
361            if projection.fields.is_empty() {
362                return Ok(None);
363            }
364            let arrays = futures::stream::iter(projection.fields.iter().cloned())
365                .map(|field| async move {
366                    read_array(
367                        self,
368                        &field,
369                        0,
370                        stats_page_table,
371                        &ReadBatchParams::RangeFull,
372                    )
373                    .await
374                })
375                .buffered(self.io_parallelism())
376                .try_collect::<Vec<_>>()
377                .await?;
378
379            let schema = ArrowSchema::from(&projection);
380            let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
381            Ok(Some(batch))
382        } else {
383            Ok(None)
384        }
385    }
386}
387
388/// Stream desired full batches from the file.
389///
390/// Parameters:
391/// - **reader**: An opened file reader.
392/// - **projection**: The schema of the returning [RecordBatch].
393/// - **predicate**: A function that takes a batch ID and returns true if the batch should be
394///   returned.
395///
396/// Returns:
397/// - A stream of [RecordBatch]s, each one corresponding to one full batch in the file.
398pub fn batches_stream(
399    reader: FileReader,
400    projection: Schema,
401    predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
402) -> impl RecordBatchStream {
403    // Make projection an Arc so we can clone it and pass between threads.
404    let projection = Arc::new(projection);
405    let arrow_schema = ArrowSchema::from(projection.as_ref());
406
407    let total_batches = reader.num_batches() as i32;
408    let batches = (0..total_batches).filter(predicate);
409    // Make another copy of self so we can clone it and pass between threads.
410    let this = Arc::new(reader);
411    let inner = stream::iter(batches)
412        .zip(stream::repeat_with(move || {
413            (this.clone(), projection.clone())
414        }))
415        .map(move |(batch_id, (reader, projection))| async move {
416            reader
417                .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
418                .await
419        })
420        .buffered(2)
421        .boxed();
422    RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
423}
424
425/// Read a batch.
426///
427/// `schema` may only be empty if `with_row_id` is also true. This function
428/// panics otherwise.
429pub async fn read_batch(
430    reader: &FileReader,
431    params: &ReadBatchParams,
432    schema: &Schema,
433    batch_id: i32,
434) -> Result<RecordBatch> {
435    if !schema.fields.is_empty() {
436        // We box this because otherwise we get a higher-order lifetime error.
437        let arrs = stream::iter(&schema.fields)
438            .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
439            .buffered(reader.io_parallelism())
440            .try_collect::<Vec<_>>()
441            .boxed();
442        let arrs = arrs.await?;
443        Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
444    } else {
445        Err(Error::invalid_input("no fields requested", location!()))
446    }
447}
448
449#[async_recursion]
450async fn read_array(
451    reader: &FileReader,
452    field: &Field,
453    batch_id: i32,
454    page_table: &PageTable,
455    params: &ReadBatchParams,
456) -> Result<ArrayRef> {
457    let data_type = field.data_type();
458
459    use DataType::*;
460
461    if data_type.is_fixed_stride() {
462        _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
463    } else {
464        match data_type {
465            Null => read_null_array(field, batch_id, page_table, params),
466            Utf8 | LargeUtf8 | Binary | LargeBinary => {
467                read_binary_array(reader, field, batch_id, page_table, params).await
468            }
469            Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
470            Dictionary(_, _) => {
471                read_dictionary_array(reader, field, batch_id, page_table, params).await
472            }
473            List(_) => {
474                read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
475            }
476            LargeList(_) => {
477                read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
478            }
479            _ => {
480                unimplemented!("{}", format!("No support for {data_type} yet"));
481            }
482        }
483    }
484}
485
486fn get_page_info<'a>(
487    page_table: &'a PageTable,
488    field: &'a Field,
489    batch_id: i32,
490) -> Result<&'a PageInfo> {
491    page_table.get(field.id, batch_id).ok_or_else(|| {
492        Error::io(
493            format!(
494                "No page info found for field: {}, field_id={} batch={}",
495                field.name, field.id, batch_id
496            ),
497            location!(),
498        )
499    })
500}
501
502/// Read primitive array for batch `batch_idx`.
503async fn _read_fixed_stride_array(
504    reader: &FileReader,
505    field: &Field,
506    batch_id: i32,
507    page_table: &PageTable,
508    params: &ReadBatchParams,
509) -> Result<ArrayRef> {
510    let page_info = get_page_info(page_table, field, batch_id)?;
511    read_fixed_stride_array(
512        reader.object_reader.as_ref(),
513        &field.data_type(),
514        page_info.position,
515        page_info.length,
516        params.clone(),
517    )
518    .await
519}
520
521fn read_null_array(
522    field: &Field,
523    batch_id: i32,
524    page_table: &PageTable,
525    params: &ReadBatchParams,
526) -> Result<ArrayRef> {
527    let page_info = get_page_info(page_table, field, batch_id)?;
528
529    let length_output = match params {
530        ReadBatchParams::Indices(indices) => {
531            if indices.is_empty() {
532                0
533            } else {
534                let idx_max = *indices.values().iter().max().unwrap() as u64;
535                if idx_max >= page_info.length as u64 {
536                    return Err(Error::io(
537                        format!(
538                            "NullArray Reader: request([{}]) out of range: [0..{}]",
539                            idx_max, page_info.length
540                        ),
541                        location!(),
542                    ));
543                }
544                indices.len()
545            }
546        }
547        _ => {
548            let (idx_start, idx_end) = match params {
549                ReadBatchParams::Range(r) => (r.start, r.end),
550                ReadBatchParams::RangeFull => (0, page_info.length),
551                ReadBatchParams::RangeTo(r) => (0, r.end),
552                ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
553                _ => unreachable!(),
554            };
555            if idx_end > page_info.length {
556                return Err(Error::io(
557                    format!(
558                        "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
559                        // and wrap it in here.
560                        idx_start,
561                        idx_end,
562                        page_info.length
563                    ),
564                    location!(),
565                ));
566            }
567            idx_end - idx_start
568        }
569    };
570
571    Ok(Arc::new(NullArray::new(length_output)))
572}
573
574async fn read_binary_array(
575    reader: &FileReader,
576    field: &Field,
577    batch_id: i32,
578    page_table: &PageTable,
579    params: &ReadBatchParams,
580) -> Result<ArrayRef> {
581    let page_info = get_page_info(page_table, field, batch_id)?;
582
583    lance_io::utils::read_binary_array(
584        reader.object_reader.as_ref(),
585        &field.data_type(),
586        field.nullable,
587        page_info.position,
588        page_info.length,
589        params,
590    )
591    .await
592}
593
594async fn read_dictionary_array(
595    reader: &FileReader,
596    field: &Field,
597    batch_id: i32,
598    page_table: &PageTable,
599    params: &ReadBatchParams,
600) -> Result<ArrayRef> {
601    let page_info = get_page_info(page_table, field, batch_id)?;
602    let data_type = field.data_type();
603    let decoder = DictionaryDecoder::new(
604        reader.object_reader.as_ref(),
605        page_info.position,
606        page_info.length,
607        &data_type,
608        field
609            .dictionary
610            .as_ref()
611            .unwrap()
612            .values
613            .as_ref()
614            .unwrap()
615            .clone(),
616    );
617    decoder.get(params.clone()).await
618}
619
620async fn read_struct_array(
621    reader: &FileReader,
622    field: &Field,
623    batch_id: i32,
624    page_table: &PageTable,
625    params: &ReadBatchParams,
626) -> Result<ArrayRef> {
627    // TODO: use tokio to make the reads in parallel.
628    let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
629
630    for child in field.children.as_slice() {
631        let arr = read_array(reader, child, batch_id, page_table, params).await?;
632        sub_arrays.push((Arc::new(child.into()), arr));
633    }
634
635    Ok(Arc::new(StructArray::from(sub_arrays)))
636}
637
638async fn take_list_array<T: ArrowNumericType>(
639    reader: &FileReader,
640    field: &Field,
641    batch_id: i32,
642    page_table: &PageTable,
643    positions: &PrimitiveArray<T>,
644    indices: &UInt32Array,
645) -> Result<ArrayRef>
646where
647    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
648{
649    let first_idx = indices.value(0);
650    // Range of values for each index
651    let ranges = indices
652        .values()
653        .iter()
654        .map(|i| (*i - first_idx).as_usize())
655        .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
656        .collect::<Vec<_>>();
657    let field = field.clone();
658    let mut list_values: Vec<ArrayRef> = vec![];
659    // TODO: read them in parallel.
660    for range in ranges.iter() {
661        list_values.push(
662            read_array(
663                reader,
664                &field.children[0],
665                batch_id,
666                page_table,
667                &(range.clone()).into(),
668            )
669            .await?,
670        );
671    }
672
673    let value_refs = list_values
674        .iter()
675        .map(|arr| arr.as_ref())
676        .collect::<Vec<_>>();
677    let mut offsets_builder = PrimitiveBuilder::<T>::new();
678    offsets_builder.append_value(T::Native::usize_as(0));
679    let mut off = 0_usize;
680    for range in ranges {
681        off += range.len();
682        offsets_builder.append_value(T::Native::usize_as(off));
683    }
684    let all_values = concat::concat(value_refs.as_slice())?;
685    let offset_arr = offsets_builder.finish();
686    let arr = try_new_generic_list_array(all_values, &offset_arr)?;
687    Ok(Arc::new(arr) as ArrayRef)
688}
689
690async fn read_list_array<T: ArrowNumericType>(
691    reader: &FileReader,
692    field: &Field,
693    batch_id: i32,
694    page_table: &PageTable,
695    params: &ReadBatchParams,
696) -> Result<ArrayRef>
697where
698    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
699{
700    // Offset the position array by 1 in order to include the upper bound of the last element
701    let positions_params = match params {
702        ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
703        ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
704        ReadBatchParams::Indices(indices) => {
705            (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
706        }
707        p => p.clone(),
708    };
709
710    let page_info = get_page_info(&reader.page_table, field, batch_id)?;
711    let position_arr = read_fixed_stride_array(
712        reader.object_reader.as_ref(),
713        &T::DATA_TYPE,
714        page_info.position,
715        page_info.length,
716        positions_params,
717    )
718    .await?;
719
720    let positions: &PrimitiveArray<T> = position_arr.as_primitive();
721
722    // Recompute params so they align with the offset array
723    let value_params = match params {
724        ReadBatchParams::Range(range) => ReadBatchParams::from(
725            positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
726        ),
727        ReadBatchParams::RangeTo(RangeTo { end }) => {
728            ReadBatchParams::from(..positions.value(*end).as_usize())
729        }
730        ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
731        ReadBatchParams::RangeFull => ReadBatchParams::from(
732            positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
733        ),
734        ReadBatchParams::Indices(indices) => {
735            return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
736        }
737    };
738
739    let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
740    let offset_arr = sub(positions, &start_position)?;
741    let offset_arr_ref = offset_arr.as_primitive::<T>();
742    let value_arrs = read_array(
743        reader,
744        &field.children[0],
745        batch_id,
746        page_table,
747        &value_params,
748    )
749    .await?;
750    let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
751    Ok(Arc::new(arr) as ArrayRef)
752}
753
754#[cfg(test)]
755mod tests {
756    use crate::writer::{FileWriter, NotSelfDescribing};
757
758    use super::*;
759
760    use arrow_array::{
761        builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
762        cast::{as_string_array, as_struct_array},
763        types::UInt8Type,
764        Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
765        UInt8Array,
766    };
767    use arrow_array::{BooleanArray, Int32Array};
768    use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
769    use lance_io::object_store::ObjectStoreParams;
770
771    #[tokio::test]
772    async fn test_take() {
773        let arrow_schema = ArrowSchema::new(vec![
774            ArrowField::new("i", DataType::Int64, true),
775            ArrowField::new("f", DataType::Float32, false),
776            ArrowField::new("s", DataType::Utf8, false),
777            ArrowField::new(
778                "d",
779                DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
780                false,
781            ),
782        ]);
783        let mut schema = Schema::try_from(&arrow_schema).unwrap();
784
785        let store = ObjectStore::memory();
786        let path = Path::from("/take_test");
787
788        // Write 10 batches.
789        let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
790        let values_ref = Arc::new(values);
791        let mut batches = vec![];
792        for batch_id in 0..10 {
793            let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
794            let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
795            let columns: Vec<ArrayRef> = vec![
796                Arc::new(Int64Array::from_iter(
797                    value_range.clone().collect::<Vec<_>>(),
798                )),
799                Arc::new(Float32Array::from_iter(
800                    value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
801                )),
802                Arc::new(StringArray::from_iter_values(
803                    value_range.clone().map(|n| format!("str-{}", n)),
804                )),
805                Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
806            ];
807            batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
808        }
809        schema.set_dictionary(&batches[0]).unwrap();
810
811        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
812            &store,
813            &path,
814            schema.clone(),
815            &Default::default(),
816        )
817        .await
818        .unwrap();
819        for batch in batches.iter() {
820            file_writer.write(&[batch.clone()]).await.unwrap();
821        }
822        file_writer.finish().await.unwrap();
823
824        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
825        let batch = reader
826            .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
827            .await
828            .unwrap();
829        let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
830        assert_eq!(
831            batch,
832            RecordBatch::try_new(
833                batch.schema(),
834                vec![
835                    Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
836                    Arc::new(Float32Array::from_iter_values([
837                        1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
838                    ])),
839                    Arc::new(StringArray::from_iter_values([
840                        "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
841                    ])),
842                    Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
843                ]
844            )
845            .unwrap()
846        );
847    }
848
849    async fn test_write_null_string_in_struct(field_nullable: bool) {
850        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
851            "parent",
852            DataType::Struct(ArrowFields::from(vec![ArrowField::new(
853                "str",
854                DataType::Utf8,
855                field_nullable,
856            )])),
857            true,
858        )]));
859
860        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
861
862        let store = ObjectStore::memory();
863        let path = Path::from("/null_strings");
864
865        let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
866        let struct_arr = Arc::new(StructArray::from(vec![(
867            Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
868            string_arr.clone() as ArrayRef,
869        )]));
870        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
871
872        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
873            &store,
874            &path,
875            schema.clone(),
876            &Default::default(),
877        )
878        .await
879        .unwrap();
880        file_writer.write(&[batch.clone()]).await.unwrap();
881        file_writer.finish().await.unwrap();
882
883        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
884        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
885
886        if field_nullable {
887            assert_eq!(
888                &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
889                as_string_array(
890                    as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
891                        .column_by_name("str")
892                        .unwrap()
893                        .as_ref()
894                )
895            );
896        } else {
897            assert_eq!(actual_batch, batch);
898        }
899    }
900
901    #[tokio::test]
902    async fn read_nullable_string_in_struct() {
903        test_write_null_string_in_struct(true).await;
904        test_write_null_string_in_struct(false).await;
905    }
906
907    #[tokio::test]
908    async fn test_read_struct_of_list_arrays() {
909        let store = ObjectStore::memory();
910        let path = Path::from("/null_strings");
911
912        let arrow_schema = make_schema_of_list_array();
913        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
914
915        let batches = (0..3)
916            .map(|_| {
917                let struct_array = make_struct_of_list_array(10, 10);
918                RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
919            })
920            .collect::<Vec<_>>();
921        let batches_ref = batches.iter().collect::<Vec<_>>();
922
923        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
924            &store,
925            &path,
926            schema.clone(),
927            &Default::default(),
928        )
929        .await
930        .unwrap();
931        file_writer.write(&batches).await.unwrap();
932        file_writer.finish().await.unwrap();
933
934        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
935        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
936        let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
937        assert_eq!(expected, actual_batch);
938    }
939
940    #[tokio::test]
941    async fn test_scan_struct_of_list_arrays() {
942        let store = ObjectStore::memory();
943        let path = Path::from("/null_strings");
944
945        let arrow_schema = make_schema_of_list_array();
946        let struct_array = make_struct_of_list_array(3, 10);
947        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
948        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
949
950        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
951            &store,
952            &path,
953            schema.clone(),
954            &Default::default(),
955        )
956        .await
957        .unwrap();
958        file_writer.write(&[batch]).await.unwrap();
959        file_writer.finish().await.unwrap();
960
961        let mut expected_columns: Vec<ArrayRef> = Vec::new();
962        for c in struct_array.columns().iter() {
963            expected_columns.push(c.slice(1, 1));
964        }
965
966        let expected_struct = match arrow_schema.fields[0].data_type() {
967            DataType::Struct(subfields) => subfields
968                .iter()
969                .zip(expected_columns)
970                .map(|(f, d)| (f.clone(), d))
971                .collect::<Vec<_>>(),
972            _ => panic!("unexpected field"),
973        };
974
975        let expected_struct_array = StructArray::from(expected_struct);
976        let expected_batch = RecordBatch::from(&StructArray::from(vec![(
977            Arc::new(arrow_schema.fields[0].as_ref().clone()),
978            Arc::new(expected_struct_array) as ArrayRef,
979        )]));
980
981        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
982        let params = ReadBatchParams::Range(1..2);
983        let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
984        assert_eq!(expected_batch, slice_of_batch);
985    }
986
987    fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
988        Arc::new(ArrowSchema::new(vec![ArrowField::new(
989            "s",
990            DataType::Struct(ArrowFields::from(vec![
991                ArrowField::new(
992                    "li",
993                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
994                    true,
995                ),
996                ArrowField::new(
997                    "ls",
998                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
999                    true,
1000                ),
1001                ArrowField::new(
1002                    "ll",
1003                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1004                    false,
1005                ),
1006            ])),
1007            true,
1008        )]))
1009    }
1010
1011    fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1012        let mut li_builder = ListBuilder::new(Int32Builder::new());
1013        let mut ls_builder = ListBuilder::new(StringBuilder::new());
1014        let ll_value_builder = Int32Builder::new();
1015        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1016        for i in 0..rows {
1017            for j in 0..num_items {
1018                li_builder.values().append_value(i * 10 + j);
1019                ls_builder
1020                    .values()
1021                    .append_value(format!("str-{}", i * 10 + j));
1022                large_list_builder.values().append_value(i * 10 + j);
1023            }
1024            li_builder.append(true);
1025            ls_builder.append(true);
1026            large_list_builder.append(true);
1027        }
1028        Arc::new(StructArray::from(vec![
1029            (
1030                Arc::new(ArrowField::new(
1031                    "li",
1032                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1033                    true,
1034                )),
1035                Arc::new(li_builder.finish()) as ArrayRef,
1036            ),
1037            (
1038                Arc::new(ArrowField::new(
1039                    "ls",
1040                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1041                    true,
1042                )),
1043                Arc::new(ls_builder.finish()) as ArrayRef,
1044            ),
1045            (
1046                Arc::new(ArrowField::new(
1047                    "ll",
1048                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1049                    false,
1050                )),
1051                Arc::new(large_list_builder.finish()) as ArrayRef,
1052            ),
1053        ]))
1054    }
1055
1056    #[tokio::test]
1057    async fn test_read_nullable_arrays() {
1058        use arrow_array::Array;
1059
1060        // create a record batch with a null array column
1061        let arrow_schema = ArrowSchema::new(vec![
1062            ArrowField::new("i", DataType::Int64, false),
1063            ArrowField::new("n", DataType::Null, true),
1064        ]);
1065        let schema = Schema::try_from(&arrow_schema).unwrap();
1066        let columns: Vec<ArrayRef> = vec![
1067            Arc::new(Int64Array::from_iter_values(0..100)),
1068            Arc::new(NullArray::new(100)),
1069        ];
1070        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1071
1072        // write to a lance file
1073        let store = ObjectStore::memory();
1074        let path = Path::from("/takes");
1075        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1076            &store,
1077            &path,
1078            schema.clone(),
1079            &Default::default(),
1080        )
1081        .await
1082        .unwrap();
1083        file_writer.write(&[batch]).await.unwrap();
1084        file_writer.finish().await.unwrap();
1085
1086        // read the file back
1087        let reader = FileReader::try_new(&store, &path, schema.clone())
1088            .await
1089            .unwrap();
1090
1091        async fn read_array_w_params(
1092            reader: &FileReader,
1093            field: &Field,
1094            params: ReadBatchParams,
1095        ) -> ArrayRef {
1096            read_array(reader, field, 0, reader.page_table.as_ref(), &params)
1097                .await
1098                .expect("Error reading back the null array from file") as _
1099        }
1100
1101        let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1102        assert_eq!(100, arr.len());
1103        assert_eq!(arr.data_type(), &DataType::Null);
1104
1105        let arr =
1106            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1107        assert_eq!(15, arr.len());
1108        assert_eq!(arr.data_type(), &DataType::Null);
1109
1110        let arr =
1111            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1112        assert_eq!(40, arr.len());
1113        assert_eq!(arr.data_type(), &DataType::Null);
1114
1115        let arr =
1116            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1117        assert_eq!(25, arr.len());
1118        assert_eq!(arr.data_type(), &DataType::Null);
1119
1120        let arr = read_array_w_params(
1121            &reader,
1122            &schema.fields[1],
1123            ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1124        )
1125        .await;
1126        assert_eq!(4, arr.len());
1127        assert_eq!(arr.data_type(), &DataType::Null);
1128
1129        // raise error if take indices are out of bounds
1130        let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1131        let arr = read_array(
1132            &reader,
1133            &schema.fields[1],
1134            0,
1135            reader.page_table.as_ref(),
1136            &params,
1137        );
1138        assert!(arr.await.is_err());
1139
1140        // raise error if range indices are out of bounds
1141        let params = ReadBatchParams::RangeTo(..107);
1142        let arr = read_array(
1143            &reader,
1144            &schema.fields[1],
1145            0,
1146            reader.page_table.as_ref(),
1147            &params,
1148        );
1149        assert!(arr.await.is_err());
1150    }
1151
1152    #[tokio::test]
1153    async fn test_take_lists() {
1154        let arrow_schema = ArrowSchema::new(vec![
1155            ArrowField::new(
1156                "l",
1157                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1158                false,
1159            ),
1160            ArrowField::new(
1161                "ll",
1162                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1163                false,
1164            ),
1165        ]);
1166
1167        let value_builder = Int32Builder::new();
1168        let mut list_builder = ListBuilder::new(value_builder);
1169        let ll_value_builder = Int32Builder::new();
1170        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1171        for i in 0..100 {
1172            list_builder.values().append_value(i);
1173            large_list_builder.values().append_value(i);
1174            if (i + 1) % 10 == 0 {
1175                list_builder.append(true);
1176                large_list_builder.append(true);
1177            }
1178        }
1179        let list_arr = Arc::new(list_builder.finish());
1180        let large_list_arr = Arc::new(large_list_builder.finish());
1181
1182        let batch = RecordBatch::try_new(
1183            Arc::new(arrow_schema.clone()),
1184            vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1185        )
1186        .unwrap();
1187
1188        // write to a lance file
1189        let store = ObjectStore::memory();
1190        let path = Path::from("/take_list");
1191        let schema: Schema = (&arrow_schema).try_into().unwrap();
1192        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1193            &store,
1194            &path,
1195            schema.clone(),
1196            &Default::default(),
1197        )
1198        .await
1199        .unwrap();
1200        file_writer.write(&[batch]).await.unwrap();
1201        file_writer.finish().await.unwrap();
1202
1203        // read the file back
1204        let reader = FileReader::try_new(&store, &path, schema.clone())
1205            .await
1206            .unwrap();
1207        let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1208
1209        let value_builder = Int32Builder::new();
1210        let mut list_builder = ListBuilder::new(value_builder);
1211        let ll_value_builder = Int32Builder::new();
1212        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1213        for i in [1, 3, 5, 9] {
1214            for j in 0..10 {
1215                list_builder.values().append_value(i * 10 + j);
1216                large_list_builder.values().append_value(i * 10 + j);
1217            }
1218            list_builder.append(true);
1219            large_list_builder.append(true);
1220        }
1221        let expected_list = list_builder.finish();
1222        let expected_large_list = large_list_builder.finish();
1223
1224        assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1225        assert_eq!(
1226            actual.column_by_name("ll").unwrap().as_ref(),
1227            &expected_large_list
1228        );
1229    }
1230
1231    #[tokio::test]
1232    async fn test_list_array_with_offsets() {
1233        let arrow_schema = ArrowSchema::new(vec![
1234            ArrowField::new(
1235                "l",
1236                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1237                false,
1238            ),
1239            ArrowField::new(
1240                "ll",
1241                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1242                false,
1243            ),
1244        ]);
1245
1246        let store = ObjectStore::memory();
1247        let path = Path::from("/lists");
1248
1249        let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1250            Some(vec![Some(1), Some(2)]),
1251            Some(vec![Some(3), Some(4)]),
1252            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1253        ])
1254        .slice(1, 1);
1255        let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1256            Some(vec![Some(10), Some(11)]),
1257            Some(vec![Some(12), Some(13)]),
1258            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1259        ])
1260        .slice(1, 1);
1261
1262        let batch = RecordBatch::try_new(
1263            Arc::new(arrow_schema.clone()),
1264            vec![Arc::new(list_array), Arc::new(large_list_array)],
1265        )
1266        .unwrap();
1267
1268        let schema: Schema = (&arrow_schema).try_into().unwrap();
1269        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1270            &store,
1271            &path,
1272            schema.clone(),
1273            &Default::default(),
1274        )
1275        .await
1276        .unwrap();
1277        file_writer.write(&[batch.clone()]).await.unwrap();
1278        file_writer.finish().await.unwrap();
1279
1280        // Make sure the big array was not written to the file
1281        let file_size_bytes = store.size(&path).await.unwrap();
1282        assert!(file_size_bytes < 1_000);
1283
1284        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1285        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1286        assert_eq!(batch, actual_batch);
1287    }
1288
1289    #[tokio::test]
1290    async fn test_read_ranges() {
1291        // create a record batch with a null array column
1292        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1293        let schema = Schema::try_from(&arrow_schema).unwrap();
1294        let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1295        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1296
1297        // write to a lance file
1298        let store = ObjectStore::memory();
1299        let path = Path::from("/read_range");
1300        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1301            &store,
1302            &path,
1303            schema.clone(),
1304            &Default::default(),
1305        )
1306        .await
1307        .unwrap();
1308        file_writer.write(&[batch]).await.unwrap();
1309        file_writer.finish().await.unwrap();
1310
1311        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1312        let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1313
1314        assert_eq!(
1315            actual_batch.column_by_name("i").unwrap().as_ref(),
1316            &Int64Array::from_iter_values(7..25)
1317        );
1318    }
1319
1320    #[tokio::test]
1321    async fn test_batches_stream() {
1322        let store = ObjectStore::memory();
1323        let path = Path::from("/batch_stream");
1324
1325        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1326        let schema = Schema::try_from(&arrow_schema).unwrap();
1327        let mut writer = FileWriter::<NotSelfDescribing>::try_new(
1328            &store,
1329            &path,
1330            schema.clone(),
1331            &Default::default(),
1332        )
1333        .await
1334        .unwrap();
1335        for i in 0..10 {
1336            let batch = RecordBatch::try_new(
1337                Arc::new(arrow_schema.clone()),
1338                vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1339            )
1340            .unwrap();
1341            writer.write(&[batch]).await.unwrap();
1342        }
1343        writer.finish().await.unwrap();
1344
1345        let reader = FileReader::try_new(&store, &path, schema.clone())
1346            .await
1347            .unwrap();
1348        let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1349        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1350
1351        assert_eq!(batches.len(), 5);
1352        for (i, batch) in batches.iter().enumerate() {
1353            assert_eq!(
1354                batch,
1355                &RecordBatch::try_new(
1356                    Arc::new(arrow_schema.clone()),
1357                    vec![Arc::new(Int32Array::from_iter_values(
1358                        i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1359                    ))],
1360                )
1361                .unwrap()
1362            )
1363        }
1364    }
1365
1366    #[tokio::test]
1367    async fn test_take_boolean_beyond_chunk() {
1368        let store = ObjectStore::from_uri_and_params(
1369            Arc::new(Default::default()),
1370            "memory://",
1371            &ObjectStoreParams {
1372                block_size: Some(256),
1373                ..Default::default()
1374            },
1375        )
1376        .await
1377        .unwrap()
1378        .0;
1379        let path = Path::from("/take_bools");
1380
1381        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1382            "b",
1383            DataType::Boolean,
1384            false,
1385        )]));
1386        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1387        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1388            &store,
1389            &path,
1390            schema.clone(),
1391            &Default::default(),
1392        )
1393        .await
1394        .unwrap();
1395
1396        let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1397        let batch =
1398            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1399        file_writer.write(&[batch]).await.unwrap();
1400        file_writer.finish().await.unwrap();
1401
1402        let reader = FileReader::try_new(&store, &path, schema.clone())
1403            .await
1404            .unwrap();
1405        let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1406
1407        assert_eq!(
1408            actual.column_by_name("b").unwrap().as_ref(),
1409            &BooleanArray::from(vec![false, false, true, false, true])
1410        );
1411    }
1412
1413    #[tokio::test]
1414    async fn test_read_projection() {
1415        // The dataset schema may be very large.  The file reader should support reading
1416        // a small projection of that schema (this just tests the field_offset / num_fields
1417        // parameters)
1418        let store = ObjectStore::memory();
1419        let path = Path::from("/partial_read");
1420
1421        // Create a large schema
1422        let mut fields = vec![];
1423        for i in 0..100 {
1424            fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1425        }
1426        let arrow_schema = ArrowSchema::new(fields);
1427        let schema = Schema::try_from(&arrow_schema).unwrap();
1428
1429        let partial_schema = schema.project(&["f50"]).unwrap();
1430        let partial_arrow: ArrowSchema = (&partial_schema).into();
1431
1432        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1433            &store,
1434            &path,
1435            partial_schema.clone(),
1436            &Default::default(),
1437        )
1438        .await
1439        .unwrap();
1440
1441        let array = Int32Array::from(vec![0; 15]);
1442        let batch =
1443            RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1444        file_writer.write(&[batch.clone()]).await.unwrap();
1445        file_writer.finish().await.unwrap();
1446
1447        let field_id = partial_schema.fields.first().unwrap().id;
1448        let reader = FileReader::try_new_with_fragment_id(
1449            &store,
1450            &path,
1451            schema.clone(),
1452            0,
1453            /*min_field_id=*/ field_id,
1454            /*max_field_id=*/ field_id,
1455            None,
1456        )
1457        .await
1458        .unwrap();
1459        let actual = reader
1460            .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1461            .await
1462            .unwrap();
1463
1464        assert_eq!(actual, batch);
1465    }
1466}