Skip to main content

lance_file/previous/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance Data File Reader
5
6// Standard
7use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12    ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
13    RecordBatch, StructArray, UInt32Array,
14    builder::PrimitiveBuilder,
15    cast::AsArray,
16    types::{Int32Type, Int64Type},
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{Future, FutureExt, StreamExt, TryStreamExt, stream};
24use lance_arrow::*;
25use lance_core::cache::{CacheKey, LanceCache};
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::AsyncIndex;
29use lance_io::encodings::dictionary::DictionaryDecoder;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33    read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{ReadBatchParams, object_store::ObjectStore};
36use std::borrow::Cow;
37
38use object_store::path::Path;
39use tracing::instrument;
40
41use crate::previous::format::metadata::Metadata;
42use crate::previous::page_table::{PageInfo, PageTable};
43
44/// Lance File Reader.
45///
46/// It reads arrow data from one data file.
47#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49    pub object_reader: Arc<dyn Reader>,
50    metadata: Arc<Metadata>,
51    page_table: Arc<PageTable>,
52    schema: Schema,
53
54    /// The id of the fragment which this file belong to.
55    /// For simple file access, this can just be zero.
56    fragment_id: u64,
57
58    /// Page table for statistics
59    stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("FileReader")
65            .field("fragment", &self.fragment_id)
66            .field("path", &self.object_reader.path())
67            .finish()
68    }
69}
70
71// Generic cache key for string-based keys
72struct StringCacheKey<'a, T> {
73    key: &'a str,
74    _phantom: std::marker::PhantomData<T>,
75}
76
77impl<'a, T> StringCacheKey<'a, T> {
78    fn new(key: &'a str) -> Self {
79        Self {
80            key,
81            _phantom: std::marker::PhantomData,
82        }
83    }
84}
85
86impl<T: 'static> CacheKey for StringCacheKey<'_, T> {
87    type ValueType = T;
88
89    fn key(&self) -> Cow<'_, str> {
90        self.key.into()
91    }
92
93    fn type_name() -> &'static str {
94        // This is a private, crate-internal key that is only instantiated with
95        // a single concrete T within one build, so std::any::type_name is fine
96        // here — there is no cross-crate collision risk.
97        std::any::type_name::<T>()
98    }
99}
100
101impl FileReader {
102    /// Open file reader
103    ///
104    /// Open the file at the given path using the provided object store.
105    ///
106    /// The passed fragment ID determines the first 32-bits of the row IDs.
107    ///
108    /// If a manifest is passed in, it will be used to load the schema and dictionary.
109    /// This is typically done if the file is part of a dataset fragment. If no manifest
110    /// is passed in, then it is read from the file itself.
111    ///
112    /// The session passed in is used to cache metadata about the file. If no session
113    /// is passed in, there will be no caching.
114    #[instrument(level = "debug", skip(object_store, schema, session))]
115    pub async fn try_new_with_fragment_id(
116        object_store: &ObjectStore,
117        path: &Path,
118        schema: Schema,
119        fragment_id: u32,
120        field_id_offset: i32,
121        max_field_id: i32,
122        session: Option<&LanceCache>,
123    ) -> Result<Self> {
124        let object_reader = object_store.open(path).await?;
125
126        let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
127
128        Self::try_new_from_reader(
129            path,
130            object_reader.into(),
131            Some(metadata),
132            schema,
133            fragment_id,
134            field_id_offset,
135            max_field_id,
136            session,
137        )
138        .await
139    }
140
141    #[allow(clippy::too_many_arguments)]
142    pub async fn try_new_from_reader(
143        path: &Path,
144        object_reader: Arc<dyn Reader>,
145        metadata: Option<Arc<Metadata>>,
146        schema: Schema,
147        fragment_id: u32,
148        field_id_offset: i32,
149        max_field_id: i32,
150        session: Option<&LanceCache>,
151    ) -> Result<Self> {
152        let metadata = match metadata {
153            Some(metadata) => metadata,
154            None => Self::read_metadata(object_reader.as_ref(), session).await?,
155        };
156
157        let page_table = async {
158            Self::load_from_cache(session, path.to_string(), |_| async {
159                PageTable::load(
160                    object_reader.as_ref(),
161                    metadata.page_table_position,
162                    field_id_offset,
163                    max_field_id,
164                    metadata.num_batches() as i32,
165                )
166                .await
167            })
168            .await
169        };
170
171        let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
172
173        // Can concurrently load page tables
174        let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
175
176        Ok(Self {
177            object_reader,
178            metadata,
179            schema,
180            page_table,
181            fragment_id: fragment_id as u64,
182            stats_page_table,
183        })
184    }
185
186    pub async fn read_metadata(
187        object_reader: &dyn Reader,
188        cache: Option<&LanceCache>,
189    ) -> Result<Arc<Metadata>> {
190        Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
191            let file_size = object_reader.size().await?;
192            let begin = if file_size < object_reader.block_size() {
193                0
194            } else {
195                file_size - object_reader.block_size()
196            };
197            let tail_bytes = object_reader.get_range(begin..file_size).await?;
198            let metadata_pos = read_metadata_offset(&tail_bytes)?;
199
200            let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
201                // We have not read the metadata bytes yet.
202                read_struct(object_reader, metadata_pos).await?
203            } else {
204                let offset = tail_bytes.len() - (file_size - metadata_pos);
205                read_struct_from_buf(&tail_bytes.slice(offset..))?
206            };
207            Ok(metadata)
208        })
209        .await
210    }
211
212    /// Get the statistics page table. This will read the metadata if it is not cached.
213    ///
214    /// The page table is cached.
215    async fn read_stats_page_table(
216        reader: &dyn Reader,
217        cache: Option<&LanceCache>,
218    ) -> Result<Arc<Option<PageTable>>> {
219        // To prevent collisions, we cache this at a child path
220        Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async {
221            let metadata = Self::read_metadata(reader, cache).await?;
222
223            if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
224                Ok(Some(
225                    PageTable::load(
226                        reader,
227                        stats_meta.page_table_position,
228                        /*min_field_id=*/ 0,
229                        /*max_field_id=*/ *stats_meta.leaf_field_ids.iter().max().unwrap(),
230                        /*num_batches=*/ 1,
231                    )
232                    .await?,
233                ))
234            } else {
235                Ok(None)
236            }
237        })
238        .await
239    }
240
241    /// Load some metadata about the fragment from the cache, if there is one.
242    async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
243        cache: Option<&LanceCache>,
244        key: String,
245        loader: F,
246    ) -> Result<Arc<T>>
247    where
248        F: Fn(&str) -> Fut + Send + Sync,
249        Fut: Future<Output = Result<T>> + Send,
250    {
251        if let Some(cache) = cache {
252            let cache_key = StringCacheKey::<T>::new(key.as_str());
253            cache
254                .get_or_insert_with_key(cache_key, || loader(key.as_str()))
255                .await
256        } else {
257            Ok(Arc::new(loader(key.as_str()).await?))
258        }
259    }
260
261    /// Open one Lance data file for read.
262    pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
263        // If just reading a lance data file we assume the schema is the schema of the data file
264        let max_field_id = schema.max_field_id().unwrap_or_default();
265        Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
266    }
267
268    fn io_parallelism(&self) -> usize {
269        self.object_reader.io_parallelism()
270    }
271
272    /// Requested projection of the data in this file, excluding the row id column.
273    pub fn schema(&self) -> &Schema {
274        &self.schema
275    }
276
277    pub fn num_batches(&self) -> usize {
278        self.metadata.num_batches()
279    }
280
281    /// Get the number of rows in this batch
282    pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
283        self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
284    }
285
286    /// Count the number of rows in this file.
287    pub fn len(&self) -> usize {
288        self.metadata.len()
289    }
290
291    pub fn is_empty(&self) -> bool {
292        self.metadata.is_empty()
293    }
294
295    /// Read a batch of data from the file.
296    ///
297    /// The schema of the returned [RecordBatch] is set by [`FileReader::schema()`].
298    #[instrument(level = "debug", skip(self, params, projection))]
299    pub async fn read_batch(
300        &self,
301        batch_id: i32,
302        params: impl Into<ReadBatchParams>,
303        projection: &Schema,
304    ) -> Result<RecordBatch> {
305        read_batch(self, &params.into(), projection, batch_id).await
306    }
307
308    /// Read a range of records into one batch.
309    ///
310    /// Note that it might call concat if the range is crossing multiple batches, which
311    /// makes it less efficient than [`FileReader::read_batch()`].
312    #[instrument(level = "debug", skip(self, projection))]
313    pub async fn read_range(
314        &self,
315        range: Range<usize>,
316        projection: &Schema,
317    ) -> Result<RecordBatch> {
318        if range.is_empty() {
319            return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
320        }
321        let range_in_batches = self.metadata.range_to_batches(range)?;
322        let batches =
323            stream::iter(range_in_batches)
324                .map(|(batch_id, range)| async move {
325                    self.read_batch(batch_id, range, projection).await
326                })
327                .buffered(self.io_parallelism())
328                .try_collect::<Vec<_>>()
329                .await?;
330        if batches.len() == 1 {
331            return Ok(batches[0].clone());
332        }
333        let schema = batches[0].schema();
334        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
335    }
336
337    /// Take by records by indices within the file.
338    ///
339    /// The indices must be sorted.
340    #[instrument(level = "debug", skip_all)]
341    pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
342        let num_batches = self.num_batches();
343        let num_rows = self.len() as u32;
344        let indices_in_batches = self.metadata.group_indices_to_batches(indices);
345        let batches = stream::iter(indices_in_batches)
346            .map(|batch| async move {
347                if batch.batch_id >= num_batches as i32 {
348                    Err(Error::invalid_input_source(
349                        format!("batch_id: {} out of bounds", batch.batch_id).into(),
350                    ))
351                } else if *batch.offsets.last().expect("got empty batch") > num_rows {
352                    Err(Error::invalid_input_source(
353                        format!("indices: {:?} out of bounds", batch.offsets).into(),
354                    ))
355                } else {
356                    self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
357                        .await
358                }
359            })
360            .buffered(self.io_parallelism())
361            .try_collect::<Vec<_>>()
362            .await?;
363
364        let schema = Arc::new(ArrowSchema::from(projection));
365
366        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
367    }
368
369    /// Get the schema of the statistics page table, for the given data field ids.
370    pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
371        self.metadata.stats_metadata.as_ref().map(|meta| {
372            let mut stats_field_ids = vec![];
373            for stats_field in &meta.schema.fields {
374                if let Ok(stats_field_id) = stats_field.name.parse::<i32>()
375                    && field_ids.contains(&stats_field_id)
376                {
377                    stats_field_ids.push(stats_field.id);
378                    for child in &stats_field.children {
379                        stats_field_ids.push(child.id);
380                    }
381                }
382            }
383            meta.schema.project_by_ids(&stats_field_ids, true)
384        })
385    }
386
387    /// Get the page statistics for the given data field ids.
388    pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
389        if let Some(stats_page_table) = self.stats_page_table.as_ref() {
390            let projection = self.page_stats_schema(field_ids).unwrap();
391            // It's possible none of the requested fields have stats.
392            if projection.fields.is_empty() {
393                return Ok(None);
394            }
395            let arrays = futures::stream::iter(projection.fields.iter().cloned())
396                .map(|field| async move {
397                    read_array(
398                        self,
399                        &field,
400                        0,
401                        stats_page_table,
402                        &ReadBatchParams::RangeFull,
403                    )
404                    .await
405                })
406                .buffered(self.io_parallelism())
407                .try_collect::<Vec<_>>()
408                .await?;
409
410            let schema = ArrowSchema::from(&projection);
411            let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
412            Ok(Some(batch))
413        } else {
414            Ok(None)
415        }
416    }
417}
418
419/// Stream desired full batches from the file.
420///
421/// Parameters:
422/// - **reader**: An opened file reader.
423/// - **projection**: The schema of the returning [RecordBatch].
424/// - **predicate**: A function that takes a batch ID and returns true if the batch should be
425///   returned.
426///
427/// Returns:
428/// - A stream of [RecordBatch]s, each one corresponding to one full batch in the file.
429pub fn batches_stream(
430    reader: FileReader,
431    projection: Schema,
432    predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
433) -> impl RecordBatchStream {
434    // Make projection an Arc so we can clone it and pass between threads.
435    let projection = Arc::new(projection);
436    let arrow_schema = ArrowSchema::from(projection.as_ref());
437
438    let total_batches = reader.num_batches() as i32;
439    let batches = (0..total_batches).filter(predicate);
440    // Make another copy of self so we can clone it and pass between threads.
441    let this = Arc::new(reader);
442    let inner = stream::iter(batches)
443        .zip(stream::repeat_with(move || {
444            (this.clone(), projection.clone())
445        }))
446        .map(move |(batch_id, (reader, projection))| async move {
447            reader
448                .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
449                .await
450        })
451        .buffered(2)
452        .boxed();
453    RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
454}
455
456/// Read a batch.
457///
458/// `schema` may only be empty if `with_row_id` is also true. This function
459/// panics otherwise.
460pub async fn read_batch(
461    reader: &FileReader,
462    params: &ReadBatchParams,
463    schema: &Schema,
464    batch_id: i32,
465) -> Result<RecordBatch> {
466    if !schema.fields.is_empty() {
467        // We box this because otherwise we get a higher-order lifetime error.
468        let arrs = stream::iter(&schema.fields)
469            .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
470            .buffered(reader.io_parallelism())
471            .try_collect::<Vec<_>>()
472            .boxed();
473        let arrs = arrs.await?;
474        Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
475    } else {
476        Err(Error::invalid_input("no fields requested"))
477    }
478}
479
480#[async_recursion]
481async fn read_array(
482    reader: &FileReader,
483    field: &Field,
484    batch_id: i32,
485    page_table: &PageTable,
486    params: &ReadBatchParams,
487) -> Result<ArrayRef> {
488    let data_type = field.data_type();
489
490    use DataType::*;
491
492    if data_type.is_fixed_stride() {
493        _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
494    } else {
495        match data_type {
496            Null => read_null_array(field, batch_id, page_table, params),
497            Utf8 | LargeUtf8 | Binary | LargeBinary => {
498                read_binary_array(reader, field, batch_id, page_table, params).await
499            }
500            Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
501            Dictionary(_, _) => {
502                read_dictionary_array(reader, field, batch_id, page_table, params).await
503            }
504            List(_) => {
505                read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
506            }
507            LargeList(_) => {
508                read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
509            }
510            _ => {
511                unimplemented!("{}", format!("No support for {data_type} yet"));
512            }
513        }
514    }
515}
516
517fn get_page_info<'a>(
518    page_table: &'a PageTable,
519    field: &'a Field,
520    batch_id: i32,
521) -> Result<&'a PageInfo> {
522    page_table.get(field.id, batch_id).ok_or_else(|| {
523        Error::invalid_input(format!(
524            "No page info found for field: {}, field_id={} batch={}",
525            field.name, field.id, batch_id
526        ))
527    })
528}
529
530/// Read primitive array for batch `batch_idx`.
531async fn _read_fixed_stride_array(
532    reader: &FileReader,
533    field: &Field,
534    batch_id: i32,
535    page_table: &PageTable,
536    params: &ReadBatchParams,
537) -> Result<ArrayRef> {
538    let page_info = get_page_info(page_table, field, batch_id)?;
539    read_fixed_stride_array(
540        reader.object_reader.as_ref(),
541        &field.data_type(),
542        page_info.position,
543        page_info.length,
544        params.clone(),
545    )
546    .await
547}
548
549fn read_null_array(
550    field: &Field,
551    batch_id: i32,
552    page_table: &PageTable,
553    params: &ReadBatchParams,
554) -> Result<ArrayRef> {
555    let page_info = get_page_info(page_table, field, batch_id)?;
556
557    let length_output = match params {
558        ReadBatchParams::Indices(indices) => {
559            if indices.is_empty() {
560                0
561            } else {
562                let idx_max = *indices.values().iter().max().unwrap() as u64;
563                if idx_max >= page_info.length as u64 {
564                    return Err(Error::invalid_input(format!(
565                        "NullArray Reader: request([{}]) out of range: [0..{}]",
566                        idx_max, page_info.length
567                    )));
568                }
569                indices.len()
570            }
571        }
572        _ => {
573            let (idx_start, idx_end) = match params {
574                ReadBatchParams::Range(r) => (r.start, r.end),
575                ReadBatchParams::RangeFull => (0, page_info.length),
576                ReadBatchParams::RangeTo(r) => (0, r.end),
577                ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
578                _ => unreachable!(),
579            };
580            if idx_end > page_info.length {
581                return Err(Error::invalid_input(format!(
582                    "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
583                    // and wrap it in here.
584                    idx_start,
585                    idx_end,
586                    page_info.length
587                )));
588            }
589            idx_end - idx_start
590        }
591    };
592
593    Ok(Arc::new(NullArray::new(length_output)))
594}
595
596async fn read_binary_array(
597    reader: &FileReader,
598    field: &Field,
599    batch_id: i32,
600    page_table: &PageTable,
601    params: &ReadBatchParams,
602) -> Result<ArrayRef> {
603    let page_info = get_page_info(page_table, field, batch_id)?;
604
605    lance_io::utils::read_binary_array(
606        reader.object_reader.as_ref(),
607        &field.data_type(),
608        field.nullable,
609        page_info.position,
610        page_info.length,
611        params,
612    )
613    .await
614}
615
616async fn read_dictionary_array(
617    reader: &FileReader,
618    field: &Field,
619    batch_id: i32,
620    page_table: &PageTable,
621    params: &ReadBatchParams,
622) -> Result<ArrayRef> {
623    let page_info = get_page_info(page_table, field, batch_id)?;
624    let data_type = field.data_type();
625    let decoder = DictionaryDecoder::new(
626        reader.object_reader.as_ref(),
627        page_info.position,
628        page_info.length,
629        &data_type,
630        field
631            .dictionary
632            .as_ref()
633            .unwrap()
634            .values
635            .as_ref()
636            .unwrap()
637            .clone(),
638    );
639    decoder.get(params.clone()).await
640}
641
642async fn read_struct_array(
643    reader: &FileReader,
644    field: &Field,
645    batch_id: i32,
646    page_table: &PageTable,
647    params: &ReadBatchParams,
648) -> Result<ArrayRef> {
649    // TODO: use tokio to make the reads in parallel.
650    let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
651
652    for child in field.children.as_slice() {
653        let arr = read_array(reader, child, batch_id, page_table, params).await?;
654        sub_arrays.push((Arc::new(child.into()), arr));
655    }
656
657    Ok(Arc::new(StructArray::from(sub_arrays)))
658}
659
660async fn take_list_array<T: ArrowNumericType>(
661    reader: &FileReader,
662    field: &Field,
663    batch_id: i32,
664    page_table: &PageTable,
665    positions: &PrimitiveArray<T>,
666    indices: &UInt32Array,
667) -> Result<ArrayRef>
668where
669    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
670{
671    let first_idx = indices.value(0);
672    // Range of values for each index
673    let ranges = indices
674        .values()
675        .iter()
676        .map(|i| (*i - first_idx).as_usize())
677        .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
678        .collect::<Vec<_>>();
679    let field = field.clone();
680    let mut list_values: Vec<ArrayRef> = vec![];
681    // TODO: read them in parallel.
682    for range in ranges.iter() {
683        list_values.push(
684            read_array(
685                reader,
686                &field.children[0],
687                batch_id,
688                page_table,
689                &(range.clone()).into(),
690            )
691            .await?,
692        );
693    }
694
695    let value_refs = list_values
696        .iter()
697        .map(|arr| arr.as_ref())
698        .collect::<Vec<_>>();
699    let mut offsets_builder = PrimitiveBuilder::<T>::new();
700    offsets_builder.append_value(T::Native::usize_as(0));
701    let mut off = 0_usize;
702    for range in ranges {
703        off += range.len();
704        offsets_builder.append_value(T::Native::usize_as(off));
705    }
706    let all_values = concat::concat(value_refs.as_slice())?;
707    let offset_arr = offsets_builder.finish();
708    let arr = try_new_generic_list_array(all_values, &offset_arr)?;
709    Ok(Arc::new(arr) as ArrayRef)
710}
711
712async fn read_list_array<T: ArrowNumericType>(
713    reader: &FileReader,
714    field: &Field,
715    batch_id: i32,
716    page_table: &PageTable,
717    params: &ReadBatchParams,
718) -> Result<ArrayRef>
719where
720    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
721{
722    // Offset the position array by 1 in order to include the upper bound of the last element
723    let positions_params = match params {
724        ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
725        ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
726        ReadBatchParams::Indices(indices) => {
727            (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
728        }
729        p => p.clone(),
730    };
731
732    let page_info = get_page_info(&reader.page_table, field, batch_id)?;
733    let position_arr = read_fixed_stride_array(
734        reader.object_reader.as_ref(),
735        &T::DATA_TYPE,
736        page_info.position,
737        page_info.length,
738        positions_params,
739    )
740    .await?;
741
742    let positions: &PrimitiveArray<T> = position_arr.as_primitive();
743
744    // Recompute params so they align with the offset array
745    let value_params = match params {
746        ReadBatchParams::Range(range) => ReadBatchParams::from(
747            positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
748        ),
749        ReadBatchParams::Ranges(_) => {
750            return Err(Error::internal(
751                "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
752            ));
753        }
754        ReadBatchParams::RangeTo(RangeTo { end }) => {
755            ReadBatchParams::from(..positions.value(*end).as_usize())
756        }
757        ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
758        ReadBatchParams::RangeFull => ReadBatchParams::from(
759            positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
760        ),
761        ReadBatchParams::Indices(indices) => {
762            return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
763        }
764    };
765
766    let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
767    let offset_arr = sub(positions, &start_position)?;
768    let offset_arr_ref = offset_arr.as_primitive::<T>();
769    let value_arrs = read_array(
770        reader,
771        &field.children[0],
772        batch_id,
773        page_table,
774        &value_params,
775    )
776    .await?;
777    let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
778    Ok(Arc::new(arr) as ArrayRef)
779}
780
781#[cfg(test)]
782mod tests {
783    use crate::previous::writer::{FileWriter as PreviousFileWriter, NotSelfDescribing};
784
785    use super::*;
786
787    use arrow_array::{
788        Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
789        UInt8Array,
790        builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
791        cast::{as_string_array, as_struct_array},
792        types::UInt8Type,
793    };
794    use arrow_array::{BooleanArray, Int32Array};
795    use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
796    use lance_io::object_store::ObjectStoreParams;
797
798    #[tokio::test]
799    async fn test_take() {
800        let arrow_schema = ArrowSchema::new(vec![
801            ArrowField::new("i", DataType::Int64, true),
802            ArrowField::new("f", DataType::Float32, false),
803            ArrowField::new("s", DataType::Utf8, false),
804            ArrowField::new(
805                "d",
806                DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
807                false,
808            ),
809        ]);
810        let mut schema = Schema::try_from(&arrow_schema).unwrap();
811
812        let store = ObjectStore::memory();
813        let path = Path::from("/take_test");
814
815        // Write 10 batches.
816        let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
817        let values_ref = Arc::new(values);
818        let mut batches = vec![];
819        for batch_id in 0..10 {
820            let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
821            let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
822            let columns: Vec<ArrayRef> = vec![
823                Arc::new(Int64Array::from_iter(
824                    value_range.clone().collect::<Vec<_>>(),
825                )),
826                Arc::new(Float32Array::from_iter(
827                    value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
828                )),
829                Arc::new(StringArray::from_iter_values(
830                    value_range.clone().map(|n| format!("str-{}", n)),
831                )),
832                Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
833            ];
834            batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
835        }
836        schema.set_dictionary(&batches[0]).unwrap();
837
838        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
839            &store,
840            &path,
841            schema.clone(),
842            &Default::default(),
843        )
844        .await
845        .unwrap();
846        for batch in batches.iter() {
847            file_writer
848                .write(std::slice::from_ref(batch))
849                .await
850                .unwrap();
851        }
852        file_writer.finish().await.unwrap();
853
854        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
855        let batch = reader
856            .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
857            .await
858            .unwrap();
859        let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
860        assert_eq!(
861            batch,
862            RecordBatch::try_new(
863                batch.schema(),
864                vec![
865                    Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
866                    Arc::new(Float32Array::from_iter_values([
867                        1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
868                    ])),
869                    Arc::new(StringArray::from_iter_values([
870                        "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
871                    ])),
872                    Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
873                ]
874            )
875            .unwrap()
876        );
877    }
878
879    async fn test_write_null_string_in_struct(field_nullable: bool) {
880        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
881            "parent",
882            DataType::Struct(ArrowFields::from(vec![ArrowField::new(
883                "str",
884                DataType::Utf8,
885                field_nullable,
886            )])),
887            true,
888        )]));
889
890        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
891
892        let store = ObjectStore::memory();
893        let path = Path::from("/null_strings");
894
895        let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
896        let struct_arr = Arc::new(StructArray::from(vec![(
897            Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
898            string_arr.clone() as ArrayRef,
899        )]));
900        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
901
902        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
903            &store,
904            &path,
905            schema.clone(),
906            &Default::default(),
907        )
908        .await
909        .unwrap();
910        file_writer
911            .write(std::slice::from_ref(&batch))
912            .await
913            .unwrap();
914        file_writer.finish().await.unwrap();
915
916        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
917        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
918
919        if field_nullable {
920            assert_eq!(
921                &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
922                as_string_array(
923                    as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
924                        .column_by_name("str")
925                        .unwrap()
926                        .as_ref()
927                )
928            );
929        } else {
930            assert_eq!(actual_batch, batch);
931        }
932    }
933
934    #[tokio::test]
935    async fn read_nullable_string_in_struct() {
936        test_write_null_string_in_struct(true).await;
937        test_write_null_string_in_struct(false).await;
938    }
939
940    #[tokio::test]
941    async fn test_read_struct_of_list_arrays() {
942        let store = ObjectStore::memory();
943        let path = Path::from("/null_strings");
944
945        let arrow_schema = make_schema_of_list_array();
946        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
947
948        let batches = (0..3)
949            .map(|_| {
950                let struct_array = make_struct_of_list_array(10, 10);
951                RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
952            })
953            .collect::<Vec<_>>();
954        let batches_ref = batches.iter().collect::<Vec<_>>();
955
956        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
957            &store,
958            &path,
959            schema.clone(),
960            &Default::default(),
961        )
962        .await
963        .unwrap();
964        file_writer.write(&batches).await.unwrap();
965        file_writer.finish().await.unwrap();
966
967        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
968        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
969        let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
970        assert_eq!(expected, actual_batch);
971    }
972
973    #[tokio::test]
974    async fn test_scan_struct_of_list_arrays() {
975        let store = ObjectStore::memory();
976        let path = Path::from("/null_strings");
977
978        let arrow_schema = make_schema_of_list_array();
979        let struct_array = make_struct_of_list_array(3, 10);
980        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
981        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
982
983        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
984            &store,
985            &path,
986            schema.clone(),
987            &Default::default(),
988        )
989        .await
990        .unwrap();
991        file_writer.write(&[batch]).await.unwrap();
992        file_writer.finish().await.unwrap();
993
994        let mut expected_columns: Vec<ArrayRef> = Vec::new();
995        for c in struct_array.columns().iter() {
996            expected_columns.push(c.slice(1, 1));
997        }
998
999        let expected_struct = match arrow_schema.fields[0].data_type() {
1000            DataType::Struct(subfields) => subfields
1001                .iter()
1002                .zip(expected_columns)
1003                .map(|(f, d)| (f.clone(), d))
1004                .collect::<Vec<_>>(),
1005            _ => panic!("unexpected field"),
1006        };
1007
1008        let expected_struct_array = StructArray::from(expected_struct);
1009        let expected_batch = RecordBatch::from(&StructArray::from(vec![(
1010            Arc::new(arrow_schema.fields[0].as_ref().clone()),
1011            Arc::new(expected_struct_array) as ArrayRef,
1012        )]));
1013
1014        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1015        let params = ReadBatchParams::Range(1..2);
1016        let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
1017        assert_eq!(expected_batch, slice_of_batch);
1018    }
1019
1020    fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
1021        Arc::new(ArrowSchema::new(vec![ArrowField::new(
1022            "s",
1023            DataType::Struct(ArrowFields::from(vec![
1024                ArrowField::new(
1025                    "li",
1026                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1027                    true,
1028                ),
1029                ArrowField::new(
1030                    "ls",
1031                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1032                    true,
1033                ),
1034                ArrowField::new(
1035                    "ll",
1036                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1037                    false,
1038                ),
1039            ])),
1040            true,
1041        )]))
1042    }
1043
1044    fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1045        let mut li_builder = ListBuilder::new(Int32Builder::new());
1046        let mut ls_builder = ListBuilder::new(StringBuilder::new());
1047        let ll_value_builder = Int32Builder::new();
1048        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1049        for i in 0..rows {
1050            for j in 0..num_items {
1051                li_builder.values().append_value(i * 10 + j);
1052                ls_builder
1053                    .values()
1054                    .append_value(format!("str-{}", i * 10 + j));
1055                large_list_builder.values().append_value(i * 10 + j);
1056            }
1057            li_builder.append(true);
1058            ls_builder.append(true);
1059            large_list_builder.append(true);
1060        }
1061        Arc::new(StructArray::from(vec![
1062            (
1063                Arc::new(ArrowField::new(
1064                    "li",
1065                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1066                    true,
1067                )),
1068                Arc::new(li_builder.finish()) as ArrayRef,
1069            ),
1070            (
1071                Arc::new(ArrowField::new(
1072                    "ls",
1073                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1074                    true,
1075                )),
1076                Arc::new(ls_builder.finish()) as ArrayRef,
1077            ),
1078            (
1079                Arc::new(ArrowField::new(
1080                    "ll",
1081                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1082                    false,
1083                )),
1084                Arc::new(large_list_builder.finish()) as ArrayRef,
1085            ),
1086        ]))
1087    }
1088
1089    #[tokio::test]
1090    async fn test_read_nullable_arrays() {
1091        use arrow_array::Array;
1092
1093        // create a record batch with a null array column
1094        let arrow_schema = ArrowSchema::new(vec![
1095            ArrowField::new("i", DataType::Int64, false),
1096            ArrowField::new("n", DataType::Null, true),
1097        ]);
1098        let schema = Schema::try_from(&arrow_schema).unwrap();
1099        let columns: Vec<ArrayRef> = vec![
1100            Arc::new(Int64Array::from_iter_values(0..100)),
1101            Arc::new(NullArray::new(100)),
1102        ];
1103        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1104
1105        // write to a lance file
1106        let store = ObjectStore::memory();
1107        let path = Path::from("/takes");
1108        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1109            &store,
1110            &path,
1111            schema.clone(),
1112            &Default::default(),
1113        )
1114        .await
1115        .unwrap();
1116        file_writer.write(&[batch]).await.unwrap();
1117        file_writer.finish().await.unwrap();
1118
1119        // read the file back
1120        let reader = FileReader::try_new(&store, &path, schema.clone())
1121            .await
1122            .unwrap();
1123
1124        async fn read_array_w_params(
1125            reader: &FileReader,
1126            field: &Field,
1127            params: ReadBatchParams,
1128        ) -> ArrayRef {
1129            read_array(reader, field, 0, reader.page_table.as_ref(), &params)
1130                .await
1131                .expect("Error reading back the null array from file") as _
1132        }
1133
1134        let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1135        assert_eq!(100, arr.len());
1136        assert_eq!(arr.data_type(), &DataType::Null);
1137
1138        let arr =
1139            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1140        assert_eq!(15, arr.len());
1141        assert_eq!(arr.data_type(), &DataType::Null);
1142
1143        let arr =
1144            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1145        assert_eq!(40, arr.len());
1146        assert_eq!(arr.data_type(), &DataType::Null);
1147
1148        let arr =
1149            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1150        assert_eq!(25, arr.len());
1151        assert_eq!(arr.data_type(), &DataType::Null);
1152
1153        let arr = read_array_w_params(
1154            &reader,
1155            &schema.fields[1],
1156            ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1157        )
1158        .await;
1159        assert_eq!(4, arr.len());
1160        assert_eq!(arr.data_type(), &DataType::Null);
1161
1162        // raise error if take indices are out of bounds
1163        let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1164        let arr = read_array(
1165            &reader,
1166            &schema.fields[1],
1167            0,
1168            reader.page_table.as_ref(),
1169            &params,
1170        );
1171        assert!(arr.await.is_err());
1172
1173        // raise error if range indices are out of bounds
1174        let params = ReadBatchParams::RangeTo(..107);
1175        let arr = read_array(
1176            &reader,
1177            &schema.fields[1],
1178            0,
1179            reader.page_table.as_ref(),
1180            &params,
1181        );
1182        assert!(arr.await.is_err());
1183    }
1184
1185    #[tokio::test]
1186    async fn test_take_lists() {
1187        let arrow_schema = ArrowSchema::new(vec![
1188            ArrowField::new(
1189                "l",
1190                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1191                false,
1192            ),
1193            ArrowField::new(
1194                "ll",
1195                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1196                false,
1197            ),
1198        ]);
1199
1200        let value_builder = Int32Builder::new();
1201        let mut list_builder = ListBuilder::new(value_builder);
1202        let ll_value_builder = Int32Builder::new();
1203        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1204        for i in 0..100 {
1205            list_builder.values().append_value(i);
1206            large_list_builder.values().append_value(i);
1207            if (i + 1) % 10 == 0 {
1208                list_builder.append(true);
1209                large_list_builder.append(true);
1210            }
1211        }
1212        let list_arr = Arc::new(list_builder.finish());
1213        let large_list_arr = Arc::new(large_list_builder.finish());
1214
1215        let batch = RecordBatch::try_new(
1216            Arc::new(arrow_schema.clone()),
1217            vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1218        )
1219        .unwrap();
1220
1221        // write to a lance file
1222        let store = ObjectStore::memory();
1223        let path = Path::from("/take_list");
1224        let schema: Schema = (&arrow_schema).try_into().unwrap();
1225        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1226            &store,
1227            &path,
1228            schema.clone(),
1229            &Default::default(),
1230        )
1231        .await
1232        .unwrap();
1233        file_writer.write(&[batch]).await.unwrap();
1234        file_writer.finish().await.unwrap();
1235
1236        // read the file back
1237        let reader = FileReader::try_new(&store, &path, schema.clone())
1238            .await
1239            .unwrap();
1240        let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1241
1242        let value_builder = Int32Builder::new();
1243        let mut list_builder = ListBuilder::new(value_builder);
1244        let ll_value_builder = Int32Builder::new();
1245        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1246        for i in [1, 3, 5, 9] {
1247            for j in 0..10 {
1248                list_builder.values().append_value(i * 10 + j);
1249                large_list_builder.values().append_value(i * 10 + j);
1250            }
1251            list_builder.append(true);
1252            large_list_builder.append(true);
1253        }
1254        let expected_list = list_builder.finish();
1255        let expected_large_list = large_list_builder.finish();
1256
1257        assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1258        assert_eq!(
1259            actual.column_by_name("ll").unwrap().as_ref(),
1260            &expected_large_list
1261        );
1262    }
1263
1264    #[tokio::test]
1265    async fn test_list_array_with_offsets() {
1266        let arrow_schema = ArrowSchema::new(vec![
1267            ArrowField::new(
1268                "l",
1269                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1270                false,
1271            ),
1272            ArrowField::new(
1273                "ll",
1274                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1275                false,
1276            ),
1277        ]);
1278
1279        let store = ObjectStore::memory();
1280        let path = Path::from("/lists");
1281
1282        let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1283            Some(vec![Some(1), Some(2)]),
1284            Some(vec![Some(3), Some(4)]),
1285            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1286        ])
1287        .slice(1, 1);
1288        let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1289            Some(vec![Some(10), Some(11)]),
1290            Some(vec![Some(12), Some(13)]),
1291            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1292        ])
1293        .slice(1, 1);
1294
1295        let batch = RecordBatch::try_new(
1296            Arc::new(arrow_schema.clone()),
1297            vec![Arc::new(list_array), Arc::new(large_list_array)],
1298        )
1299        .unwrap();
1300
1301        let schema: Schema = (&arrow_schema).try_into().unwrap();
1302        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1303            &store,
1304            &path,
1305            schema.clone(),
1306            &Default::default(),
1307        )
1308        .await
1309        .unwrap();
1310        file_writer
1311            .write(std::slice::from_ref(&batch))
1312            .await
1313            .unwrap();
1314        file_writer.finish().await.unwrap();
1315
1316        // Make sure the big array was not written to the file
1317        let file_size_bytes = store.size(&path).await.unwrap();
1318        assert!(file_size_bytes < 1_000);
1319
1320        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1321        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1322        assert_eq!(batch, actual_batch);
1323    }
1324
1325    #[tokio::test]
1326    async fn test_read_ranges() {
1327        // create a record batch with a null array column
1328        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1329        let schema = Schema::try_from(&arrow_schema).unwrap();
1330        let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1331        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1332
1333        // write to a lance file
1334        let store = ObjectStore::memory();
1335        let path = Path::from("/read_range");
1336        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1337            &store,
1338            &path,
1339            schema.clone(),
1340            &Default::default(),
1341        )
1342        .await
1343        .unwrap();
1344        file_writer.write(&[batch]).await.unwrap();
1345        file_writer.finish().await.unwrap();
1346
1347        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1348        let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1349
1350        assert_eq!(
1351            actual_batch.column_by_name("i").unwrap().as_ref(),
1352            &Int64Array::from_iter_values(7..25)
1353        );
1354    }
1355
1356    #[tokio::test]
1357    async fn test_batches_stream() {
1358        let store = ObjectStore::memory();
1359        let path = Path::from("/batch_stream");
1360
1361        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1362        let schema = Schema::try_from(&arrow_schema).unwrap();
1363        let mut writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1364            &store,
1365            &path,
1366            schema.clone(),
1367            &Default::default(),
1368        )
1369        .await
1370        .unwrap();
1371        for i in 0..10 {
1372            let batch = RecordBatch::try_new(
1373                Arc::new(arrow_schema.clone()),
1374                vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1375            )
1376            .unwrap();
1377            writer.write(&[batch]).await.unwrap();
1378        }
1379        writer.finish().await.unwrap();
1380
1381        let reader = FileReader::try_new(&store, &path, schema.clone())
1382            .await
1383            .unwrap();
1384        let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1385        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1386
1387        assert_eq!(batches.len(), 5);
1388        for (i, batch) in batches.iter().enumerate() {
1389            assert_eq!(
1390                batch,
1391                &RecordBatch::try_new(
1392                    Arc::new(arrow_schema.clone()),
1393                    vec![Arc::new(Int32Array::from_iter_values(
1394                        i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1395                    ))],
1396                )
1397                .unwrap()
1398            )
1399        }
1400    }
1401
1402    #[tokio::test]
1403    async fn test_take_boolean_beyond_chunk() {
1404        let store = ObjectStore::from_uri_and_params(
1405            Arc::new(Default::default()),
1406            "memory://",
1407            &ObjectStoreParams {
1408                block_size: Some(256),
1409                ..Default::default()
1410            },
1411        )
1412        .await
1413        .unwrap()
1414        .0;
1415        let path = Path::from("/take_bools");
1416
1417        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1418            "b",
1419            DataType::Boolean,
1420            false,
1421        )]));
1422        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1423        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1424            &store,
1425            &path,
1426            schema.clone(),
1427            &Default::default(),
1428        )
1429        .await
1430        .unwrap();
1431
1432        let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1433        let batch =
1434            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1435        file_writer.write(&[batch]).await.unwrap();
1436        file_writer.finish().await.unwrap();
1437
1438        let reader = FileReader::try_new(&store, &path, schema.clone())
1439            .await
1440            .unwrap();
1441        let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1442
1443        assert_eq!(
1444            actual.column_by_name("b").unwrap().as_ref(),
1445            &BooleanArray::from(vec![false, false, true, false, true])
1446        );
1447    }
1448
1449    #[tokio::test]
1450    async fn test_read_projection() {
1451        // The dataset schema may be very large.  The file reader should support reading
1452        // a small projection of that schema (this just tests the field_offset / num_fields
1453        // parameters)
1454        let store = ObjectStore::memory();
1455        let path = Path::from("/partial_read");
1456
1457        // Create a large schema
1458        let mut fields = vec![];
1459        for i in 0..100 {
1460            fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1461        }
1462        let arrow_schema = ArrowSchema::new(fields);
1463        let schema = Schema::try_from(&arrow_schema).unwrap();
1464
1465        let partial_schema = schema.project(&["f50"]).unwrap();
1466        let partial_arrow: ArrowSchema = (&partial_schema).into();
1467
1468        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1469            &store,
1470            &path,
1471            partial_schema.clone(),
1472            &Default::default(),
1473        )
1474        .await
1475        .unwrap();
1476
1477        let array = Int32Array::from(vec![0; 15]);
1478        let batch =
1479            RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1480        file_writer
1481            .write(std::slice::from_ref(&batch))
1482            .await
1483            .unwrap();
1484        file_writer.finish().await.unwrap();
1485
1486        let field_id = partial_schema.fields.first().unwrap().id;
1487        let reader = FileReader::try_new_with_fragment_id(
1488            &store,
1489            &path,
1490            schema.clone(),
1491            0,
1492            /*min_field_id=*/ field_id,
1493            /*max_field_id=*/ field_id,
1494            None,
1495        )
1496        .await
1497        .unwrap();
1498        let actual = reader
1499            .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1500            .await
1501            .unwrap();
1502
1503        assert_eq!(actual, batch);
1504    }
1505}