Skip to main content

lance_io/encodings/
plain.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Plain encoding
5//!
6//! Plain encoding works with fixed stride types, i.e., `boolean`, `i8...i64`, `f16...f64`,
7//! it stores the array directly in the file. It offers O(1) read access.
8
9use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
10use std::slice::from_raw_parts;
11use std::sync::Arc;
12
13use crate::{
14    ReadBatchParams,
15    traits::{Reader, Writer},
16};
17use arrow_arith::numeric::sub;
18use arrow_array::{
19    Array, ArrayRef, BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, UInt8Array,
20    UInt32Array, builder::BooleanBuilder, cast::AsArray, make_array, new_empty_array,
21};
22use arrow_buffer::{Buffer, bit_util};
23use arrow_data::{ArrayDataBuilder, BufferSpec, layout};
24use arrow_schema::{DataType, Field};
25use arrow_select::{concat::concat, take::take};
26use async_recursion::async_recursion;
27use async_trait::async_trait;
28use bytes::Bytes;
29use futures::stream::{self, StreamExt, TryStreamExt};
30use lance_arrow::*;
31use lance_core::{Error, Result};
32use tokio::io::AsyncWriteExt;
33
34use crate::encodings::{AsyncIndex, Decoder};
35
36/// Encoder for plain encoding.
37///
38pub struct PlainEncoder<'a> {
39    writer: &'a mut dyn Writer,
40    data_type: &'a DataType,
41}
42
43impl<'a> PlainEncoder<'a> {
44    pub fn new(writer: &'a mut dyn Writer, data_type: &'a DataType) -> Self {
45        PlainEncoder { writer, data_type }
46    }
47
48    /// Write an continuous plain-encoded array to the writer.
49    pub async fn write(writer: &'a mut dyn Writer, arrays: &[&'a dyn Array]) -> Result<usize> {
50        let pos = writer.tell().await?;
51        if !arrays.is_empty() {
52            let mut encoder = Self::new(writer, arrays[0].data_type());
53            encoder.encode(arrays).await?;
54        }
55        Ok(pos)
56    }
57
58    /// Encode an slice of an Array of a batch.
59    /// Returns the offset of the metadata
60    pub async fn encode(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
61        self.encode_internal(arrays, self.data_type).await
62    }
63
64    #[async_recursion]
65    async fn encode_internal(
66        &mut self,
67        array: &[&dyn Array],
68        data_type: &DataType,
69    ) -> Result<usize> {
70        if let DataType::FixedSizeList(items, _) = data_type {
71            self.encode_fixed_size_list(array, items).await
72        } else {
73            self.encode_primitive(array).await
74        }
75    }
76
77    async fn encode_boolean(&mut self, arrays: &[&BooleanArray]) -> Result<()> {
78        let capacity: usize = arrays.iter().map(|a| a.len()).sum();
79        let mut builder = BooleanBuilder::with_capacity(capacity);
80
81        for array in arrays {
82            for val in array.iter() {
83                builder.append_value(val.unwrap_or_default());
84            }
85        }
86
87        let boolean_array = builder.finish();
88        self.writer
89            .write_all(boolean_array.into_data().buffers()[0].as_slice())
90            .await?;
91        Ok(())
92    }
93
94    /// Encode array of primitive values.
95    async fn encode_primitive(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
96        assert!(!arrays.is_empty());
97        let data_type = arrays[0].data_type();
98        let offset = self.writer.tell().await?;
99
100        if matches!(data_type, DataType::Boolean) {
101            let boolean_arr = arrays
102                .iter()
103                .map(|a| a.as_boolean())
104                .collect::<Vec<&BooleanArray>>();
105            self.encode_boolean(boolean_arr.as_slice()).await?;
106        } else {
107            let byte_width = data_type.byte_width();
108            for a in arrays.iter() {
109                let data = a.to_data();
110                let slice = unsafe {
111                    from_raw_parts(
112                        data.buffers()[0].as_ptr().add(a.offset() * byte_width),
113                        a.len() * byte_width,
114                    )
115                };
116                self.writer.write_all(slice).await?;
117            }
118        }
119        Ok(offset)
120    }
121
122    /// Encode fixed size list.
123    async fn encode_fixed_size_list(
124        &mut self,
125        arrays: &[&dyn Array],
126        items: &Field,
127    ) -> Result<usize> {
128        let mut value_arrs: Vec<ArrayRef> = Vec::new();
129
130        for array in arrays {
131            let list_array = array
132                .as_any()
133                .downcast_ref::<FixedSizeListArray>()
134                .ok_or_else(|| {
135                    Error::schema(format!(
136                        "Needed a FixedSizeListArray but got {}",
137                        array.data_type()
138                    ))
139                })?;
140            let offset = list_array.value_offset(0) as usize;
141            let length = list_array.len();
142            let value_length = list_array.value_length() as usize;
143            let value_array = list_array.values().slice(offset, length * value_length);
144            value_arrs.push(value_array);
145        }
146
147        self.encode_internal(
148            value_arrs
149                .iter()
150                .map(|a| a.as_ref())
151                .collect::<Vec<_>>()
152                .as_slice(),
153            items.data_type(),
154        )
155        .await
156    }
157}
158
159/// Decoder for plain encoding.
160pub struct PlainDecoder<'a> {
161    reader: &'a dyn Reader,
162    data_type: &'a DataType,
163    /// The start position of the batch in the file.
164    position: usize,
165    /// Number of the rows in this batch.
166    length: usize,
167}
168
169/// Get byte range from the row offset range.
170#[inline]
171fn get_byte_range(data_type: &DataType, row_range: Range<usize>) -> Range<usize> {
172    match data_type {
173        DataType::Boolean => row_range.start / 8..bit_util::ceil(row_range.end, 8),
174        _ => row_range.start * data_type.byte_width()..row_range.end * data_type.byte_width(),
175    }
176}
177
178pub fn bytes_to_array(
179    data_type: &DataType,
180    bytes: Bytes,
181    len: usize,
182    offset: usize,
183) -> Result<ArrayRef> {
184    let layout = layout(data_type);
185
186    if layout.buffers.len() != 1 {
187        return Err(Error::internal(format!(
188            "Can only convert datatypes that require one buffer, found {:?}",
189            data_type
190        )));
191    }
192
193    let buf: Buffer = if let BufferSpec::FixedWidth {
194        byte_width,
195        alignment,
196    } = &layout.buffers[0]
197    {
198        // this code is taken from
199        // https://github.com/apache/arrow-rs/blob/master/arrow-data/src/data.rs#L748-L768
200        let len_plus_offset = len + offset;
201        let min_buffer_size = len_plus_offset.saturating_mul(*byte_width);
202
203        // alignment or size isn't right -- just make a copy
204        if bytes.len() < min_buffer_size {
205            Buffer::copy_bytes_bytes(bytes, min_buffer_size)
206        } else {
207            Buffer::from_bytes_bytes(bytes, *alignment as u64)
208        }
209    } else {
210        // cases we don't handle, just copy
211        Buffer::from_slice_ref(bytes)
212    };
213
214    let array_data = ArrayDataBuilder::new(data_type.clone())
215        .len(len)
216        .offset(offset)
217        .null_count(0)
218        .add_buffer(buf)
219        .build()?;
220    Ok(make_array(array_data))
221}
222
223impl<'a> PlainDecoder<'a> {
224    pub fn new(
225        reader: &'a dyn Reader,
226        data_type: &'a DataType,
227        position: usize,
228        length: usize,
229    ) -> Result<Self> {
230        Ok(PlainDecoder {
231            reader,
232            data_type,
233            position,
234            length,
235        })
236    }
237
238    /// Decode primitive values, from "offset" to "offset + length".
239    ///
240    async fn decode_primitive(&self, start: usize, end: usize) -> Result<ArrayRef> {
241        if end > self.length {
242            return Err(Error::invalid_input(format!(
243                "PlainDecoder: request([{}..{}]) out of range: [0..{}]",
244                start, end, self.length
245            )));
246        }
247        let byte_range = get_byte_range(self.data_type, start..end);
248        let range = Range {
249            start: self.position + byte_range.start,
250            end: self.position + byte_range.end,
251        };
252
253        let data = self.reader.get_range(range).await?;
254        // booleans are bitpacked, so we need an offset to provide the exact
255        // requested range.
256        let offset = if self.data_type == &DataType::Boolean {
257            start % 8
258        } else {
259            0
260        };
261        bytes_to_array(self.data_type, data, end - start, offset)
262    }
263
264    async fn decode_fixed_size_list(
265        &self,
266        items: &Field,
267        list_size: i32,
268        start: usize,
269        end: usize,
270    ) -> Result<ArrayRef> {
271        if !items.data_type().is_fixed_stride() {
272            return Err(Error::schema(format!(
273                "Items for fixed size list should be primitives but found {}",
274                items.data_type()
275            )));
276        };
277        let item_decoder = PlainDecoder::new(
278            self.reader,
279            items.data_type(),
280            self.position,
281            self.length * list_size as usize,
282        )?;
283        let item_array = item_decoder
284            .get(start * list_size as usize..end * list_size as usize)
285            .await?;
286        Ok(Arc::new(FixedSizeListArray::new(
287            Arc::new(items.clone()),
288            list_size,
289            item_array,
290            None,
291        )) as ArrayRef)
292    }
293
294    async fn decode_fixed_size_binary(
295        &self,
296        stride: i32,
297        start: usize,
298        end: usize,
299    ) -> Result<ArrayRef> {
300        let bytes_decoder = PlainDecoder::new(
301            self.reader,
302            &DataType::UInt8,
303            self.position,
304            self.length * stride as usize,
305        )?;
306        let bytes_array = bytes_decoder
307            .get(start * stride as usize..end * stride as usize)
308            .await?;
309        let values = bytes_array
310            .as_any()
311            .downcast_ref::<UInt8Array>()
312            .ok_or_else(|| {
313                Error::schema("Could not cast to UInt8Array for FixedSizeBinary".to_string())
314            })?;
315        Ok(Arc::new(FixedSizeBinaryArray::try_new_from_values(values, stride)?) as ArrayRef)
316    }
317
318    async fn take_boolean(&self, indices: &UInt32Array) -> Result<ArrayRef> {
319        let block_size = self.reader.block_size() as u32;
320        let boolean_block_size = block_size * 8;
321
322        let mut chunk_ranges = vec![];
323        let mut start: u32 = 0;
324        for j in 0..(indices.len() - 1) as u32 {
325            if (indices.value(j as usize + 1) / boolean_block_size)
326                > (indices.value(start as usize) / boolean_block_size)
327            {
328                let next_start = j + 1;
329                chunk_ranges.push(start..next_start);
330                start = next_start;
331            }
332        }
333        // Remaining
334        chunk_ranges.push(start..indices.len() as u32);
335
336        let arrays = stream::iter(chunk_ranges)
337            .map(|cr| async move {
338                let request = indices.slice(cr.start as usize, cr.len());
339                // request contains the array indices we are retrieving in this chunk.
340
341                // Get the starting index
342                let start = request.value(0);
343                // Final index is the last value
344                let end = request.value(request.len() - 1);
345                let array = self.get(start as usize..end as usize + 1).await?;
346
347                let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?;
348                Ok::<ArrayRef, Error>(take(&array, &shifted_indices, None)?)
349            })
350            .buffered(self.reader.io_parallelism())
351            .try_collect::<Vec<_>>()
352            .await?;
353        let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
354        Ok(concat(&references)?)
355    }
356}
357
358fn make_chunked_requests(
359    indices: &[u32],
360    byte_width: usize,
361    block_size: usize,
362) -> Vec<Range<usize>> {
363    let mut chunked_ranges = vec![];
364    let mut start: usize = 0;
365    // Note: limit the I/O size to the block size.
366    //
367    // Another option could be checking whether `indices[i]` and `indices[i+1]` are not
368    // farther way than the block size:
369    //    indices[i] * byte_width + block_size < indices[i+1] * byte_width
370    // It might allow slightly larger sequential reads.
371    for i in 0..indices.len() - 1 {
372        // If contiguous, continue
373        if indices[i + 1] == indices[i] + 1 {
374            continue;
375        }
376        if indices[i + 1] as usize * byte_width > indices[start] as usize * byte_width + block_size
377        {
378            chunked_ranges.push(start..i + 1);
379            start = i + 1;
380        }
381    }
382    chunked_ranges.push(start..indices.len());
383    chunked_ranges
384}
385
386#[async_trait]
387impl Decoder for PlainDecoder<'_> {
388    async fn decode(&self) -> Result<ArrayRef> {
389        self.get(0..self.length).await
390    }
391
392    async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
393        if indices.is_empty() {
394            return Ok(new_empty_array(self.data_type));
395        }
396
397        if matches!(self.data_type, DataType::Boolean) {
398            return self.take_boolean(indices).await;
399        }
400
401        let block_size = self.reader.block_size();
402        let byte_width = self.data_type.byte_width();
403
404        let chunked_ranges = make_chunked_requests(indices.values(), byte_width, block_size);
405
406        let arrays = stream::iter(chunked_ranges)
407            .map(|cr| async move {
408                let request = indices.slice(cr.start, cr.len());
409
410                let start = request.value(0);
411                let end = request.value(request.len() - 1);
412                let array = self.get(start as usize..end as usize + 1).await?;
413                let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?;
414                Ok::<ArrayRef, Error>(take(&array, &adjusted_offsets, None)?)
415            })
416            .buffered(self.reader.io_parallelism())
417            .try_collect::<Vec<_>>()
418            .await?;
419        let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
420        Ok(concat(&references)?)
421    }
422}
423
424#[async_trait]
425impl AsyncIndex<usize> for PlainDecoder<'_> {
426    // TODO: should this return a Scalar value?
427    type Output = Result<ArrayRef>;
428
429    async fn get(&self, index: usize) -> Self::Output {
430        self.get(index..index + 1).await
431    }
432}
433
434#[async_trait]
435impl AsyncIndex<Range<usize>> for PlainDecoder<'_> {
436    type Output = Result<ArrayRef>;
437
438    async fn get(&self, index: Range<usize>) -> Self::Output {
439        if index.is_empty() {
440            return Ok(new_empty_array(self.data_type));
441        }
442        match self.data_type {
443            DataType::FixedSizeList(items, list_size) => {
444                self.decode_fixed_size_list(items, *list_size, index.start, index.end)
445                    .await
446            }
447            DataType::FixedSizeBinary(stride) => {
448                self.decode_fixed_size_binary(*stride, index.start, index.end)
449                    .await
450            }
451            _ => self.decode_primitive(index.start, index.end).await,
452        }
453    }
454}
455
456#[async_trait]
457impl AsyncIndex<RangeFrom<usize>> for PlainDecoder<'_> {
458    type Output = Result<ArrayRef>;
459
460    async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
461        self.get(index.start..self.length).await
462    }
463}
464
465#[async_trait]
466impl AsyncIndex<RangeTo<usize>> for PlainDecoder<'_> {
467    type Output = Result<ArrayRef>;
468
469    async fn get(&self, index: RangeTo<usize>) -> Self::Output {
470        self.get(0..index.end).await
471    }
472}
473
474#[async_trait]
475impl AsyncIndex<RangeFull> for PlainDecoder<'_> {
476    type Output = Result<ArrayRef>;
477
478    async fn get(&self, _: RangeFull) -> Self::Output {
479        self.get(0..self.length).await
480    }
481}
482
483#[async_trait]
484impl AsyncIndex<ReadBatchParams> for PlainDecoder<'_> {
485    type Output = Result<ArrayRef>;
486
487    async fn get(&self, params: ReadBatchParams) -> Self::Output {
488        match params {
489            ReadBatchParams::Range(r) => self.get(r).await,
490            // Ranges not supported in v1 files
491            ReadBatchParams::Ranges(_) => unimplemented!(),
492            ReadBatchParams::RangeFull => self.get(..).await,
493            ReadBatchParams::RangeTo(r) => self.get(r).await,
494            ReadBatchParams::RangeFrom(r) => self.get(r).await,
495            ReadBatchParams::Indices(indices) => self.take(&indices).await,
496        }
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use std::ops::Deref;
503
504    use arrow_array::*;
505    use lance_core::utils::tempfile::TempStdFile;
506    use rand::prelude::*;
507
508    use super::*;
509    use crate::local::LocalObjectReader;
510
511    #[tokio::test]
512    async fn test_encode_decode_primitive_array() {
513        let int_types = vec![
514            DataType::Int8,
515            DataType::Int16,
516            DataType::Int32,
517            DataType::Int64,
518            DataType::UInt8,
519            DataType::UInt16,
520            DataType::UInt32,
521            DataType::UInt64,
522        ];
523        let input: Vec<i64> = Vec::from_iter(1..127_i64);
524        for t in int_types {
525            let buffer = Buffer::from_slice_ref(input.as_slice());
526            let mut arrs: Vec<ArrayRef> = Vec::new();
527            for _ in 0..10 {
528                arrs.push(Arc::new(make_array_(&t, &buffer).await));
529            }
530            test_round_trip(arrs.as_slice(), t).await;
531        }
532
533        let float_types = vec![DataType::Float16, DataType::Float32, DataType::Float64];
534        let mut rng = rand::rng();
535        let input: Vec<f64> = (1..127).map(|_| rng.random()).collect();
536        for t in float_types {
537            let buffer = Buffer::from_slice_ref(input.as_slice());
538            let mut arrs: Vec<ArrayRef> = Vec::new();
539
540            for _ in 0..10 {
541                arrs.push(Arc::new(make_array_(&t, &buffer).await));
542            }
543            test_round_trip(arrs.as_slice(), t).await;
544        }
545    }
546
547    async fn test_round_trip(expected: &[ArrayRef], data_type: DataType) {
548        let path = TempStdFile::default();
549
550        let expected_as_array = expected
551            .iter()
552            .map(|e| e.as_ref())
553            .collect::<Vec<&dyn Array>>();
554        {
555            let mut writer = tokio::fs::File::create(&path).await.unwrap();
556            let mut encoder = PlainEncoder::new(&mut writer, &data_type);
557            assert_eq!(
558                encoder.encode(expected_as_array.as_slice()).await.unwrap(),
559                0
560            );
561            writer.flush().await.unwrap();
562        }
563
564        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
565            .await
566            .unwrap();
567        assert!(reader.size().await.unwrap() > 0);
568        // Expected size is the total of all arrays
569        let expected_size = expected.iter().map(|e| e.len()).sum();
570        let decoder = PlainDecoder::new(reader.as_ref(), &data_type, 0, expected_size).unwrap();
571        let arr = decoder.decode().await.unwrap();
572        let actual = arr.as_ref();
573        let expected_merged = concat(expected_as_array.as_slice()).unwrap();
574        assert_eq!(expected_merged.deref(), actual);
575        assert_eq!(expected_size, actual.len());
576    }
577
578    #[tokio::test]
579    async fn test_encode_decode_bool_array() {
580        let mut arrs: Vec<ArrayRef> = Vec::new();
581
582        for _ in 0..10 {
583            // It is important that the boolean array length is < 8 so we can test if the Arrays are merged correctly
584            arrs.push(Arc::new(BooleanArray::from(vec![true, true, true])) as ArrayRef);
585        }
586        test_round_trip(arrs.as_slice(), DataType::Boolean).await;
587    }
588
589    #[tokio::test]
590    async fn test_encode_decode_fixed_size_list_array() {
591        let int_types = vec![
592            DataType::Int8,
593            DataType::Int16,
594            DataType::Int32,
595            DataType::Int64,
596            DataType::UInt8,
597            DataType::UInt16,
598            DataType::UInt32,
599            DataType::UInt64,
600        ];
601        let input = Vec::from_iter(1..127_i64);
602        for t in int_types {
603            let buffer = Buffer::from_slice_ref(input.as_slice());
604            let list_type =
605                DataType::FixedSizeList(Arc::new(Field::new("item", t.clone(), true)), 3);
606            let mut arrs: Vec<ArrayRef> = Vec::new();
607
608            for _ in 0..10 {
609                let items = make_array_(&t.clone(), &buffer).await;
610                let arr = FixedSizeListArray::try_new_from_values(items, 3).unwrap();
611                arrs.push(Arc::new(arr) as ArrayRef);
612            }
613            test_round_trip(arrs.as_slice(), list_type).await;
614        }
615    }
616
617    #[tokio::test]
618    async fn test_encode_decode_fixed_size_binary_array() {
619        let t = DataType::FixedSizeBinary(3);
620        let mut arrs: Vec<ArrayRef> = Vec::new();
621
622        for _ in 0..10 {
623            let values = UInt8Array::from(Vec::from_iter(1..127_u8));
624            let arr = FixedSizeBinaryArray::try_new_from_values(&values, 3).unwrap();
625            arrs.push(Arc::new(arr) as ArrayRef);
626        }
627        test_round_trip(arrs.as_slice(), t).await;
628    }
629
630    #[tokio::test]
631    async fn test_bytes_to_array_padding() {
632        let bytes = Bytes::from_static(&[0x01, 0x00, 0x02, 0x00, 0x03]);
633        let arr = bytes_to_array(&DataType::UInt16, bytes, 3, 0).unwrap();
634
635        let expected = UInt16Array::from(vec![1, 2, 3]);
636        assert_eq!(arr.as_ref(), &expected);
637
638        // Underlying data is padded to the nearest multiple of two bytes (for u16).
639        let data = arr.to_data();
640        let buf = &data.buffers()[0];
641        let repr = format!("{:?}", buf);
642        assert!(
643            repr.contains("[1, 0, 2, 0, 3, 0]"),
644            "Underlying buffer contains unexpected data: {}",
645            repr
646        );
647    }
648
649    #[tokio::test]
650    async fn test_encode_decode_nested_fixed_size_list() {
651        // FixedSizeList of FixedSizeList
652        let inner = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
653        let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
654        let mut arrs: Vec<ArrayRef> = Vec::new();
655
656        for _ in 0..10 {
657            let values = Int64Array::from_iter_values(1..=120_i64);
658            let arr = FixedSizeListArray::try_new_from_values(
659                FixedSizeListArray::try_new_from_values(values, 2).unwrap(),
660                2,
661            )
662            .unwrap();
663            arrs.push(Arc::new(arr) as ArrayRef);
664        }
665        test_round_trip(arrs.as_slice(), t).await;
666
667        // FixedSizeList of FixedSizeBinary
668        let inner = DataType::FixedSizeBinary(2);
669        let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
670        let mut arrs: Vec<ArrayRef> = Vec::new();
671
672        for _ in 0..10 {
673            let values = UInt8Array::from_iter_values(1..=120_u8);
674            let arr = FixedSizeListArray::try_new_from_values(
675                FixedSizeBinaryArray::try_new_from_values(&values, 2).unwrap(),
676                2,
677            )
678            .unwrap();
679            arrs.push(Arc::new(arr) as ArrayRef);
680        }
681        test_round_trip(arrs.as_slice(), t).await;
682    }
683
684    async fn make_array_(data_type: &DataType, buffer: &Buffer) -> ArrayRef {
685        make_array(
686            ArrayDataBuilder::new(data_type.clone())
687                .len(126)
688                .add_buffer(buffer.clone())
689                .build()
690                .unwrap(),
691        )
692    }
693
694    #[tokio::test]
695    async fn test_decode_by_range() {
696        let path = TempStdFile::default();
697
698        let array = Int32Array::from_iter_values([0, 1, 2, 3, 4, 5]);
699        {
700            let mut writer = tokio::fs::File::create(&path).await.unwrap();
701            let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
702            assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
703            writer.flush().await.unwrap();
704        }
705
706        let reader = LocalObjectReader::open_local_path(&path, 2048, None)
707            .await
708            .unwrap();
709        assert!(reader.size().await.unwrap() > 0);
710        let decoder =
711            PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
712        assert_eq!(
713            decoder.get(2..4).await.unwrap().as_ref(),
714            &Int32Array::from_iter_values([2, 3])
715        );
716
717        assert_eq!(
718            decoder.get(..4).await.unwrap().as_ref(),
719            &Int32Array::from_iter_values([0, 1, 2, 3])
720        );
721
722        assert_eq!(
723            decoder.get(2..).await.unwrap().as_ref(),
724            &Int32Array::from_iter_values([2, 3, 4, 5])
725        );
726
727        assert_eq!(
728            &decoder.get(2..2).await.unwrap(),
729            &new_empty_array(&DataType::Int32)
730        );
731
732        assert_eq!(
733            &decoder.get(5..5).await.unwrap(),
734            &new_empty_array(&DataType::Int32)
735        );
736
737        assert!(decoder.get(3..1000).await.is_err());
738    }
739
740    #[tokio::test]
741    async fn test_take() {
742        let path = TempStdFile::default();
743
744        let array = Int32Array::from_iter_values(0..100);
745
746        {
747            let mut writer = tokio::fs::File::create(&path).await.unwrap();
748            let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
749            assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
750            AsyncWriteExt::shutdown(&mut writer).await.unwrap();
751        }
752
753        let reader = LocalObjectReader::open_local_path(&path, 2048, None)
754            .await
755            .unwrap();
756        assert!(reader.size().await.unwrap() > 0);
757        let decoder =
758            PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
759
760        let results = decoder
761            .take(&UInt32Array::from_iter(
762                [2, 4, 5, 20, 30, 55, 60].iter().map(|i| *i as u32),
763            ))
764            .await
765            .unwrap();
766        assert_eq!(
767            results.as_ref(),
768            &Int32Array::from_iter_values([2, 4, 5, 20, 30, 55, 60])
769        );
770    }
771
772    // Re-enable the following tests once the Lance FileReader / FileWrite is migrated.
773
774    // #[tokio::test]
775    // async fn test_boolean_slice() {
776    //     let store = ObjectStore::memory();
777    //     let path = Path::from("/bool_slice");
778    //
779    //     let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
780    //         "b",
781    //         DataType::Boolean,
782    //         true,
783    //     )]));
784    //     let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
785    //     let mut file_writer =
786    //         FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
787    //             .await
788    //             .unwrap();
789    //
790    //     let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
791    //     for i in 0..10 {
792    //         let data = array.slice(i * 12, 12); // one and half byte
793    //         file_writer
794    //             .write(&[RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(data)]).unwrap()])
795    //             .await
796    //             .unwrap();
797    //     }
798    //     file_writer.finish().await.unwrap();
799    //
800    //     let batch = read_file_as_one_batch(&store, &path).await;
801    //     assert_eq!(batch.column_by_name("b").unwrap().as_ref(), &array);
802    //
803    //     let array = BooleanArray::from(vec![Some(true), Some(false), None]);
804    //     let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
805    //         .await
806    //         .unwrap();
807    //     file_writer
808    //         .write(&[RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array)]).unwrap()])
809    //         .await
810    //         .unwrap();
811    //     file_writer.finish().await.unwrap();
812    //     let batch = read_file_as_one_batch(&store, &path).await;
813    //
814    //     // None default to Some(false), since we don't support null values yet.
815    //     let expected = BooleanArray::from(vec![Some(true), Some(false), Some(false)]);
816    //     assert_eq!(batch.column_by_name("b").unwrap().as_ref(), &expected);
817    // }
818    //
819    // #[tokio::test]
820    // async fn test_encode_fixed_size_list_slice() {
821    //     let store = ObjectStore::memory();
822    //     let path = Path::from("/shared_slice");
823    //
824    //     let array = Int32Array::from_iter_values(0..1600);
825    //     let fixed_size_list = FixedSizeListArray::try_new_from_values(array, 16).unwrap();
826    //     let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
827    //         "fl",
828    //         fixed_size_list.data_type().clone(),
829    //         false,
830    //     )]));
831    //     let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
832    //     let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
833    //         .await
834    //         .unwrap();
835    //
836    //     for i in (0..100).step_by(4) {
837    //         let slice: FixedSizeListArray = fixed_size_list.slice(i, 4);
838    //         file_writer
839    //             .write(&[
840    //                 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(slice)]).unwrap(),
841    //             ])
842    //             .await
843    //             .unwrap();
844    //     }
845    //     file_writer.finish().await.unwrap();
846    //
847    //     let batch = read_file_as_one_batch(&store, &path).await;
848    //     assert_eq!(
849    //         batch.column_by_name("fl").unwrap().as_ref(),
850    //         &fixed_size_list
851    //     );
852    // }
853    //
854    // #[tokio::test]
855    // async fn test_take_boolean() {
856    //     let temp_dir = tempfile::tempdir().unwrap();
857    //     let path = temp_dir.join("/bool_take");
858    //
859    //     let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
860    //         "b",
861    //         DataType::Boolean,
862    //         false,
863    //     )]));
864    //     let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
865    //     let mut file_writer =
866    //         FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
867    //             .await
868    //             .unwrap();
869    //
870    //     let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
871    //     let batch =
872    //         RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
873    //     file_writer.write(&[batch]).await.unwrap();
874    //     file_writer.finish().await.unwrap();
875    //
876    //     let reader = FileReader::try_new(&store, &path).await.unwrap();
877    //     let actual = reader
878    //         .take(&[2, 4, 5, 8, 63, 64, 65], &schema)
879    //         .await
880    //         .unwrap();
881    //
882    //     assert_eq!(
883    //         actual.column_by_name("b").unwrap().as_ref(),
884    //         &BooleanArray::from(vec![false, false, true, false, false, false, true])
885    //     );
886    // }
887
888    #[test]
889    fn test_make_chunked_request() {
890        let byte_width: usize = 4096; // 4K
891        let prefetch_size: usize = 64 * 1024; // 64KB.
892        let u32_overflow: usize = u32::MAX as usize + 10;
893
894        let indices: Vec<u32> = vec![
895            1,
896            10,
897            20,
898            100,
899            120,
900            (u32_overflow / byte_width) as u32, // Two overflow offsets
901            (u32_overflow / byte_width) as u32 + 100,
902        ];
903        let chunks = make_chunked_requests(&indices, byte_width, prefetch_size);
904        assert_eq!(chunks.len(), 6, "got chunks: {:?}", chunks);
905        assert_eq!(chunks, vec![(0..2), (2..3), (3..4), (4..5), (5..6), (6..7)])
906    }
907}