lance_io/encodings/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Var-length binary encoding.
5//!
6
7use std::marker::PhantomData;
8use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
9use std::sync::Arc;
10
11use arrow_arith::numeric::sub;
12use arrow_array::{
13    builder::{ArrayBuilder, PrimitiveBuilder},
14    cast::as_primitive_array,
15    cast::AsArray,
16    new_empty_array,
17    types::{
18        BinaryType, ByteArrayType, Int64Type, LargeBinaryType, LargeUtf8Type, UInt32Type, Utf8Type,
19    },
20    Array, ArrayRef, GenericByteArray, Int64Array, OffsetSizeTrait, UInt32Array,
21};
22use arrow_buffer::{bit_util, ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
23use arrow_cast::cast::cast;
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::{StreamExt, TryStreamExt};
29use lance_arrow::BufferExt;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32
33use super::ReadBatchParams;
34use super::{plain::PlainDecoder, AsyncIndex, Decoder, Encoder};
35use crate::traits::{Reader, Writer};
36use lance_core::Result;
37
38/// Encoder for Var-binary encoding.
39pub struct BinaryEncoder<'a> {
40    writer: &'a mut dyn Writer,
41}
42
43impl<'a> BinaryEncoder<'a> {
44    pub fn new(writer: &'a mut dyn Writer) -> Self {
45        Self { writer }
46    }
47
48    async fn encode_typed_arr<T: ByteArrayType>(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
49        let capacity: usize = arrs.iter().map(|a| a.len()).sum();
50        let mut pos_builder: PrimitiveBuilder<Int64Type> =
51            PrimitiveBuilder::with_capacity(capacity + 1);
52
53        let mut last_offset: usize = self.writer.tell().await?;
54        pos_builder.append_value(last_offset as i64);
55        for array in arrs.iter() {
56            let arr = array
57                .as_any()
58                .downcast_ref::<GenericByteArray<T>>()
59                .unwrap();
60
61            let offsets = arr.value_offsets();
62
63            let start = offsets[0].as_usize();
64            let end = offsets[offsets.len() - 1].as_usize();
65            let b = unsafe {
66                std::slice::from_raw_parts(
67                    arr.to_data().buffers()[1].as_ptr().add(start),
68                    end - start,
69                )
70            };
71            self.writer.write_all(b).await?;
72
73            let start_offset = offsets[0].as_usize();
74            offsets
75                .iter()
76                .skip(1)
77                .map(|b| b.as_usize() - start_offset + last_offset)
78                .for_each(|o| pos_builder.append_value(o as i64));
79            last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize;
80        }
81
82        let positions_offset = self.writer.tell().await?;
83        let pos_array = pos_builder.finish();
84        self.writer
85            .write_all(pos_array.to_data().buffers()[0].as_slice())
86            .await?;
87        Ok(positions_offset)
88    }
89}
90
91#[async_trait]
92impl Encoder for BinaryEncoder<'_> {
93    async fn encode(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
94        assert!(!arrs.is_empty());
95        let data_type = arrs[0].data_type();
96        match data_type {
97            DataType::Utf8 => self.encode_typed_arr::<Utf8Type>(arrs).await,
98            DataType::Binary => self.encode_typed_arr::<BinaryType>(arrs).await,
99            DataType::LargeUtf8 => self.encode_typed_arr::<LargeUtf8Type>(arrs).await,
100            DataType::LargeBinary => self.encode_typed_arr::<LargeBinaryType>(arrs).await,
101            _ => {
102                return Err(lance_core::Error::io(
103                    format!("Binary encoder does not support {}", data_type),
104                    location!(),
105                ));
106            }
107        }
108    }
109}
110
111/// Var-binary encoding decoder.
112pub struct BinaryDecoder<'a, T: ByteArrayType> {
113    reader: &'a dyn Reader,
114
115    position: usize,
116
117    length: usize,
118
119    nullable: bool,
120
121    phantom: PhantomData<T>,
122}
123
124/// Var-length Binary Decoder
125///
126impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
127    /// Create a [BinaryEncoder] to decode one batch.
128    ///
129    ///  - `position`, file position where this batch starts.
130    ///  - `length`, the number of records in this batch.
131    ///  - `nullable`, whether this batch contains nullable value.
132    ///
133    /// ## Example
134    ///
135    /// ```rust
136    /// use arrow_array::types::Utf8Type;
137    /// use object_store::path::Path;
138    /// use lance_io::{local::LocalObjectReader, encodings::binary::BinaryDecoder, traits::Reader};
139    ///
140    /// async {
141    ///     let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048, None).await.unwrap();
142    ///     let string_decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), 100, 1024, true);
143    /// };
144    /// ```
145    pub fn new(reader: &'a dyn Reader, position: usize, length: usize, nullable: bool) -> Self {
146        Self {
147            reader,
148            position,
149            length,
150            nullable,
151            phantom: PhantomData,
152        }
153    }
154
155    /// Get the position array for the batch.
156    async fn get_positions(&self, index: Range<usize>) -> Result<Arc<Int64Array>> {
157        let position_decoder = PlainDecoder::new(
158            self.reader,
159            &DataType::Int64,
160            self.position,
161            self.length + 1,
162        )?;
163        let values = position_decoder.get(index.start..index.end + 1).await?;
164        Ok(Arc::new(as_primitive_array(&values).clone()))
165    }
166
167    fn count_nulls<O: OffsetSizeTrait>(offsets: &ScalarBuffer<O>) -> (usize, Option<Buffer>) {
168        let mut null_count = 0;
169        let mut null_buf = MutableBuffer::new_null(offsets.len() - 1);
170        offsets.windows(2).enumerate().for_each(|(idx, w)| {
171            if w[0] == w[1] {
172                bit_util::unset_bit(null_buf.as_mut(), idx);
173                null_count += 1;
174            } else {
175                bit_util::set_bit(null_buf.as_mut(), idx);
176            }
177        });
178        let null_buf = if null_count > 0 {
179            Some(null_buf.into())
180        } else {
181            None
182        };
183        (null_count, null_buf)
184    }
185
186    /// Read the array with batch positions and range.
187    ///
188    /// Parameters
189    ///
190    ///  - *positions*: position array for the batch.
191    ///  - *range*: range of rows to read.
192    async fn get_range(&self, positions: &Int64Array, range: Range<usize>) -> Result<ArrayRef> {
193        assert!(positions.len() >= range.end);
194        let start = positions.value(range.start);
195        let end = positions.value(range.end);
196
197        let start_scalar = Int64Array::new_scalar(start);
198
199        let slice = positions.slice(range.start, range.len() + 1);
200        let offset_data = if T::Offset::IS_LARGE {
201            sub(&slice, &start_scalar)?.into_data()
202        } else {
203            cast(
204                &(Arc::new(sub(&slice, &start_scalar)?) as ArrayRef),
205                &DataType::Int32,
206            )?
207            .into_data()
208        };
209
210        let bytes: Bytes = if start >= end {
211            Bytes::new()
212        } else {
213            self.reader.get_range(start as usize..end as usize).await?
214        };
215
216        let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
217            .len(range.len())
218            .null_count(0);
219
220        // Count nulls
221        if self.nullable {
222            let (null_count, null_buf) = Self::count_nulls(slice.values());
223            data_builder = data_builder
224                .null_count(null_count)
225                .null_bit_buffer(null_buf);
226        }
227
228        let buf = Buffer::from_bytes_bytes(bytes, /*bytes_per_value=*/ 1);
229        let array_data = data_builder
230            .add_buffer(offset_data.buffers()[0].clone())
231            .add_buffer(buf)
232            .build()?;
233
234        Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
235    }
236}
237
238#[derive(Debug)]
239struct TakeChunksPlan {
240    indices: UInt32Array,
241    is_contiguous: bool,
242}
243
244/// Group the indices into chunks, such that either:
245/// 1. the indices are contiguous (and non-repeating)
246/// 2. the values are within `min_io_size` of each other (and thus are worth
247///    grabbing in a single request)
248fn plan_take_chunks(
249    positions: &Int64Array,
250    indices: &UInt32Array,
251    min_io_size: i64,
252) -> Result<Vec<TakeChunksPlan>> {
253    let start = indices.value(0);
254    let indices = sub(indices, &UInt32Array::new_scalar(start))?;
255    let indices_ref = indices.as_primitive::<UInt32Type>();
256
257    let mut chunks: Vec<TakeChunksPlan> = vec![];
258    let mut start_idx = 0;
259    let mut last_idx: i64 = -1;
260    let mut is_contiguous = true;
261    for i in 0..indices.len() {
262        let current = indices_ref.value(i) as usize;
263        let curr_contiguous = current == start_idx || current as i64 - last_idx == 1;
264
265        if !curr_contiguous
266            && positions.value(current) - positions.value(indices_ref.value(start_idx) as usize)
267                > min_io_size
268        {
269            chunks.push(TakeChunksPlan {
270                indices: as_primitive_array(&indices.slice(start_idx, i - start_idx)).clone(),
271                is_contiguous,
272            });
273            start_idx = i;
274            is_contiguous = true;
275        } else {
276            is_contiguous &= curr_contiguous;
277        }
278
279        last_idx = current as i64;
280    }
281    chunks.push(TakeChunksPlan {
282        indices: as_primitive_array(&indices.slice(start_idx, indices.len() - start_idx)).clone(),
283        is_contiguous,
284    });
285
286    Ok(chunks)
287}
288
289#[async_trait]
290impl<T: ByteArrayType> Decoder for BinaryDecoder<'_, T> {
291    async fn decode(&self) -> Result<ArrayRef> {
292        self.get(..).await
293    }
294
295    /// Take the values at the given indices.
296    ///
297    /// This function assumes indices are sorted.
298    async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
299        if indices.is_empty() {
300            return Ok(new_empty_array(&T::DATA_TYPE));
301        }
302
303        let start = indices.value(0);
304        let end = indices.value(indices.len() - 1);
305
306        // TODO: make min batch size configurable.
307        // TODO: make reading positions in chunks too.
308        const MIN_IO_SIZE: i64 = 64 * 1024; // 64KB
309        let positions = self
310            .get_positions(start as usize..(end + 1) as usize)
311            .await?;
312        // Use indices and positions to pre-allocate an exact-size buffer
313        let capacity = indices
314            .iter()
315            .map(|i| {
316                let relative_index = (i.unwrap() - start) as usize;
317                let start = positions.value(relative_index) as usize;
318                let end = positions.value(relative_index + 1) as usize;
319                end - start
320            })
321            .sum();
322        let mut buffer = MutableBuffer::with_capacity(capacity);
323
324        let offsets_capacity = std::mem::size_of::<T::Offset>() * (indices.len() + 1);
325        let mut offsets = MutableBuffer::with_capacity(offsets_capacity);
326        let mut offset = T::Offset::from_usize(0).unwrap();
327        // Safety: We allocated appropriate capacity just above.
328        unsafe {
329            offsets.push_unchecked(offset);
330        }
331
332        let chunks = plan_take_chunks(&positions, indices, MIN_IO_SIZE)?;
333
334        let positions_ref = positions.as_ref();
335        futures::stream::iter(chunks)
336            .map(|chunk| async move {
337                let chunk_offset = chunk.indices.value(0);
338                let chunk_end = chunk.indices.value(chunk.indices.len() - 1);
339                let array = self
340                    .get_range(positions_ref, chunk_offset as usize..chunk_end as usize + 1)
341                    .await?;
342                Result::Ok((chunk, chunk_offset, array))
343            })
344            .buffered(self.reader.io_parallelism())
345            .try_for_each(|(chunk, chunk_offset, array)| {
346                let array: &GenericByteArray<T> = array.as_bytes();
347
348                // Faster to do one large memcpy than O(n) small ones.
349                if chunk.is_contiguous {
350                    buffer.extend_from_slice(array.value_data());
351                }
352
353                // Append each value to the buffer in the correct order
354                for index in chunk.indices.values() {
355                    if !chunk.is_contiguous {
356                        let value = array.value((index - chunk_offset) as usize);
357                        let value_ref: &[u8] = value.as_ref();
358                        buffer.extend_from_slice(value_ref);
359                    }
360
361                    offset += array.value_length((index - chunk_offset) as usize);
362                    // Append next offset
363                    // Safety: We allocated appropriate capacity on initialization
364                    unsafe {
365                        offsets.push_unchecked(offset);
366                    }
367                }
368                futures::future::ready(Ok(()))
369            })
370            .await?;
371
372        let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
373            .len(indices.len())
374            .null_count(0);
375
376        let offsets: ScalarBuffer<T::Offset> = ScalarBuffer::from(Buffer::from(offsets));
377
378        // We should have pre-sized perfectly.
379        debug_assert_eq!(buffer.len(), capacity);
380
381        if self.nullable {
382            let (null_count, null_buf) = Self::count_nulls(&offsets);
383            data_builder = data_builder
384                .null_count(null_count)
385                .null_bit_buffer(null_buf);
386        }
387
388        let array_data = data_builder
389            .add_buffer(offsets.into_inner())
390            .add_buffer(buffer.into())
391            .build()?;
392
393        Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
394    }
395}
396
397#[async_trait]
398impl<T: ByteArrayType> AsyncIndex<usize> for BinaryDecoder<'_, T> {
399    type Output = Result<ArrayRef>;
400
401    async fn get(&self, index: usize) -> Self::Output {
402        self.get(index..index + 1).await
403    }
404}
405
406#[async_trait]
407impl<T: ByteArrayType> AsyncIndex<RangeFrom<usize>> for BinaryDecoder<'_, T> {
408    type Output = Result<ArrayRef>;
409
410    async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
411        self.get(index.start..self.length).await
412    }
413}
414
415#[async_trait]
416impl<T: ByteArrayType> AsyncIndex<RangeTo<usize>> for BinaryDecoder<'_, T> {
417    type Output = Result<ArrayRef>;
418
419    async fn get(&self, index: RangeTo<usize>) -> Self::Output {
420        self.get(0..index.end).await
421    }
422}
423
424#[async_trait]
425impl<T: ByteArrayType> AsyncIndex<RangeFull> for BinaryDecoder<'_, T> {
426    type Output = Result<ArrayRef>;
427
428    async fn get(&self, _: RangeFull) -> Self::Output {
429        self.get(0..self.length).await
430    }
431}
432
433#[async_trait]
434impl<T: ByteArrayType> AsyncIndex<ReadBatchParams> for BinaryDecoder<'_, T> {
435    type Output = Result<ArrayRef>;
436
437    async fn get(&self, params: ReadBatchParams) -> Self::Output {
438        match params {
439            ReadBatchParams::Range(r) => self.get(r).await,
440            // Ranges not supported in v1 files
441            ReadBatchParams::Ranges(_) => unimplemented!(),
442            ReadBatchParams::RangeFull => self.get(..).await,
443            ReadBatchParams::RangeTo(r) => self.get(r).await,
444            ReadBatchParams::RangeFrom(r) => self.get(r).await,
445            ReadBatchParams::Indices(indices) => self.take(&indices).await,
446        }
447    }
448}
449
450#[async_trait]
451impl<T: ByteArrayType> AsyncIndex<Range<usize>> for BinaryDecoder<'_, T> {
452    type Output = Result<ArrayRef>;
453
454    async fn get(&self, index: Range<usize>) -> Self::Output {
455        let position_decoder = PlainDecoder::new(
456            self.reader,
457            &DataType::Int64,
458            self.position,
459            self.length + 1,
460        )?;
461        let positions = position_decoder.get(index.start..index.end + 1).await?;
462        let int64_positions: &Int64Array = as_primitive_array(&positions);
463
464        self.get_range(int64_positions, 0..index.len()).await
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    use arrow_array::{
473        types::GenericStringType, BinaryArray, GenericStringArray, LargeStringArray, StringArray,
474    };
475    use arrow_select::concat::concat;
476    use lance_core::utils::tempfile::TempStdFile;
477
478    use crate::local::LocalObjectReader;
479
480    async fn write_test_data<O: OffsetSizeTrait>(
481        path: impl AsRef<std::path::Path>,
482        arr: &[&GenericStringArray<O>],
483    ) -> Result<usize> {
484        let mut writer = tokio::fs::File::create(path).await?;
485        // Write some garbage to reset "tell()".
486        writer.write_all(b"1234").await.unwrap();
487        let mut encoder = BinaryEncoder::new(&mut writer);
488
489        let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
490        let pos = encoder.encode(arrs.as_slice()).await.unwrap();
491        writer.shutdown().await.unwrap();
492        Ok(pos)
493    }
494
495    async fn test_round_trips<O: OffsetSizeTrait>(arrs: &[&GenericStringArray<O>]) {
496        let path = TempStdFile::default();
497
498        let pos = write_test_data(&path, arrs).await.unwrap();
499
500        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
501            .await
502            .unwrap();
503        let read_len = arrs.iter().map(|a| a.len()).sum();
504        let decoder =
505            BinaryDecoder::<GenericStringType<O>>::new(reader.as_ref(), pos, read_len, true);
506        let actual_arr = decoder.decode().await.unwrap();
507
508        let arrs_ref = arrs.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
509        let expected = concat(arrs_ref.as_slice()).unwrap();
510        assert_eq!(
511            actual_arr
512                .as_any()
513                .downcast_ref::<GenericStringArray<O>>()
514                .unwrap(),
515            expected
516                .as_any()
517                .downcast_ref::<GenericStringArray<O>>()
518                .unwrap(),
519        );
520    }
521
522    #[tokio::test]
523    async fn test_write_binary_data() {
524        test_round_trips(&[&StringArray::from(vec!["a", "b", "cd", "efg"])]).await;
525        test_round_trips(&[&StringArray::from(vec![Some("a"), None, Some("cd"), None])]).await;
526        test_round_trips(&[
527            &StringArray::from(vec![Some("a"), None, Some("cd"), None]),
528            &StringArray::from(vec![Some("f"), None, Some("gh"), None]),
529            &StringArray::from(vec![Some("t"), None, Some("uv"), None]),
530        ])
531        .await;
532        test_round_trips(&[&LargeStringArray::from(vec!["a", "b", "cd", "efg"])]).await;
533        test_round_trips(&[&LargeStringArray::from(vec![
534            Some("a"),
535            None,
536            Some("cd"),
537            None,
538        ])])
539        .await;
540        test_round_trips(&[
541            &LargeStringArray::from(vec![Some("a"), Some("b")]),
542            &LargeStringArray::from(vec![Some("c")]),
543            &LargeStringArray::from(vec![Some("d"), Some("e")]),
544        ])
545        .await;
546    }
547
548    #[tokio::test]
549    async fn test_write_binary_data_with_offset() {
550        let array: StringArray = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1);
551        test_round_trips(&[&array]).await;
552    }
553
554    #[tokio::test]
555    async fn test_range_query() {
556        let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
557
558        let path = TempStdFile::default();
559        let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
560
561        // Write some garbage to reset "tell()".
562        object_writer.write_all(b"1234").await.unwrap();
563        let mut encoder = BinaryEncoder::new(&mut object_writer);
564        let pos = encoder.encode(&[&data]).await.unwrap();
565        object_writer.shutdown().await.unwrap();
566
567        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
568            .await
569            .unwrap();
570        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
571        assert_eq!(
572            decoder.decode().await.unwrap().as_ref(),
573            &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
574        );
575
576        assert_eq!(
577            decoder.get(..).await.unwrap().as_ref(),
578            &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
579        );
580
581        assert_eq!(
582            decoder.get(2..5).await.unwrap().as_ref(),
583            &StringArray::from_iter_values(["c", "d", "e"])
584        );
585
586        assert_eq!(
587            decoder.get(..5).await.unwrap().as_ref(),
588            &StringArray::from_iter_values(["a", "b", "c", "d", "e"])
589        );
590
591        assert_eq!(
592            decoder.get(4..).await.unwrap().as_ref(),
593            &StringArray::from_iter_values(["e", "f", "g"])
594        );
595        assert_eq!(
596            decoder.get(2..2).await.unwrap().as_ref(),
597            &new_empty_array(&DataType::Utf8)
598        );
599        assert!(decoder.get(100..100).await.is_err());
600    }
601
602    #[tokio::test]
603    async fn test_take() {
604        let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
605
606        let path = TempStdFile::default();
607
608        let pos = write_test_data(&path, &[&data]).await.unwrap();
609        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
610            .await
611            .unwrap();
612        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
613
614        let actual = decoder
615            .take(&UInt32Array::from_iter_values([1, 2, 5]))
616            .await
617            .unwrap();
618        assert_eq!(
619            actual.as_ref(),
620            &StringArray::from_iter_values(["b", "c", "f"])
621        );
622    }
623
624    #[tokio::test]
625    async fn test_take_sparse_indices() {
626        let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
627
628        let path = TempStdFile::default();
629        let pos = write_test_data(&path, &[&data]).await.unwrap();
630        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
631            .await
632            .unwrap();
633        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
634
635        let positions = decoder.get_positions(1..999998).await.unwrap();
636        let indices = UInt32Array::from_iter_values([1, 999998]);
637        let chunks = plan_take_chunks(positions.as_ref(), &indices, 64 * 1024).unwrap();
638        // Relative offset within the positions.
639        assert_eq!(chunks.len(), 2);
640        assert_eq!(chunks[0].indices, UInt32Array::from_iter_values([0]),);
641        assert_eq!(chunks[1].indices, UInt32Array::from_iter_values([999997]),);
642
643        let actual = decoder
644            .take(&UInt32Array::from_iter_values([1, 999998]))
645            .await
646            .unwrap();
647        assert_eq!(
648            actual.as_ref(),
649            &StringArray::from_iter_values(["string-1", "string-999998"])
650        );
651    }
652
653    #[tokio::test]
654    async fn test_take_dense_indices() {
655        let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
656
657        let path = TempStdFile::default();
658        let pos = write_test_data(&path, &[&data]).await.unwrap();
659
660        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
661            .await
662            .unwrap();
663        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
664
665        let positions = decoder.get_positions(1..999998).await.unwrap();
666        let indices = UInt32Array::from_iter_values([
667            2, 3, 4, 1001, 1001, 1002, 2001, 2002, 2004, 3004, 3005,
668        ]);
669
670        let chunks = plan_take_chunks(positions.as_ref(), &indices, 1024).unwrap();
671        assert_eq!(chunks.len(), 4);
672        // A contiguous range.
673        assert_eq!(chunks[0].indices, UInt32Array::from_iter_values(0..3));
674        assert!(chunks[0].is_contiguous);
675        // Not contiguous because of repeats
676        assert_eq!(
677            chunks[1].indices,
678            UInt32Array::from_iter_values([999, 999, 1000])
679        );
680        assert!(!chunks[1].is_contiguous);
681        // Not contiguous because of gaps
682        assert_eq!(
683            chunks[2].indices,
684            UInt32Array::from_iter_values([1999, 2000, 2002])
685        );
686        assert!(!chunks[2].is_contiguous);
687        // Another contiguous range, this time after non-contiguous ones
688        assert_eq!(
689            chunks[3].indices,
690            UInt32Array::from_iter_values([3002, 3003])
691        );
692        assert!(chunks[3].is_contiguous);
693
694        let actual = decoder.take(&indices).await.unwrap();
695        assert_eq!(
696            actual.as_ref(),
697            &StringArray::from_iter_values(indices.values().iter().map(|v| format!("string-{v}")))
698        );
699    }
700
701    #[tokio::test]
702    async fn test_write_slice() {
703        let path = TempStdFile::default();
704        let data = StringArray::from_iter_values((0..100).map(|v| format!("abcdef-{v:#03}")));
705
706        let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
707        let mut encoder = BinaryEncoder::new(&mut object_writer);
708        for i in 0..10 {
709            let pos = encoder.encode(&[&data.slice(i * 10, 10)]).await.unwrap();
710            assert_eq!(pos, (i * (8 * 11) /* offset array */ + (i + 1) * (10 * 10)));
711        }
712    }
713
714    #[tokio::test]
715    async fn test_write_binary_with_nulls() {
716        let data = BinaryArray::from_iter((0..60000).map(|v| {
717            if v % 4 != 0 {
718                Some::<&[u8]>(b"abcdefgh")
719            } else {
720                None
721            }
722        }));
723        let path = TempStdFile::default();
724
725        let pos = {
726            let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
727
728            // Write some garbage to reset "tell()".
729            object_writer.write_all(b"1234").await.unwrap();
730            let mut encoder = BinaryEncoder::new(&mut object_writer);
731
732            // let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
733            let pos = encoder.encode(&[&data]).await.unwrap();
734            object_writer.shutdown().await.unwrap();
735            pos
736        };
737
738        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
739            .await
740            .unwrap();
741        let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
742        let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]);
743        let actual = decoder.take(&idx).await.unwrap();
744        let values: Vec<Option<&[u8]>> = vec![None, Some(b"abcdefgh"), None];
745        assert_eq!(actual.as_binary::<i32>(), &BinaryArray::from(values));
746    }
747}