Skip to main content

lance_io/encodings/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Var-length binary encoding.
5//!
6
7use std::marker::PhantomData;
8use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
9use std::sync::Arc;
10
11use arrow_arith::numeric::sub;
12use arrow_array::{
13    Array, ArrayRef, GenericByteArray, Int64Array, OffsetSizeTrait, UInt32Array,
14    builder::{ArrayBuilder, PrimitiveBuilder},
15    cast::AsArray,
16    cast::as_primitive_array,
17    new_empty_array,
18    types::{
19        BinaryType, ByteArrayType, Int64Type, LargeBinaryType, LargeUtf8Type, UInt32Type, Utf8Type,
20    },
21};
22use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer, bit_util};
23use arrow_cast::cast::cast;
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::{StreamExt, TryStreamExt};
29use lance_arrow::BufferExt;
30use tokio::io::AsyncWriteExt;
31
32use super::ReadBatchParams;
33use super::{AsyncIndex, Decoder, Encoder, plain::PlainDecoder};
34use crate::traits::{Reader, Writer};
35use lance_core::Result;
36
37/// Encoder for Var-binary encoding.
38pub struct BinaryEncoder<'a> {
39    writer: &'a mut dyn Writer,
40}
41
42impl<'a> BinaryEncoder<'a> {
43    pub fn new(writer: &'a mut dyn Writer) -> Self {
44        Self { writer }
45    }
46
47    async fn encode_typed_arr<T: ByteArrayType>(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
48        let capacity: usize = arrs.iter().map(|a| a.len()).sum();
49        let mut pos_builder: PrimitiveBuilder<Int64Type> =
50            PrimitiveBuilder::with_capacity(capacity + 1);
51
52        let mut last_offset: usize = self.writer.tell().await?;
53        pos_builder.append_value(last_offset as i64);
54        for array in arrs.iter() {
55            let arr = array
56                .as_any()
57                .downcast_ref::<GenericByteArray<T>>()
58                .unwrap();
59
60            let offsets = arr.value_offsets();
61
62            let start = offsets[0].as_usize();
63            let end = offsets[offsets.len() - 1].as_usize();
64            let b = unsafe {
65                std::slice::from_raw_parts(
66                    arr.to_data().buffers()[1].as_ptr().add(start),
67                    end - start,
68                )
69            };
70            self.writer.write_all(b).await?;
71
72            let start_offset = offsets[0].as_usize();
73            offsets
74                .iter()
75                .skip(1)
76                .map(|b| b.as_usize() - start_offset + last_offset)
77                .for_each(|o| pos_builder.append_value(o as i64));
78            last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize;
79        }
80
81        let positions_offset = self.writer.tell().await?;
82        let pos_array = pos_builder.finish();
83        self.writer
84            .write_all(pos_array.to_data().buffers()[0].as_slice())
85            .await?;
86        Ok(positions_offset)
87    }
88}
89
90#[async_trait]
91impl Encoder for BinaryEncoder<'_> {
92    async fn encode(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
93        assert!(!arrs.is_empty());
94        let data_type = arrs[0].data_type();
95        match data_type {
96            DataType::Utf8 => self.encode_typed_arr::<Utf8Type>(arrs).await,
97            DataType::Binary => self.encode_typed_arr::<BinaryType>(arrs).await,
98            DataType::LargeUtf8 => self.encode_typed_arr::<LargeUtf8Type>(arrs).await,
99            DataType::LargeBinary => self.encode_typed_arr::<LargeBinaryType>(arrs).await,
100            _ => {
101                return Err(lance_core::Error::invalid_input(format!(
102                    "Unsupported data type for binary encoding: {}",
103                    data_type
104                )));
105            }
106        }
107    }
108}
109
110/// Var-binary encoding decoder.
111pub struct BinaryDecoder<'a, T: ByteArrayType> {
112    reader: &'a dyn Reader,
113
114    position: usize,
115
116    length: usize,
117
118    nullable: bool,
119
120    phantom: PhantomData<T>,
121}
122
123/// Var-length Binary Decoder
124///
125impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
126    /// Create a [BinaryEncoder] to decode one batch.
127    ///
128    ///  - `position`, file position where this batch starts.
129    ///  - `length`, the number of records in this batch.
130    ///  - `nullable`, whether this batch contains nullable value.
131    ///
132    /// ## Example
133    ///
134    /// ```rust
135    /// use arrow_array::types::Utf8Type;
136    /// use object_store::path::Path;
137    /// use lance_io::{local::LocalObjectReader, encodings::binary::BinaryDecoder, traits::Reader};
138    ///
139    /// async {
140    ///     let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048, None).await.unwrap();
141    ///     let string_decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), 100, 1024, true);
142    /// };
143    /// ```
144    pub fn new(reader: &'a dyn Reader, position: usize, length: usize, nullable: bool) -> Self {
145        Self {
146            reader,
147            position,
148            length,
149            nullable,
150            phantom: PhantomData,
151        }
152    }
153
154    /// Get the position array for the batch.
155    async fn get_positions(&self, index: Range<usize>) -> Result<Arc<Int64Array>> {
156        let position_decoder = PlainDecoder::new(
157            self.reader,
158            &DataType::Int64,
159            self.position,
160            self.length + 1,
161        )?;
162        let values = position_decoder.get(index.start..index.end + 1).await?;
163        Ok(Arc::new(as_primitive_array(&values).clone()))
164    }
165
166    fn count_nulls<O: OffsetSizeTrait>(offsets: &ScalarBuffer<O>) -> (usize, Option<Buffer>) {
167        let mut null_count = 0;
168        let mut null_buf = MutableBuffer::new_null(offsets.len() - 1);
169        offsets.windows(2).enumerate().for_each(|(idx, w)| {
170            if w[0] == w[1] {
171                bit_util::unset_bit(null_buf.as_mut(), idx);
172                null_count += 1;
173            } else {
174                bit_util::set_bit(null_buf.as_mut(), idx);
175            }
176        });
177        let null_buf = if null_count > 0 {
178            Some(null_buf.into())
179        } else {
180            None
181        };
182        (null_count, null_buf)
183    }
184
185    /// Read the array with batch positions and range.
186    ///
187    /// Parameters
188    ///
189    ///  - *positions*: position array for the batch.
190    ///  - *range*: range of rows to read.
191    async fn get_range(&self, positions: &Int64Array, range: Range<usize>) -> Result<ArrayRef> {
192        assert!(positions.len() >= range.end);
193        let start = positions.value(range.start);
194        let end = positions.value(range.end);
195
196        let start_scalar = Int64Array::new_scalar(start);
197
198        let slice = positions.slice(range.start, range.len() + 1);
199        let offset_data = if T::Offset::IS_LARGE {
200            sub(&slice, &start_scalar)?.into_data()
201        } else {
202            cast(
203                &(Arc::new(sub(&slice, &start_scalar)?) as ArrayRef),
204                &DataType::Int32,
205            )?
206            .into_data()
207        };
208
209        let bytes: Bytes = if start >= end {
210            Bytes::new()
211        } else {
212            self.reader.get_range(start as usize..end as usize).await?
213        };
214
215        let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
216            .len(range.len())
217            .null_count(0);
218
219        // Count nulls
220        if self.nullable {
221            let (null_count, null_buf) = Self::count_nulls(slice.values());
222            data_builder = data_builder
223                .null_count(null_count)
224                .null_bit_buffer(null_buf);
225        }
226
227        let buf = Buffer::from_bytes_bytes(bytes, /*bytes_per_value=*/ 1);
228        let array_data = data_builder
229            .add_buffer(offset_data.buffers()[0].clone())
230            .add_buffer(buf)
231            .build()?;
232
233        Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
234    }
235}
236
237#[derive(Debug)]
238struct TakeChunksPlan {
239    indices: UInt32Array,
240    is_contiguous: bool,
241}
242
243/// Group the indices into chunks, such that either:
244/// 1. the indices are contiguous (and non-repeating)
245/// 2. the values are within `min_io_size` of each other (and thus are worth
246///    grabbing in a single request)
247fn plan_take_chunks(
248    positions: &Int64Array,
249    indices: &UInt32Array,
250    min_io_size: i64,
251) -> Result<Vec<TakeChunksPlan>> {
252    let start = indices.value(0);
253    let indices = sub(indices, &UInt32Array::new_scalar(start))?;
254    let indices_ref = indices.as_primitive::<UInt32Type>();
255
256    let mut chunks: Vec<TakeChunksPlan> = vec![];
257    let mut start_idx = 0;
258    let mut last_idx: i64 = -1;
259    let mut is_contiguous = true;
260    for i in 0..indices.len() {
261        let current = indices_ref.value(i) as usize;
262        let curr_contiguous = current == start_idx || current as i64 - last_idx == 1;
263
264        if !curr_contiguous
265            && positions.value(current) - positions.value(indices_ref.value(start_idx) as usize)
266                > min_io_size
267        {
268            chunks.push(TakeChunksPlan {
269                indices: as_primitive_array(&indices.slice(start_idx, i - start_idx)).clone(),
270                is_contiguous,
271            });
272            start_idx = i;
273            is_contiguous = true;
274        } else {
275            is_contiguous &= curr_contiguous;
276        }
277
278        last_idx = current as i64;
279    }
280    chunks.push(TakeChunksPlan {
281        indices: as_primitive_array(&indices.slice(start_idx, indices.len() - start_idx)).clone(),
282        is_contiguous,
283    });
284
285    Ok(chunks)
286}
287
288#[async_trait]
289impl<T: ByteArrayType> Decoder for BinaryDecoder<'_, T> {
290    async fn decode(&self) -> Result<ArrayRef> {
291        self.get(..).await
292    }
293
294    /// Take the values at the given indices.
295    ///
296    /// This function assumes indices are sorted.
297    async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
298        if indices.is_empty() {
299            return Ok(new_empty_array(&T::DATA_TYPE));
300        }
301
302        let start = indices.value(0);
303        let end = indices.value(indices.len() - 1);
304
305        // TODO: make min batch size configurable.
306        // TODO: make reading positions in chunks too.
307        const MIN_IO_SIZE: i64 = 64 * 1024; // 64KB
308        let positions = self
309            .get_positions(start as usize..(end + 1) as usize)
310            .await?;
311        // Use indices and positions to pre-allocate an exact-size buffer
312        let capacity = indices
313            .iter()
314            .map(|i| {
315                let relative_index = (i.unwrap() - start) as usize;
316                let start = positions.value(relative_index) as usize;
317                let end = positions.value(relative_index + 1) as usize;
318                end - start
319            })
320            .sum();
321        let mut buffer = MutableBuffer::with_capacity(capacity);
322
323        let offsets_capacity = std::mem::size_of::<T::Offset>() * (indices.len() + 1);
324        let mut offsets = MutableBuffer::with_capacity(offsets_capacity);
325        let mut offset = T::Offset::from_usize(0).unwrap();
326        // Safety: We allocated appropriate capacity just above.
327        unsafe {
328            offsets.push_unchecked(offset);
329        }
330
331        let chunks = plan_take_chunks(&positions, indices, MIN_IO_SIZE)?;
332
333        let positions_ref = positions.as_ref();
334        futures::stream::iter(chunks)
335            .map(|chunk| async move {
336                let chunk_offset = chunk.indices.value(0);
337                let chunk_end = chunk.indices.value(chunk.indices.len() - 1);
338                let array = self
339                    .get_range(positions_ref, chunk_offset as usize..chunk_end as usize + 1)
340                    .await?;
341                Result::Ok((chunk, chunk_offset, array))
342            })
343            .buffered(self.reader.io_parallelism())
344            .try_for_each(|(chunk, chunk_offset, array)| {
345                let array: &GenericByteArray<T> = array.as_bytes();
346
347                // Faster to do one large memcpy than O(n) small ones.
348                if chunk.is_contiguous {
349                    buffer.extend_from_slice(array.value_data());
350                }
351
352                // Append each value to the buffer in the correct order
353                for index in chunk.indices.values() {
354                    if !chunk.is_contiguous {
355                        let value = array.value((index - chunk_offset) as usize);
356                        let value_ref: &[u8] = value.as_ref();
357                        buffer.extend_from_slice(value_ref);
358                    }
359
360                    offset += array.value_length((index - chunk_offset) as usize);
361                    // Append next offset
362                    // Safety: We allocated appropriate capacity on initialization
363                    unsafe {
364                        offsets.push_unchecked(offset);
365                    }
366                }
367                futures::future::ready(Ok(()))
368            })
369            .await?;
370
371        let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
372            .len(indices.len())
373            .null_count(0);
374
375        let offsets: ScalarBuffer<T::Offset> = ScalarBuffer::from(Buffer::from(offsets));
376
377        // We should have pre-sized perfectly.
378        debug_assert_eq!(buffer.len(), capacity);
379
380        if self.nullable {
381            let (null_count, null_buf) = Self::count_nulls(&offsets);
382            data_builder = data_builder
383                .null_count(null_count)
384                .null_bit_buffer(null_buf);
385        }
386
387        let array_data = data_builder
388            .add_buffer(offsets.into_inner())
389            .add_buffer(buffer.into())
390            .build()?;
391
392        Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
393    }
394}
395
396#[async_trait]
397impl<T: ByteArrayType> AsyncIndex<usize> for BinaryDecoder<'_, T> {
398    type Output = Result<ArrayRef>;
399
400    async fn get(&self, index: usize) -> Self::Output {
401        self.get(index..index + 1).await
402    }
403}
404
405#[async_trait]
406impl<T: ByteArrayType> AsyncIndex<RangeFrom<usize>> for BinaryDecoder<'_, T> {
407    type Output = Result<ArrayRef>;
408
409    async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
410        self.get(index.start..self.length).await
411    }
412}
413
414#[async_trait]
415impl<T: ByteArrayType> AsyncIndex<RangeTo<usize>> for BinaryDecoder<'_, T> {
416    type Output = Result<ArrayRef>;
417
418    async fn get(&self, index: RangeTo<usize>) -> Self::Output {
419        self.get(0..index.end).await
420    }
421}
422
423#[async_trait]
424impl<T: ByteArrayType> AsyncIndex<RangeFull> for BinaryDecoder<'_, T> {
425    type Output = Result<ArrayRef>;
426
427    async fn get(&self, _: RangeFull) -> Self::Output {
428        self.get(0..self.length).await
429    }
430}
431
432#[async_trait]
433impl<T: ByteArrayType> AsyncIndex<ReadBatchParams> for BinaryDecoder<'_, T> {
434    type Output = Result<ArrayRef>;
435
436    async fn get(&self, params: ReadBatchParams) -> Self::Output {
437        match params {
438            ReadBatchParams::Range(r) => self.get(r).await,
439            // Ranges not supported in v1 files
440            ReadBatchParams::Ranges(_) => unimplemented!(),
441            ReadBatchParams::RangeFull => self.get(..).await,
442            ReadBatchParams::RangeTo(r) => self.get(r).await,
443            ReadBatchParams::RangeFrom(r) => self.get(r).await,
444            ReadBatchParams::Indices(indices) => self.take(&indices).await,
445        }
446    }
447}
448
449#[async_trait]
450impl<T: ByteArrayType> AsyncIndex<Range<usize>> for BinaryDecoder<'_, T> {
451    type Output = Result<ArrayRef>;
452
453    async fn get(&self, index: Range<usize>) -> Self::Output {
454        let position_decoder = PlainDecoder::new(
455            self.reader,
456            &DataType::Int64,
457            self.position,
458            self.length + 1,
459        )?;
460        let positions = position_decoder.get(index.start..index.end + 1).await?;
461        let int64_positions: &Int64Array = as_primitive_array(&positions);
462
463        self.get_range(int64_positions, 0..index.len()).await
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    use arrow_array::{
472        BinaryArray, GenericStringArray, LargeStringArray, StringArray, types::GenericStringType,
473    };
474    use arrow_select::concat::concat;
475    use lance_core::utils::tempfile::TempStdFile;
476
477    use crate::local::LocalObjectReader;
478
479    async fn write_test_data<O: OffsetSizeTrait>(
480        path: impl AsRef<std::path::Path>,
481        arr: &[&GenericStringArray<O>],
482    ) -> Result<usize> {
483        let mut writer = tokio::fs::File::create(path).await?;
484        // Write some garbage to reset "tell()".
485        writer.write_all(b"1234").await.unwrap();
486        let mut encoder = BinaryEncoder::new(&mut writer);
487
488        let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
489        let pos = encoder.encode(arrs.as_slice()).await.unwrap();
490        AsyncWriteExt::shutdown(&mut writer).await.unwrap();
491        Ok(pos)
492    }
493
494    async fn test_round_trips<O: OffsetSizeTrait>(arrs: &[&GenericStringArray<O>]) {
495        let path = TempStdFile::default();
496
497        let pos = write_test_data(&path, arrs).await.unwrap();
498
499        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
500            .await
501            .unwrap();
502        let read_len = arrs.iter().map(|a| a.len()).sum();
503        let decoder =
504            BinaryDecoder::<GenericStringType<O>>::new(reader.as_ref(), pos, read_len, true);
505        let actual_arr = decoder.decode().await.unwrap();
506
507        let arrs_ref = arrs.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
508        let expected = concat(arrs_ref.as_slice()).unwrap();
509        assert_eq!(
510            actual_arr
511                .as_any()
512                .downcast_ref::<GenericStringArray<O>>()
513                .unwrap(),
514            expected
515                .as_any()
516                .downcast_ref::<GenericStringArray<O>>()
517                .unwrap(),
518        );
519    }
520
521    #[tokio::test]
522    async fn test_write_binary_data() {
523        test_round_trips(&[&StringArray::from(vec!["a", "b", "cd", "efg"])]).await;
524        test_round_trips(&[&StringArray::from(vec![Some("a"), None, Some("cd"), None])]).await;
525        test_round_trips(&[
526            &StringArray::from(vec![Some("a"), None, Some("cd"), None]),
527            &StringArray::from(vec![Some("f"), None, Some("gh"), None]),
528            &StringArray::from(vec![Some("t"), None, Some("uv"), None]),
529        ])
530        .await;
531        test_round_trips(&[&LargeStringArray::from(vec!["a", "b", "cd", "efg"])]).await;
532        test_round_trips(&[&LargeStringArray::from(vec![
533            Some("a"),
534            None,
535            Some("cd"),
536            None,
537        ])])
538        .await;
539        test_round_trips(&[
540            &LargeStringArray::from(vec![Some("a"), Some("b")]),
541            &LargeStringArray::from(vec![Some("c")]),
542            &LargeStringArray::from(vec![Some("d"), Some("e")]),
543        ])
544        .await;
545    }
546
547    #[tokio::test]
548    async fn test_write_binary_data_with_offset() {
549        let array: StringArray = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1);
550        test_round_trips(&[&array]).await;
551    }
552
553    #[tokio::test]
554    async fn test_range_query() {
555        let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
556
557        let path = TempStdFile::default();
558        let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
559
560        // Write some garbage to reset "tell()".
561        object_writer.write_all(b"1234").await.unwrap();
562        let mut encoder = BinaryEncoder::new(&mut object_writer);
563        let pos = encoder.encode(&[&data]).await.unwrap();
564        AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();
565
566        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
567            .await
568            .unwrap();
569        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
570        assert_eq!(
571            decoder.decode().await.unwrap().as_ref(),
572            &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
573        );
574
575        assert_eq!(
576            decoder.get(..).await.unwrap().as_ref(),
577            &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
578        );
579
580        assert_eq!(
581            decoder.get(2..5).await.unwrap().as_ref(),
582            &StringArray::from_iter_values(["c", "d", "e"])
583        );
584
585        assert_eq!(
586            decoder.get(..5).await.unwrap().as_ref(),
587            &StringArray::from_iter_values(["a", "b", "c", "d", "e"])
588        );
589
590        assert_eq!(
591            decoder.get(4..).await.unwrap().as_ref(),
592            &StringArray::from_iter_values(["e", "f", "g"])
593        );
594        assert_eq!(
595            decoder.get(2..2).await.unwrap().as_ref(),
596            &new_empty_array(&DataType::Utf8)
597        );
598        assert!(decoder.get(100..100).await.is_err());
599    }
600
601    #[tokio::test]
602    async fn test_take() {
603        let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
604
605        let path = TempStdFile::default();
606
607        let pos = write_test_data(&path, &[&data]).await.unwrap();
608        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
609            .await
610            .unwrap();
611        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
612
613        let actual = decoder
614            .take(&UInt32Array::from_iter_values([1, 2, 5]))
615            .await
616            .unwrap();
617        assert_eq!(
618            actual.as_ref(),
619            &StringArray::from_iter_values(["b", "c", "f"])
620        );
621    }
622
623    #[tokio::test]
624    async fn test_take_sparse_indices() {
625        let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
626
627        let path = TempStdFile::default();
628        let pos = write_test_data(&path, &[&data]).await.unwrap();
629        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
630            .await
631            .unwrap();
632        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
633
634        let positions = decoder.get_positions(1..999998).await.unwrap();
635        let indices = UInt32Array::from_iter_values([1, 999998]);
636        let chunks = plan_take_chunks(positions.as_ref(), &indices, 64 * 1024).unwrap();
637        // Relative offset within the positions.
638        assert_eq!(chunks.len(), 2);
639        assert_eq!(chunks[0].indices, UInt32Array::from_iter_values([0]),);
640        assert_eq!(chunks[1].indices, UInt32Array::from_iter_values([999997]),);
641
642        let actual = decoder
643            .take(&UInt32Array::from_iter_values([1, 999998]))
644            .await
645            .unwrap();
646        assert_eq!(
647            actual.as_ref(),
648            &StringArray::from_iter_values(["string-1", "string-999998"])
649        );
650    }
651
652    #[tokio::test]
653    async fn test_take_dense_indices() {
654        let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
655
656        let path = TempStdFile::default();
657        let pos = write_test_data(&path, &[&data]).await.unwrap();
658
659        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
660            .await
661            .unwrap();
662        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
663
664        let positions = decoder.get_positions(1..999998).await.unwrap();
665        let indices = UInt32Array::from_iter_values([
666            2, 3, 4, 1001, 1001, 1002, 2001, 2002, 2004, 3004, 3005,
667        ]);
668
669        let chunks = plan_take_chunks(positions.as_ref(), &indices, 1024).unwrap();
670        assert_eq!(chunks.len(), 4);
671        // A contiguous range.
672        assert_eq!(chunks[0].indices, UInt32Array::from_iter_values(0..3));
673        assert!(chunks[0].is_contiguous);
674        // Not contiguous because of repeats
675        assert_eq!(
676            chunks[1].indices,
677            UInt32Array::from_iter_values([999, 999, 1000])
678        );
679        assert!(!chunks[1].is_contiguous);
680        // Not contiguous because of gaps
681        assert_eq!(
682            chunks[2].indices,
683            UInt32Array::from_iter_values([1999, 2000, 2002])
684        );
685        assert!(!chunks[2].is_contiguous);
686        // Another contiguous range, this time after non-contiguous ones
687        assert_eq!(
688            chunks[3].indices,
689            UInt32Array::from_iter_values([3002, 3003])
690        );
691        assert!(chunks[3].is_contiguous);
692
693        let actual = decoder.take(&indices).await.unwrap();
694        assert_eq!(
695            actual.as_ref(),
696            &StringArray::from_iter_values(indices.values().iter().map(|v| format!("string-{v}")))
697        );
698    }
699
700    #[tokio::test]
701    async fn test_write_slice() {
702        let path = TempStdFile::default();
703        let data = StringArray::from_iter_values((0..100).map(|v| format!("abcdef-{v:#03}")));
704
705        let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
706        let mut encoder = BinaryEncoder::new(&mut object_writer);
707        for i in 0..10 {
708            let pos = encoder.encode(&[&data.slice(i * 10, 10)]).await.unwrap();
709            assert_eq!(pos, (i * (8 * 11) /* offset array */ + (i + 1) * (10 * 10)));
710        }
711    }
712
713    #[tokio::test]
714    async fn test_write_binary_with_nulls() {
715        let data = BinaryArray::from_iter((0..60000).map(|v| {
716            if v % 4 != 0 {
717                Some::<&[u8]>(b"abcdefgh")
718            } else {
719                None
720            }
721        }));
722        let path = TempStdFile::default();
723
724        let pos = {
725            let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
726
727            // Write some garbage to reset "tell()".
728            object_writer.write_all(b"1234").await.unwrap();
729            let mut encoder = BinaryEncoder::new(&mut object_writer);
730
731            // let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
732            let pos = encoder.encode(&[&data]).await.unwrap();
733            AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();
734            pos
735        };
736
737        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
738            .await
739            .unwrap();
740        let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
741        let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]);
742        let actual = decoder.take(&idx).await.unwrap();
743        let values: Vec<Option<&[u8]>> = vec![None, Some(b"abcdefgh"), None];
744        assert_eq!(actual.as_binary::<i32>(), &BinaryArray::from(values));
745    }
746}