Skip to main content

lance_file/previous/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance Data File Reader
5
6// Standard
7use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12    ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
13    RecordBatch, StructArray, UInt32Array,
14    builder::PrimitiveBuilder,
15    cast::AsArray,
16    types::{Int32Type, Int64Type},
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{Future, FutureExt, StreamExt, TryStreamExt, stream};
24use lance_arrow::*;
25use lance_core::cache::{CacheKey, LanceCache};
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::AsyncIndex;
29use lance_io::encodings::dictionary::DictionaryDecoder;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33    read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{ReadBatchParams, object_store::ObjectStore};
36use std::borrow::Cow;
37
38use object_store::path::Path;
39use tracing::instrument;
40
41use crate::previous::format::metadata::Metadata;
42use crate::previous::page_table::{PageInfo, PageTable};
43
44/// Lance File Reader.
45///
46/// It reads arrow data from one data file.
47#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49    pub object_reader: Arc<dyn Reader>,
50    metadata: Arc<Metadata>,
51    page_table: Arc<PageTable>,
52    schema: Schema,
53
54    /// The id of the fragment which this file belong to.
55    /// For simple file access, this can just be zero.
56    fragment_id: u64,
57
58    /// Page table for statistics
59    stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("FileReader")
65            .field("fragment", &self.fragment_id)
66            .field("path", &self.object_reader.path())
67            .finish()
68    }
69}
70
71// Generic cache key for string-based keys
72struct StringCacheKey<'a, T> {
73    key: &'a str,
74    _phantom: std::marker::PhantomData<T>,
75}
76
77impl<'a, T> StringCacheKey<'a, T> {
78    fn new(key: &'a str) -> Self {
79        Self {
80            key,
81            _phantom: std::marker::PhantomData,
82        }
83    }
84}
85
86impl<T> CacheKey for StringCacheKey<'_, T> {
87    type ValueType = T;
88
89    fn key(&self) -> Cow<'_, str> {
90        self.key.into()
91    }
92}
93
94impl FileReader {
95    /// Open file reader
96    ///
97    /// Open the file at the given path using the provided object store.
98    ///
99    /// The passed fragment ID determines the first 32-bits of the row IDs.
100    ///
101    /// If a manifest is passed in, it will be used to load the schema and dictionary.
102    /// This is typically done if the file is part of a dataset fragment. If no manifest
103    /// is passed in, then it is read from the file itself.
104    ///
105    /// The session passed in is used to cache metadata about the file. If no session
106    /// is passed in, there will be no caching.
107    #[instrument(level = "debug", skip(object_store, schema, session))]
108    pub async fn try_new_with_fragment_id(
109        object_store: &ObjectStore,
110        path: &Path,
111        schema: Schema,
112        fragment_id: u32,
113        field_id_offset: i32,
114        max_field_id: i32,
115        session: Option<&LanceCache>,
116    ) -> Result<Self> {
117        let object_reader = object_store.open(path).await?;
118
119        let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
120
121        Self::try_new_from_reader(
122            path,
123            object_reader.into(),
124            Some(metadata),
125            schema,
126            fragment_id,
127            field_id_offset,
128            max_field_id,
129            session,
130        )
131        .await
132    }
133
134    #[allow(clippy::too_many_arguments)]
135    pub async fn try_new_from_reader(
136        path: &Path,
137        object_reader: Arc<dyn Reader>,
138        metadata: Option<Arc<Metadata>>,
139        schema: Schema,
140        fragment_id: u32,
141        field_id_offset: i32,
142        max_field_id: i32,
143        session: Option<&LanceCache>,
144    ) -> Result<Self> {
145        let metadata = match metadata {
146            Some(metadata) => metadata,
147            None => Self::read_metadata(object_reader.as_ref(), session).await?,
148        };
149
150        let page_table = async {
151            Self::load_from_cache(session, path.to_string(), |_| async {
152                PageTable::load(
153                    object_reader.as_ref(),
154                    metadata.page_table_position,
155                    field_id_offset,
156                    max_field_id,
157                    metadata.num_batches() as i32,
158                )
159                .await
160            })
161            .await
162        };
163
164        let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
165
166        // Can concurrently load page tables
167        let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
168
169        Ok(Self {
170            object_reader,
171            metadata,
172            schema,
173            page_table,
174            fragment_id: fragment_id as u64,
175            stats_page_table,
176        })
177    }
178
179    pub async fn read_metadata(
180        object_reader: &dyn Reader,
181        cache: Option<&LanceCache>,
182    ) -> Result<Arc<Metadata>> {
183        Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
184            let file_size = object_reader.size().await?;
185            let begin = if file_size < object_reader.block_size() {
186                0
187            } else {
188                file_size - object_reader.block_size()
189            };
190            let tail_bytes = object_reader.get_range(begin..file_size).await?;
191            let metadata_pos = read_metadata_offset(&tail_bytes)?;
192
193            let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
194                // We have not read the metadata bytes yet.
195                read_struct(object_reader, metadata_pos).await?
196            } else {
197                let offset = tail_bytes.len() - (file_size - metadata_pos);
198                read_struct_from_buf(&tail_bytes.slice(offset..))?
199            };
200            Ok(metadata)
201        })
202        .await
203    }
204
205    /// Get the statistics page table. This will read the metadata if it is not cached.
206    ///
207    /// The page table is cached.
208    async fn read_stats_page_table(
209        reader: &dyn Reader,
210        cache: Option<&LanceCache>,
211    ) -> Result<Arc<Option<PageTable>>> {
212        // To prevent collisions, we cache this at a child path
213        Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async {
214            let metadata = Self::read_metadata(reader, cache).await?;
215
216            if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
217                Ok(Some(
218                    PageTable::load(
219                        reader,
220                        stats_meta.page_table_position,
221                        /*min_field_id=*/ 0,
222                        /*max_field_id=*/ *stats_meta.leaf_field_ids.iter().max().unwrap(),
223                        /*num_batches=*/ 1,
224                    )
225                    .await?,
226                ))
227            } else {
228                Ok(None)
229            }
230        })
231        .await
232    }
233
234    /// Load some metadata about the fragment from the cache, if there is one.
235    async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
236        cache: Option<&LanceCache>,
237        key: String,
238        loader: F,
239    ) -> Result<Arc<T>>
240    where
241        F: Fn(&str) -> Fut,
242        Fut: Future<Output = Result<T>> + Send,
243    {
244        if let Some(cache) = cache {
245            let cache_key = StringCacheKey::<T>::new(key.as_str());
246            cache
247                .get_or_insert_with_key(cache_key, || loader(key.as_str()))
248                .await
249        } else {
250            Ok(Arc::new(loader(key.as_str()).await?))
251        }
252    }
253
254    /// Open one Lance data file for read.
255    pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
256        // If just reading a lance data file we assume the schema is the schema of the data file
257        let max_field_id = schema.max_field_id().unwrap_or_default();
258        Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
259    }
260
261    fn io_parallelism(&self) -> usize {
262        self.object_reader.io_parallelism()
263    }
264
265    /// Requested projection of the data in this file, excluding the row id column.
266    pub fn schema(&self) -> &Schema {
267        &self.schema
268    }
269
270    pub fn num_batches(&self) -> usize {
271        self.metadata.num_batches()
272    }
273
274    /// Get the number of rows in this batch
275    pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
276        self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
277    }
278
279    /// Count the number of rows in this file.
280    pub fn len(&self) -> usize {
281        self.metadata.len()
282    }
283
284    pub fn is_empty(&self) -> bool {
285        self.metadata.is_empty()
286    }
287
288    /// Read a batch of data from the file.
289    ///
290    /// The schema of the returned [RecordBatch] is set by [`FileReader::schema()`].
291    #[instrument(level = "debug", skip(self, params, projection))]
292    pub async fn read_batch(
293        &self,
294        batch_id: i32,
295        params: impl Into<ReadBatchParams>,
296        projection: &Schema,
297    ) -> Result<RecordBatch> {
298        read_batch(self, &params.into(), projection, batch_id).await
299    }
300
301    /// Read a range of records into one batch.
302    ///
303    /// Note that it might call concat if the range is crossing multiple batches, which
304    /// makes it less efficient than [`FileReader::read_batch()`].
305    #[instrument(level = "debug", skip(self, projection))]
306    pub async fn read_range(
307        &self,
308        range: Range<usize>,
309        projection: &Schema,
310    ) -> Result<RecordBatch> {
311        if range.is_empty() {
312            return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
313        }
314        let range_in_batches = self.metadata.range_to_batches(range)?;
315        let batches =
316            stream::iter(range_in_batches)
317                .map(|(batch_id, range)| async move {
318                    self.read_batch(batch_id, range, projection).await
319                })
320                .buffered(self.io_parallelism())
321                .try_collect::<Vec<_>>()
322                .await?;
323        if batches.len() == 1 {
324            return Ok(batches[0].clone());
325        }
326        let schema = batches[0].schema();
327        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
328    }
329
330    /// Take by records by indices within the file.
331    ///
332    /// The indices must be sorted.
333    #[instrument(level = "debug", skip_all)]
334    pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
335        let num_batches = self.num_batches();
336        let num_rows = self.len() as u32;
337        let indices_in_batches = self.metadata.group_indices_to_batches(indices);
338        let batches = stream::iter(indices_in_batches)
339            .map(|batch| async move {
340                if batch.batch_id >= num_batches as i32 {
341                    Err(Error::invalid_input_source(
342                        format!("batch_id: {} out of bounds", batch.batch_id).into(),
343                    ))
344                } else if *batch.offsets.last().expect("got empty batch") > num_rows {
345                    Err(Error::invalid_input_source(
346                        format!("indices: {:?} out of bounds", batch.offsets).into(),
347                    ))
348                } else {
349                    self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
350                        .await
351                }
352            })
353            .buffered(self.io_parallelism())
354            .try_collect::<Vec<_>>()
355            .await?;
356
357        let schema = Arc::new(ArrowSchema::from(projection));
358
359        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
360    }
361
362    /// Get the schema of the statistics page table, for the given data field ids.
363    pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
364        self.metadata.stats_metadata.as_ref().map(|meta| {
365            let mut stats_field_ids = vec![];
366            for stats_field in &meta.schema.fields {
367                if let Ok(stats_field_id) = stats_field.name.parse::<i32>()
368                    && field_ids.contains(&stats_field_id)
369                {
370                    stats_field_ids.push(stats_field.id);
371                    for child in &stats_field.children {
372                        stats_field_ids.push(child.id);
373                    }
374                }
375            }
376            meta.schema.project_by_ids(&stats_field_ids, true)
377        })
378    }
379
380    /// Get the page statistics for the given data field ids.
381    pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
382        if let Some(stats_page_table) = self.stats_page_table.as_ref() {
383            let projection = self.page_stats_schema(field_ids).unwrap();
384            // It's possible none of the requested fields have stats.
385            if projection.fields.is_empty() {
386                return Ok(None);
387            }
388            let arrays = futures::stream::iter(projection.fields.iter().cloned())
389                .map(|field| async move {
390                    read_array(
391                        self,
392                        &field,
393                        0,
394                        stats_page_table,
395                        &ReadBatchParams::RangeFull,
396                    )
397                    .await
398                })
399                .buffered(self.io_parallelism())
400                .try_collect::<Vec<_>>()
401                .await?;
402
403            let schema = ArrowSchema::from(&projection);
404            let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
405            Ok(Some(batch))
406        } else {
407            Ok(None)
408        }
409    }
410}
411
412/// Stream desired full batches from the file.
413///
414/// Parameters:
415/// - **reader**: An opened file reader.
416/// - **projection**: The schema of the returning [RecordBatch].
417/// - **predicate**: A function that takes a batch ID and returns true if the batch should be
418///   returned.
419///
420/// Returns:
421/// - A stream of [RecordBatch]s, each one corresponding to one full batch in the file.
422pub fn batches_stream(
423    reader: FileReader,
424    projection: Schema,
425    predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
426) -> impl RecordBatchStream {
427    // Make projection an Arc so we can clone it and pass between threads.
428    let projection = Arc::new(projection);
429    let arrow_schema = ArrowSchema::from(projection.as_ref());
430
431    let total_batches = reader.num_batches() as i32;
432    let batches = (0..total_batches).filter(predicate);
433    // Make another copy of self so we can clone it and pass between threads.
434    let this = Arc::new(reader);
435    let inner = stream::iter(batches)
436        .zip(stream::repeat_with(move || {
437            (this.clone(), projection.clone())
438        }))
439        .map(move |(batch_id, (reader, projection))| async move {
440            reader
441                .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
442                .await
443        })
444        .buffered(2)
445        .boxed();
446    RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
447}
448
449/// Read a batch.
450///
451/// `schema` may only be empty if `with_row_id` is also true. This function
452/// panics otherwise.
453pub async fn read_batch(
454    reader: &FileReader,
455    params: &ReadBatchParams,
456    schema: &Schema,
457    batch_id: i32,
458) -> Result<RecordBatch> {
459    if !schema.fields.is_empty() {
460        // We box this because otherwise we get a higher-order lifetime error.
461        let arrs = stream::iter(&schema.fields)
462            .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
463            .buffered(reader.io_parallelism())
464            .try_collect::<Vec<_>>()
465            .boxed();
466        let arrs = arrs.await?;
467        Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
468    } else {
469        Err(Error::invalid_input("no fields requested"))
470    }
471}
472
473#[async_recursion]
474async fn read_array(
475    reader: &FileReader,
476    field: &Field,
477    batch_id: i32,
478    page_table: &PageTable,
479    params: &ReadBatchParams,
480) -> Result<ArrayRef> {
481    let data_type = field.data_type();
482
483    use DataType::*;
484
485    if data_type.is_fixed_stride() {
486        _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
487    } else {
488        match data_type {
489            Null => read_null_array(field, batch_id, page_table, params),
490            Utf8 | LargeUtf8 | Binary | LargeBinary => {
491                read_binary_array(reader, field, batch_id, page_table, params).await
492            }
493            Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
494            Dictionary(_, _) => {
495                read_dictionary_array(reader, field, batch_id, page_table, params).await
496            }
497            List(_) => {
498                read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
499            }
500            LargeList(_) => {
501                read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
502            }
503            _ => {
504                unimplemented!("{}", format!("No support for {data_type} yet"));
505            }
506        }
507    }
508}
509
510fn get_page_info<'a>(
511    page_table: &'a PageTable,
512    field: &'a Field,
513    batch_id: i32,
514) -> Result<&'a PageInfo> {
515    page_table.get(field.id, batch_id).ok_or_else(|| {
516        Error::invalid_input(format!(
517            "No page info found for field: {}, field_id={} batch={}",
518            field.name, field.id, batch_id
519        ))
520    })
521}
522
523/// Read primitive array for batch `batch_idx`.
524async fn _read_fixed_stride_array(
525    reader: &FileReader,
526    field: &Field,
527    batch_id: i32,
528    page_table: &PageTable,
529    params: &ReadBatchParams,
530) -> Result<ArrayRef> {
531    let page_info = get_page_info(page_table, field, batch_id)?;
532    read_fixed_stride_array(
533        reader.object_reader.as_ref(),
534        &field.data_type(),
535        page_info.position,
536        page_info.length,
537        params.clone(),
538    )
539    .await
540}
541
542fn read_null_array(
543    field: &Field,
544    batch_id: i32,
545    page_table: &PageTable,
546    params: &ReadBatchParams,
547) -> Result<ArrayRef> {
548    let page_info = get_page_info(page_table, field, batch_id)?;
549
550    let length_output = match params {
551        ReadBatchParams::Indices(indices) => {
552            if indices.is_empty() {
553                0
554            } else {
555                let idx_max = *indices.values().iter().max().unwrap() as u64;
556                if idx_max >= page_info.length as u64 {
557                    return Err(Error::invalid_input(format!(
558                        "NullArray Reader: request([{}]) out of range: [0..{}]",
559                        idx_max, page_info.length
560                    )));
561                }
562                indices.len()
563            }
564        }
565        _ => {
566            let (idx_start, idx_end) = match params {
567                ReadBatchParams::Range(r) => (r.start, r.end),
568                ReadBatchParams::RangeFull => (0, page_info.length),
569                ReadBatchParams::RangeTo(r) => (0, r.end),
570                ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
571                _ => unreachable!(),
572            };
573            if idx_end > page_info.length {
574                return Err(Error::invalid_input(format!(
575                    "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
576                    // and wrap it in here.
577                    idx_start,
578                    idx_end,
579                    page_info.length
580                )));
581            }
582            idx_end - idx_start
583        }
584    };
585
586    Ok(Arc::new(NullArray::new(length_output)))
587}
588
589async fn read_binary_array(
590    reader: &FileReader,
591    field: &Field,
592    batch_id: i32,
593    page_table: &PageTable,
594    params: &ReadBatchParams,
595) -> Result<ArrayRef> {
596    let page_info = get_page_info(page_table, field, batch_id)?;
597
598    lance_io::utils::read_binary_array(
599        reader.object_reader.as_ref(),
600        &field.data_type(),
601        field.nullable,
602        page_info.position,
603        page_info.length,
604        params,
605    )
606    .await
607}
608
609async fn read_dictionary_array(
610    reader: &FileReader,
611    field: &Field,
612    batch_id: i32,
613    page_table: &PageTable,
614    params: &ReadBatchParams,
615) -> Result<ArrayRef> {
616    let page_info = get_page_info(page_table, field, batch_id)?;
617    let data_type = field.data_type();
618    let decoder = DictionaryDecoder::new(
619        reader.object_reader.as_ref(),
620        page_info.position,
621        page_info.length,
622        &data_type,
623        field
624            .dictionary
625            .as_ref()
626            .unwrap()
627            .values
628            .as_ref()
629            .unwrap()
630            .clone(),
631    );
632    decoder.get(params.clone()).await
633}
634
635async fn read_struct_array(
636    reader: &FileReader,
637    field: &Field,
638    batch_id: i32,
639    page_table: &PageTable,
640    params: &ReadBatchParams,
641) -> Result<ArrayRef> {
642    // TODO: use tokio to make the reads in parallel.
643    let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
644
645    for child in field.children.as_slice() {
646        let arr = read_array(reader, child, batch_id, page_table, params).await?;
647        sub_arrays.push((Arc::new(child.into()), arr));
648    }
649
650    Ok(Arc::new(StructArray::from(sub_arrays)))
651}
652
653async fn take_list_array<T: ArrowNumericType>(
654    reader: &FileReader,
655    field: &Field,
656    batch_id: i32,
657    page_table: &PageTable,
658    positions: &PrimitiveArray<T>,
659    indices: &UInt32Array,
660) -> Result<ArrayRef>
661where
662    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
663{
664    let first_idx = indices.value(0);
665    // Range of values for each index
666    let ranges = indices
667        .values()
668        .iter()
669        .map(|i| (*i - first_idx).as_usize())
670        .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
671        .collect::<Vec<_>>();
672    let field = field.clone();
673    let mut list_values: Vec<ArrayRef> = vec![];
674    // TODO: read them in parallel.
675    for range in ranges.iter() {
676        list_values.push(
677            read_array(
678                reader,
679                &field.children[0],
680                batch_id,
681                page_table,
682                &(range.clone()).into(),
683            )
684            .await?,
685        );
686    }
687
688    let value_refs = list_values
689        .iter()
690        .map(|arr| arr.as_ref())
691        .collect::<Vec<_>>();
692    let mut offsets_builder = PrimitiveBuilder::<T>::new();
693    offsets_builder.append_value(T::Native::usize_as(0));
694    let mut off = 0_usize;
695    for range in ranges {
696        off += range.len();
697        offsets_builder.append_value(T::Native::usize_as(off));
698    }
699    let all_values = concat::concat(value_refs.as_slice())?;
700    let offset_arr = offsets_builder.finish();
701    let arr = try_new_generic_list_array(all_values, &offset_arr)?;
702    Ok(Arc::new(arr) as ArrayRef)
703}
704
705async fn read_list_array<T: ArrowNumericType>(
706    reader: &FileReader,
707    field: &Field,
708    batch_id: i32,
709    page_table: &PageTable,
710    params: &ReadBatchParams,
711) -> Result<ArrayRef>
712where
713    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
714{
715    // Offset the position array by 1 in order to include the upper bound of the last element
716    let positions_params = match params {
717        ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
718        ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
719        ReadBatchParams::Indices(indices) => {
720            (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
721        }
722        p => p.clone(),
723    };
724
725    let page_info = get_page_info(&reader.page_table, field, batch_id)?;
726    let position_arr = read_fixed_stride_array(
727        reader.object_reader.as_ref(),
728        &T::DATA_TYPE,
729        page_info.position,
730        page_info.length,
731        positions_params,
732    )
733    .await?;
734
735    let positions: &PrimitiveArray<T> = position_arr.as_primitive();
736
737    // Recompute params so they align with the offset array
738    let value_params = match params {
739        ReadBatchParams::Range(range) => ReadBatchParams::from(
740            positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
741        ),
742        ReadBatchParams::Ranges(_) => {
743            return Err(Error::internal(
744                "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
745            ));
746        }
747        ReadBatchParams::RangeTo(RangeTo { end }) => {
748            ReadBatchParams::from(..positions.value(*end).as_usize())
749        }
750        ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
751        ReadBatchParams::RangeFull => ReadBatchParams::from(
752            positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
753        ),
754        ReadBatchParams::Indices(indices) => {
755            return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
756        }
757    };
758
759    let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
760    let offset_arr = sub(positions, &start_position)?;
761    let offset_arr_ref = offset_arr.as_primitive::<T>();
762    let value_arrs = read_array(
763        reader,
764        &field.children[0],
765        batch_id,
766        page_table,
767        &value_params,
768    )
769    .await?;
770    let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
771    Ok(Arc::new(arr) as ArrayRef)
772}
773
774#[cfg(test)]
775mod tests {
776    use crate::previous::writer::{FileWriter as PreviousFileWriter, NotSelfDescribing};
777
778    use super::*;
779
780    use arrow_array::{
781        Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
782        UInt8Array,
783        builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
784        cast::{as_string_array, as_struct_array},
785        types::UInt8Type,
786    };
787    use arrow_array::{BooleanArray, Int32Array};
788    use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
789    use lance_io::object_store::ObjectStoreParams;
790
791    #[tokio::test]
792    async fn test_take() {
793        let arrow_schema = ArrowSchema::new(vec![
794            ArrowField::new("i", DataType::Int64, true),
795            ArrowField::new("f", DataType::Float32, false),
796            ArrowField::new("s", DataType::Utf8, false),
797            ArrowField::new(
798                "d",
799                DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
800                false,
801            ),
802        ]);
803        let mut schema = Schema::try_from(&arrow_schema).unwrap();
804
805        let store = ObjectStore::memory();
806        let path = Path::from("/take_test");
807
808        // Write 10 batches.
809        let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
810        let values_ref = Arc::new(values);
811        let mut batches = vec![];
812        for batch_id in 0..10 {
813            let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
814            let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
815            let columns: Vec<ArrayRef> = vec![
816                Arc::new(Int64Array::from_iter(
817                    value_range.clone().collect::<Vec<_>>(),
818                )),
819                Arc::new(Float32Array::from_iter(
820                    value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
821                )),
822                Arc::new(StringArray::from_iter_values(
823                    value_range.clone().map(|n| format!("str-{}", n)),
824                )),
825                Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
826            ];
827            batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
828        }
829        schema.set_dictionary(&batches[0]).unwrap();
830
831        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
832            &store,
833            &path,
834            schema.clone(),
835            &Default::default(),
836        )
837        .await
838        .unwrap();
839        for batch in batches.iter() {
840            file_writer
841                .write(std::slice::from_ref(batch))
842                .await
843                .unwrap();
844        }
845        file_writer.finish().await.unwrap();
846
847        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
848        let batch = reader
849            .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
850            .await
851            .unwrap();
852        let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
853        assert_eq!(
854            batch,
855            RecordBatch::try_new(
856                batch.schema(),
857                vec![
858                    Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
859                    Arc::new(Float32Array::from_iter_values([
860                        1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
861                    ])),
862                    Arc::new(StringArray::from_iter_values([
863                        "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
864                    ])),
865                    Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
866                ]
867            )
868            .unwrap()
869        );
870    }
871
872    async fn test_write_null_string_in_struct(field_nullable: bool) {
873        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
874            "parent",
875            DataType::Struct(ArrowFields::from(vec![ArrowField::new(
876                "str",
877                DataType::Utf8,
878                field_nullable,
879            )])),
880            true,
881        )]));
882
883        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
884
885        let store = ObjectStore::memory();
886        let path = Path::from("/null_strings");
887
888        let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
889        let struct_arr = Arc::new(StructArray::from(vec![(
890            Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
891            string_arr.clone() as ArrayRef,
892        )]));
893        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
894
895        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
896            &store,
897            &path,
898            schema.clone(),
899            &Default::default(),
900        )
901        .await
902        .unwrap();
903        file_writer
904            .write(std::slice::from_ref(&batch))
905            .await
906            .unwrap();
907        file_writer.finish().await.unwrap();
908
909        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
910        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
911
912        if field_nullable {
913            assert_eq!(
914                &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
915                as_string_array(
916                    as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
917                        .column_by_name("str")
918                        .unwrap()
919                        .as_ref()
920                )
921            );
922        } else {
923            assert_eq!(actual_batch, batch);
924        }
925    }
926
927    #[tokio::test]
928    async fn read_nullable_string_in_struct() {
929        test_write_null_string_in_struct(true).await;
930        test_write_null_string_in_struct(false).await;
931    }
932
933    #[tokio::test]
934    async fn test_read_struct_of_list_arrays() {
935        let store = ObjectStore::memory();
936        let path = Path::from("/null_strings");
937
938        let arrow_schema = make_schema_of_list_array();
939        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
940
941        let batches = (0..3)
942            .map(|_| {
943                let struct_array = make_struct_of_list_array(10, 10);
944                RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
945            })
946            .collect::<Vec<_>>();
947        let batches_ref = batches.iter().collect::<Vec<_>>();
948
949        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
950            &store,
951            &path,
952            schema.clone(),
953            &Default::default(),
954        )
955        .await
956        .unwrap();
957        file_writer.write(&batches).await.unwrap();
958        file_writer.finish().await.unwrap();
959
960        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
961        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
962        let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
963        assert_eq!(expected, actual_batch);
964    }
965
966    #[tokio::test]
967    async fn test_scan_struct_of_list_arrays() {
968        let store = ObjectStore::memory();
969        let path = Path::from("/null_strings");
970
971        let arrow_schema = make_schema_of_list_array();
972        let struct_array = make_struct_of_list_array(3, 10);
973        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
974        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
975
976        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
977            &store,
978            &path,
979            schema.clone(),
980            &Default::default(),
981        )
982        .await
983        .unwrap();
984        file_writer.write(&[batch]).await.unwrap();
985        file_writer.finish().await.unwrap();
986
987        let mut expected_columns: Vec<ArrayRef> = Vec::new();
988        for c in struct_array.columns().iter() {
989            expected_columns.push(c.slice(1, 1));
990        }
991
992        let expected_struct = match arrow_schema.fields[0].data_type() {
993            DataType::Struct(subfields) => subfields
994                .iter()
995                .zip(expected_columns)
996                .map(|(f, d)| (f.clone(), d))
997                .collect::<Vec<_>>(),
998            _ => panic!("unexpected field"),
999        };
1000
1001        let expected_struct_array = StructArray::from(expected_struct);
1002        let expected_batch = RecordBatch::from(&StructArray::from(vec![(
1003            Arc::new(arrow_schema.fields[0].as_ref().clone()),
1004            Arc::new(expected_struct_array) as ArrayRef,
1005        )]));
1006
1007        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1008        let params = ReadBatchParams::Range(1..2);
1009        let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
1010        assert_eq!(expected_batch, slice_of_batch);
1011    }
1012
1013    fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
1014        Arc::new(ArrowSchema::new(vec![ArrowField::new(
1015            "s",
1016            DataType::Struct(ArrowFields::from(vec![
1017                ArrowField::new(
1018                    "li",
1019                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1020                    true,
1021                ),
1022                ArrowField::new(
1023                    "ls",
1024                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1025                    true,
1026                ),
1027                ArrowField::new(
1028                    "ll",
1029                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1030                    false,
1031                ),
1032            ])),
1033            true,
1034        )]))
1035    }
1036
1037    fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1038        let mut li_builder = ListBuilder::new(Int32Builder::new());
1039        let mut ls_builder = ListBuilder::new(StringBuilder::new());
1040        let ll_value_builder = Int32Builder::new();
1041        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1042        for i in 0..rows {
1043            for j in 0..num_items {
1044                li_builder.values().append_value(i * 10 + j);
1045                ls_builder
1046                    .values()
1047                    .append_value(format!("str-{}", i * 10 + j));
1048                large_list_builder.values().append_value(i * 10 + j);
1049            }
1050            li_builder.append(true);
1051            ls_builder.append(true);
1052            large_list_builder.append(true);
1053        }
1054        Arc::new(StructArray::from(vec![
1055            (
1056                Arc::new(ArrowField::new(
1057                    "li",
1058                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1059                    true,
1060                )),
1061                Arc::new(li_builder.finish()) as ArrayRef,
1062            ),
1063            (
1064                Arc::new(ArrowField::new(
1065                    "ls",
1066                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1067                    true,
1068                )),
1069                Arc::new(ls_builder.finish()) as ArrayRef,
1070            ),
1071            (
1072                Arc::new(ArrowField::new(
1073                    "ll",
1074                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1075                    false,
1076                )),
1077                Arc::new(large_list_builder.finish()) as ArrayRef,
1078            ),
1079        ]))
1080    }
1081
1082    #[tokio::test]
1083    async fn test_read_nullable_arrays() {
1084        use arrow_array::Array;
1085
1086        // create a record batch with a null array column
1087        let arrow_schema = ArrowSchema::new(vec![
1088            ArrowField::new("i", DataType::Int64, false),
1089            ArrowField::new("n", DataType::Null, true),
1090        ]);
1091        let schema = Schema::try_from(&arrow_schema).unwrap();
1092        let columns: Vec<ArrayRef> = vec![
1093            Arc::new(Int64Array::from_iter_values(0..100)),
1094            Arc::new(NullArray::new(100)),
1095        ];
1096        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1097
1098        // write to a lance file
1099        let store = ObjectStore::memory();
1100        let path = Path::from("/takes");
1101        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1102            &store,
1103            &path,
1104            schema.clone(),
1105            &Default::default(),
1106        )
1107        .await
1108        .unwrap();
1109        file_writer.write(&[batch]).await.unwrap();
1110        file_writer.finish().await.unwrap();
1111
1112        // read the file back
1113        let reader = FileReader::try_new(&store, &path, schema.clone())
1114            .await
1115            .unwrap();
1116
1117        async fn read_array_w_params(
1118            reader: &FileReader,
1119            field: &Field,
1120            params: ReadBatchParams,
1121        ) -> ArrayRef {
1122            read_array(reader, field, 0, reader.page_table.as_ref(), &params)
1123                .await
1124                .expect("Error reading back the null array from file") as _
1125        }
1126
1127        let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1128        assert_eq!(100, arr.len());
1129        assert_eq!(arr.data_type(), &DataType::Null);
1130
1131        let arr =
1132            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1133        assert_eq!(15, arr.len());
1134        assert_eq!(arr.data_type(), &DataType::Null);
1135
1136        let arr =
1137            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1138        assert_eq!(40, arr.len());
1139        assert_eq!(arr.data_type(), &DataType::Null);
1140
1141        let arr =
1142            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1143        assert_eq!(25, arr.len());
1144        assert_eq!(arr.data_type(), &DataType::Null);
1145
1146        let arr = read_array_w_params(
1147            &reader,
1148            &schema.fields[1],
1149            ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1150        )
1151        .await;
1152        assert_eq!(4, arr.len());
1153        assert_eq!(arr.data_type(), &DataType::Null);
1154
1155        // raise error if take indices are out of bounds
1156        let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1157        let arr = read_array(
1158            &reader,
1159            &schema.fields[1],
1160            0,
1161            reader.page_table.as_ref(),
1162            &params,
1163        );
1164        assert!(arr.await.is_err());
1165
1166        // raise error if range indices are out of bounds
1167        let params = ReadBatchParams::RangeTo(..107);
1168        let arr = read_array(
1169            &reader,
1170            &schema.fields[1],
1171            0,
1172            reader.page_table.as_ref(),
1173            &params,
1174        );
1175        assert!(arr.await.is_err());
1176    }
1177
1178    #[tokio::test]
1179    async fn test_take_lists() {
1180        let arrow_schema = ArrowSchema::new(vec![
1181            ArrowField::new(
1182                "l",
1183                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1184                false,
1185            ),
1186            ArrowField::new(
1187                "ll",
1188                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1189                false,
1190            ),
1191        ]);
1192
1193        let value_builder = Int32Builder::new();
1194        let mut list_builder = ListBuilder::new(value_builder);
1195        let ll_value_builder = Int32Builder::new();
1196        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1197        for i in 0..100 {
1198            list_builder.values().append_value(i);
1199            large_list_builder.values().append_value(i);
1200            if (i + 1) % 10 == 0 {
1201                list_builder.append(true);
1202                large_list_builder.append(true);
1203            }
1204        }
1205        let list_arr = Arc::new(list_builder.finish());
1206        let large_list_arr = Arc::new(large_list_builder.finish());
1207
1208        let batch = RecordBatch::try_new(
1209            Arc::new(arrow_schema.clone()),
1210            vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1211        )
1212        .unwrap();
1213
1214        // write to a lance file
1215        let store = ObjectStore::memory();
1216        let path = Path::from("/take_list");
1217        let schema: Schema = (&arrow_schema).try_into().unwrap();
1218        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1219            &store,
1220            &path,
1221            schema.clone(),
1222            &Default::default(),
1223        )
1224        .await
1225        .unwrap();
1226        file_writer.write(&[batch]).await.unwrap();
1227        file_writer.finish().await.unwrap();
1228
1229        // read the file back
1230        let reader = FileReader::try_new(&store, &path, schema.clone())
1231            .await
1232            .unwrap();
1233        let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1234
1235        let value_builder = Int32Builder::new();
1236        let mut list_builder = ListBuilder::new(value_builder);
1237        let ll_value_builder = Int32Builder::new();
1238        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1239        for i in [1, 3, 5, 9] {
1240            for j in 0..10 {
1241                list_builder.values().append_value(i * 10 + j);
1242                large_list_builder.values().append_value(i * 10 + j);
1243            }
1244            list_builder.append(true);
1245            large_list_builder.append(true);
1246        }
1247        let expected_list = list_builder.finish();
1248        let expected_large_list = large_list_builder.finish();
1249
1250        assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1251        assert_eq!(
1252            actual.column_by_name("ll").unwrap().as_ref(),
1253            &expected_large_list
1254        );
1255    }
1256
1257    #[tokio::test]
1258    async fn test_list_array_with_offsets() {
1259        let arrow_schema = ArrowSchema::new(vec![
1260            ArrowField::new(
1261                "l",
1262                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1263                false,
1264            ),
1265            ArrowField::new(
1266                "ll",
1267                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1268                false,
1269            ),
1270        ]);
1271
1272        let store = ObjectStore::memory();
1273        let path = Path::from("/lists");
1274
1275        let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1276            Some(vec![Some(1), Some(2)]),
1277            Some(vec![Some(3), Some(4)]),
1278            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1279        ])
1280        .slice(1, 1);
1281        let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1282            Some(vec![Some(10), Some(11)]),
1283            Some(vec![Some(12), Some(13)]),
1284            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1285        ])
1286        .slice(1, 1);
1287
1288        let batch = RecordBatch::try_new(
1289            Arc::new(arrow_schema.clone()),
1290            vec![Arc::new(list_array), Arc::new(large_list_array)],
1291        )
1292        .unwrap();
1293
1294        let schema: Schema = (&arrow_schema).try_into().unwrap();
1295        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1296            &store,
1297            &path,
1298            schema.clone(),
1299            &Default::default(),
1300        )
1301        .await
1302        .unwrap();
1303        file_writer
1304            .write(std::slice::from_ref(&batch))
1305            .await
1306            .unwrap();
1307        file_writer.finish().await.unwrap();
1308
1309        // Make sure the big array was not written to the file
1310        let file_size_bytes = store.size(&path).await.unwrap();
1311        assert!(file_size_bytes < 1_000);
1312
1313        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1314        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1315        assert_eq!(batch, actual_batch);
1316    }
1317
1318    #[tokio::test]
1319    async fn test_read_ranges() {
1320        // create a record batch with a null array column
1321        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1322        let schema = Schema::try_from(&arrow_schema).unwrap();
1323        let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1324        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1325
1326        // write to a lance file
1327        let store = ObjectStore::memory();
1328        let path = Path::from("/read_range");
1329        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1330            &store,
1331            &path,
1332            schema.clone(),
1333            &Default::default(),
1334        )
1335        .await
1336        .unwrap();
1337        file_writer.write(&[batch]).await.unwrap();
1338        file_writer.finish().await.unwrap();
1339
1340        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1341        let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1342
1343        assert_eq!(
1344            actual_batch.column_by_name("i").unwrap().as_ref(),
1345            &Int64Array::from_iter_values(7..25)
1346        );
1347    }
1348
1349    #[tokio::test]
1350    async fn test_batches_stream() {
1351        let store = ObjectStore::memory();
1352        let path = Path::from("/batch_stream");
1353
1354        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1355        let schema = Schema::try_from(&arrow_schema).unwrap();
1356        let mut writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1357            &store,
1358            &path,
1359            schema.clone(),
1360            &Default::default(),
1361        )
1362        .await
1363        .unwrap();
1364        for i in 0..10 {
1365            let batch = RecordBatch::try_new(
1366                Arc::new(arrow_schema.clone()),
1367                vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1368            )
1369            .unwrap();
1370            writer.write(&[batch]).await.unwrap();
1371        }
1372        writer.finish().await.unwrap();
1373
1374        let reader = FileReader::try_new(&store, &path, schema.clone())
1375            .await
1376            .unwrap();
1377        let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1378        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1379
1380        assert_eq!(batches.len(), 5);
1381        for (i, batch) in batches.iter().enumerate() {
1382            assert_eq!(
1383                batch,
1384                &RecordBatch::try_new(
1385                    Arc::new(arrow_schema.clone()),
1386                    vec![Arc::new(Int32Array::from_iter_values(
1387                        i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1388                    ))],
1389                )
1390                .unwrap()
1391            )
1392        }
1393    }
1394
1395    #[tokio::test]
1396    async fn test_take_boolean_beyond_chunk() {
1397        let store = ObjectStore::from_uri_and_params(
1398            Arc::new(Default::default()),
1399            "memory://",
1400            &ObjectStoreParams {
1401                block_size: Some(256),
1402                ..Default::default()
1403            },
1404        )
1405        .await
1406        .unwrap()
1407        .0;
1408        let path = Path::from("/take_bools");
1409
1410        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1411            "b",
1412            DataType::Boolean,
1413            false,
1414        )]));
1415        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1416        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1417            &store,
1418            &path,
1419            schema.clone(),
1420            &Default::default(),
1421        )
1422        .await
1423        .unwrap();
1424
1425        let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1426        let batch =
1427            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1428        file_writer.write(&[batch]).await.unwrap();
1429        file_writer.finish().await.unwrap();
1430
1431        let reader = FileReader::try_new(&store, &path, schema.clone())
1432            .await
1433            .unwrap();
1434        let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1435
1436        assert_eq!(
1437            actual.column_by_name("b").unwrap().as_ref(),
1438            &BooleanArray::from(vec![false, false, true, false, true])
1439        );
1440    }
1441
1442    #[tokio::test]
1443    async fn test_read_projection() {
1444        // The dataset schema may be very large.  The file reader should support reading
1445        // a small projection of that schema (this just tests the field_offset / num_fields
1446        // parameters)
1447        let store = ObjectStore::memory();
1448        let path = Path::from("/partial_read");
1449
1450        // Create a large schema
1451        let mut fields = vec![];
1452        for i in 0..100 {
1453            fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1454        }
1455        let arrow_schema = ArrowSchema::new(fields);
1456        let schema = Schema::try_from(&arrow_schema).unwrap();
1457
1458        let partial_schema = schema.project(&["f50"]).unwrap();
1459        let partial_arrow: ArrowSchema = (&partial_schema).into();
1460
1461        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1462            &store,
1463            &path,
1464            partial_schema.clone(),
1465            &Default::default(),
1466        )
1467        .await
1468        .unwrap();
1469
1470        let array = Int32Array::from(vec![0; 15]);
1471        let batch =
1472            RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1473        file_writer
1474            .write(std::slice::from_ref(&batch))
1475            .await
1476            .unwrap();
1477        file_writer.finish().await.unwrap();
1478
1479        let field_id = partial_schema.fields.first().unwrap().id;
1480        let reader = FileReader::try_new_with_fragment_id(
1481            &store,
1482            &path,
1483            schema.clone(),
1484            0,
1485            /*min_field_id=*/ field_id,
1486            /*max_field_id=*/ field_id,
1487            None,
1488        )
1489        .await
1490        .unwrap();
1491        let actual = reader
1492            .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1493            .await
1494            .unwrap();
1495
1496        assert_eq!(actual, batch);
1497    }
1498}