1use std::marker::PhantomData;
8use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
9use std::sync::Arc;
10
11use arrow_arith::numeric::sub;
12use arrow_array::{
13 Array, ArrayRef, GenericByteArray, Int64Array, OffsetSizeTrait, UInt32Array,
14 builder::{ArrayBuilder, PrimitiveBuilder},
15 cast::AsArray,
16 cast::as_primitive_array,
17 new_empty_array,
18 types::{
19 BinaryType, ByteArrayType, Int64Type, LargeBinaryType, LargeUtf8Type, UInt32Type, Utf8Type,
20 },
21};
22use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer, bit_util};
23use arrow_cast::cast::cast;
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::{StreamExt, TryStreamExt};
29use lance_arrow::BufferExt;
30use tokio::io::AsyncWriteExt;
31
32use super::ReadBatchParams;
33use super::{AsyncIndex, Decoder, Encoder, plain::PlainDecoder};
34use crate::traits::{Reader, Writer};
35use lance_core::Result;
36
37pub struct BinaryEncoder<'a> {
39 writer: &'a mut dyn Writer,
40}
41
42impl<'a> BinaryEncoder<'a> {
43 pub fn new(writer: &'a mut dyn Writer) -> Self {
44 Self { writer }
45 }
46
47 async fn encode_typed_arr<T: ByteArrayType>(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
48 let capacity: usize = arrs.iter().map(|a| a.len()).sum();
49 let mut pos_builder: PrimitiveBuilder<Int64Type> =
50 PrimitiveBuilder::with_capacity(capacity + 1);
51
52 let mut last_offset: usize = self.writer.tell().await?;
53 pos_builder.append_value(last_offset as i64);
54 for array in arrs.iter() {
55 let arr = array
56 .as_any()
57 .downcast_ref::<GenericByteArray<T>>()
58 .unwrap();
59
60 let offsets = arr.value_offsets();
61
62 let start = offsets[0].as_usize();
63 let end = offsets[offsets.len() - 1].as_usize();
64 let b = unsafe {
65 std::slice::from_raw_parts(
66 arr.to_data().buffers()[1].as_ptr().add(start),
67 end - start,
68 )
69 };
70 self.writer.write_all(b).await?;
71
72 let start_offset = offsets[0].as_usize();
73 offsets
74 .iter()
75 .skip(1)
76 .map(|b| b.as_usize() - start_offset + last_offset)
77 .for_each(|o| pos_builder.append_value(o as i64));
78 last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize;
79 }
80
81 let positions_offset = self.writer.tell().await?;
82 let pos_array = pos_builder.finish();
83 self.writer
84 .write_all(pos_array.to_data().buffers()[0].as_slice())
85 .await?;
86 Ok(positions_offset)
87 }
88}
89
90#[async_trait]
91impl Encoder for BinaryEncoder<'_> {
92 async fn encode(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
93 assert!(!arrs.is_empty());
94 let data_type = arrs[0].data_type();
95 match data_type {
96 DataType::Utf8 => self.encode_typed_arr::<Utf8Type>(arrs).await,
97 DataType::Binary => self.encode_typed_arr::<BinaryType>(arrs).await,
98 DataType::LargeUtf8 => self.encode_typed_arr::<LargeUtf8Type>(arrs).await,
99 DataType::LargeBinary => self.encode_typed_arr::<LargeBinaryType>(arrs).await,
100 _ => {
101 return Err(lance_core::Error::invalid_input(format!(
102 "Unsupported data type for binary encoding: {}",
103 data_type
104 )));
105 }
106 }
107 }
108}
109
110pub struct BinaryDecoder<'a, T: ByteArrayType> {
112 reader: &'a dyn Reader,
113
114 position: usize,
115
116 length: usize,
117
118 nullable: bool,
119
120 phantom: PhantomData<T>,
121}
122
123impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
126 pub fn new(reader: &'a dyn Reader, position: usize, length: usize, nullable: bool) -> Self {
145 Self {
146 reader,
147 position,
148 length,
149 nullable,
150 phantom: PhantomData,
151 }
152 }
153
154 async fn get_positions(&self, index: Range<usize>) -> Result<Arc<Int64Array>> {
156 let position_decoder = PlainDecoder::new(
157 self.reader,
158 &DataType::Int64,
159 self.position,
160 self.length + 1,
161 )?;
162 let values = position_decoder.get(index.start..index.end + 1).await?;
163 Ok(Arc::new(as_primitive_array(&values).clone()))
164 }
165
166 fn count_nulls<O: OffsetSizeTrait>(offsets: &ScalarBuffer<O>) -> (usize, Option<Buffer>) {
167 let mut null_count = 0;
168 let mut null_buf = MutableBuffer::new_null(offsets.len() - 1);
169 offsets.windows(2).enumerate().for_each(|(idx, w)| {
170 if w[0] == w[1] {
171 bit_util::unset_bit(null_buf.as_mut(), idx);
172 null_count += 1;
173 } else {
174 bit_util::set_bit(null_buf.as_mut(), idx);
175 }
176 });
177 let null_buf = if null_count > 0 {
178 Some(null_buf.into())
179 } else {
180 None
181 };
182 (null_count, null_buf)
183 }
184
185 async fn get_range(&self, positions: &Int64Array, range: Range<usize>) -> Result<ArrayRef> {
192 assert!(positions.len() >= range.end);
193 let start = positions.value(range.start);
194 let end = positions.value(range.end);
195
196 let start_scalar = Int64Array::new_scalar(start);
197
198 let slice = positions.slice(range.start, range.len() + 1);
199 let offset_data = if T::Offset::IS_LARGE {
200 sub(&slice, &start_scalar)?.into_data()
201 } else {
202 cast(
203 &(Arc::new(sub(&slice, &start_scalar)?) as ArrayRef),
204 &DataType::Int32,
205 )?
206 .into_data()
207 };
208
209 let bytes: Bytes = if start >= end {
210 Bytes::new()
211 } else {
212 self.reader.get_range(start as usize..end as usize).await?
213 };
214
215 let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
216 .len(range.len())
217 .null_count(0);
218
219 if self.nullable {
221 let (null_count, null_buf) = Self::count_nulls(slice.values());
222 data_builder = data_builder
223 .null_count(null_count)
224 .null_bit_buffer(null_buf);
225 }
226
227 let buf = Buffer::from_bytes_bytes(bytes, 1);
228 let array_data = data_builder
229 .add_buffer(offset_data.buffers()[0].clone())
230 .add_buffer(buf)
231 .build()?;
232
233 Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
234 }
235}
236
237#[derive(Debug)]
238struct TakeChunksPlan {
239 indices: UInt32Array,
240 is_contiguous: bool,
241}
242
243fn plan_take_chunks(
248 positions: &Int64Array,
249 indices: &UInt32Array,
250 min_io_size: i64,
251) -> Result<Vec<TakeChunksPlan>> {
252 let start = indices.value(0);
253 let indices = sub(indices, &UInt32Array::new_scalar(start))?;
254 let indices_ref = indices.as_primitive::<UInt32Type>();
255
256 let mut chunks: Vec<TakeChunksPlan> = vec![];
257 let mut start_idx = 0;
258 let mut last_idx: i64 = -1;
259 let mut is_contiguous = true;
260 for i in 0..indices.len() {
261 let current = indices_ref.value(i) as usize;
262 let curr_contiguous = current == start_idx || current as i64 - last_idx == 1;
263
264 if !curr_contiguous
265 && positions.value(current) - positions.value(indices_ref.value(start_idx) as usize)
266 > min_io_size
267 {
268 chunks.push(TakeChunksPlan {
269 indices: as_primitive_array(&indices.slice(start_idx, i - start_idx)).clone(),
270 is_contiguous,
271 });
272 start_idx = i;
273 is_contiguous = true;
274 } else {
275 is_contiguous &= curr_contiguous;
276 }
277
278 last_idx = current as i64;
279 }
280 chunks.push(TakeChunksPlan {
281 indices: as_primitive_array(&indices.slice(start_idx, indices.len() - start_idx)).clone(),
282 is_contiguous,
283 });
284
285 Ok(chunks)
286}
287
288#[async_trait]
289impl<T: ByteArrayType> Decoder for BinaryDecoder<'_, T> {
290 async fn decode(&self) -> Result<ArrayRef> {
291 self.get(..).await
292 }
293
294 async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
298 if indices.is_empty() {
299 return Ok(new_empty_array(&T::DATA_TYPE));
300 }
301
302 let start = indices.value(0);
303 let end = indices.value(indices.len() - 1);
304
305 const MIN_IO_SIZE: i64 = 64 * 1024; let positions = self
309 .get_positions(start as usize..(end + 1) as usize)
310 .await?;
311 let capacity = indices
313 .iter()
314 .map(|i| {
315 let relative_index = (i.unwrap() - start) as usize;
316 let start = positions.value(relative_index) as usize;
317 let end = positions.value(relative_index + 1) as usize;
318 end - start
319 })
320 .sum();
321 let mut buffer = MutableBuffer::with_capacity(capacity);
322
323 let offsets_capacity = std::mem::size_of::<T::Offset>() * (indices.len() + 1);
324 let mut offsets = MutableBuffer::with_capacity(offsets_capacity);
325 let mut offset = T::Offset::from_usize(0).unwrap();
326 unsafe {
328 offsets.push_unchecked(offset);
329 }
330
331 let chunks = plan_take_chunks(&positions, indices, MIN_IO_SIZE)?;
332
333 let positions_ref = positions.as_ref();
334 futures::stream::iter(chunks)
335 .map(|chunk| async move {
336 let chunk_offset = chunk.indices.value(0);
337 let chunk_end = chunk.indices.value(chunk.indices.len() - 1);
338 let array = self
339 .get_range(positions_ref, chunk_offset as usize..chunk_end as usize + 1)
340 .await?;
341 Result::Ok((chunk, chunk_offset, array))
342 })
343 .buffered(self.reader.io_parallelism())
344 .try_for_each(|(chunk, chunk_offset, array)| {
345 let array: &GenericByteArray<T> = array.as_bytes();
346
347 if chunk.is_contiguous {
349 buffer.extend_from_slice(array.value_data());
350 }
351
352 for index in chunk.indices.values() {
354 if !chunk.is_contiguous {
355 let value = array.value((index - chunk_offset) as usize);
356 let value_ref: &[u8] = value.as_ref();
357 buffer.extend_from_slice(value_ref);
358 }
359
360 offset += array.value_length((index - chunk_offset) as usize);
361 unsafe {
364 offsets.push_unchecked(offset);
365 }
366 }
367 futures::future::ready(Ok(()))
368 })
369 .await?;
370
371 let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
372 .len(indices.len())
373 .null_count(0);
374
375 let offsets: ScalarBuffer<T::Offset> = ScalarBuffer::from(Buffer::from(offsets));
376
377 debug_assert_eq!(buffer.len(), capacity);
379
380 if self.nullable {
381 let (null_count, null_buf) = Self::count_nulls(&offsets);
382 data_builder = data_builder
383 .null_count(null_count)
384 .null_bit_buffer(null_buf);
385 }
386
387 let array_data = data_builder
388 .add_buffer(offsets.into_inner())
389 .add_buffer(buffer.into())
390 .build()?;
391
392 Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
393 }
394}
395
396#[async_trait]
397impl<T: ByteArrayType> AsyncIndex<usize> for BinaryDecoder<'_, T> {
398 type Output = Result<ArrayRef>;
399
400 async fn get(&self, index: usize) -> Self::Output {
401 self.get(index..index + 1).await
402 }
403}
404
405#[async_trait]
406impl<T: ByteArrayType> AsyncIndex<RangeFrom<usize>> for BinaryDecoder<'_, T> {
407 type Output = Result<ArrayRef>;
408
409 async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
410 self.get(index.start..self.length).await
411 }
412}
413
414#[async_trait]
415impl<T: ByteArrayType> AsyncIndex<RangeTo<usize>> for BinaryDecoder<'_, T> {
416 type Output = Result<ArrayRef>;
417
418 async fn get(&self, index: RangeTo<usize>) -> Self::Output {
419 self.get(0..index.end).await
420 }
421}
422
423#[async_trait]
424impl<T: ByteArrayType> AsyncIndex<RangeFull> for BinaryDecoder<'_, T> {
425 type Output = Result<ArrayRef>;
426
427 async fn get(&self, _: RangeFull) -> Self::Output {
428 self.get(0..self.length).await
429 }
430}
431
432#[async_trait]
433impl<T: ByteArrayType> AsyncIndex<ReadBatchParams> for BinaryDecoder<'_, T> {
434 type Output = Result<ArrayRef>;
435
436 async fn get(&self, params: ReadBatchParams) -> Self::Output {
437 match params {
438 ReadBatchParams::Range(r) => self.get(r).await,
439 ReadBatchParams::Ranges(_) => unimplemented!(),
441 ReadBatchParams::RangeFull => self.get(..).await,
442 ReadBatchParams::RangeTo(r) => self.get(r).await,
443 ReadBatchParams::RangeFrom(r) => self.get(r).await,
444 ReadBatchParams::Indices(indices) => self.take(&indices).await,
445 }
446 }
447}
448
449#[async_trait]
450impl<T: ByteArrayType> AsyncIndex<Range<usize>> for BinaryDecoder<'_, T> {
451 type Output = Result<ArrayRef>;
452
453 async fn get(&self, index: Range<usize>) -> Self::Output {
454 let position_decoder = PlainDecoder::new(
455 self.reader,
456 &DataType::Int64,
457 self.position,
458 self.length + 1,
459 )?;
460 let positions = position_decoder.get(index.start..index.end + 1).await?;
461 let int64_positions: &Int64Array = as_primitive_array(&positions);
462
463 self.get_range(int64_positions, 0..index.len()).await
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 use arrow_array::{
472 BinaryArray, GenericStringArray, LargeStringArray, StringArray, types::GenericStringType,
473 };
474 use arrow_select::concat::concat;
475 use lance_core::utils::tempfile::TempStdFile;
476
477 use crate::local::LocalObjectReader;
478
479 async fn write_test_data<O: OffsetSizeTrait>(
480 path: impl AsRef<std::path::Path>,
481 arr: &[&GenericStringArray<O>],
482 ) -> Result<usize> {
483 let mut writer = tokio::fs::File::create(path).await?;
484 writer.write_all(b"1234").await.unwrap();
486 let mut encoder = BinaryEncoder::new(&mut writer);
487
488 let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
489 let pos = encoder.encode(arrs.as_slice()).await.unwrap();
490 AsyncWriteExt::shutdown(&mut writer).await.unwrap();
491 Ok(pos)
492 }
493
494 async fn test_round_trips<O: OffsetSizeTrait>(arrs: &[&GenericStringArray<O>]) {
495 let path = TempStdFile::default();
496
497 let pos = write_test_data(&path, arrs).await.unwrap();
498
499 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
500 .await
501 .unwrap();
502 let read_len = arrs.iter().map(|a| a.len()).sum();
503 let decoder =
504 BinaryDecoder::<GenericStringType<O>>::new(reader.as_ref(), pos, read_len, true);
505 let actual_arr = decoder.decode().await.unwrap();
506
507 let arrs_ref = arrs.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
508 let expected = concat(arrs_ref.as_slice()).unwrap();
509 assert_eq!(
510 actual_arr
511 .as_any()
512 .downcast_ref::<GenericStringArray<O>>()
513 .unwrap(),
514 expected
515 .as_any()
516 .downcast_ref::<GenericStringArray<O>>()
517 .unwrap(),
518 );
519 }
520
521 #[tokio::test]
522 async fn test_write_binary_data() {
523 test_round_trips(&[&StringArray::from(vec!["a", "b", "cd", "efg"])]).await;
524 test_round_trips(&[&StringArray::from(vec![Some("a"), None, Some("cd"), None])]).await;
525 test_round_trips(&[
526 &StringArray::from(vec![Some("a"), None, Some("cd"), None]),
527 &StringArray::from(vec![Some("f"), None, Some("gh"), None]),
528 &StringArray::from(vec![Some("t"), None, Some("uv"), None]),
529 ])
530 .await;
531 test_round_trips(&[&LargeStringArray::from(vec!["a", "b", "cd", "efg"])]).await;
532 test_round_trips(&[&LargeStringArray::from(vec![
533 Some("a"),
534 None,
535 Some("cd"),
536 None,
537 ])])
538 .await;
539 test_round_trips(&[
540 &LargeStringArray::from(vec![Some("a"), Some("b")]),
541 &LargeStringArray::from(vec![Some("c")]),
542 &LargeStringArray::from(vec![Some("d"), Some("e")]),
543 ])
544 .await;
545 }
546
547 #[tokio::test]
548 async fn test_write_binary_data_with_offset() {
549 let array: StringArray = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1);
550 test_round_trips(&[&array]).await;
551 }
552
553 #[tokio::test]
554 async fn test_range_query() {
555 let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
556
557 let path = TempStdFile::default();
558 let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
559
560 object_writer.write_all(b"1234").await.unwrap();
562 let mut encoder = BinaryEncoder::new(&mut object_writer);
563 let pos = encoder.encode(&[&data]).await.unwrap();
564 AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();
565
566 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
567 .await
568 .unwrap();
569 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
570 assert_eq!(
571 decoder.decode().await.unwrap().as_ref(),
572 &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
573 );
574
575 assert_eq!(
576 decoder.get(..).await.unwrap().as_ref(),
577 &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
578 );
579
580 assert_eq!(
581 decoder.get(2..5).await.unwrap().as_ref(),
582 &StringArray::from_iter_values(["c", "d", "e"])
583 );
584
585 assert_eq!(
586 decoder.get(..5).await.unwrap().as_ref(),
587 &StringArray::from_iter_values(["a", "b", "c", "d", "e"])
588 );
589
590 assert_eq!(
591 decoder.get(4..).await.unwrap().as_ref(),
592 &StringArray::from_iter_values(["e", "f", "g"])
593 );
594 assert_eq!(
595 decoder.get(2..2).await.unwrap().as_ref(),
596 &new_empty_array(&DataType::Utf8)
597 );
598 assert!(decoder.get(100..100).await.is_err());
599 }
600
601 #[tokio::test]
602 async fn test_take() {
603 let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
604
605 let path = TempStdFile::default();
606
607 let pos = write_test_data(&path, &[&data]).await.unwrap();
608 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
609 .await
610 .unwrap();
611 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
612
613 let actual = decoder
614 .take(&UInt32Array::from_iter_values([1, 2, 5]))
615 .await
616 .unwrap();
617 assert_eq!(
618 actual.as_ref(),
619 &StringArray::from_iter_values(["b", "c", "f"])
620 );
621 }
622
623 #[tokio::test]
624 async fn test_take_sparse_indices() {
625 let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
626
627 let path = TempStdFile::default();
628 let pos = write_test_data(&path, &[&data]).await.unwrap();
629 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
630 .await
631 .unwrap();
632 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
633
634 let positions = decoder.get_positions(1..999998).await.unwrap();
635 let indices = UInt32Array::from_iter_values([1, 999998]);
636 let chunks = plan_take_chunks(positions.as_ref(), &indices, 64 * 1024).unwrap();
637 assert_eq!(chunks.len(), 2);
639 assert_eq!(chunks[0].indices, UInt32Array::from_iter_values([0]),);
640 assert_eq!(chunks[1].indices, UInt32Array::from_iter_values([999997]),);
641
642 let actual = decoder
643 .take(&UInt32Array::from_iter_values([1, 999998]))
644 .await
645 .unwrap();
646 assert_eq!(
647 actual.as_ref(),
648 &StringArray::from_iter_values(["string-1", "string-999998"])
649 );
650 }
651
652 #[tokio::test]
653 async fn test_take_dense_indices() {
654 let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
655
656 let path = TempStdFile::default();
657 let pos = write_test_data(&path, &[&data]).await.unwrap();
658
659 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
660 .await
661 .unwrap();
662 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
663
664 let positions = decoder.get_positions(1..999998).await.unwrap();
665 let indices = UInt32Array::from_iter_values([
666 2, 3, 4, 1001, 1001, 1002, 2001, 2002, 2004, 3004, 3005,
667 ]);
668
669 let chunks = plan_take_chunks(positions.as_ref(), &indices, 1024).unwrap();
670 assert_eq!(chunks.len(), 4);
671 assert_eq!(chunks[0].indices, UInt32Array::from_iter_values(0..3));
673 assert!(chunks[0].is_contiguous);
674 assert_eq!(
676 chunks[1].indices,
677 UInt32Array::from_iter_values([999, 999, 1000])
678 );
679 assert!(!chunks[1].is_contiguous);
680 assert_eq!(
682 chunks[2].indices,
683 UInt32Array::from_iter_values([1999, 2000, 2002])
684 );
685 assert!(!chunks[2].is_contiguous);
686 assert_eq!(
688 chunks[3].indices,
689 UInt32Array::from_iter_values([3002, 3003])
690 );
691 assert!(chunks[3].is_contiguous);
692
693 let actual = decoder.take(&indices).await.unwrap();
694 assert_eq!(
695 actual.as_ref(),
696 &StringArray::from_iter_values(indices.values().iter().map(|v| format!("string-{v}")))
697 );
698 }
699
700 #[tokio::test]
701 async fn test_write_slice() {
702 let path = TempStdFile::default();
703 let data = StringArray::from_iter_values((0..100).map(|v| format!("abcdef-{v:#03}")));
704
705 let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
706 let mut encoder = BinaryEncoder::new(&mut object_writer);
707 for i in 0..10 {
708 let pos = encoder.encode(&[&data.slice(i * 10, 10)]).await.unwrap();
709 assert_eq!(pos, (i * (8 * 11) + (i + 1) * (10 * 10)));
710 }
711 }
712
713 #[tokio::test]
714 async fn test_write_binary_with_nulls() {
715 let data = BinaryArray::from_iter((0..60000).map(|v| {
716 if v % 4 != 0 {
717 Some::<&[u8]>(b"abcdefgh")
718 } else {
719 None
720 }
721 }));
722 let path = TempStdFile::default();
723
724 let pos = {
725 let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
726
727 object_writer.write_all(b"1234").await.unwrap();
729 let mut encoder = BinaryEncoder::new(&mut object_writer);
730
731 let pos = encoder.encode(&[&data]).await.unwrap();
733 AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();
734 pos
735 };
736
737 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
738 .await
739 .unwrap();
740 let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
741 let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]);
742 let actual = decoder.take(&idx).await.unwrap();
743 let values: Vec<Option<&[u8]>> = vec![None, Some(b"abcdefgh"), None];
744 assert_eq!(actual.as_binary::<i32>(), &BinaryArray::from(values));
745 }
746}