lance_io/encodings/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Var-length binary encoding.
5//!
6
7use std::marker::PhantomData;
8use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
9use std::sync::Arc;
10
11use arrow_arith::numeric::sub;
12use arrow_array::{
13    builder::{ArrayBuilder, PrimitiveBuilder},
14    cast::as_primitive_array,
15    cast::AsArray,
16    new_empty_array,
17    types::{
18        BinaryType, ByteArrayType, Int64Type, LargeBinaryType, LargeUtf8Type, UInt32Type, Utf8Type,
19    },
20    Array, ArrayRef, GenericByteArray, Int64Array, OffsetSizeTrait, UInt32Array,
21};
22use arrow_buffer::{bit_util, ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
23use arrow_cast::cast::cast;
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::{StreamExt, TryStreamExt};
29use lance_arrow::BufferExt;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32
33use super::ReadBatchParams;
34use super::{plain::PlainDecoder, AsyncIndex, Decoder, Encoder};
35use crate::traits::{Reader, Writer};
36use lance_core::Result;
37
38/// Encoder for Var-binary encoding.
39pub struct BinaryEncoder<'a> {
40    writer: &'a mut dyn Writer,
41}
42
43impl<'a> BinaryEncoder<'a> {
44    pub fn new(writer: &'a mut dyn Writer) -> Self {
45        Self { writer }
46    }
47
48    async fn encode_typed_arr<T: ByteArrayType>(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
49        let capacity: usize = arrs.iter().map(|a| a.len()).sum();
50        let mut pos_builder: PrimitiveBuilder<Int64Type> =
51            PrimitiveBuilder::with_capacity(capacity + 1);
52
53        let mut last_offset: usize = self.writer.tell().await?;
54        pos_builder.append_value(last_offset as i64);
55        for array in arrs.iter() {
56            let arr = array
57                .as_any()
58                .downcast_ref::<GenericByteArray<T>>()
59                .unwrap();
60
61            let offsets = arr.value_offsets();
62
63            let start = offsets[0].as_usize();
64            let end = offsets[offsets.len() - 1].as_usize();
65            let b = unsafe {
66                std::slice::from_raw_parts(
67                    arr.to_data().buffers()[1].as_ptr().add(start),
68                    end - start,
69                )
70            };
71            self.writer.write_all(b).await?;
72
73            let start_offset = offsets[0].as_usize();
74            offsets
75                .iter()
76                .skip(1)
77                .map(|b| b.as_usize() - start_offset + last_offset)
78                .for_each(|o| pos_builder.append_value(o as i64));
79            last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize;
80        }
81
82        let positions_offset = self.writer.tell().await?;
83        let pos_array = pos_builder.finish();
84        self.writer
85            .write_all(pos_array.to_data().buffers()[0].as_slice())
86            .await?;
87        Ok(positions_offset)
88    }
89}
90
91#[async_trait]
92impl Encoder for BinaryEncoder<'_> {
93    async fn encode(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
94        assert!(!arrs.is_empty());
95        let data_type = arrs[0].data_type();
96        match data_type {
97            DataType::Utf8 => self.encode_typed_arr::<Utf8Type>(arrs).await,
98            DataType::Binary => self.encode_typed_arr::<BinaryType>(arrs).await,
99            DataType::LargeUtf8 => self.encode_typed_arr::<LargeUtf8Type>(arrs).await,
100            DataType::LargeBinary => self.encode_typed_arr::<LargeBinaryType>(arrs).await,
101            _ => {
102                return Err(lance_core::Error::io(
103                    format!("Binary encoder does not support {}", data_type),
104                    location!(),
105                ));
106            }
107        }
108    }
109}
110
111/// Var-binary encoding decoder.
112pub struct BinaryDecoder<'a, T: ByteArrayType> {
113    reader: &'a dyn Reader,
114
115    position: usize,
116
117    length: usize,
118
119    nullable: bool,
120
121    phantom: PhantomData<T>,
122}
123
124/// Var-length Binary Decoder
125///
126impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
127    /// Create a [BinaryEncoder] to decode one batch.
128    ///
129    ///  - `position`, file position where this batch starts.
130    ///  - `length`, the number of records in this batch.
131    ///  - `nullable`, whether this batch contains nullable value.
132    ///
133    /// ## Example
134    ///
135    /// ```rust
136    /// use arrow_array::types::Utf8Type;
137    /// use object_store::path::Path;
138    /// use lance_io::{local::LocalObjectReader, encodings::binary::BinaryDecoder, traits::Reader};
139    ///
140    /// async {
141    ///     let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048, None).await.unwrap();
142    ///     let string_decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), 100, 1024, true);
143    /// };
144    /// ```
145    pub fn new(reader: &'a dyn Reader, position: usize, length: usize, nullable: bool) -> Self {
146        Self {
147            reader,
148            position,
149            length,
150            nullable,
151            phantom: PhantomData,
152        }
153    }
154
155    /// Get the position array for the batch.
156    async fn get_positions(&self, index: Range<usize>) -> Result<Arc<Int64Array>> {
157        let position_decoder = PlainDecoder::new(
158            self.reader,
159            &DataType::Int64,
160            self.position,
161            self.length + 1,
162        )?;
163        let values = position_decoder.get(index.start..index.end + 1).await?;
164        Ok(Arc::new(as_primitive_array(&values).clone()))
165    }
166
167    fn count_nulls<O: OffsetSizeTrait>(offsets: &ScalarBuffer<O>) -> (usize, Option<Buffer>) {
168        let mut null_count = 0;
169        let mut null_buf = MutableBuffer::new_null(offsets.len() - 1);
170        offsets.windows(2).enumerate().for_each(|(idx, w)| {
171            if w[0] == w[1] {
172                bit_util::unset_bit(null_buf.as_mut(), idx);
173                null_count += 1;
174            } else {
175                bit_util::set_bit(null_buf.as_mut(), idx);
176            }
177        });
178        let null_buf = if null_count > 0 {
179            Some(null_buf.into())
180        } else {
181            None
182        };
183        (null_count, null_buf)
184    }
185
186    /// Read the array with batch positions and range.
187    ///
188    /// Parameters
189    ///
190    ///  - *positions*: position array for the batch.
191    ///  - *range*: range of rows to read.
192    async fn get_range(&self, positions: &Int64Array, range: Range<usize>) -> Result<ArrayRef> {
193        assert!(positions.len() >= range.end);
194        let start = positions.value(range.start);
195        let end = positions.value(range.end);
196
197        let start_scalar = Int64Array::new_scalar(start);
198
199        let slice = positions.slice(range.start, range.len() + 1);
200        let offset_data = if T::Offset::IS_LARGE {
201            sub(&slice, &start_scalar)?.into_data()
202        } else {
203            cast(
204                &(Arc::new(sub(&slice, &start_scalar)?) as ArrayRef),
205                &DataType::Int32,
206            )?
207            .into_data()
208        };
209
210        let bytes: Bytes = if start >= end {
211            Bytes::new()
212        } else {
213            self.reader.get_range(start as usize..end as usize).await?
214        };
215
216        let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
217            .len(range.len())
218            .null_count(0);
219
220        // Count nulls
221        if self.nullable {
222            let (null_count, null_buf) = Self::count_nulls(slice.values());
223            data_builder = data_builder
224                .null_count(null_count)
225                .null_bit_buffer(null_buf);
226        }
227
228        let buf = Buffer::from_bytes_bytes(bytes, /*bytes_per_value=*/ 1);
229        let array_data = data_builder
230            .add_buffer(offset_data.buffers()[0].clone())
231            .add_buffer(buf)
232            .build()?;
233
234        Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
235    }
236}
237
238#[derive(Debug)]
239struct TakeChunksPlan {
240    indices: UInt32Array,
241    is_contiguous: bool,
242}
243
244/// Group the indices into chunks, such that either:
245/// 1. the indices are contiguous (and non-repeating)
246/// 2. the values are within `min_io_size` of each other (and thus are worth
247///    grabbing in a single request)
248fn plan_take_chunks(
249    positions: &Int64Array,
250    indices: &UInt32Array,
251    min_io_size: i64,
252) -> Result<Vec<TakeChunksPlan>> {
253    let start = indices.value(0);
254    let indices = sub(indices, &UInt32Array::new_scalar(start))?;
255    let indices_ref = indices.as_primitive::<UInt32Type>();
256
257    let mut chunks: Vec<TakeChunksPlan> = vec![];
258    let mut start_idx = 0;
259    let mut last_idx: i64 = -1;
260    let mut is_contiguous = true;
261    for i in 0..indices.len() {
262        let current = indices_ref.value(i) as usize;
263        let curr_contiguous = current == start_idx || current as i64 - last_idx == 1;
264
265        if !curr_contiguous
266            && positions.value(current) - positions.value(indices_ref.value(start_idx) as usize)
267                > min_io_size
268        {
269            chunks.push(TakeChunksPlan {
270                indices: as_primitive_array(&indices.slice(start_idx, i - start_idx)).clone(),
271                is_contiguous,
272            });
273            start_idx = i;
274            is_contiguous = true;
275        } else {
276            is_contiguous &= curr_contiguous;
277        }
278
279        last_idx = current as i64;
280    }
281    chunks.push(TakeChunksPlan {
282        indices: as_primitive_array(&indices.slice(start_idx, indices.len() - start_idx)).clone(),
283        is_contiguous,
284    });
285
286    Ok(chunks)
287}
288
289#[async_trait]
290impl<T: ByteArrayType> Decoder for BinaryDecoder<'_, T> {
291    async fn decode(&self) -> Result<ArrayRef> {
292        self.get(..).await
293    }
294
295    /// Take the values at the given indices.
296    ///
297    /// This function assumes indices are sorted.
298    async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
299        if indices.is_empty() {
300            return Ok(new_empty_array(&T::DATA_TYPE));
301        }
302
303        let start = indices.value(0);
304        let end = indices.value(indices.len() - 1);
305
306        // TODO: make min batch size configurable.
307        // TODO: make reading positions in chunks too.
308        const MIN_IO_SIZE: i64 = 64 * 1024; // 64KB
309        let positions = self
310            .get_positions(start as usize..(end + 1) as usize)
311            .await?;
312        // Use indices and positions to pre-allocate an exact-size buffer
313        let capacity = indices
314            .iter()
315            .map(|i| {
316                let relative_index = (i.unwrap() - start) as usize;
317                let start = positions.value(relative_index) as usize;
318                let end = positions.value(relative_index + 1) as usize;
319                end - start
320            })
321            .sum();
322        let mut buffer = MutableBuffer::with_capacity(capacity);
323
324        let offsets_capacity = std::mem::size_of::<T::Offset>() * (indices.len() + 1);
325        let mut offsets = MutableBuffer::with_capacity(offsets_capacity);
326        let mut offset = T::Offset::from_usize(0).unwrap();
327        // Safety: We allocated appropriate capacity just above.
328        unsafe {
329            offsets.push_unchecked(offset);
330        }
331
332        let chunks = plan_take_chunks(&positions, indices, MIN_IO_SIZE)?;
333
334        let positions_ref = positions.as_ref();
335        futures::stream::iter(chunks)
336            .map(|chunk| async move {
337                let chunk_offset = chunk.indices.value(0);
338                let chunk_end = chunk.indices.value(chunk.indices.len() - 1);
339                let array = self
340                    .get_range(positions_ref, chunk_offset as usize..chunk_end as usize + 1)
341                    .await?;
342                Result::Ok((chunk, chunk_offset, array))
343            })
344            .buffered(self.reader.io_parallelism())
345            .try_for_each(|(chunk, chunk_offset, array)| {
346                let array: &GenericByteArray<T> = array.as_bytes();
347
348                // Faster to do one large memcpy than O(n) small ones.
349                if chunk.is_contiguous {
350                    buffer.extend_from_slice(array.value_data());
351                }
352
353                // Append each value to the buffer in the correct order
354                for index in chunk.indices.values() {
355                    if !chunk.is_contiguous {
356                        let value = array.value((index - chunk_offset) as usize);
357                        let value_ref: &[u8] = value.as_ref();
358                        buffer.extend_from_slice(value_ref);
359                    }
360
361                    offset += array.value_length((index - chunk_offset) as usize);
362                    // Append next offset
363                    // Safety: We allocated appropriate capacity on initialization
364                    unsafe {
365                        offsets.push_unchecked(offset);
366                    }
367                }
368                futures::future::ready(Ok(()))
369            })
370            .await?;
371
372        let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
373            .len(indices.len())
374            .null_count(0);
375
376        let offsets: ScalarBuffer<T::Offset> = ScalarBuffer::from(Buffer::from(offsets));
377
378        // We should have pre-sized perfectly.
379        debug_assert_eq!(buffer.len(), capacity);
380
381        if self.nullable {
382            let (null_count, null_buf) = Self::count_nulls(&offsets);
383            data_builder = data_builder
384                .null_count(null_count)
385                .null_bit_buffer(null_buf);
386        }
387
388        let array_data = data_builder
389            .add_buffer(offsets.into_inner())
390            .add_buffer(buffer.into())
391            .build()?;
392
393        Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
394    }
395}
396
397#[async_trait]
398impl<T: ByteArrayType> AsyncIndex<usize> for BinaryDecoder<'_, T> {
399    type Output = Result<ArrayRef>;
400
401    async fn get(&self, index: usize) -> Self::Output {
402        self.get(index..index + 1).await
403    }
404}
405
406#[async_trait]
407impl<T: ByteArrayType> AsyncIndex<RangeFrom<usize>> for BinaryDecoder<'_, T> {
408    type Output = Result<ArrayRef>;
409
410    async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
411        self.get(index.start..self.length).await
412    }
413}
414
415#[async_trait]
416impl<T: ByteArrayType> AsyncIndex<RangeTo<usize>> for BinaryDecoder<'_, T> {
417    type Output = Result<ArrayRef>;
418
419    async fn get(&self, index: RangeTo<usize>) -> Self::Output {
420        self.get(0..index.end).await
421    }
422}
423
424#[async_trait]
425impl<T: ByteArrayType> AsyncIndex<RangeFull> for BinaryDecoder<'_, T> {
426    type Output = Result<ArrayRef>;
427
428    async fn get(&self, _: RangeFull) -> Self::Output {
429        self.get(0..self.length).await
430    }
431}
432
433#[async_trait]
434impl<T: ByteArrayType> AsyncIndex<ReadBatchParams> for BinaryDecoder<'_, T> {
435    type Output = Result<ArrayRef>;
436
437    async fn get(&self, params: ReadBatchParams) -> Self::Output {
438        match params {
439            ReadBatchParams::Range(r) => self.get(r).await,
440            // Ranges not supported in v1 files
441            ReadBatchParams::Ranges(_) => unimplemented!(),
442            ReadBatchParams::RangeFull => self.get(..).await,
443            ReadBatchParams::RangeTo(r) => self.get(r).await,
444            ReadBatchParams::RangeFrom(r) => self.get(r).await,
445            ReadBatchParams::Indices(indices) => self.take(&indices).await,
446        }
447    }
448}
449
450#[async_trait]
451impl<T: ByteArrayType> AsyncIndex<Range<usize>> for BinaryDecoder<'_, T> {
452    type Output = Result<ArrayRef>;
453
454    async fn get(&self, index: Range<usize>) -> Self::Output {
455        let position_decoder = PlainDecoder::new(
456            self.reader,
457            &DataType::Int64,
458            self.position,
459            self.length + 1,
460        )?;
461        let positions = position_decoder.get(index.start..index.end + 1).await?;
462        let int64_positions: &Int64Array = as_primitive_array(&positions);
463
464        self.get_range(int64_positions, 0..index.len()).await
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    use arrow_array::{
473        types::GenericStringType, BinaryArray, GenericStringArray, LargeStringArray, StringArray,
474    };
475    use arrow_select::concat::concat;
476
477    use crate::local::LocalObjectReader;
478
479    async fn write_test_data<O: OffsetSizeTrait>(
480        path: impl AsRef<std::path::Path>,
481        arr: &[&GenericStringArray<O>],
482    ) -> Result<usize> {
483        let mut writer = tokio::fs::File::create(path).await?;
484        // Write some garbage to reset "tell()".
485        writer.write_all(b"1234").await.unwrap();
486        let mut encoder = BinaryEncoder::new(&mut writer);
487
488        let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
489        let pos = encoder.encode(arrs.as_slice()).await.unwrap();
490        writer.shutdown().await.unwrap();
491        Ok(pos)
492    }
493
494    async fn test_round_trips<O: OffsetSizeTrait>(arrs: &[&GenericStringArray<O>]) {
495        let temp_dir = tempfile::tempdir().unwrap();
496        let path = temp_dir.path().join("foo");
497
498        let pos = write_test_data(&path, arrs).await.unwrap();
499
500        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
501            .await
502            .unwrap();
503        let read_len = arrs.iter().map(|a| a.len()).sum();
504        let decoder =
505            BinaryDecoder::<GenericStringType<O>>::new(reader.as_ref(), pos, read_len, true);
506        let actual_arr = decoder.decode().await.unwrap();
507
508        let arrs_ref = arrs.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
509        let expected = concat(arrs_ref.as_slice()).unwrap();
510        assert_eq!(
511            actual_arr
512                .as_any()
513                .downcast_ref::<GenericStringArray<O>>()
514                .unwrap(),
515            expected
516                .as_any()
517                .downcast_ref::<GenericStringArray<O>>()
518                .unwrap(),
519        );
520    }
521
522    #[tokio::test]
523    async fn test_write_binary_data() {
524        test_round_trips(&[&StringArray::from(vec!["a", "b", "cd", "efg"])]).await;
525        test_round_trips(&[&StringArray::from(vec![Some("a"), None, Some("cd"), None])]).await;
526        test_round_trips(&[
527            &StringArray::from(vec![Some("a"), None, Some("cd"), None]),
528            &StringArray::from(vec![Some("f"), None, Some("gh"), None]),
529            &StringArray::from(vec![Some("t"), None, Some("uv"), None]),
530        ])
531        .await;
532        test_round_trips(&[&LargeStringArray::from(vec!["a", "b", "cd", "efg"])]).await;
533        test_round_trips(&[&LargeStringArray::from(vec![
534            Some("a"),
535            None,
536            Some("cd"),
537            None,
538        ])])
539        .await;
540        test_round_trips(&[
541            &LargeStringArray::from(vec![Some("a"), Some("b")]),
542            &LargeStringArray::from(vec![Some("c")]),
543            &LargeStringArray::from(vec![Some("d"), Some("e")]),
544        ])
545        .await;
546    }
547
548    #[tokio::test]
549    async fn test_write_binary_data_with_offset() {
550        let array: StringArray = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1);
551        test_round_trips(&[&array]).await;
552    }
553
554    #[tokio::test]
555    async fn test_range_query() {
556        let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
557
558        let temp_dir = tempfile::tempdir().unwrap();
559        let path = temp_dir.path().join("foo");
560        let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
561
562        // Write some garbage to reset "tell()".
563        object_writer.write_all(b"1234").await.unwrap();
564        let mut encoder = BinaryEncoder::new(&mut object_writer);
565        let pos = encoder.encode(&[&data]).await.unwrap();
566        object_writer.shutdown().await.unwrap();
567
568        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
569            .await
570            .unwrap();
571        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
572        assert_eq!(
573            decoder.decode().await.unwrap().as_ref(),
574            &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
575        );
576
577        assert_eq!(
578            decoder.get(..).await.unwrap().as_ref(),
579            &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
580        );
581
582        assert_eq!(
583            decoder.get(2..5).await.unwrap().as_ref(),
584            &StringArray::from_iter_values(["c", "d", "e"])
585        );
586
587        assert_eq!(
588            decoder.get(..5).await.unwrap().as_ref(),
589            &StringArray::from_iter_values(["a", "b", "c", "d", "e"])
590        );
591
592        assert_eq!(
593            decoder.get(4..).await.unwrap().as_ref(),
594            &StringArray::from_iter_values(["e", "f", "g"])
595        );
596        assert_eq!(
597            decoder.get(2..2).await.unwrap().as_ref(),
598            &new_empty_array(&DataType::Utf8)
599        );
600        assert!(decoder.get(100..100).await.is_err());
601    }
602
603    #[tokio::test]
604    async fn test_take() {
605        let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
606
607        let temp_dir = tempfile::tempdir().unwrap();
608        let path = temp_dir.path().join("foo");
609
610        let pos = write_test_data(&path, &[&data]).await.unwrap();
611        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
612            .await
613            .unwrap();
614        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
615
616        let actual = decoder
617            .take(&UInt32Array::from_iter_values([1, 2, 5]))
618            .await
619            .unwrap();
620        assert_eq!(
621            actual.as_ref(),
622            &StringArray::from_iter_values(["b", "c", "f"])
623        );
624    }
625
626    #[tokio::test]
627    async fn test_take_sparse_indices() {
628        let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
629
630        let temp_dir = tempfile::tempdir().unwrap();
631        let path = temp_dir.path().join("foo");
632        let pos = write_test_data(&path, &[&data]).await.unwrap();
633        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
634            .await
635            .unwrap();
636        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
637
638        let positions = decoder.get_positions(1..999998).await.unwrap();
639        let indices = UInt32Array::from_iter_values([1, 999998]);
640        let chunks = plan_take_chunks(positions.as_ref(), &indices, 64 * 1024).unwrap();
641        // Relative offset within the positions.
642        assert_eq!(chunks.len(), 2);
643        assert_eq!(chunks[0].indices, UInt32Array::from_iter_values([0]),);
644        assert_eq!(chunks[1].indices, UInt32Array::from_iter_values([999997]),);
645
646        let actual = decoder
647            .take(&UInt32Array::from_iter_values([1, 999998]))
648            .await
649            .unwrap();
650        assert_eq!(
651            actual.as_ref(),
652            &StringArray::from_iter_values(["string-1", "string-999998"])
653        );
654    }
655
656    #[tokio::test]
657    async fn test_take_dense_indices() {
658        let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
659
660        let temp_dir = tempfile::tempdir().unwrap();
661        let path = temp_dir.path().join("foo");
662        let pos = write_test_data(&path, &[&data]).await.unwrap();
663
664        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
665            .await
666            .unwrap();
667        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
668
669        let positions = decoder.get_positions(1..999998).await.unwrap();
670        let indices = UInt32Array::from_iter_values([
671            2, 3, 4, 1001, 1001, 1002, 2001, 2002, 2004, 3004, 3005,
672        ]);
673
674        let chunks = plan_take_chunks(positions.as_ref(), &indices, 1024).unwrap();
675        assert_eq!(chunks.len(), 4);
676        // A contiguous range.
677        assert_eq!(chunks[0].indices, UInt32Array::from_iter_values(0..3));
678        assert!(chunks[0].is_contiguous);
679        // Not contiguous because of repeats
680        assert_eq!(
681            chunks[1].indices,
682            UInt32Array::from_iter_values([999, 999, 1000])
683        );
684        assert!(!chunks[1].is_contiguous);
685        // Not contiguous because of gaps
686        assert_eq!(
687            chunks[2].indices,
688            UInt32Array::from_iter_values([1999, 2000, 2002])
689        );
690        assert!(!chunks[2].is_contiguous);
691        // Another contiguous range, this time after non-contiguous ones
692        assert_eq!(
693            chunks[3].indices,
694            UInt32Array::from_iter_values([3002, 3003])
695        );
696        assert!(chunks[3].is_contiguous);
697
698        let actual = decoder.take(&indices).await.unwrap();
699        assert_eq!(
700            actual.as_ref(),
701            &StringArray::from_iter_values(indices.values().iter().map(|v| format!("string-{v}")))
702        );
703    }
704
705    #[tokio::test]
706    async fn test_write_slice() {
707        let temp_dir = tempfile::tempdir().unwrap();
708        let path = temp_dir.path().join("slices");
709        let data = StringArray::from_iter_values((0..100).map(|v| format!("abcdef-{v:#03}")));
710
711        let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
712        let mut encoder = BinaryEncoder::new(&mut object_writer);
713        for i in 0..10 {
714            let pos = encoder.encode(&[&data.slice(i * 10, 10)]).await.unwrap();
715            assert_eq!(pos, (i * (8 * 11) /* offset array */ + (i + 1) * (10 * 10)));
716        }
717    }
718
719    #[tokio::test]
720    async fn test_write_binary_with_nulls() {
721        let data = BinaryArray::from_iter((0..60000).map(|v| {
722            if v % 4 != 0 {
723                Some::<&[u8]>(b"abcdefgh")
724            } else {
725                None
726            }
727        }));
728        let temp_dir = tempfile::tempdir().unwrap();
729        let path = temp_dir.path().join("nulls");
730
731        let pos = {
732            let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
733
734            // Write some garbage to reset "tell()".
735            object_writer.write_all(b"1234").await.unwrap();
736            let mut encoder = BinaryEncoder::new(&mut object_writer);
737
738            // let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
739            let pos = encoder.encode(&[&data]).await.unwrap();
740            object_writer.shutdown().await.unwrap();
741            pos
742        };
743
744        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
745            .await
746            .unwrap();
747        let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
748        let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]);
749        let actual = decoder.take(&idx).await.unwrap();
750        let values: Vec<Option<&[u8]>> = vec![None, Some(b"abcdefgh"), None];
751        assert_eq!(actual.as_binary::<i32>(), &BinaryArray::from(values));
752    }
753}