lance_io/encodings/
plain.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Plain encoding
5//!
6//! Plain encoding works with fixed stride types, i.e., `boolean`, `i8...i64`, `f16...f64`,
7//! it stores the array directly in the file. It offers O(1) read access.
8
9use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
10use std::slice::from_raw_parts;
11use std::sync::Arc;
12
13use crate::{
14    traits::{Reader, Writer},
15    ReadBatchParams,
16};
17use arrow_arith::numeric::sub;
18use arrow_array::{
19    builder::BooleanBuilder, cast::AsArray, make_array, new_empty_array, Array, ArrayRef,
20    BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, UInt32Array, UInt8Array,
21};
22use arrow_buffer::{bit_util, Buffer};
23use arrow_data::{layout, ArrayDataBuilder, BufferSpec};
24use arrow_schema::{DataType, Field};
25use arrow_select::{concat::concat, take::take};
26use async_recursion::async_recursion;
27use async_trait::async_trait;
28use bytes::Bytes;
29use futures::stream::{self, StreamExt, TryStreamExt};
30use lance_arrow::*;
31use lance_core::{Error, Result};
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34
35use crate::encodings::{AsyncIndex, Decoder};
36
37/// Encoder for plain encoding.
38///
39pub struct PlainEncoder<'a> {
40    writer: &'a mut dyn Writer,
41    data_type: &'a DataType,
42}
43
44impl<'a> PlainEncoder<'a> {
45    pub fn new(writer: &'a mut dyn Writer, data_type: &'a DataType) -> Self {
46        PlainEncoder { writer, data_type }
47    }
48
49    /// Write an continuous plain-encoded array to the writer.
50    pub async fn write(writer: &'a mut dyn Writer, arrays: &[&'a dyn Array]) -> Result<usize> {
51        let pos = writer.tell().await?;
52        if !arrays.is_empty() {
53            let mut encoder = Self::new(writer, arrays[0].data_type());
54            encoder.encode(arrays).await?;
55        }
56        Ok(pos)
57    }
58
59    /// Encode an slice of an Array of a batch.
60    /// Returns the offset of the metadata
61    pub async fn encode(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
62        self.encode_internal(arrays, self.data_type).await
63    }
64
65    #[async_recursion]
66    async fn encode_internal(
67        &mut self,
68        array: &[&dyn Array],
69        data_type: &DataType,
70    ) -> Result<usize> {
71        if let DataType::FixedSizeList(items, _) = data_type {
72            self.encode_fixed_size_list(array, items).await
73        } else {
74            self.encode_primitive(array).await
75        }
76    }
77
78    async fn encode_boolean(&mut self, arrays: &[&BooleanArray]) -> Result<()> {
79        let capacity: usize = arrays.iter().map(|a| a.len()).sum();
80        let mut builder = BooleanBuilder::with_capacity(capacity);
81
82        for array in arrays {
83            for val in array.iter() {
84                builder.append_value(val.unwrap_or_default());
85            }
86        }
87
88        let boolean_array = builder.finish();
89        self.writer
90            .write_all(boolean_array.into_data().buffers()[0].as_slice())
91            .await?;
92        Ok(())
93    }
94
95    /// Encode array of primitive values.
96    async fn encode_primitive(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
97        assert!(!arrays.is_empty());
98        let data_type = arrays[0].data_type();
99        let offset = self.writer.tell().await?;
100
101        if matches!(data_type, DataType::Boolean) {
102            let boolean_arr = arrays
103                .iter()
104                .map(|a| a.as_boolean())
105                .collect::<Vec<&BooleanArray>>();
106            self.encode_boolean(boolean_arr.as_slice()).await?;
107        } else {
108            let byte_width = data_type.byte_width();
109            for a in arrays.iter() {
110                let data = a.to_data();
111                let slice = unsafe {
112                    from_raw_parts(
113                        data.buffers()[0].as_ptr().add(a.offset() * byte_width),
114                        a.len() * byte_width,
115                    )
116                };
117                self.writer.write_all(slice).await?;
118            }
119        }
120        Ok(offset)
121    }
122
123    /// Encode fixed size list.
124    async fn encode_fixed_size_list(
125        &mut self,
126        arrays: &[&dyn Array],
127        items: &Field,
128    ) -> Result<usize> {
129        let mut value_arrs: Vec<ArrayRef> = Vec::new();
130
131        for array in arrays {
132            let list_array = array
133                .as_any()
134                .downcast_ref::<FixedSizeListArray>()
135                .ok_or_else(|| Error::Schema {
136                    message: format!("Needed a FixedSizeListArray but got {}", array.data_type()),
137                    location: location!(),
138                })?;
139            let offset = list_array.value_offset(0) as usize;
140            let length = list_array.len();
141            let value_length = list_array.value_length() as usize;
142            let value_array = list_array.values().slice(offset, length * value_length);
143            value_arrs.push(value_array);
144        }
145
146        self.encode_internal(
147            value_arrs
148                .iter()
149                .map(|a| a.as_ref())
150                .collect::<Vec<_>>()
151                .as_slice(),
152            items.data_type(),
153        )
154        .await
155    }
156}
157
158/// Decoder for plain encoding.
159pub struct PlainDecoder<'a> {
160    reader: &'a dyn Reader,
161    data_type: &'a DataType,
162    /// The start position of the batch in the file.
163    position: usize,
164    /// Number of the rows in this batch.
165    length: usize,
166}
167
168/// Get byte range from the row offset range.
169#[inline]
170fn get_byte_range(data_type: &DataType, row_range: Range<usize>) -> Range<usize> {
171    match data_type {
172        DataType::Boolean => row_range.start / 8..bit_util::ceil(row_range.end, 8),
173        _ => row_range.start * data_type.byte_width()..row_range.end * data_type.byte_width(),
174    }
175}
176
177pub fn bytes_to_array(
178    data_type: &DataType,
179    bytes: Bytes,
180    len: usize,
181    offset: usize,
182) -> Result<ArrayRef> {
183    let layout = layout(data_type);
184
185    if layout.buffers.len() != 1 {
186        return Err(Error::Internal {
187            message: format!(
188                "Can only convert datatypes that require one buffer, found {:?}",
189                data_type
190            ),
191            location: location!(),
192        });
193    }
194
195    let buf: Buffer = if let BufferSpec::FixedWidth {
196        byte_width,
197        alignment,
198    } = &layout.buffers[0]
199    {
200        // this code is taken from
201        // https://github.com/apache/arrow-rs/blob/master/arrow-data/src/data.rs#L748-L768
202        let len_plus_offset = len + offset;
203        let min_buffer_size = len_plus_offset.saturating_mul(*byte_width);
204
205        // alignment or size isn't right -- just make a copy
206        if bytes.len() < min_buffer_size {
207            Buffer::copy_bytes_bytes(bytes, min_buffer_size)
208        } else {
209            Buffer::from_bytes_bytes(bytes, *alignment as u64)
210        }
211    } else {
212        // cases we don't handle, just copy
213        Buffer::from_slice_ref(bytes)
214    };
215
216    let array_data = ArrayDataBuilder::new(data_type.clone())
217        .len(len)
218        .offset(offset)
219        .null_count(0)
220        .add_buffer(buf)
221        .build()?;
222    Ok(make_array(array_data))
223}
224
225impl<'a> PlainDecoder<'a> {
226    pub fn new(
227        reader: &'a dyn Reader,
228        data_type: &'a DataType,
229        position: usize,
230        length: usize,
231    ) -> Result<Self> {
232        Ok(PlainDecoder {
233            reader,
234            data_type,
235            position,
236            length,
237        })
238    }
239
240    /// Decode primitive values, from "offset" to "offset + length".
241    ///
242    async fn decode_primitive(&self, start: usize, end: usize) -> Result<ArrayRef> {
243        if end > self.length {
244            return Err(Error::io(
245                format!(
246                    "PlainDecoder: request([{}..{}]) out of range: [0..{}]",
247                    start, end, self.length
248                ),
249                location!(),
250            ));
251        }
252        let byte_range = get_byte_range(self.data_type, start..end);
253        let range = Range {
254            start: self.position + byte_range.start,
255            end: self.position + byte_range.end,
256        };
257
258        let data = self.reader.get_range(range).await?;
259        // booleans are bitpacked, so we need an offset to provide the exact
260        // requested range.
261        let offset = if self.data_type == &DataType::Boolean {
262            start % 8
263        } else {
264            0
265        };
266        bytes_to_array(self.data_type, data, end - start, offset)
267    }
268
269    async fn decode_fixed_size_list(
270        &self,
271        items: &Field,
272        list_size: i32,
273        start: usize,
274        end: usize,
275    ) -> Result<ArrayRef> {
276        if !items.data_type().is_fixed_stride() {
277            return Err(Error::Schema {
278                message: format!(
279                    "Items for fixed size list should be primitives but found {}",
280                    items.data_type()
281                ),
282                location: location!(),
283            });
284        };
285        let item_decoder = PlainDecoder::new(
286            self.reader,
287            items.data_type(),
288            self.position,
289            self.length * list_size as usize,
290        )?;
291        let item_array = item_decoder
292            .get(start * list_size as usize..end * list_size as usize)
293            .await?;
294        Ok(Arc::new(FixedSizeListArray::new(
295            Arc::new(items.clone()),
296            list_size,
297            item_array,
298            None,
299        )) as ArrayRef)
300    }
301
302    async fn decode_fixed_size_binary(
303        &self,
304        stride: i32,
305        start: usize,
306        end: usize,
307    ) -> Result<ArrayRef> {
308        let bytes_decoder = PlainDecoder::new(
309            self.reader,
310            &DataType::UInt8,
311            self.position,
312            self.length * stride as usize,
313        )?;
314        let bytes_array = bytes_decoder
315            .get(start * stride as usize..end * stride as usize)
316            .await?;
317        let values = bytes_array
318            .as_any()
319            .downcast_ref::<UInt8Array>()
320            .ok_or_else(|| Error::Schema {
321                message: "Could not cast to UInt8Array for FixedSizeBinary".to_string(),
322                location: location!(),
323            })?;
324        Ok(Arc::new(FixedSizeBinaryArray::try_new_from_values(values, stride)?) as ArrayRef)
325    }
326
327    async fn take_boolean(&self, indices: &UInt32Array) -> Result<ArrayRef> {
328        let block_size = self.reader.block_size() as u32;
329        let boolean_block_size = block_size * 8;
330
331        let mut chunk_ranges = vec![];
332        let mut start: u32 = 0;
333        for j in 0..(indices.len() - 1) as u32 {
334            if (indices.value(j as usize + 1) / boolean_block_size)
335                > (indices.value(start as usize) / boolean_block_size)
336            {
337                let next_start = j + 1;
338                chunk_ranges.push(start..next_start);
339                start = next_start;
340            }
341        }
342        // Remaining
343        chunk_ranges.push(start..indices.len() as u32);
344
345        let arrays = stream::iter(chunk_ranges)
346            .map(|cr| async move {
347                let request = indices.slice(cr.start as usize, cr.len());
348                // request contains the array indices we are retrieving in this chunk.
349
350                // Get the starting index
351                let start = request.value(0);
352                // Final index is the last value
353                let end = request.value(request.len() - 1);
354                let array = self.get(start as usize..end as usize + 1).await?;
355
356                let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?;
357                Ok::<ArrayRef, Error>(take(&array, &shifted_indices, None)?)
358            })
359            .buffered(self.reader.io_parallelism())
360            .try_collect::<Vec<_>>()
361            .await?;
362        let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
363        Ok(concat(&references)?)
364    }
365}
366
367fn make_chunked_requests(
368    indices: &[u32],
369    byte_width: usize,
370    block_size: usize,
371) -> Vec<Range<usize>> {
372    let mut chunked_ranges = vec![];
373    let mut start: usize = 0;
374    // Note: limit the I/O size to the block size.
375    //
376    // Another option could be checking whether `indices[i]` and `indices[i+1]` are not
377    // farther way than the block size:
378    //    indices[i] * byte_width + block_size < indices[i+1] * byte_width
379    // It might allow slightly larger sequential reads.
380    for i in 0..indices.len() - 1 {
381        // If contiguous, continue
382        if indices[i + 1] == indices[i] + 1 {
383            continue;
384        }
385        if indices[i + 1] as usize * byte_width > indices[start] as usize * byte_width + block_size
386        {
387            chunked_ranges.push(start..i + 1);
388            start = i + 1;
389        }
390    }
391    chunked_ranges.push(start..indices.len());
392    chunked_ranges
393}
394
395#[async_trait]
396impl Decoder for PlainDecoder<'_> {
397    async fn decode(&self) -> Result<ArrayRef> {
398        self.get(0..self.length).await
399    }
400
401    async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
402        if indices.is_empty() {
403            return Ok(new_empty_array(self.data_type));
404        }
405
406        if matches!(self.data_type, DataType::Boolean) {
407            return self.take_boolean(indices).await;
408        }
409
410        let block_size = self.reader.block_size();
411        let byte_width = self.data_type.byte_width();
412
413        let chunked_ranges = make_chunked_requests(indices.values(), byte_width, block_size);
414
415        let arrays = stream::iter(chunked_ranges)
416            .map(|cr| async move {
417                let request = indices.slice(cr.start, cr.len());
418
419                let start = request.value(0);
420                let end = request.value(request.len() - 1);
421                let array = self.get(start as usize..end as usize + 1).await?;
422                let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?;
423                Ok::<ArrayRef, Error>(take(&array, &adjusted_offsets, None)?)
424            })
425            .buffered(self.reader.io_parallelism())
426            .try_collect::<Vec<_>>()
427            .await?;
428        let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
429        Ok(concat(&references)?)
430    }
431}
432
433#[async_trait]
434impl AsyncIndex<usize> for PlainDecoder<'_> {
435    // TODO: should this return a Scalar value?
436    type Output = Result<ArrayRef>;
437
438    async fn get(&self, index: usize) -> Self::Output {
439        self.get(index..index + 1).await
440    }
441}
442
443#[async_trait]
444impl AsyncIndex<Range<usize>> for PlainDecoder<'_> {
445    type Output = Result<ArrayRef>;
446
447    async fn get(&self, index: Range<usize>) -> Self::Output {
448        if index.is_empty() {
449            return Ok(new_empty_array(self.data_type));
450        }
451        match self.data_type {
452            DataType::FixedSizeList(items, list_size) => {
453                self.decode_fixed_size_list(items, *list_size, index.start, index.end)
454                    .await
455            }
456            DataType::FixedSizeBinary(stride) => {
457                self.decode_fixed_size_binary(*stride, index.start, index.end)
458                    .await
459            }
460            _ => self.decode_primitive(index.start, index.end).await,
461        }
462    }
463}
464
465#[async_trait]
466impl AsyncIndex<RangeFrom<usize>> for PlainDecoder<'_> {
467    type Output = Result<ArrayRef>;
468
469    async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
470        self.get(index.start..self.length).await
471    }
472}
473
474#[async_trait]
475impl AsyncIndex<RangeTo<usize>> for PlainDecoder<'_> {
476    type Output = Result<ArrayRef>;
477
478    async fn get(&self, index: RangeTo<usize>) -> Self::Output {
479        self.get(0..index.end).await
480    }
481}
482
483#[async_trait]
484impl AsyncIndex<RangeFull> for PlainDecoder<'_> {
485    type Output = Result<ArrayRef>;
486
487    async fn get(&self, _: RangeFull) -> Self::Output {
488        self.get(0..self.length).await
489    }
490}
491
492#[async_trait]
493impl AsyncIndex<ReadBatchParams> for PlainDecoder<'_> {
494    type Output = Result<ArrayRef>;
495
496    async fn get(&self, params: ReadBatchParams) -> Self::Output {
497        match params {
498            ReadBatchParams::Range(r) => self.get(r).await,
499            // Ranges not supported in v1 files
500            ReadBatchParams::Ranges(_) => unimplemented!(),
501            ReadBatchParams::RangeFull => self.get(..).await,
502            ReadBatchParams::RangeTo(r) => self.get(r).await,
503            ReadBatchParams::RangeFrom(r) => self.get(r).await,
504            ReadBatchParams::Indices(indices) => self.take(&indices).await,
505        }
506    }
507}
508
509#[cfg(test)]
510mod tests {
511    use std::ops::Deref;
512
513    use arrow_array::*;
514    use lance_core::utils::tempfile::TempStdFile;
515    use rand::prelude::*;
516
517    use super::*;
518    use crate::local::LocalObjectReader;
519
520    #[tokio::test]
521    async fn test_encode_decode_primitive_array() {
522        let int_types = vec![
523            DataType::Int8,
524            DataType::Int16,
525            DataType::Int32,
526            DataType::Int64,
527            DataType::UInt8,
528            DataType::UInt16,
529            DataType::UInt32,
530            DataType::UInt64,
531        ];
532        let input: Vec<i64> = Vec::from_iter(1..127_i64);
533        for t in int_types {
534            let buffer = Buffer::from_slice_ref(input.as_slice());
535            let mut arrs: Vec<ArrayRef> = Vec::new();
536            for _ in 0..10 {
537                arrs.push(Arc::new(make_array_(&t, &buffer).await));
538            }
539            test_round_trip(arrs.as_slice(), t).await;
540        }
541
542        let float_types = vec![DataType::Float16, DataType::Float32, DataType::Float64];
543        let mut rng = rand::rng();
544        let input: Vec<f64> = (1..127).map(|_| rng.random()).collect();
545        for t in float_types {
546            let buffer = Buffer::from_slice_ref(input.as_slice());
547            let mut arrs: Vec<ArrayRef> = Vec::new();
548
549            for _ in 0..10 {
550                arrs.push(Arc::new(make_array_(&t, &buffer).await));
551            }
552            test_round_trip(arrs.as_slice(), t).await;
553        }
554    }
555
556    async fn test_round_trip(expected: &[ArrayRef], data_type: DataType) {
557        let path = TempStdFile::default();
558
559        let expected_as_array = expected
560            .iter()
561            .map(|e| e.as_ref())
562            .collect::<Vec<&dyn Array>>();
563        {
564            let mut writer = tokio::fs::File::create(&path).await.unwrap();
565            let mut encoder = PlainEncoder::new(&mut writer, &data_type);
566            assert_eq!(
567                encoder.encode(expected_as_array.as_slice()).await.unwrap(),
568                0
569            );
570            writer.flush().await.unwrap();
571        }
572
573        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
574            .await
575            .unwrap();
576        assert!(reader.size().await.unwrap() > 0);
577        // Expected size is the total of all arrays
578        let expected_size = expected.iter().map(|e| e.len()).sum();
579        let decoder = PlainDecoder::new(reader.as_ref(), &data_type, 0, expected_size).unwrap();
580        let arr = decoder.decode().await.unwrap();
581        let actual = arr.as_ref();
582        let expected_merged = concat(expected_as_array.as_slice()).unwrap();
583        assert_eq!(expected_merged.deref(), actual);
584        assert_eq!(expected_size, actual.len());
585    }
586
587    #[tokio::test]
588    async fn test_encode_decode_bool_array() {
589        let mut arrs: Vec<ArrayRef> = Vec::new();
590
591        for _ in 0..10 {
592            // It is important that the boolean array length is < 8 so we can test if the Arrays are merged correctly
593            arrs.push(Arc::new(BooleanArray::from(vec![true, true, true])) as ArrayRef);
594        }
595        test_round_trip(arrs.as_slice(), DataType::Boolean).await;
596    }
597
598    #[tokio::test]
599    async fn test_encode_decode_fixed_size_list_array() {
600        let int_types = vec![
601            DataType::Int8,
602            DataType::Int16,
603            DataType::Int32,
604            DataType::Int64,
605            DataType::UInt8,
606            DataType::UInt16,
607            DataType::UInt32,
608            DataType::UInt64,
609        ];
610        let input = Vec::from_iter(1..127_i64);
611        for t in int_types {
612            let buffer = Buffer::from_slice_ref(input.as_slice());
613            let list_type =
614                DataType::FixedSizeList(Arc::new(Field::new("item", t.clone(), true)), 3);
615            let mut arrs: Vec<ArrayRef> = Vec::new();
616
617            for _ in 0..10 {
618                let items = make_array_(&t.clone(), &buffer).await;
619                let arr = FixedSizeListArray::try_new_from_values(items, 3).unwrap();
620                arrs.push(Arc::new(arr) as ArrayRef);
621            }
622            test_round_trip(arrs.as_slice(), list_type).await;
623        }
624    }
625
626    #[tokio::test]
627    async fn test_encode_decode_fixed_size_binary_array() {
628        let t = DataType::FixedSizeBinary(3);
629        let mut arrs: Vec<ArrayRef> = Vec::new();
630
631        for _ in 0..10 {
632            let values = UInt8Array::from(Vec::from_iter(1..127_u8));
633            let arr = FixedSizeBinaryArray::try_new_from_values(&values, 3).unwrap();
634            arrs.push(Arc::new(arr) as ArrayRef);
635        }
636        test_round_trip(arrs.as_slice(), t).await;
637    }
638
639    #[tokio::test]
640    async fn test_bytes_to_array_padding() {
641        let bytes = Bytes::from_static(&[0x01, 0x00, 0x02, 0x00, 0x03]);
642        let arr = bytes_to_array(&DataType::UInt16, bytes, 3, 0).unwrap();
643
644        let expected = UInt16Array::from(vec![1, 2, 3]);
645        assert_eq!(arr.as_ref(), &expected);
646
647        // Underlying data is padded to the nearest multiple of two bytes (for u16).
648        let data = arr.to_data();
649        let buf = &data.buffers()[0];
650        let repr = format!("{:?}", buf);
651        assert!(
652            repr.contains("[1, 0, 2, 0, 3, 0]"),
653            "Underlying buffer contains unexpected data: {}",
654            repr
655        );
656    }
657
658    #[tokio::test]
659    async fn test_encode_decode_nested_fixed_size_list() {
660        // FixedSizeList of FixedSizeList
661        let inner = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
662        let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
663        let mut arrs: Vec<ArrayRef> = Vec::new();
664
665        for _ in 0..10 {
666            let values = Int64Array::from_iter_values(1..=120_i64);
667            let arr = FixedSizeListArray::try_new_from_values(
668                FixedSizeListArray::try_new_from_values(values, 2).unwrap(),
669                2,
670            )
671            .unwrap();
672            arrs.push(Arc::new(arr) as ArrayRef);
673        }
674        test_round_trip(arrs.as_slice(), t).await;
675
676        // FixedSizeList of FixedSizeBinary
677        let inner = DataType::FixedSizeBinary(2);
678        let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
679        let mut arrs: Vec<ArrayRef> = Vec::new();
680
681        for _ in 0..10 {
682            let values = UInt8Array::from_iter_values(1..=120_u8);
683            let arr = FixedSizeListArray::try_new_from_values(
684                FixedSizeBinaryArray::try_new_from_values(&values, 2).unwrap(),
685                2,
686            )
687            .unwrap();
688            arrs.push(Arc::new(arr) as ArrayRef);
689        }
690        test_round_trip(arrs.as_slice(), t).await;
691    }
692
693    async fn make_array_(data_type: &DataType, buffer: &Buffer) -> ArrayRef {
694        make_array(
695            ArrayDataBuilder::new(data_type.clone())
696                .len(126)
697                .add_buffer(buffer.clone())
698                .build()
699                .unwrap(),
700        )
701    }
702
703    #[tokio::test]
704    async fn test_decode_by_range() {
705        let path = TempStdFile::default();
706
707        let array = Int32Array::from_iter_values([0, 1, 2, 3, 4, 5]);
708        {
709            let mut writer = tokio::fs::File::create(&path).await.unwrap();
710            let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
711            assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
712            writer.flush().await.unwrap();
713        }
714
715        let reader = LocalObjectReader::open_local_path(&path, 2048, None)
716            .await
717            .unwrap();
718        assert!(reader.size().await.unwrap() > 0);
719        let decoder =
720            PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
721        assert_eq!(
722            decoder.get(2..4).await.unwrap().as_ref(),
723            &Int32Array::from_iter_values([2, 3])
724        );
725
726        assert_eq!(
727            decoder.get(..4).await.unwrap().as_ref(),
728            &Int32Array::from_iter_values([0, 1, 2, 3])
729        );
730
731        assert_eq!(
732            decoder.get(2..).await.unwrap().as_ref(),
733            &Int32Array::from_iter_values([2, 3, 4, 5])
734        );
735
736        assert_eq!(
737            &decoder.get(2..2).await.unwrap(),
738            &new_empty_array(&DataType::Int32)
739        );
740
741        assert_eq!(
742            &decoder.get(5..5).await.unwrap(),
743            &new_empty_array(&DataType::Int32)
744        );
745
746        assert!(decoder.get(3..1000).await.is_err());
747    }
748
749    #[tokio::test]
750    async fn test_take() {
751        let path = TempStdFile::default();
752
753        let array = Int32Array::from_iter_values(0..100);
754
755        {
756            let mut writer = tokio::fs::File::create(&path).await.unwrap();
757            let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
758            assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
759            writer.shutdown().await.unwrap();
760        }
761
762        let reader = LocalObjectReader::open_local_path(&path, 2048, None)
763            .await
764            .unwrap();
765        assert!(reader.size().await.unwrap() > 0);
766        let decoder =
767            PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
768
769        let results = decoder
770            .take(&UInt32Array::from_iter(
771                [2, 4, 5, 20, 30, 55, 60].iter().map(|i| *i as u32),
772            ))
773            .await
774            .unwrap();
775        assert_eq!(
776            results.as_ref(),
777            &Int32Array::from_iter_values([2, 4, 5, 20, 30, 55, 60])
778        );
779    }
780
781    // Re-enable the following tests once the Lance FileReader / FileWrite is migrated.
782
783    // #[tokio::test]
784    // async fn test_boolean_slice() {
785    //     let store = ObjectStore::memory();
786    //     let path = Path::from("/bool_slice");
787    //
788    //     let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
789    //         "b",
790    //         DataType::Boolean,
791    //         true,
792    //     )]));
793    //     let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
794    //     let mut file_writer =
795    //         FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
796    //             .await
797    //             .unwrap();
798    //
799    //     let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
800    //     for i in 0..10 {
801    //         let data = array.slice(i * 12, 12); // one and half byte
802    //         file_writer
803    //             .write(&[RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(data)]).unwrap()])
804    //             .await
805    //             .unwrap();
806    //     }
807    //     file_writer.finish().await.unwrap();
808    //
809    //     let batch = read_file_as_one_batch(&store, &path).await;
810    //     assert_eq!(batch.column_by_name("b").unwrap().as_ref(), &array);
811    //
812    //     let array = BooleanArray::from(vec![Some(true), Some(false), None]);
813    //     let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
814    //         .await
815    //         .unwrap();
816    //     file_writer
817    //         .write(&[RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array)]).unwrap()])
818    //         .await
819    //         .unwrap();
820    //     file_writer.finish().await.unwrap();
821    //     let batch = read_file_as_one_batch(&store, &path).await;
822    //
823    //     // None default to Some(false), since we don't support null values yet.
824    //     let expected = BooleanArray::from(vec![Some(true), Some(false), Some(false)]);
825    //     assert_eq!(batch.column_by_name("b").unwrap().as_ref(), &expected);
826    // }
827    //
828    // #[tokio::test]
829    // async fn test_encode_fixed_size_list_slice() {
830    //     let store = ObjectStore::memory();
831    //     let path = Path::from("/shared_slice");
832    //
833    //     let array = Int32Array::from_iter_values(0..1600);
834    //     let fixed_size_list = FixedSizeListArray::try_new_from_values(array, 16).unwrap();
835    //     let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
836    //         "fl",
837    //         fixed_size_list.data_type().clone(),
838    //         false,
839    //     )]));
840    //     let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
841    //     let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
842    //         .await
843    //         .unwrap();
844    //
845    //     for i in (0..100).step_by(4) {
846    //         let slice: FixedSizeListArray = fixed_size_list.slice(i, 4);
847    //         file_writer
848    //             .write(&[
849    //                 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(slice)]).unwrap(),
850    //             ])
851    //             .await
852    //             .unwrap();
853    //     }
854    //     file_writer.finish().await.unwrap();
855    //
856    //     let batch = read_file_as_one_batch(&store, &path).await;
857    //     assert_eq!(
858    //         batch.column_by_name("fl").unwrap().as_ref(),
859    //         &fixed_size_list
860    //     );
861    // }
862    //
863    // #[tokio::test]
864    // async fn test_take_boolean() {
865    //     let temp_dir = tempfile::tempdir().unwrap();
866    //     let path = temp_dir.join("/bool_take");
867    //
868    //     let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
869    //         "b",
870    //         DataType::Boolean,
871    //         false,
872    //     )]));
873    //     let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
874    //     let mut file_writer =
875    //         FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
876    //             .await
877    //             .unwrap();
878    //
879    //     let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
880    //     let batch =
881    //         RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
882    //     file_writer.write(&[batch]).await.unwrap();
883    //     file_writer.finish().await.unwrap();
884    //
885    //     let reader = FileReader::try_new(&store, &path).await.unwrap();
886    //     let actual = reader
887    //         .take(&[2, 4, 5, 8, 63, 64, 65], &schema)
888    //         .await
889    //         .unwrap();
890    //
891    //     assert_eq!(
892    //         actual.column_by_name("b").unwrap().as_ref(),
893    //         &BooleanArray::from(vec![false, false, true, false, false, false, true])
894    //     );
895    // }
896
897    #[test]
898    fn test_make_chunked_request() {
899        let byte_width: usize = 4096; // 4K
900        let prefetch_size: usize = 64 * 1024; // 64KB.
901        let u32_overflow: usize = u32::MAX as usize + 10;
902
903        let indices: Vec<u32> = vec![
904            1,
905            10,
906            20,
907            100,
908            120,
909            (u32_overflow / byte_width) as u32, // Two overflow offsets
910            (u32_overflow / byte_width) as u32 + 100,
911        ];
912        let chunks = make_chunked_requests(&indices, byte_width, prefetch_size);
913        assert_eq!(chunks.len(), 6, "got chunks: {:?}", chunks);
914        assert_eq!(chunks, vec![(0..2), (2..3), (3..4), (4..5), (5..6), (6..7)])
915    }
916}