Skip to main content

lance_file/previous/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance Data File Reader
5
6// Standard
7use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12    ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
13    RecordBatch, StructArray, UInt32Array,
14    builder::PrimitiveBuilder,
15    cast::AsArray,
16    types::{Int32Type, Int64Type},
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{Future, FutureExt, StreamExt, TryStreamExt, stream};
24use lance_arrow::*;
25use lance_core::cache::{CacheKey, LanceCache};
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::AsyncIndex;
29use lance_io::encodings::dictionary::DictionaryDecoder;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33    read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{ReadBatchParams, object_store::ObjectStore};
36use std::borrow::Cow;
37
38use object_store::path::Path;
39use tracing::instrument;
40
41use crate::previous::format::metadata::Metadata;
42use crate::previous::page_table::{PageInfo, PageTable};
43
44/// Lance File Reader.
45///
46/// It reads arrow data from one data file.
47#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49    pub object_reader: Arc<dyn Reader>,
50    metadata: Arc<Metadata>,
51    page_table: Arc<PageTable>,
52    schema: Schema,
53
54    /// The id of the fragment which this file belong to.
55    /// For simple file access, this can just be zero.
56    fragment_id: u64,
57
58    /// Page table for statistics
59    stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("FileReader")
65            .field("fragment", &self.fragment_id)
66            .field("path", &self.object_reader.path())
67            .finish()
68    }
69}
70
71// Generic cache key for string-based keys
72struct StringCacheKey<'a, T> {
73    key: &'a str,
74    _phantom: std::marker::PhantomData<T>,
75}
76
77impl<'a, T> StringCacheKey<'a, T> {
78    fn new(key: &'a str) -> Self {
79        Self {
80            key,
81            _phantom: std::marker::PhantomData,
82        }
83    }
84}
85
86impl<T: 'static> CacheKey for StringCacheKey<'_, T> {
87    type ValueType = T;
88
89    fn key(&self) -> Cow<'_, str> {
90        self.key.into()
91    }
92
93    fn type_name() -> &'static str {
94        // This is a private, crate-internal key that is only instantiated with
95        // a single concrete T within one build, so std::any::type_name is fine
96        // here — there is no cross-crate collision risk.
97        std::any::type_name::<T>()
98    }
99}
100
101impl FileReader {
102    /// Open file reader
103    ///
104    /// Open the file at the given path using the provided object store.
105    ///
106    /// The passed fragment ID determines the first 32-bits of the row IDs.
107    ///
108    /// If a manifest is passed in, it will be used to load the schema and dictionary.
109    /// This is typically done if the file is part of a dataset fragment. If no manifest
110    /// is passed in, then it is read from the file itself.
111    ///
112    /// The session passed in is used to cache metadata about the file. If no session
113    /// is passed in, there will be no caching.
114    #[instrument(level = "debug", skip(object_store, schema, session))]
115    pub async fn try_new_with_fragment_id(
116        object_store: &ObjectStore,
117        path: &Path,
118        schema: Schema,
119        fragment_id: u32,
120        field_id_offset: i32,
121        max_field_id: i32,
122        session: Option<&LanceCache>,
123    ) -> Result<Self> {
124        let object_reader = object_store.open(path).await?;
125
126        let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
127
128        Self::try_new_from_reader(
129            path,
130            object_reader.into(),
131            Some(metadata),
132            schema,
133            fragment_id,
134            field_id_offset,
135            max_field_id,
136            session,
137        )
138        .await
139    }
140
141    #[allow(clippy::too_many_arguments)]
142    pub async fn try_new_from_reader(
143        path: &Path,
144        object_reader: Arc<dyn Reader>,
145        metadata: Option<Arc<Metadata>>,
146        schema: Schema,
147        fragment_id: u32,
148        field_id_offset: i32,
149        max_field_id: i32,
150        session: Option<&LanceCache>,
151    ) -> Result<Self> {
152        let metadata = match metadata {
153            Some(metadata) => metadata,
154            None => Self::read_metadata(object_reader.as_ref(), session).await?,
155        };
156
157        let page_table = async {
158            Self::load_from_cache(session, path.to_string(), |_| async {
159                PageTable::load(
160                    object_reader.as_ref(),
161                    metadata.page_table_position,
162                    field_id_offset,
163                    max_field_id,
164                    metadata.num_batches() as i32,
165                )
166                .await
167            })
168            .await
169        };
170
171        let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
172
173        // Can concurrently load page tables
174        let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
175
176        Ok(Self {
177            object_reader,
178            metadata,
179            schema,
180            page_table,
181            fragment_id: fragment_id as u64,
182            stats_page_table,
183        })
184    }
185
186    pub async fn read_metadata(
187        object_reader: &dyn Reader,
188        cache: Option<&LanceCache>,
189    ) -> Result<Arc<Metadata>> {
190        Self::load_from_cache(cache, object_reader.path().to_string(), |_| async {
191            let file_size = object_reader.size().await?;
192            let begin = if file_size < object_reader.block_size() {
193                0
194            } else {
195                file_size - object_reader.block_size()
196            };
197            let tail_bytes = object_reader.get_range(begin..file_size).await?;
198            let metadata_pos = read_metadata_offset(&tail_bytes)?;
199
200            let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
201                // We have not read the metadata bytes yet.
202                read_struct(object_reader, metadata_pos).await?
203            } else {
204                let offset = tail_bytes.len() - (file_size - metadata_pos);
205                read_struct_from_buf(&tail_bytes.slice(offset..))?
206            };
207            Ok(metadata)
208        })
209        .await
210    }
211
212    /// Get the statistics page table. This will read the metadata if it is not cached.
213    ///
214    /// The page table is cached.
215    async fn read_stats_page_table(
216        reader: &dyn Reader,
217        cache: Option<&LanceCache>,
218    ) -> Result<Arc<Option<PageTable>>> {
219        // To prevent collisions, we cache this at a child path
220        Self::load_from_cache(
221            cache,
222            reader.path().clone().join("stats").to_string(),
223            |_| async {
224                let metadata = Self::read_metadata(reader, cache).await?;
225
226                if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
227                    Ok(Some(
228                        PageTable::load(
229                            reader,
230                            stats_meta.page_table_position,
231                            /*min_field_id=*/ 0,
232                            /*max_field_id=*/
233                            *stats_meta.leaf_field_ids.iter().max().unwrap(),
234                            /*num_batches=*/ 1,
235                        )
236                        .await?,
237                    ))
238                } else {
239                    Ok(None)
240                }
241            },
242        )
243        .await
244    }
245
246    /// Load some metadata about the fragment from the cache, if there is one.
247    async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
248        cache: Option<&LanceCache>,
249        key: String,
250        loader: F,
251    ) -> Result<Arc<T>>
252    where
253        F: Fn(&str) -> Fut + Send + Sync,
254        Fut: Future<Output = Result<T>> + Send,
255    {
256        if let Some(cache) = cache {
257            let cache_key = StringCacheKey::<T>::new(key.as_str());
258            cache
259                .get_or_insert_with_key(cache_key, || loader(key.as_str()))
260                .await
261        } else {
262            Ok(Arc::new(loader(key.as_str()).await?))
263        }
264    }
265
266    /// Open one Lance data file for read.
267    pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
268        // If just reading a lance data file we assume the schema is the schema of the data file
269        let max_field_id = schema.max_field_id().unwrap_or_default();
270        Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
271    }
272
273    fn io_parallelism(&self) -> usize {
274        self.object_reader.io_parallelism()
275    }
276
277    /// Requested projection of the data in this file, excluding the row id column.
278    pub fn schema(&self) -> &Schema {
279        &self.schema
280    }
281
282    pub fn num_batches(&self) -> usize {
283        self.metadata.num_batches()
284    }
285
286    /// Get the number of rows in this batch
287    pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
288        self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
289    }
290
291    /// Count the number of rows in this file.
292    pub fn len(&self) -> usize {
293        self.metadata.len()
294    }
295
296    pub fn is_empty(&self) -> bool {
297        self.metadata.is_empty()
298    }
299
300    /// Read a batch of data from the file.
301    ///
302    /// The schema of the returned [RecordBatch] is set by [`FileReader::schema()`].
303    #[instrument(level = "debug", skip(self, params, projection))]
304    pub async fn read_batch(
305        &self,
306        batch_id: i32,
307        params: impl Into<ReadBatchParams>,
308        projection: &Schema,
309    ) -> Result<RecordBatch> {
310        read_batch(self, &params.into(), projection, batch_id).await
311    }
312
313    /// Read a range of records into one batch.
314    ///
315    /// Note that it might call concat if the range is crossing multiple batches, which
316    /// makes it less efficient than [`FileReader::read_batch()`].
317    #[instrument(level = "debug", skip(self, projection))]
318    pub async fn read_range(
319        &self,
320        range: Range<usize>,
321        projection: &Schema,
322    ) -> Result<RecordBatch> {
323        if range.is_empty() {
324            return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
325        }
326        let range_in_batches = self.metadata.range_to_batches(range)?;
327        let batches =
328            stream::iter(range_in_batches)
329                .map(|(batch_id, range)| async move {
330                    self.read_batch(batch_id, range, projection).await
331                })
332                .buffered(self.io_parallelism())
333                .try_collect::<Vec<_>>()
334                .await?;
335        if batches.len() == 1 {
336            return Ok(batches[0].clone());
337        }
338        let schema = batches[0].schema();
339        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
340    }
341
342    /// Take by records by indices within the file.
343    ///
344    /// The indices must be sorted.
345    #[instrument(level = "debug", skip_all)]
346    pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
347        let num_batches = self.num_batches();
348        let num_rows = self.len() as u32;
349        let indices_in_batches = self.metadata.group_indices_to_batches(indices);
350        let batches = stream::iter(indices_in_batches)
351            .map(|batch| async move {
352                if batch.batch_id >= num_batches as i32 {
353                    Err(Error::invalid_input_source(
354                        format!("batch_id: {} out of bounds", batch.batch_id).into(),
355                    ))
356                } else if *batch.offsets.last().expect("got empty batch") > num_rows {
357                    Err(Error::invalid_input_source(
358                        format!("indices: {:?} out of bounds", batch.offsets).into(),
359                    ))
360                } else {
361                    self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
362                        .await
363                }
364            })
365            .buffered(self.io_parallelism())
366            .try_collect::<Vec<_>>()
367            .await?;
368
369        let schema = Arc::new(ArrowSchema::from(projection));
370
371        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
372    }
373
374    /// Get the schema of the statistics page table, for the given data field ids.
375    pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
376        self.metadata.stats_metadata.as_ref().map(|meta| {
377            let mut stats_field_ids = vec![];
378            for stats_field in &meta.schema.fields {
379                if let Ok(stats_field_id) = stats_field.name.parse::<i32>()
380                    && field_ids.contains(&stats_field_id)
381                {
382                    stats_field_ids.push(stats_field.id);
383                    for child in &stats_field.children {
384                        stats_field_ids.push(child.id);
385                    }
386                }
387            }
388            meta.schema.project_by_ids(&stats_field_ids, true)
389        })
390    }
391
392    /// Get the page statistics for the given data field ids.
393    pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
394        if let Some(stats_page_table) = self.stats_page_table.as_ref() {
395            let projection = self.page_stats_schema(field_ids).unwrap();
396            // It's possible none of the requested fields have stats.
397            if projection.fields.is_empty() {
398                return Ok(None);
399            }
400            let arrays = futures::stream::iter(projection.fields.iter().cloned())
401                .map(|field| async move {
402                    read_array(
403                        self,
404                        &field,
405                        0,
406                        stats_page_table,
407                        &ReadBatchParams::RangeFull,
408                    )
409                    .await
410                })
411                .buffered(self.io_parallelism())
412                .try_collect::<Vec<_>>()
413                .await?;
414
415            let schema = ArrowSchema::from(&projection);
416            let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
417            Ok(Some(batch))
418        } else {
419            Ok(None)
420        }
421    }
422}
423
424/// Stream desired full batches from the file.
425///
426/// Parameters:
427/// - **reader**: An opened file reader.
428/// - **projection**: The schema of the returning [RecordBatch].
429/// - **predicate**: A function that takes a batch ID and returns true if the batch should be
430///   returned.
431///
432/// Returns:
433/// - A stream of [RecordBatch]s, each one corresponding to one full batch in the file.
434pub fn batches_stream(
435    reader: FileReader,
436    projection: Schema,
437    predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
438) -> impl RecordBatchStream {
439    // Make projection an Arc so we can clone it and pass between threads.
440    let projection = Arc::new(projection);
441    let arrow_schema = ArrowSchema::from(projection.as_ref());
442
443    let total_batches = reader.num_batches() as i32;
444    let batches = (0..total_batches).filter(predicate);
445    // Make another copy of self so we can clone it and pass between threads.
446    let this = Arc::new(reader);
447    let inner = stream::iter(batches)
448        .zip(stream::repeat_with(move || {
449            (this.clone(), projection.clone())
450        }))
451        .map(move |(batch_id, (reader, projection))| async move {
452            reader
453                .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
454                .await
455        })
456        .buffered(2)
457        .boxed();
458    RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
459}
460
461/// Read a batch.
462///
463/// `schema` may only be empty if `with_row_id` is also true. This function
464/// panics otherwise.
465pub async fn read_batch(
466    reader: &FileReader,
467    params: &ReadBatchParams,
468    schema: &Schema,
469    batch_id: i32,
470) -> Result<RecordBatch> {
471    if !schema.fields.is_empty() {
472        // We box this because otherwise we get a higher-order lifetime error.
473        let arrs = stream::iter(&schema.fields)
474            .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
475            .buffered(reader.io_parallelism())
476            .try_collect::<Vec<_>>()
477            .boxed();
478        let arrs = arrs.await?;
479        Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
480    } else {
481        Err(Error::invalid_input("no fields requested"))
482    }
483}
484
485#[async_recursion]
486async fn read_array(
487    reader: &FileReader,
488    field: &Field,
489    batch_id: i32,
490    page_table: &PageTable,
491    params: &ReadBatchParams,
492) -> Result<ArrayRef> {
493    let data_type = field.data_type();
494
495    use DataType::*;
496
497    if data_type.is_fixed_stride() {
498        _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
499    } else {
500        match data_type {
501            Null => read_null_array(field, batch_id, page_table, params),
502            Utf8 | LargeUtf8 | Binary | LargeBinary => {
503                read_binary_array(reader, field, batch_id, page_table, params).await
504            }
505            Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
506            Dictionary(_, _) => {
507                read_dictionary_array(reader, field, batch_id, page_table, params).await
508            }
509            List(_) => {
510                read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
511            }
512            LargeList(_) => {
513                read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
514            }
515            _ => {
516                unimplemented!("{}", format!("No support for {data_type} yet"));
517            }
518        }
519    }
520}
521
522fn get_page_info<'a>(
523    page_table: &'a PageTable,
524    field: &'a Field,
525    batch_id: i32,
526) -> Result<&'a PageInfo> {
527    page_table.get(field.id, batch_id).ok_or_else(|| {
528        Error::invalid_input(format!(
529            "No page info found for field: {}, field_id={} batch={}",
530            field.name, field.id, batch_id
531        ))
532    })
533}
534
535/// Read primitive array for batch `batch_idx`.
536async fn _read_fixed_stride_array(
537    reader: &FileReader,
538    field: &Field,
539    batch_id: i32,
540    page_table: &PageTable,
541    params: &ReadBatchParams,
542) -> Result<ArrayRef> {
543    let page_info = get_page_info(page_table, field, batch_id)?;
544    read_fixed_stride_array(
545        reader.object_reader.as_ref(),
546        &field.data_type(),
547        page_info.position,
548        page_info.length,
549        params.clone(),
550    )
551    .await
552}
553
554fn read_null_array(
555    field: &Field,
556    batch_id: i32,
557    page_table: &PageTable,
558    params: &ReadBatchParams,
559) -> Result<ArrayRef> {
560    let page_info = get_page_info(page_table, field, batch_id)?;
561
562    let length_output = match params {
563        ReadBatchParams::Indices(indices) => {
564            if indices.is_empty() {
565                0
566            } else {
567                let idx_max = *indices.values().iter().max().unwrap() as u64;
568                if idx_max >= page_info.length as u64 {
569                    return Err(Error::invalid_input(format!(
570                        "NullArray Reader: request([{}]) out of range: [0..{}]",
571                        idx_max, page_info.length
572                    )));
573                }
574                indices.len()
575            }
576        }
577        _ => {
578            let (idx_start, idx_end) = match params {
579                ReadBatchParams::Range(r) => (r.start, r.end),
580                ReadBatchParams::RangeFull => (0, page_info.length),
581                ReadBatchParams::RangeTo(r) => (0, r.end),
582                ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
583                _ => unreachable!(),
584            };
585            if idx_end > page_info.length {
586                return Err(Error::invalid_input(format!(
587                    "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
588                    // and wrap it in here.
589                    idx_start,
590                    idx_end,
591                    page_info.length
592                )));
593            }
594            idx_end - idx_start
595        }
596    };
597
598    Ok(Arc::new(NullArray::new(length_output)))
599}
600
601async fn read_binary_array(
602    reader: &FileReader,
603    field: &Field,
604    batch_id: i32,
605    page_table: &PageTable,
606    params: &ReadBatchParams,
607) -> Result<ArrayRef> {
608    let page_info = get_page_info(page_table, field, batch_id)?;
609
610    lance_io::utils::read_binary_array(
611        reader.object_reader.as_ref(),
612        &field.data_type(),
613        field.nullable,
614        page_info.position,
615        page_info.length,
616        params,
617    )
618    .await
619}
620
621async fn read_dictionary_array(
622    reader: &FileReader,
623    field: &Field,
624    batch_id: i32,
625    page_table: &PageTable,
626    params: &ReadBatchParams,
627) -> Result<ArrayRef> {
628    let page_info = get_page_info(page_table, field, batch_id)?;
629    let data_type = field.data_type();
630    let decoder = DictionaryDecoder::new(
631        reader.object_reader.as_ref(),
632        page_info.position,
633        page_info.length,
634        &data_type,
635        field
636            .dictionary
637            .as_ref()
638            .unwrap()
639            .values
640            .as_ref()
641            .unwrap()
642            .clone(),
643    );
644    decoder.get(params.clone()).await
645}
646
647async fn read_struct_array(
648    reader: &FileReader,
649    field: &Field,
650    batch_id: i32,
651    page_table: &PageTable,
652    params: &ReadBatchParams,
653) -> Result<ArrayRef> {
654    // TODO: use tokio to make the reads in parallel.
655    let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
656
657    for child in field.children.as_slice() {
658        let arr = read_array(reader, child, batch_id, page_table, params).await?;
659        sub_arrays.push((Arc::new(child.into()), arr));
660    }
661
662    Ok(Arc::new(StructArray::from(sub_arrays)))
663}
664
665async fn take_list_array<T: ArrowNumericType>(
666    reader: &FileReader,
667    field: &Field,
668    batch_id: i32,
669    page_table: &PageTable,
670    positions: &PrimitiveArray<T>,
671    indices: &UInt32Array,
672) -> Result<ArrayRef>
673where
674    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
675{
676    let first_idx = indices.value(0);
677    // Range of values for each index
678    let ranges = indices
679        .values()
680        .iter()
681        .map(|i| (*i - first_idx).as_usize())
682        .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
683        .collect::<Vec<_>>();
684    let field = field.clone();
685    let mut list_values: Vec<ArrayRef> = vec![];
686    // TODO: read them in parallel.
687    for range in ranges.iter() {
688        list_values.push(
689            read_array(
690                reader,
691                &field.children[0],
692                batch_id,
693                page_table,
694                &(range.clone()).into(),
695            )
696            .await?,
697        );
698    }
699
700    let value_refs = list_values
701        .iter()
702        .map(|arr| arr.as_ref())
703        .collect::<Vec<_>>();
704    let mut offsets_builder = PrimitiveBuilder::<T>::new();
705    offsets_builder.append_value(T::Native::usize_as(0));
706    let mut off = 0_usize;
707    for range in ranges {
708        off += range.len();
709        offsets_builder.append_value(T::Native::usize_as(off));
710    }
711    let all_values = concat::concat(value_refs.as_slice())?;
712    let offset_arr = offsets_builder.finish();
713    let arr = try_new_generic_list_array(all_values, &offset_arr)?;
714    Ok(Arc::new(arr) as ArrayRef)
715}
716
717async fn read_list_array<T: ArrowNumericType>(
718    reader: &FileReader,
719    field: &Field,
720    batch_id: i32,
721    page_table: &PageTable,
722    params: &ReadBatchParams,
723) -> Result<ArrayRef>
724where
725    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
726{
727    // Offset the position array by 1 in order to include the upper bound of the last element
728    let positions_params = match params {
729        ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
730        ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
731        ReadBatchParams::Indices(indices) => {
732            (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
733        }
734        p => p.clone(),
735    };
736
737    let page_info = get_page_info(&reader.page_table, field, batch_id)?;
738    let position_arr = read_fixed_stride_array(
739        reader.object_reader.as_ref(),
740        &T::DATA_TYPE,
741        page_info.position,
742        page_info.length,
743        positions_params,
744    )
745    .await?;
746
747    let positions: &PrimitiveArray<T> = position_arr.as_primitive();
748
749    // Recompute params so they align with the offset array
750    let value_params = match params {
751        ReadBatchParams::Range(range) => ReadBatchParams::from(
752            positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
753        ),
754        ReadBatchParams::Ranges(_) => {
755            return Err(Error::internal(
756                "ReadBatchParams::Ranges should not be used in v1 files".to_string(),
757            ));
758        }
759        ReadBatchParams::RangeTo(RangeTo { end }) => {
760            ReadBatchParams::from(..positions.value(*end).as_usize())
761        }
762        ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
763        ReadBatchParams::RangeFull => ReadBatchParams::from(
764            positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
765        ),
766        ReadBatchParams::Indices(indices) => {
767            return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
768        }
769    };
770
771    let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
772    let offset_arr = sub(positions, &start_position)?;
773    let offset_arr_ref = offset_arr.as_primitive::<T>();
774    let value_arrs = read_array(
775        reader,
776        &field.children[0],
777        batch_id,
778        page_table,
779        &value_params,
780    )
781    .await?;
782    let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
783    Ok(Arc::new(arr) as ArrayRef)
784}
785
786#[cfg(test)]
787mod tests {
788    use crate::previous::writer::{FileWriter as PreviousFileWriter, NotSelfDescribing};
789
790    use super::*;
791
792    use arrow_array::{
793        Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
794        UInt8Array,
795        builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
796        cast::{as_string_array, as_struct_array},
797        types::UInt8Type,
798    };
799    use arrow_array::{BooleanArray, Int32Array};
800    use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
801    use lance_io::object_store::ObjectStoreParams;
802
803    #[tokio::test]
804    async fn test_take() {
805        let arrow_schema = ArrowSchema::new(vec![
806            ArrowField::new("i", DataType::Int64, true),
807            ArrowField::new("f", DataType::Float32, false),
808            ArrowField::new("s", DataType::Utf8, false),
809            ArrowField::new(
810                "d",
811                DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
812                false,
813            ),
814        ]);
815        let mut schema = Schema::try_from(&arrow_schema).unwrap();
816
817        let store = ObjectStore::memory();
818        let path = Path::from("/take_test");
819
820        // Write 10 batches.
821        let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
822        let values_ref = Arc::new(values);
823        let mut batches = vec![];
824        for batch_id in 0..10 {
825            let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
826            let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
827            let columns: Vec<ArrayRef> = vec![
828                Arc::new(Int64Array::from_iter(
829                    value_range.clone().collect::<Vec<_>>(),
830                )),
831                Arc::new(Float32Array::from_iter(
832                    value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
833                )),
834                Arc::new(StringArray::from_iter_values(
835                    value_range.clone().map(|n| format!("str-{}", n)),
836                )),
837                Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
838            ];
839            batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
840        }
841        schema.set_dictionary(&batches[0]).unwrap();
842
843        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
844            &store,
845            &path,
846            schema.clone(),
847            &Default::default(),
848        )
849        .await
850        .unwrap();
851        for batch in batches.iter() {
852            file_writer
853                .write(std::slice::from_ref(batch))
854                .await
855                .unwrap();
856        }
857        file_writer.finish().await.unwrap();
858
859        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
860        let batch = reader
861            .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
862            .await
863            .unwrap();
864        let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
865        assert_eq!(
866            batch,
867            RecordBatch::try_new(
868                batch.schema(),
869                vec![
870                    Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
871                    Arc::new(Float32Array::from_iter_values([
872                        1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
873                    ])),
874                    Arc::new(StringArray::from_iter_values([
875                        "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
876                    ])),
877                    Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
878                ]
879            )
880            .unwrap()
881        );
882    }
883
884    async fn test_write_null_string_in_struct(field_nullable: bool) {
885        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
886            "parent",
887            DataType::Struct(ArrowFields::from(vec![ArrowField::new(
888                "str",
889                DataType::Utf8,
890                field_nullable,
891            )])),
892            true,
893        )]));
894
895        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
896
897        let store = ObjectStore::memory();
898        let path = Path::from("/null_strings");
899
900        let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
901        let struct_arr = Arc::new(StructArray::from(vec![(
902            Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
903            string_arr.clone() as ArrayRef,
904        )]));
905        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
906
907        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
908            &store,
909            &path,
910            schema.clone(),
911            &Default::default(),
912        )
913        .await
914        .unwrap();
915        file_writer
916            .write(std::slice::from_ref(&batch))
917            .await
918            .unwrap();
919        file_writer.finish().await.unwrap();
920
921        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
922        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
923
924        if field_nullable {
925            assert_eq!(
926                &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
927                as_string_array(
928                    as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
929                        .column_by_name("str")
930                        .unwrap()
931                        .as_ref()
932                )
933            );
934        } else {
935            assert_eq!(actual_batch, batch);
936        }
937    }
938
939    #[tokio::test]
940    async fn read_nullable_string_in_struct() {
941        test_write_null_string_in_struct(true).await;
942        test_write_null_string_in_struct(false).await;
943    }
944
945    #[tokio::test]
946    async fn test_read_struct_of_list_arrays() {
947        let store = ObjectStore::memory();
948        let path = Path::from("/null_strings");
949
950        let arrow_schema = make_schema_of_list_array();
951        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
952
953        let batches = (0..3)
954            .map(|_| {
955                let struct_array = make_struct_of_list_array(10, 10);
956                RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
957            })
958            .collect::<Vec<_>>();
959        let batches_ref = batches.iter().collect::<Vec<_>>();
960
961        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
962            &store,
963            &path,
964            schema.clone(),
965            &Default::default(),
966        )
967        .await
968        .unwrap();
969        file_writer.write(&batches).await.unwrap();
970        file_writer.finish().await.unwrap();
971
972        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
973        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
974        let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
975        assert_eq!(expected, actual_batch);
976    }
977
978    #[tokio::test]
979    async fn test_scan_struct_of_list_arrays() {
980        let store = ObjectStore::memory();
981        let path = Path::from("/null_strings");
982
983        let arrow_schema = make_schema_of_list_array();
984        let struct_array = make_struct_of_list_array(3, 10);
985        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
986        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
987
988        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
989            &store,
990            &path,
991            schema.clone(),
992            &Default::default(),
993        )
994        .await
995        .unwrap();
996        file_writer.write(&[batch]).await.unwrap();
997        file_writer.finish().await.unwrap();
998
999        let mut expected_columns: Vec<ArrayRef> = Vec::new();
1000        for c in struct_array.columns().iter() {
1001            expected_columns.push(c.slice(1, 1));
1002        }
1003
1004        let expected_struct = match arrow_schema.fields[0].data_type() {
1005            DataType::Struct(subfields) => subfields
1006                .iter()
1007                .zip(expected_columns)
1008                .map(|(f, d)| (f.clone(), d))
1009                .collect::<Vec<_>>(),
1010            _ => panic!("unexpected field"),
1011        };
1012
1013        let expected_struct_array = StructArray::from(expected_struct);
1014        let expected_batch = RecordBatch::from(&StructArray::from(vec![(
1015            Arc::new(arrow_schema.fields[0].as_ref().clone()),
1016            Arc::new(expected_struct_array) as ArrayRef,
1017        )]));
1018
1019        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1020        let params = ReadBatchParams::Range(1..2);
1021        let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
1022        assert_eq!(expected_batch, slice_of_batch);
1023    }
1024
1025    fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
1026        Arc::new(ArrowSchema::new(vec![ArrowField::new(
1027            "s",
1028            DataType::Struct(ArrowFields::from(vec![
1029                ArrowField::new(
1030                    "li",
1031                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1032                    true,
1033                ),
1034                ArrowField::new(
1035                    "ls",
1036                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1037                    true,
1038                ),
1039                ArrowField::new(
1040                    "ll",
1041                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1042                    false,
1043                ),
1044            ])),
1045            true,
1046        )]))
1047    }
1048
1049    fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1050        let mut li_builder = ListBuilder::new(Int32Builder::new());
1051        let mut ls_builder = ListBuilder::new(StringBuilder::new());
1052        let ll_value_builder = Int32Builder::new();
1053        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1054        for i in 0..rows {
1055            for j in 0..num_items {
1056                li_builder.values().append_value(i * 10 + j);
1057                ls_builder
1058                    .values()
1059                    .append_value(format!("str-{}", i * 10 + j));
1060                large_list_builder.values().append_value(i * 10 + j);
1061            }
1062            li_builder.append(true);
1063            ls_builder.append(true);
1064            large_list_builder.append(true);
1065        }
1066        Arc::new(StructArray::from(vec![
1067            (
1068                Arc::new(ArrowField::new(
1069                    "li",
1070                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1071                    true,
1072                )),
1073                Arc::new(li_builder.finish()) as ArrayRef,
1074            ),
1075            (
1076                Arc::new(ArrowField::new(
1077                    "ls",
1078                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1079                    true,
1080                )),
1081                Arc::new(ls_builder.finish()) as ArrayRef,
1082            ),
1083            (
1084                Arc::new(ArrowField::new(
1085                    "ll",
1086                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1087                    false,
1088                )),
1089                Arc::new(large_list_builder.finish()) as ArrayRef,
1090            ),
1091        ]))
1092    }
1093
1094    #[tokio::test]
1095    async fn test_read_nullable_arrays() {
1096        use arrow_array::Array;
1097
1098        // create a record batch with a null array column
1099        let arrow_schema = ArrowSchema::new(vec![
1100            ArrowField::new("i", DataType::Int64, false),
1101            ArrowField::new("n", DataType::Null, true),
1102        ]);
1103        let schema = Schema::try_from(&arrow_schema).unwrap();
1104        let columns: Vec<ArrayRef> = vec![
1105            Arc::new(Int64Array::from_iter_values(0..100)),
1106            Arc::new(NullArray::new(100)),
1107        ];
1108        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1109
1110        // write to a lance file
1111        let store = ObjectStore::memory();
1112        let path = Path::from("/takes");
1113        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1114            &store,
1115            &path,
1116            schema.clone(),
1117            &Default::default(),
1118        )
1119        .await
1120        .unwrap();
1121        file_writer.write(&[batch]).await.unwrap();
1122        file_writer.finish().await.unwrap();
1123
1124        // read the file back
1125        let reader = FileReader::try_new(&store, &path, schema.clone())
1126            .await
1127            .unwrap();
1128
1129        async fn read_array_w_params(
1130            reader: &FileReader,
1131            field: &Field,
1132            params: ReadBatchParams,
1133        ) -> ArrayRef {
1134            read_array(reader, field, 0, reader.page_table.as_ref(), &params)
1135                .await
1136                .expect("Error reading back the null array from file") as _
1137        }
1138
1139        let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1140        assert_eq!(100, arr.len());
1141        assert_eq!(arr.data_type(), &DataType::Null);
1142
1143        let arr =
1144            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1145        assert_eq!(15, arr.len());
1146        assert_eq!(arr.data_type(), &DataType::Null);
1147
1148        let arr =
1149            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1150        assert_eq!(40, arr.len());
1151        assert_eq!(arr.data_type(), &DataType::Null);
1152
1153        let arr =
1154            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1155        assert_eq!(25, arr.len());
1156        assert_eq!(arr.data_type(), &DataType::Null);
1157
1158        let arr = read_array_w_params(
1159            &reader,
1160            &schema.fields[1],
1161            ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1162        )
1163        .await;
1164        assert_eq!(4, arr.len());
1165        assert_eq!(arr.data_type(), &DataType::Null);
1166
1167        // raise error if take indices are out of bounds
1168        let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1169        let arr = read_array(
1170            &reader,
1171            &schema.fields[1],
1172            0,
1173            reader.page_table.as_ref(),
1174            &params,
1175        );
1176        assert!(arr.await.is_err());
1177
1178        // raise error if range indices are out of bounds
1179        let params = ReadBatchParams::RangeTo(..107);
1180        let arr = read_array(
1181            &reader,
1182            &schema.fields[1],
1183            0,
1184            reader.page_table.as_ref(),
1185            &params,
1186        );
1187        assert!(arr.await.is_err());
1188    }
1189
1190    #[tokio::test]
1191    async fn test_take_lists() {
1192        let arrow_schema = ArrowSchema::new(vec![
1193            ArrowField::new(
1194                "l",
1195                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1196                false,
1197            ),
1198            ArrowField::new(
1199                "ll",
1200                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1201                false,
1202            ),
1203        ]);
1204
1205        let value_builder = Int32Builder::new();
1206        let mut list_builder = ListBuilder::new(value_builder);
1207        let ll_value_builder = Int32Builder::new();
1208        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1209        for i in 0..100 {
1210            list_builder.values().append_value(i);
1211            large_list_builder.values().append_value(i);
1212            if (i + 1) % 10 == 0 {
1213                list_builder.append(true);
1214                large_list_builder.append(true);
1215            }
1216        }
1217        let list_arr = Arc::new(list_builder.finish());
1218        let large_list_arr = Arc::new(large_list_builder.finish());
1219
1220        let batch = RecordBatch::try_new(
1221            Arc::new(arrow_schema.clone()),
1222            vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1223        )
1224        .unwrap();
1225
1226        // write to a lance file
1227        let store = ObjectStore::memory();
1228        let path = Path::from("/take_list");
1229        let schema: Schema = (&arrow_schema).try_into().unwrap();
1230        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1231            &store,
1232            &path,
1233            schema.clone(),
1234            &Default::default(),
1235        )
1236        .await
1237        .unwrap();
1238        file_writer.write(&[batch]).await.unwrap();
1239        file_writer.finish().await.unwrap();
1240
1241        // read the file back
1242        let reader = FileReader::try_new(&store, &path, schema.clone())
1243            .await
1244            .unwrap();
1245        let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1246
1247        let value_builder = Int32Builder::new();
1248        let mut list_builder = ListBuilder::new(value_builder);
1249        let ll_value_builder = Int32Builder::new();
1250        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1251        for i in [1, 3, 5, 9] {
1252            for j in 0..10 {
1253                list_builder.values().append_value(i * 10 + j);
1254                large_list_builder.values().append_value(i * 10 + j);
1255            }
1256            list_builder.append(true);
1257            large_list_builder.append(true);
1258        }
1259        let expected_list = list_builder.finish();
1260        let expected_large_list = large_list_builder.finish();
1261
1262        assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1263        assert_eq!(
1264            actual.column_by_name("ll").unwrap().as_ref(),
1265            &expected_large_list
1266        );
1267    }
1268
1269    #[tokio::test]
1270    async fn test_list_array_with_offsets() {
1271        let arrow_schema = ArrowSchema::new(vec![
1272            ArrowField::new(
1273                "l",
1274                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1275                false,
1276            ),
1277            ArrowField::new(
1278                "ll",
1279                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1280                false,
1281            ),
1282        ]);
1283
1284        let store = ObjectStore::memory();
1285        let path = Path::from("/lists");
1286
1287        let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1288            Some(vec![Some(1), Some(2)]),
1289            Some(vec![Some(3), Some(4)]),
1290            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1291        ])
1292        .slice(1, 1);
1293        let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1294            Some(vec![Some(10), Some(11)]),
1295            Some(vec![Some(12), Some(13)]),
1296            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1297        ])
1298        .slice(1, 1);
1299
1300        let batch = RecordBatch::try_new(
1301            Arc::new(arrow_schema.clone()),
1302            vec![Arc::new(list_array), Arc::new(large_list_array)],
1303        )
1304        .unwrap();
1305
1306        let schema: Schema = (&arrow_schema).try_into().unwrap();
1307        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1308            &store,
1309            &path,
1310            schema.clone(),
1311            &Default::default(),
1312        )
1313        .await
1314        .unwrap();
1315        file_writer
1316            .write(std::slice::from_ref(&batch))
1317            .await
1318            .unwrap();
1319        file_writer.finish().await.unwrap();
1320
1321        // Make sure the big array was not written to the file
1322        let file_size_bytes = store.size(&path).await.unwrap();
1323        assert!(file_size_bytes < 1_000);
1324
1325        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1326        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1327        assert_eq!(batch, actual_batch);
1328    }
1329
1330    #[tokio::test]
1331    async fn test_read_ranges() {
1332        // create a record batch with a null array column
1333        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1334        let schema = Schema::try_from(&arrow_schema).unwrap();
1335        let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1336        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1337
1338        // write to a lance file
1339        let store = ObjectStore::memory();
1340        let path = Path::from("/read_range");
1341        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1342            &store,
1343            &path,
1344            schema.clone(),
1345            &Default::default(),
1346        )
1347        .await
1348        .unwrap();
1349        file_writer.write(&[batch]).await.unwrap();
1350        file_writer.finish().await.unwrap();
1351
1352        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1353        let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1354
1355        assert_eq!(
1356            actual_batch.column_by_name("i").unwrap().as_ref(),
1357            &Int64Array::from_iter_values(7..25)
1358        );
1359    }
1360
1361    #[tokio::test]
1362    async fn test_batches_stream() {
1363        let store = ObjectStore::memory();
1364        let path = Path::from("/batch_stream");
1365
1366        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1367        let schema = Schema::try_from(&arrow_schema).unwrap();
1368        let mut writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1369            &store,
1370            &path,
1371            schema.clone(),
1372            &Default::default(),
1373        )
1374        .await
1375        .unwrap();
1376        for i in 0..10 {
1377            let batch = RecordBatch::try_new(
1378                Arc::new(arrow_schema.clone()),
1379                vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1380            )
1381            .unwrap();
1382            writer.write(&[batch]).await.unwrap();
1383        }
1384        writer.finish().await.unwrap();
1385
1386        let reader = FileReader::try_new(&store, &path, schema.clone())
1387            .await
1388            .unwrap();
1389        let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1390        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1391
1392        assert_eq!(batches.len(), 5);
1393        for (i, batch) in batches.iter().enumerate() {
1394            assert_eq!(
1395                batch,
1396                &RecordBatch::try_new(
1397                    Arc::new(arrow_schema.clone()),
1398                    vec![Arc::new(Int32Array::from_iter_values(
1399                        i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1400                    ))],
1401                )
1402                .unwrap()
1403            )
1404        }
1405    }
1406
1407    #[tokio::test]
1408    async fn test_take_boolean_beyond_chunk() {
1409        let store = ObjectStore::from_uri_and_params(
1410            Arc::new(Default::default()),
1411            "memory://",
1412            &ObjectStoreParams {
1413                block_size: Some(256),
1414                ..Default::default()
1415            },
1416        )
1417        .await
1418        .unwrap()
1419        .0;
1420        let path = Path::from("/take_bools");
1421
1422        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1423            "b",
1424            DataType::Boolean,
1425            false,
1426        )]));
1427        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1428        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1429            &store,
1430            &path,
1431            schema.clone(),
1432            &Default::default(),
1433        )
1434        .await
1435        .unwrap();
1436
1437        let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1438        let batch =
1439            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1440        file_writer.write(&[batch]).await.unwrap();
1441        file_writer.finish().await.unwrap();
1442
1443        let reader = FileReader::try_new(&store, &path, schema.clone())
1444            .await
1445            .unwrap();
1446        let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1447
1448        assert_eq!(
1449            actual.column_by_name("b").unwrap().as_ref(),
1450            &BooleanArray::from(vec![false, false, true, false, true])
1451        );
1452    }
1453
1454    #[tokio::test]
1455    async fn test_read_projection() {
1456        // The dataset schema may be very large.  The file reader should support reading
1457        // a small projection of that schema (this just tests the field_offset / num_fields
1458        // parameters)
1459        let store = ObjectStore::memory();
1460        let path = Path::from("/partial_read");
1461
1462        // Create a large schema
1463        let mut fields = vec![];
1464        for i in 0..100 {
1465            fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1466        }
1467        let arrow_schema = ArrowSchema::new(fields);
1468        let schema = Schema::try_from(&arrow_schema).unwrap();
1469
1470        let partial_schema = schema.project(&["f50"]).unwrap();
1471        let partial_arrow: ArrowSchema = (&partial_schema).into();
1472
1473        let mut file_writer = PreviousFileWriter::<NotSelfDescribing>::try_new(
1474            &store,
1475            &path,
1476            partial_schema.clone(),
1477            &Default::default(),
1478        )
1479        .await
1480        .unwrap();
1481
1482        let array = Int32Array::from(vec![0; 15]);
1483        let batch =
1484            RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1485        file_writer
1486            .write(std::slice::from_ref(&batch))
1487            .await
1488            .unwrap();
1489        file_writer.finish().await.unwrap();
1490
1491        let field_id = partial_schema.fields.first().unwrap().id;
1492        let reader = FileReader::try_new_with_fragment_id(
1493            &store,
1494            &path,
1495            schema.clone(),
1496            0,
1497            /*min_field_id=*/ field_id,
1498            /*max_field_id=*/ field_id,
1499            None,
1500        )
1501        .await
1502        .unwrap();
1503        let actual = reader
1504            .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1505            .await
1506            .unwrap();
1507
1508        assert_eq!(actual, batch);
1509    }
1510}