lance_file/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance Data File Reader
5
6// Standard
7use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12    builder::PrimitiveBuilder,
13    cast::AsArray,
14    types::{Int32Type, Int64Type},
15    ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
16    RecordBatch, StructArray, UInt32Array,
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
24use lance_arrow::*;
25use lance_core::cache::LanceCache;
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::dictionary::DictionaryDecoder;
29use lance_io::encodings::AsyncIndex;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33    read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{object_store::ObjectStore, ReadBatchParams};
36
37use object_store::path::Path;
38use snafu::location;
39use tracing::instrument;
40
41use crate::format::metadata::Metadata;
42use crate::page_table::{PageInfo, PageTable};
43
44/// Lance File Reader.
45///
46/// It reads arrow data from one data file.
47#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49    pub object_reader: Arc<dyn Reader>,
50    metadata: Arc<Metadata>,
51    page_table: Arc<PageTable>,
52    schema: Schema,
53
54    /// The id of the fragment which this file belong to.
55    /// For simple file access, this can just be zero.
56    fragment_id: u64,
57
58    /// Page table for statistics
59    stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("FileReader")
65            .field("fragment", &self.fragment_id)
66            .field("path", &self.object_reader.path())
67            .finish()
68    }
69}
70
71impl FileReader {
72    /// Open file reader
73    ///
74    /// Open the file at the given path using the provided object store.
75    ///
76    /// The passed fragment ID determines the first 32-bits of the row IDs.
77    ///
78    /// If a manifest is passed in, it will be used to load the schema and dictionary.
79    /// This is typically done if the file is part of a dataset fragment. If no manifest
80    /// is passed in, then it is read from the file itself.
81    ///
82    /// The session passed in is used to cache metadata about the file. If no session
83    /// is passed in, there will be no caching.
84    #[instrument(level = "debug", skip(object_store, schema, session))]
85    pub async fn try_new_with_fragment_id(
86        object_store: &ObjectStore,
87        path: &Path,
88        schema: Schema,
89        fragment_id: u32,
90        field_id_offset: i32,
91        max_field_id: i32,
92        session: Option<&LanceCache>,
93    ) -> Result<Self> {
94        let object_reader = object_store.open(path).await?;
95
96        let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
97
98        Self::try_new_from_reader(
99            path,
100            object_reader.into(),
101            Some(metadata),
102            schema,
103            fragment_id,
104            field_id_offset,
105            max_field_id,
106            session,
107        )
108        .await
109    }
110
111    #[allow(clippy::too_many_arguments)]
112    pub async fn try_new_from_reader(
113        path: &Path,
114        object_reader: Arc<dyn Reader>,
115        metadata: Option<Arc<Metadata>>,
116        schema: Schema,
117        fragment_id: u32,
118        field_id_offset: i32,
119        max_field_id: i32,
120        session: Option<&LanceCache>,
121    ) -> Result<Self> {
122        let metadata = match metadata {
123            Some(metadata) => metadata,
124            None => Self::read_metadata(object_reader.as_ref(), session).await?,
125        };
126
127        let page_table = async {
128            Self::load_from_cache(session, path.to_string(), |_| async {
129                PageTable::load(
130                    object_reader.as_ref(),
131                    metadata.page_table_position,
132                    field_id_offset,
133                    max_field_id,
134                    metadata.num_batches() as i32,
135                )
136                .await
137            })
138            .await
139        };
140
141        let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
142
143        // Can concurrently load page tables
144        let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
145
146        Ok(Self {
147            object_reader,
148            metadata,
149            schema,
150            page_table,
151            fragment_id: fragment_id as u64,
152            stats_page_table,
153        })
154    }
155
156    pub async fn read_metadata(
157        object_reader: &dyn Reader,
158        cache: Option<&LanceCache>,
159    ) -> Result<Arc<Metadata>> {
160        Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
161            let file_size = object_reader.size().await?;
162            let begin = if file_size < object_reader.block_size() {
163                0
164            } else {
165                file_size - object_reader.block_size()
166            };
167            let tail_bytes = object_reader.get_range(begin..file_size).await?;
168            let metadata_pos = read_metadata_offset(&tail_bytes)?;
169
170            let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
171                // We have not read the metadata bytes yet.
172                read_struct(object_reader, metadata_pos).await?
173            } else {
174                let offset = tail_bytes.len() - (file_size - metadata_pos);
175                read_struct_from_buf(&tail_bytes.slice(offset..))?
176            };
177            Ok(metadata)
178        })
179        .await
180    }
181
182    /// Get the statistics page table. This will read the metadata if it is not cached.
183    ///
184    /// The page table is cached.
185    async fn read_stats_page_table(
186        reader: &dyn Reader,
187        cache: Option<&LanceCache>,
188    ) -> Result<Arc<Option<PageTable>>> {
189        // To prevent collisions, we cache this at a child path
190        Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async {
191            let metadata = Self::read_metadata(reader, cache).await?;
192
193            if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
194                Ok(Some(
195                    PageTable::load(
196                        reader,
197                        stats_meta.page_table_position,
198                        /*min_field_id=*/ 0,
199                        /*max_field_id=*/ *stats_meta.leaf_field_ids.iter().max().unwrap(),
200                        /*num_batches=*/ 1,
201                    )
202                    .await?,
203                ))
204            } else {
205                Ok(None)
206            }
207        })
208        .await
209    }
210
211    /// Load some metadata about the fragment from the cache, if there is one.
212    async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
213        cache: Option<&LanceCache>,
214        key: String,
215        loader: F,
216    ) -> Result<Arc<T>>
217    where
218        F: Fn(&str) -> Fut,
219        Fut: Future<Output = Result<T>>,
220    {
221        if let Some(cache) = cache {
222            cache.get_or_insert(key, loader).await
223        } else {
224            Ok(Arc::new(loader(key.as_str()).await?))
225        }
226    }
227
228    /// Open one Lance data file for read.
229    pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
230        // If just reading a lance data file we assume the schema is the schema of the data file
231        let max_field_id = schema.max_field_id().unwrap_or_default();
232        Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
233    }
234
235    fn io_parallelism(&self) -> usize {
236        self.object_reader.io_parallelism()
237    }
238
239    /// Requested projection of the data in this file, excluding the row id column.
240    pub fn schema(&self) -> &Schema {
241        &self.schema
242    }
243
244    pub fn num_batches(&self) -> usize {
245        self.metadata.num_batches()
246    }
247
248    /// Get the number of rows in this batch
249    pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
250        self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
251    }
252
253    /// Count the number of rows in this file.
254    pub fn len(&self) -> usize {
255        self.metadata.len()
256    }
257
258    pub fn is_empty(&self) -> bool {
259        self.metadata.is_empty()
260    }
261
262    /// Read a batch of data from the file.
263    ///
264    /// The schema of the returned [RecordBatch] is set by [`FileReader::schema()`].
265    #[instrument(level = "debug", skip(self, params, projection))]
266    pub async fn read_batch(
267        &self,
268        batch_id: i32,
269        params: impl Into<ReadBatchParams>,
270        projection: &Schema,
271    ) -> Result<RecordBatch> {
272        read_batch(self, &params.into(), projection, batch_id).await
273    }
274
275    /// Read a range of records into one batch.
276    ///
277    /// Note that it might call concat if the range is crossing multiple batches, which
278    /// makes it less efficient than [`FileReader::read_batch()`].
279    #[instrument(level = "debug", skip(self, projection))]
280    pub async fn read_range(
281        &self,
282        range: Range<usize>,
283        projection: &Schema,
284    ) -> Result<RecordBatch> {
285        if range.is_empty() {
286            return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
287        }
288        let range_in_batches = self.metadata.range_to_batches(range)?;
289        let batches =
290            stream::iter(range_in_batches)
291                .map(|(batch_id, range)| async move {
292                    self.read_batch(batch_id, range, projection).await
293                })
294                .buffered(self.io_parallelism())
295                .try_collect::<Vec<_>>()
296                .await?;
297        if batches.len() == 1 {
298            return Ok(batches[0].clone());
299        }
300        let schema = batches[0].schema();
301        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
302    }
303
304    /// Take by records by indices within the file.
305    ///
306    /// The indices must be sorted.
307    #[instrument(level = "debug", skip_all)]
308    pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
309        let num_batches = self.num_batches();
310        let num_rows = self.len() as u32;
311        let indices_in_batches = self.metadata.group_indices_to_batches(indices);
312        let batches = stream::iter(indices_in_batches)
313            .map(|batch| async move {
314                if batch.batch_id >= num_batches as i32 {
315                    Err(Error::InvalidInput {
316                        source: format!("batch_id: {} out of bounds", batch.batch_id).into(),
317                        location: location!(),
318                    })
319                } else if *batch.offsets.last().expect("got empty batch") > num_rows {
320                    Err(Error::InvalidInput {
321                        source: format!("indices: {:?} out of bounds", batch.offsets).into(),
322                        location: location!(),
323                    })
324                } else {
325                    self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
326                        .await
327                }
328            })
329            .buffered(self.io_parallelism())
330            .try_collect::<Vec<_>>()
331            .await?;
332
333        let schema = Arc::new(ArrowSchema::from(projection));
334
335        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
336    }
337
338    /// Get the schema of the statistics page table, for the given data field ids.
339    pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
340        self.metadata.stats_metadata.as_ref().map(|meta| {
341            let mut stats_field_ids = vec![];
342            for stats_field in &meta.schema.fields {
343                if let Ok(stats_field_id) = stats_field.name.parse::<i32>() {
344                    if field_ids.contains(&stats_field_id) {
345                        stats_field_ids.push(stats_field.id);
346                        for child in &stats_field.children {
347                            stats_field_ids.push(child.id);
348                        }
349                    }
350                }
351            }
352            meta.schema.project_by_ids(&stats_field_ids, true)
353        })
354    }
355
356    /// Get the page statistics for the given data field ids.
357    pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
358        if let Some(stats_page_table) = self.stats_page_table.as_ref() {
359            let projection = self.page_stats_schema(field_ids).unwrap();
360            // It's possible none of the requested fields have stats.
361            if projection.fields.is_empty() {
362                return Ok(None);
363            }
364            let arrays = futures::stream::iter(projection.fields.iter().cloned())
365                .map(|field| async move {
366                    read_array(
367                        self,
368                        &field,
369                        0,
370                        stats_page_table,
371                        &ReadBatchParams::RangeFull,
372                    )
373                    .await
374                })
375                .buffered(self.io_parallelism())
376                .try_collect::<Vec<_>>()
377                .await?;
378
379            let schema = ArrowSchema::from(&projection);
380            let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
381            Ok(Some(batch))
382        } else {
383            Ok(None)
384        }
385    }
386}
387
388/// Stream desired full batches from the file.
389///
390/// Parameters:
391/// - **reader**: An opened file reader.
392/// - **projection**: The schema of the returning [RecordBatch].
393/// - **predicate**: A function that takes a batch ID and returns true if the batch should be
394///   returned.
395///
396/// Returns:
397/// - A stream of [RecordBatch]s, each one corresponding to one full batch in the file.
398pub fn batches_stream(
399    reader: FileReader,
400    projection: Schema,
401    predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
402) -> impl RecordBatchStream {
403    // Make projection an Arc so we can clone it and pass between threads.
404    let projection = Arc::new(projection);
405    let arrow_schema = ArrowSchema::from(projection.as_ref());
406
407    let total_batches = reader.num_batches() as i32;
408    let batches = (0..total_batches).filter(predicate);
409    // Make another copy of self so we can clone it and pass between threads.
410    let this = Arc::new(reader);
411    let inner = stream::iter(batches)
412        .zip(stream::repeat_with(move || {
413            (this.clone(), projection.clone())
414        }))
415        .map(move |(batch_id, (reader, projection))| async move {
416            reader
417                .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
418                .await
419        })
420        .buffered(2)
421        .boxed();
422    RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
423}
424
425/// Read a batch.
426///
427/// `schema` may only be empty if `with_row_id` is also true. This function
428/// panics otherwise.
429pub async fn read_batch(
430    reader: &FileReader,
431    params: &ReadBatchParams,
432    schema: &Schema,
433    batch_id: i32,
434) -> Result<RecordBatch> {
435    if !schema.fields.is_empty() {
436        // We box this because otherwise we get a higher-order lifetime error.
437        let arrs = stream::iter(&schema.fields)
438            .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
439            .buffered(reader.io_parallelism())
440            .try_collect::<Vec<_>>()
441            .boxed();
442        let arrs = arrs.await?;
443        Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
444    } else {
445        Err(Error::invalid_input("no fields requested", location!()))
446    }
447}
448
449#[async_recursion]
450async fn read_array(
451    reader: &FileReader,
452    field: &Field,
453    batch_id: i32,
454    page_table: &PageTable,
455    params: &ReadBatchParams,
456) -> Result<ArrayRef> {
457    let data_type = field.data_type();
458
459    use DataType::*;
460
461    if data_type.is_fixed_stride() {
462        _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
463    } else {
464        match data_type {
465            Null => read_null_array(field, batch_id, page_table, params),
466            Utf8 | LargeUtf8 | Binary | LargeBinary => {
467                read_binary_array(reader, field, batch_id, page_table, params).await
468            }
469            Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
470            Dictionary(_, _) => {
471                read_dictionary_array(reader, field, batch_id, page_table, params).await
472            }
473            List(_) => {
474                read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
475            }
476            LargeList(_) => {
477                read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
478            }
479            _ => {
480                unimplemented!("{}", format!("No support for {data_type} yet"));
481            }
482        }
483    }
484}
485
486fn get_page_info<'a>(
487    page_table: &'a PageTable,
488    field: &'a Field,
489    batch_id: i32,
490) -> Result<&'a PageInfo> {
491    page_table.get(field.id, batch_id).ok_or_else(|| {
492        Error::io(
493            format!(
494                "No page info found for field: {}, field_id={} batch={}",
495                field.name, field.id, batch_id
496            ),
497            location!(),
498        )
499    })
500}
501
502/// Read primitive array for batch `batch_idx`.
503async fn _read_fixed_stride_array(
504    reader: &FileReader,
505    field: &Field,
506    batch_id: i32,
507    page_table: &PageTable,
508    params: &ReadBatchParams,
509) -> Result<ArrayRef> {
510    let page_info = get_page_info(page_table, field, batch_id)?;
511    read_fixed_stride_array(
512        reader.object_reader.as_ref(),
513        &field.data_type(),
514        page_info.position,
515        page_info.length,
516        params.clone(),
517    )
518    .await
519}
520
521fn read_null_array(
522    field: &Field,
523    batch_id: i32,
524    page_table: &PageTable,
525    params: &ReadBatchParams,
526) -> Result<ArrayRef> {
527    let page_info = get_page_info(page_table, field, batch_id)?;
528
529    let length_output = match params {
530        ReadBatchParams::Indices(indices) => {
531            if indices.is_empty() {
532                0
533            } else {
534                let idx_max = *indices.values().iter().max().unwrap() as u64;
535                if idx_max >= page_info.length as u64 {
536                    return Err(Error::io(
537                        format!(
538                            "NullArray Reader: request([{}]) out of range: [0..{}]",
539                            idx_max, page_info.length
540                        ),
541                        location!(),
542                    ));
543                }
544                indices.len()
545            }
546        }
547        _ => {
548            let (idx_start, idx_end) = match params {
549                ReadBatchParams::Range(r) => (r.start, r.end),
550                ReadBatchParams::RangeFull => (0, page_info.length),
551                ReadBatchParams::RangeTo(r) => (0, r.end),
552                ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
553                _ => unreachable!(),
554            };
555            if idx_end > page_info.length {
556                return Err(Error::io(
557                    format!(
558                        "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
559                        // and wrap it in here.
560                        idx_start,
561                        idx_end,
562                        page_info.length
563                    ),
564                    location!(),
565                ));
566            }
567            idx_end - idx_start
568        }
569    };
570
571    Ok(Arc::new(NullArray::new(length_output)))
572}
573
574async fn read_binary_array(
575    reader: &FileReader,
576    field: &Field,
577    batch_id: i32,
578    page_table: &PageTable,
579    params: &ReadBatchParams,
580) -> Result<ArrayRef> {
581    let page_info = get_page_info(page_table, field, batch_id)?;
582
583    lance_io::utils::read_binary_array(
584        reader.object_reader.as_ref(),
585        &field.data_type(),
586        field.nullable,
587        page_info.position,
588        page_info.length,
589        params,
590    )
591    .await
592}
593
594async fn read_dictionary_array(
595    reader: &FileReader,
596    field: &Field,
597    batch_id: i32,
598    page_table: &PageTable,
599    params: &ReadBatchParams,
600) -> Result<ArrayRef> {
601    let page_info = get_page_info(page_table, field, batch_id)?;
602    let data_type = field.data_type();
603    let decoder = DictionaryDecoder::new(
604        reader.object_reader.as_ref(),
605        page_info.position,
606        page_info.length,
607        &data_type,
608        field
609            .dictionary
610            .as_ref()
611            .unwrap()
612            .values
613            .as_ref()
614            .unwrap()
615            .clone(),
616    );
617    decoder.get(params.clone()).await
618}
619
620async fn read_struct_array(
621    reader: &FileReader,
622    field: &Field,
623    batch_id: i32,
624    page_table: &PageTable,
625    params: &ReadBatchParams,
626) -> Result<ArrayRef> {
627    // TODO: use tokio to make the reads in parallel.
628    let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
629
630    for child in field.children.as_slice() {
631        let arr = read_array(reader, child, batch_id, page_table, params).await?;
632        sub_arrays.push((Arc::new(child.into()), arr));
633    }
634
635    Ok(Arc::new(StructArray::from(sub_arrays)))
636}
637
638async fn take_list_array<T: ArrowNumericType>(
639    reader: &FileReader,
640    field: &Field,
641    batch_id: i32,
642    page_table: &PageTable,
643    positions: &PrimitiveArray<T>,
644    indices: &UInt32Array,
645) -> Result<ArrayRef>
646where
647    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
648{
649    let first_idx = indices.value(0);
650    // Range of values for each index
651    let ranges = indices
652        .values()
653        .iter()
654        .map(|i| (*i - first_idx).as_usize())
655        .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
656        .collect::<Vec<_>>();
657    let field = field.clone();
658    let mut list_values: Vec<ArrayRef> = vec![];
659    // TODO: read them in parallel.
660    for range in ranges.iter() {
661        list_values.push(
662            read_array(
663                reader,
664                &field.children[0],
665                batch_id,
666                page_table,
667                &(range.clone()).into(),
668            )
669            .await?,
670        );
671    }
672
673    let value_refs = list_values
674        .iter()
675        .map(|arr| arr.as_ref())
676        .collect::<Vec<_>>();
677    let mut offsets_builder = PrimitiveBuilder::<T>::new();
678    offsets_builder.append_value(T::Native::usize_as(0));
679    let mut off = 0_usize;
680    for range in ranges {
681        off += range.len();
682        offsets_builder.append_value(T::Native::usize_as(off));
683    }
684    let all_values = concat::concat(value_refs.as_slice())?;
685    let offset_arr = offsets_builder.finish();
686    let arr = try_new_generic_list_array(all_values, &offset_arr)?;
687    Ok(Arc::new(arr) as ArrayRef)
688}
689
690async fn read_list_array<T: ArrowNumericType>(
691    reader: &FileReader,
692    field: &Field,
693    batch_id: i32,
694    page_table: &PageTable,
695    params: &ReadBatchParams,
696) -> Result<ArrayRef>
697where
698    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
699{
700    // Offset the position array by 1 in order to include the upper bound of the last element
701    let positions_params = match params {
702        ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
703        ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
704        ReadBatchParams::Indices(indices) => {
705            (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
706        }
707        p => p.clone(),
708    };
709
710    let page_info = get_page_info(&reader.page_table, field, batch_id)?;
711    let position_arr = read_fixed_stride_array(
712        reader.object_reader.as_ref(),
713        &T::DATA_TYPE,
714        page_info.position,
715        page_info.length,
716        positions_params,
717    )
718    .await?;
719
720    let positions: &PrimitiveArray<T> = position_arr.as_primitive();
721
722    // Recompute params so they align with the offset array
723    let value_params = match params {
724        ReadBatchParams::Range(range) => ReadBatchParams::from(
725            positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
726        ),
727        ReadBatchParams::Ranges(_) => {
728            return Err(Error::Internal {
729                message: "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
730                location: location!(),
731            })
732        }
733        ReadBatchParams::RangeTo(RangeTo { end }) => {
734            ReadBatchParams::from(..positions.value(*end).as_usize())
735        }
736        ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
737        ReadBatchParams::RangeFull => ReadBatchParams::from(
738            positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
739        ),
740        ReadBatchParams::Indices(indices) => {
741            return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
742        }
743    };
744
745    let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
746    let offset_arr = sub(positions, &start_position)?;
747    let offset_arr_ref = offset_arr.as_primitive::<T>();
748    let value_arrs = read_array(
749        reader,
750        &field.children[0],
751        batch_id,
752        page_table,
753        &value_params,
754    )
755    .await?;
756    let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
757    Ok(Arc::new(arr) as ArrayRef)
758}
759
760#[cfg(test)]
761mod tests {
762    use crate::writer::{FileWriter, NotSelfDescribing};
763
764    use super::*;
765
766    use arrow_array::{
767        builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
768        cast::{as_string_array, as_struct_array},
769        types::UInt8Type,
770        Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
771        UInt8Array,
772    };
773    use arrow_array::{BooleanArray, Int32Array};
774    use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
775    use lance_io::object_store::ObjectStoreParams;
776
777    #[tokio::test]
778    async fn test_take() {
779        let arrow_schema = ArrowSchema::new(vec![
780            ArrowField::new("i", DataType::Int64, true),
781            ArrowField::new("f", DataType::Float32, false),
782            ArrowField::new("s", DataType::Utf8, false),
783            ArrowField::new(
784                "d",
785                DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
786                false,
787            ),
788        ]);
789        let mut schema = Schema::try_from(&arrow_schema).unwrap();
790
791        let store = ObjectStore::memory();
792        let path = Path::from("/take_test");
793
794        // Write 10 batches.
795        let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
796        let values_ref = Arc::new(values);
797        let mut batches = vec![];
798        for batch_id in 0..10 {
799            let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
800            let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
801            let columns: Vec<ArrayRef> = vec![
802                Arc::new(Int64Array::from_iter(
803                    value_range.clone().collect::<Vec<_>>(),
804                )),
805                Arc::new(Float32Array::from_iter(
806                    value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
807                )),
808                Arc::new(StringArray::from_iter_values(
809                    value_range.clone().map(|n| format!("str-{}", n)),
810                )),
811                Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
812            ];
813            batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
814        }
815        schema.set_dictionary(&batches[0]).unwrap();
816
817        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
818            &store,
819            &path,
820            schema.clone(),
821            &Default::default(),
822        )
823        .await
824        .unwrap();
825        for batch in batches.iter() {
826            file_writer.write(&[batch.clone()]).await.unwrap();
827        }
828        file_writer.finish().await.unwrap();
829
830        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
831        let batch = reader
832            .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
833            .await
834            .unwrap();
835        let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
836        assert_eq!(
837            batch,
838            RecordBatch::try_new(
839                batch.schema(),
840                vec![
841                    Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
842                    Arc::new(Float32Array::from_iter_values([
843                        1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
844                    ])),
845                    Arc::new(StringArray::from_iter_values([
846                        "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
847                    ])),
848                    Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
849                ]
850            )
851            .unwrap()
852        );
853    }
854
855    async fn test_write_null_string_in_struct(field_nullable: bool) {
856        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
857            "parent",
858            DataType::Struct(ArrowFields::from(vec![ArrowField::new(
859                "str",
860                DataType::Utf8,
861                field_nullable,
862            )])),
863            true,
864        )]));
865
866        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
867
868        let store = ObjectStore::memory();
869        let path = Path::from("/null_strings");
870
871        let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
872        let struct_arr = Arc::new(StructArray::from(vec![(
873            Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
874            string_arr.clone() as ArrayRef,
875        )]));
876        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
877
878        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
879            &store,
880            &path,
881            schema.clone(),
882            &Default::default(),
883        )
884        .await
885        .unwrap();
886        file_writer.write(&[batch.clone()]).await.unwrap();
887        file_writer.finish().await.unwrap();
888
889        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
890        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
891
892        if field_nullable {
893            assert_eq!(
894                &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
895                as_string_array(
896                    as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
897                        .column_by_name("str")
898                        .unwrap()
899                        .as_ref()
900                )
901            );
902        } else {
903            assert_eq!(actual_batch, batch);
904        }
905    }
906
907    #[tokio::test]
908    async fn read_nullable_string_in_struct() {
909        test_write_null_string_in_struct(true).await;
910        test_write_null_string_in_struct(false).await;
911    }
912
913    #[tokio::test]
914    async fn test_read_struct_of_list_arrays() {
915        let store = ObjectStore::memory();
916        let path = Path::from("/null_strings");
917
918        let arrow_schema = make_schema_of_list_array();
919        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
920
921        let batches = (0..3)
922            .map(|_| {
923                let struct_array = make_struct_of_list_array(10, 10);
924                RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
925            })
926            .collect::<Vec<_>>();
927        let batches_ref = batches.iter().collect::<Vec<_>>();
928
929        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
930            &store,
931            &path,
932            schema.clone(),
933            &Default::default(),
934        )
935        .await
936        .unwrap();
937        file_writer.write(&batches).await.unwrap();
938        file_writer.finish().await.unwrap();
939
940        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
941        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
942        let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
943        assert_eq!(expected, actual_batch);
944    }
945
946    #[tokio::test]
947    async fn test_scan_struct_of_list_arrays() {
948        let store = ObjectStore::memory();
949        let path = Path::from("/null_strings");
950
951        let arrow_schema = make_schema_of_list_array();
952        let struct_array = make_struct_of_list_array(3, 10);
953        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
954        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
955
956        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
957            &store,
958            &path,
959            schema.clone(),
960            &Default::default(),
961        )
962        .await
963        .unwrap();
964        file_writer.write(&[batch]).await.unwrap();
965        file_writer.finish().await.unwrap();
966
967        let mut expected_columns: Vec<ArrayRef> = Vec::new();
968        for c in struct_array.columns().iter() {
969            expected_columns.push(c.slice(1, 1));
970        }
971
972        let expected_struct = match arrow_schema.fields[0].data_type() {
973            DataType::Struct(subfields) => subfields
974                .iter()
975                .zip(expected_columns)
976                .map(|(f, d)| (f.clone(), d))
977                .collect::<Vec<_>>(),
978            _ => panic!("unexpected field"),
979        };
980
981        let expected_struct_array = StructArray::from(expected_struct);
982        let expected_batch = RecordBatch::from(&StructArray::from(vec![(
983            Arc::new(arrow_schema.fields[0].as_ref().clone()),
984            Arc::new(expected_struct_array) as ArrayRef,
985        )]));
986
987        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
988        let params = ReadBatchParams::Range(1..2);
989        let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
990        assert_eq!(expected_batch, slice_of_batch);
991    }
992
993    fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
994        Arc::new(ArrowSchema::new(vec![ArrowField::new(
995            "s",
996            DataType::Struct(ArrowFields::from(vec![
997                ArrowField::new(
998                    "li",
999                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1000                    true,
1001                ),
1002                ArrowField::new(
1003                    "ls",
1004                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1005                    true,
1006                ),
1007                ArrowField::new(
1008                    "ll",
1009                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1010                    false,
1011                ),
1012            ])),
1013            true,
1014        )]))
1015    }
1016
1017    fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1018        let mut li_builder = ListBuilder::new(Int32Builder::new());
1019        let mut ls_builder = ListBuilder::new(StringBuilder::new());
1020        let ll_value_builder = Int32Builder::new();
1021        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1022        for i in 0..rows {
1023            for j in 0..num_items {
1024                li_builder.values().append_value(i * 10 + j);
1025                ls_builder
1026                    .values()
1027                    .append_value(format!("str-{}", i * 10 + j));
1028                large_list_builder.values().append_value(i * 10 + j);
1029            }
1030            li_builder.append(true);
1031            ls_builder.append(true);
1032            large_list_builder.append(true);
1033        }
1034        Arc::new(StructArray::from(vec![
1035            (
1036                Arc::new(ArrowField::new(
1037                    "li",
1038                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1039                    true,
1040                )),
1041                Arc::new(li_builder.finish()) as ArrayRef,
1042            ),
1043            (
1044                Arc::new(ArrowField::new(
1045                    "ls",
1046                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1047                    true,
1048                )),
1049                Arc::new(ls_builder.finish()) as ArrayRef,
1050            ),
1051            (
1052                Arc::new(ArrowField::new(
1053                    "ll",
1054                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1055                    false,
1056                )),
1057                Arc::new(large_list_builder.finish()) as ArrayRef,
1058            ),
1059        ]))
1060    }
1061
1062    #[tokio::test]
1063    async fn test_read_nullable_arrays() {
1064        use arrow_array::Array;
1065
1066        // create a record batch with a null array column
1067        let arrow_schema = ArrowSchema::new(vec![
1068            ArrowField::new("i", DataType::Int64, false),
1069            ArrowField::new("n", DataType::Null, true),
1070        ]);
1071        let schema = Schema::try_from(&arrow_schema).unwrap();
1072        let columns: Vec<ArrayRef> = vec![
1073            Arc::new(Int64Array::from_iter_values(0..100)),
1074            Arc::new(NullArray::new(100)),
1075        ];
1076        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1077
1078        // write to a lance file
1079        let store = ObjectStore::memory();
1080        let path = Path::from("/takes");
1081        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1082            &store,
1083            &path,
1084            schema.clone(),
1085            &Default::default(),
1086        )
1087        .await
1088        .unwrap();
1089        file_writer.write(&[batch]).await.unwrap();
1090        file_writer.finish().await.unwrap();
1091
1092        // read the file back
1093        let reader = FileReader::try_new(&store, &path, schema.clone())
1094            .await
1095            .unwrap();
1096
1097        async fn read_array_w_params(
1098            reader: &FileReader,
1099            field: &Field,
1100            params: ReadBatchParams,
1101        ) -> ArrayRef {
1102            read_array(reader, field, 0, reader.page_table.as_ref(), &params)
1103                .await
1104                .expect("Error reading back the null array from file") as _
1105        }
1106
1107        let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1108        assert_eq!(100, arr.len());
1109        assert_eq!(arr.data_type(), &DataType::Null);
1110
1111        let arr =
1112            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1113        assert_eq!(15, arr.len());
1114        assert_eq!(arr.data_type(), &DataType::Null);
1115
1116        let arr =
1117            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1118        assert_eq!(40, arr.len());
1119        assert_eq!(arr.data_type(), &DataType::Null);
1120
1121        let arr =
1122            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1123        assert_eq!(25, arr.len());
1124        assert_eq!(arr.data_type(), &DataType::Null);
1125
1126        let arr = read_array_w_params(
1127            &reader,
1128            &schema.fields[1],
1129            ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1130        )
1131        .await;
1132        assert_eq!(4, arr.len());
1133        assert_eq!(arr.data_type(), &DataType::Null);
1134
1135        // raise error if take indices are out of bounds
1136        let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1137        let arr = read_array(
1138            &reader,
1139            &schema.fields[1],
1140            0,
1141            reader.page_table.as_ref(),
1142            &params,
1143        );
1144        assert!(arr.await.is_err());
1145
1146        // raise error if range indices are out of bounds
1147        let params = ReadBatchParams::RangeTo(..107);
1148        let arr = read_array(
1149            &reader,
1150            &schema.fields[1],
1151            0,
1152            reader.page_table.as_ref(),
1153            &params,
1154        );
1155        assert!(arr.await.is_err());
1156    }
1157
1158    #[tokio::test]
1159    async fn test_take_lists() {
1160        let arrow_schema = ArrowSchema::new(vec![
1161            ArrowField::new(
1162                "l",
1163                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1164                false,
1165            ),
1166            ArrowField::new(
1167                "ll",
1168                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1169                false,
1170            ),
1171        ]);
1172
1173        let value_builder = Int32Builder::new();
1174        let mut list_builder = ListBuilder::new(value_builder);
1175        let ll_value_builder = Int32Builder::new();
1176        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1177        for i in 0..100 {
1178            list_builder.values().append_value(i);
1179            large_list_builder.values().append_value(i);
1180            if (i + 1) % 10 == 0 {
1181                list_builder.append(true);
1182                large_list_builder.append(true);
1183            }
1184        }
1185        let list_arr = Arc::new(list_builder.finish());
1186        let large_list_arr = Arc::new(large_list_builder.finish());
1187
1188        let batch = RecordBatch::try_new(
1189            Arc::new(arrow_schema.clone()),
1190            vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1191        )
1192        .unwrap();
1193
1194        // write to a lance file
1195        let store = ObjectStore::memory();
1196        let path = Path::from("/take_list");
1197        let schema: Schema = (&arrow_schema).try_into().unwrap();
1198        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1199            &store,
1200            &path,
1201            schema.clone(),
1202            &Default::default(),
1203        )
1204        .await
1205        .unwrap();
1206        file_writer.write(&[batch]).await.unwrap();
1207        file_writer.finish().await.unwrap();
1208
1209        // read the file back
1210        let reader = FileReader::try_new(&store, &path, schema.clone())
1211            .await
1212            .unwrap();
1213        let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1214
1215        let value_builder = Int32Builder::new();
1216        let mut list_builder = ListBuilder::new(value_builder);
1217        let ll_value_builder = Int32Builder::new();
1218        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1219        for i in [1, 3, 5, 9] {
1220            for j in 0..10 {
1221                list_builder.values().append_value(i * 10 + j);
1222                large_list_builder.values().append_value(i * 10 + j);
1223            }
1224            list_builder.append(true);
1225            large_list_builder.append(true);
1226        }
1227        let expected_list = list_builder.finish();
1228        let expected_large_list = large_list_builder.finish();
1229
1230        assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1231        assert_eq!(
1232            actual.column_by_name("ll").unwrap().as_ref(),
1233            &expected_large_list
1234        );
1235    }
1236
1237    #[tokio::test]
1238    async fn test_list_array_with_offsets() {
1239        let arrow_schema = ArrowSchema::new(vec![
1240            ArrowField::new(
1241                "l",
1242                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1243                false,
1244            ),
1245            ArrowField::new(
1246                "ll",
1247                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1248                false,
1249            ),
1250        ]);
1251
1252        let store = ObjectStore::memory();
1253        let path = Path::from("/lists");
1254
1255        let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1256            Some(vec![Some(1), Some(2)]),
1257            Some(vec![Some(3), Some(4)]),
1258            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1259        ])
1260        .slice(1, 1);
1261        let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1262            Some(vec![Some(10), Some(11)]),
1263            Some(vec![Some(12), Some(13)]),
1264            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1265        ])
1266        .slice(1, 1);
1267
1268        let batch = RecordBatch::try_new(
1269            Arc::new(arrow_schema.clone()),
1270            vec![Arc::new(list_array), Arc::new(large_list_array)],
1271        )
1272        .unwrap();
1273
1274        let schema: Schema = (&arrow_schema).try_into().unwrap();
1275        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1276            &store,
1277            &path,
1278            schema.clone(),
1279            &Default::default(),
1280        )
1281        .await
1282        .unwrap();
1283        file_writer.write(&[batch.clone()]).await.unwrap();
1284        file_writer.finish().await.unwrap();
1285
1286        // Make sure the big array was not written to the file
1287        let file_size_bytes = store.size(&path).await.unwrap();
1288        assert!(file_size_bytes < 1_000);
1289
1290        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1291        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1292        assert_eq!(batch, actual_batch);
1293    }
1294
1295    #[tokio::test]
1296    async fn test_read_ranges() {
1297        // create a record batch with a null array column
1298        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1299        let schema = Schema::try_from(&arrow_schema).unwrap();
1300        let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1301        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1302
1303        // write to a lance file
1304        let store = ObjectStore::memory();
1305        let path = Path::from("/read_range");
1306        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1307            &store,
1308            &path,
1309            schema.clone(),
1310            &Default::default(),
1311        )
1312        .await
1313        .unwrap();
1314        file_writer.write(&[batch]).await.unwrap();
1315        file_writer.finish().await.unwrap();
1316
1317        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1318        let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1319
1320        assert_eq!(
1321            actual_batch.column_by_name("i").unwrap().as_ref(),
1322            &Int64Array::from_iter_values(7..25)
1323        );
1324    }
1325
1326    #[tokio::test]
1327    async fn test_batches_stream() {
1328        let store = ObjectStore::memory();
1329        let path = Path::from("/batch_stream");
1330
1331        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1332        let schema = Schema::try_from(&arrow_schema).unwrap();
1333        let mut writer = FileWriter::<NotSelfDescribing>::try_new(
1334            &store,
1335            &path,
1336            schema.clone(),
1337            &Default::default(),
1338        )
1339        .await
1340        .unwrap();
1341        for i in 0..10 {
1342            let batch = RecordBatch::try_new(
1343                Arc::new(arrow_schema.clone()),
1344                vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1345            )
1346            .unwrap();
1347            writer.write(&[batch]).await.unwrap();
1348        }
1349        writer.finish().await.unwrap();
1350
1351        let reader = FileReader::try_new(&store, &path, schema.clone())
1352            .await
1353            .unwrap();
1354        let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1355        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1356
1357        assert_eq!(batches.len(), 5);
1358        for (i, batch) in batches.iter().enumerate() {
1359            assert_eq!(
1360                batch,
1361                &RecordBatch::try_new(
1362                    Arc::new(arrow_schema.clone()),
1363                    vec![Arc::new(Int32Array::from_iter_values(
1364                        i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1365                    ))],
1366                )
1367                .unwrap()
1368            )
1369        }
1370    }
1371
1372    #[tokio::test]
1373    async fn test_take_boolean_beyond_chunk() {
1374        let store = ObjectStore::from_uri_and_params(
1375            Arc::new(Default::default()),
1376            "memory://",
1377            &ObjectStoreParams {
1378                block_size: Some(256),
1379                ..Default::default()
1380            },
1381        )
1382        .await
1383        .unwrap()
1384        .0;
1385        let path = Path::from("/take_bools");
1386
1387        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1388            "b",
1389            DataType::Boolean,
1390            false,
1391        )]));
1392        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1393        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1394            &store,
1395            &path,
1396            schema.clone(),
1397            &Default::default(),
1398        )
1399        .await
1400        .unwrap();
1401
1402        let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1403        let batch =
1404            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1405        file_writer.write(&[batch]).await.unwrap();
1406        file_writer.finish().await.unwrap();
1407
1408        let reader = FileReader::try_new(&store, &path, schema.clone())
1409            .await
1410            .unwrap();
1411        let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1412
1413        assert_eq!(
1414            actual.column_by_name("b").unwrap().as_ref(),
1415            &BooleanArray::from(vec![false, false, true, false, true])
1416        );
1417    }
1418
1419    #[tokio::test]
1420    async fn test_read_projection() {
1421        // The dataset schema may be very large.  The file reader should support reading
1422        // a small projection of that schema (this just tests the field_offset / num_fields
1423        // parameters)
1424        let store = ObjectStore::memory();
1425        let path = Path::from("/partial_read");
1426
1427        // Create a large schema
1428        let mut fields = vec![];
1429        for i in 0..100 {
1430            fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1431        }
1432        let arrow_schema = ArrowSchema::new(fields);
1433        let schema = Schema::try_from(&arrow_schema).unwrap();
1434
1435        let partial_schema = schema.project(&["f50"]).unwrap();
1436        let partial_arrow: ArrowSchema = (&partial_schema).into();
1437
1438        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1439            &store,
1440            &path,
1441            partial_schema.clone(),
1442            &Default::default(),
1443        )
1444        .await
1445        .unwrap();
1446
1447        let array = Int32Array::from(vec![0; 15]);
1448        let batch =
1449            RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1450        file_writer.write(&[batch.clone()]).await.unwrap();
1451        file_writer.finish().await.unwrap();
1452
1453        let field_id = partial_schema.fields.first().unwrap().id;
1454        let reader = FileReader::try_new_with_fragment_id(
1455            &store,
1456            &path,
1457            schema.clone(),
1458            0,
1459            /*min_field_id=*/ field_id,
1460            /*max_field_id=*/ field_id,
1461            None,
1462        )
1463        .await
1464        .unwrap();
1465        let actual = reader
1466            .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1467            .await
1468            .unwrap();
1469
1470        assert_eq!(actual, batch);
1471    }
1472}