1use std::marker::PhantomData;
8use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
9use std::sync::Arc;
10
11use arrow_arith::numeric::sub;
12use arrow_array::{
13 builder::{ArrayBuilder, PrimitiveBuilder},
14 cast::as_primitive_array,
15 cast::AsArray,
16 new_empty_array,
17 types::{
18 BinaryType, ByteArrayType, Int64Type, LargeBinaryType, LargeUtf8Type, UInt32Type, Utf8Type,
19 },
20 Array, ArrayRef, GenericByteArray, Int64Array, OffsetSizeTrait, UInt32Array,
21};
22use arrow_buffer::{bit_util, ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
23use arrow_cast::cast::cast;
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::{StreamExt, TryStreamExt};
29use lance_arrow::BufferExt;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32
33use super::ReadBatchParams;
34use super::{plain::PlainDecoder, AsyncIndex, Decoder, Encoder};
35use crate::traits::{Reader, Writer};
36use lance_core::Result;
37
38pub struct BinaryEncoder<'a> {
40 writer: &'a mut dyn Writer,
41}
42
43impl<'a> BinaryEncoder<'a> {
44 pub fn new(writer: &'a mut dyn Writer) -> Self {
45 Self { writer }
46 }
47
48 async fn encode_typed_arr<T: ByteArrayType>(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
49 let capacity: usize = arrs.iter().map(|a| a.len()).sum();
50 let mut pos_builder: PrimitiveBuilder<Int64Type> =
51 PrimitiveBuilder::with_capacity(capacity + 1);
52
53 let mut last_offset: usize = self.writer.tell().await?;
54 pos_builder.append_value(last_offset as i64);
55 for array in arrs.iter() {
56 let arr = array
57 .as_any()
58 .downcast_ref::<GenericByteArray<T>>()
59 .unwrap();
60
61 let offsets = arr.value_offsets();
62
63 let start = offsets[0].as_usize();
64 let end = offsets[offsets.len() - 1].as_usize();
65 let b = unsafe {
66 std::slice::from_raw_parts(
67 arr.to_data().buffers()[1].as_ptr().add(start),
68 end - start,
69 )
70 };
71 self.writer.write_all(b).await?;
72
73 let start_offset = offsets[0].as_usize();
74 offsets
75 .iter()
76 .skip(1)
77 .map(|b| b.as_usize() - start_offset + last_offset)
78 .for_each(|o| pos_builder.append_value(o as i64));
79 last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize;
80 }
81
82 let positions_offset = self.writer.tell().await?;
83 let pos_array = pos_builder.finish();
84 self.writer
85 .write_all(pos_array.to_data().buffers()[0].as_slice())
86 .await?;
87 Ok(positions_offset)
88 }
89}
90
91#[async_trait]
92impl Encoder for BinaryEncoder<'_> {
93 async fn encode(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
94 assert!(!arrs.is_empty());
95 let data_type = arrs[0].data_type();
96 match data_type {
97 DataType::Utf8 => self.encode_typed_arr::<Utf8Type>(arrs).await,
98 DataType::Binary => self.encode_typed_arr::<BinaryType>(arrs).await,
99 DataType::LargeUtf8 => self.encode_typed_arr::<LargeUtf8Type>(arrs).await,
100 DataType::LargeBinary => self.encode_typed_arr::<LargeBinaryType>(arrs).await,
101 _ => {
102 return Err(lance_core::Error::io(
103 format!("Binary encoder does not support {}", data_type),
104 location!(),
105 ));
106 }
107 }
108 }
109}
110
111pub struct BinaryDecoder<'a, T: ByteArrayType> {
113 reader: &'a dyn Reader,
114
115 position: usize,
116
117 length: usize,
118
119 nullable: bool,
120
121 phantom: PhantomData<T>,
122}
123
124impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
127 pub fn new(reader: &'a dyn Reader, position: usize, length: usize, nullable: bool) -> Self {
146 Self {
147 reader,
148 position,
149 length,
150 nullable,
151 phantom: PhantomData,
152 }
153 }
154
155 async fn get_positions(&self, index: Range<usize>) -> Result<Arc<Int64Array>> {
157 let position_decoder = PlainDecoder::new(
158 self.reader,
159 &DataType::Int64,
160 self.position,
161 self.length + 1,
162 )?;
163 let values = position_decoder.get(index.start..index.end + 1).await?;
164 Ok(Arc::new(as_primitive_array(&values).clone()))
165 }
166
167 fn count_nulls<O: OffsetSizeTrait>(offsets: &ScalarBuffer<O>) -> (usize, Option<Buffer>) {
168 let mut null_count = 0;
169 let mut null_buf = MutableBuffer::new_null(offsets.len() - 1);
170 offsets.windows(2).enumerate().for_each(|(idx, w)| {
171 if w[0] == w[1] {
172 bit_util::unset_bit(null_buf.as_mut(), idx);
173 null_count += 1;
174 } else {
175 bit_util::set_bit(null_buf.as_mut(), idx);
176 }
177 });
178 let null_buf = if null_count > 0 {
179 Some(null_buf.into())
180 } else {
181 None
182 };
183 (null_count, null_buf)
184 }
185
186 async fn get_range(&self, positions: &Int64Array, range: Range<usize>) -> Result<ArrayRef> {
193 assert!(positions.len() >= range.end);
194 let start = positions.value(range.start);
195 let end = positions.value(range.end);
196
197 let start_scalar = Int64Array::new_scalar(start);
198
199 let slice = positions.slice(range.start, range.len() + 1);
200 let offset_data = if T::Offset::IS_LARGE {
201 sub(&slice, &start_scalar)?.into_data()
202 } else {
203 cast(
204 &(Arc::new(sub(&slice, &start_scalar)?) as ArrayRef),
205 &DataType::Int32,
206 )?
207 .into_data()
208 };
209
210 let bytes: Bytes = if start >= end {
211 Bytes::new()
212 } else {
213 self.reader.get_range(start as usize..end as usize).await?
214 };
215
216 let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
217 .len(range.len())
218 .null_count(0);
219
220 if self.nullable {
222 let (null_count, null_buf) = Self::count_nulls(slice.values());
223 data_builder = data_builder
224 .null_count(null_count)
225 .null_bit_buffer(null_buf);
226 }
227
228 let buf = Buffer::from_bytes_bytes(bytes, 1);
229 let array_data = data_builder
230 .add_buffer(offset_data.buffers()[0].clone())
231 .add_buffer(buf)
232 .build()?;
233
234 Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
235 }
236}
237
238#[derive(Debug)]
239struct TakeChunksPlan {
240 indices: UInt32Array,
241 is_contiguous: bool,
242}
243
244fn plan_take_chunks(
249 positions: &Int64Array,
250 indices: &UInt32Array,
251 min_io_size: i64,
252) -> Result<Vec<TakeChunksPlan>> {
253 let start = indices.value(0);
254 let indices = sub(indices, &UInt32Array::new_scalar(start))?;
255 let indices_ref = indices.as_primitive::<UInt32Type>();
256
257 let mut chunks: Vec<TakeChunksPlan> = vec![];
258 let mut start_idx = 0;
259 let mut last_idx: i64 = -1;
260 let mut is_contiguous = true;
261 for i in 0..indices.len() {
262 let current = indices_ref.value(i) as usize;
263 let curr_contiguous = current == start_idx || current as i64 - last_idx == 1;
264
265 if !curr_contiguous
266 && positions.value(current) - positions.value(indices_ref.value(start_idx) as usize)
267 > min_io_size
268 {
269 chunks.push(TakeChunksPlan {
270 indices: as_primitive_array(&indices.slice(start_idx, i - start_idx)).clone(),
271 is_contiguous,
272 });
273 start_idx = i;
274 is_contiguous = true;
275 } else {
276 is_contiguous &= curr_contiguous;
277 }
278
279 last_idx = current as i64;
280 }
281 chunks.push(TakeChunksPlan {
282 indices: as_primitive_array(&indices.slice(start_idx, indices.len() - start_idx)).clone(),
283 is_contiguous,
284 });
285
286 Ok(chunks)
287}
288
289#[async_trait]
290impl<T: ByteArrayType> Decoder for BinaryDecoder<'_, T> {
291 async fn decode(&self) -> Result<ArrayRef> {
292 self.get(..).await
293 }
294
295 async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
299 if indices.is_empty() {
300 return Ok(new_empty_array(&T::DATA_TYPE));
301 }
302
303 let start = indices.value(0);
304 let end = indices.value(indices.len() - 1);
305
306 const MIN_IO_SIZE: i64 = 64 * 1024; let positions = self
310 .get_positions(start as usize..(end + 1) as usize)
311 .await?;
312 let capacity = indices
314 .iter()
315 .map(|i| {
316 let relative_index = (i.unwrap() - start) as usize;
317 let start = positions.value(relative_index) as usize;
318 let end = positions.value(relative_index + 1) as usize;
319 end - start
320 })
321 .sum();
322 let mut buffer = MutableBuffer::with_capacity(capacity);
323
324 let offsets_capacity = std::mem::size_of::<T::Offset>() * (indices.len() + 1);
325 let mut offsets = MutableBuffer::with_capacity(offsets_capacity);
326 let mut offset = T::Offset::from_usize(0).unwrap();
327 unsafe {
329 offsets.push_unchecked(offset);
330 }
331
332 let chunks = plan_take_chunks(&positions, indices, MIN_IO_SIZE)?;
333
334 let positions_ref = positions.as_ref();
335 futures::stream::iter(chunks)
336 .map(|chunk| async move {
337 let chunk_offset = chunk.indices.value(0);
338 let chunk_end = chunk.indices.value(chunk.indices.len() - 1);
339 let array = self
340 .get_range(positions_ref, chunk_offset as usize..chunk_end as usize + 1)
341 .await?;
342 Result::Ok((chunk, chunk_offset, array))
343 })
344 .buffered(self.reader.io_parallelism())
345 .try_for_each(|(chunk, chunk_offset, array)| {
346 let array: &GenericByteArray<T> = array.as_bytes();
347
348 if chunk.is_contiguous {
350 buffer.extend_from_slice(array.value_data());
351 }
352
353 for index in chunk.indices.values() {
355 if !chunk.is_contiguous {
356 let value = array.value((index - chunk_offset) as usize);
357 let value_ref: &[u8] = value.as_ref();
358 buffer.extend_from_slice(value_ref);
359 }
360
361 offset += array.value_length((index - chunk_offset) as usize);
362 unsafe {
365 offsets.push_unchecked(offset);
366 }
367 }
368 futures::future::ready(Ok(()))
369 })
370 .await?;
371
372 let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
373 .len(indices.len())
374 .null_count(0);
375
376 let offsets: ScalarBuffer<T::Offset> = ScalarBuffer::from(Buffer::from(offsets));
377
378 debug_assert_eq!(buffer.len(), capacity);
380
381 if self.nullable {
382 let (null_count, null_buf) = Self::count_nulls(&offsets);
383 data_builder = data_builder
384 .null_count(null_count)
385 .null_bit_buffer(null_buf);
386 }
387
388 let array_data = data_builder
389 .add_buffer(offsets.into_inner())
390 .add_buffer(buffer.into())
391 .build()?;
392
393 Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
394 }
395}
396
397#[async_trait]
398impl<T: ByteArrayType> AsyncIndex<usize> for BinaryDecoder<'_, T> {
399 type Output = Result<ArrayRef>;
400
401 async fn get(&self, index: usize) -> Self::Output {
402 self.get(index..index + 1).await
403 }
404}
405
406#[async_trait]
407impl<T: ByteArrayType> AsyncIndex<RangeFrom<usize>> for BinaryDecoder<'_, T> {
408 type Output = Result<ArrayRef>;
409
410 async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
411 self.get(index.start..self.length).await
412 }
413}
414
415#[async_trait]
416impl<T: ByteArrayType> AsyncIndex<RangeTo<usize>> for BinaryDecoder<'_, T> {
417 type Output = Result<ArrayRef>;
418
419 async fn get(&self, index: RangeTo<usize>) -> Self::Output {
420 self.get(0..index.end).await
421 }
422}
423
424#[async_trait]
425impl<T: ByteArrayType> AsyncIndex<RangeFull> for BinaryDecoder<'_, T> {
426 type Output = Result<ArrayRef>;
427
428 async fn get(&self, _: RangeFull) -> Self::Output {
429 self.get(0..self.length).await
430 }
431}
432
433#[async_trait]
434impl<T: ByteArrayType> AsyncIndex<ReadBatchParams> for BinaryDecoder<'_, T> {
435 type Output = Result<ArrayRef>;
436
437 async fn get(&self, params: ReadBatchParams) -> Self::Output {
438 match params {
439 ReadBatchParams::Range(r) => self.get(r).await,
440 ReadBatchParams::Ranges(_) => unimplemented!(),
442 ReadBatchParams::RangeFull => self.get(..).await,
443 ReadBatchParams::RangeTo(r) => self.get(r).await,
444 ReadBatchParams::RangeFrom(r) => self.get(r).await,
445 ReadBatchParams::Indices(indices) => self.take(&indices).await,
446 }
447 }
448}
449
450#[async_trait]
451impl<T: ByteArrayType> AsyncIndex<Range<usize>> for BinaryDecoder<'_, T> {
452 type Output = Result<ArrayRef>;
453
454 async fn get(&self, index: Range<usize>) -> Self::Output {
455 let position_decoder = PlainDecoder::new(
456 self.reader,
457 &DataType::Int64,
458 self.position,
459 self.length + 1,
460 )?;
461 let positions = position_decoder.get(index.start..index.end + 1).await?;
462 let int64_positions: &Int64Array = as_primitive_array(&positions);
463
464 self.get_range(int64_positions, 0..index.len()).await
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 use arrow_array::{
473 types::GenericStringType, BinaryArray, GenericStringArray, LargeStringArray, StringArray,
474 };
475 use arrow_select::concat::concat;
476 use lance_core::utils::tempfile::TempStdFile;
477
478 use crate::local::LocalObjectReader;
479
480 async fn write_test_data<O: OffsetSizeTrait>(
481 path: impl AsRef<std::path::Path>,
482 arr: &[&GenericStringArray<O>],
483 ) -> Result<usize> {
484 let mut writer = tokio::fs::File::create(path).await?;
485 writer.write_all(b"1234").await.unwrap();
487 let mut encoder = BinaryEncoder::new(&mut writer);
488
489 let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
490 let pos = encoder.encode(arrs.as_slice()).await.unwrap();
491 writer.shutdown().await.unwrap();
492 Ok(pos)
493 }
494
495 async fn test_round_trips<O: OffsetSizeTrait>(arrs: &[&GenericStringArray<O>]) {
496 let path = TempStdFile::default();
497
498 let pos = write_test_data(&path, arrs).await.unwrap();
499
500 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
501 .await
502 .unwrap();
503 let read_len = arrs.iter().map(|a| a.len()).sum();
504 let decoder =
505 BinaryDecoder::<GenericStringType<O>>::new(reader.as_ref(), pos, read_len, true);
506 let actual_arr = decoder.decode().await.unwrap();
507
508 let arrs_ref = arrs.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
509 let expected = concat(arrs_ref.as_slice()).unwrap();
510 assert_eq!(
511 actual_arr
512 .as_any()
513 .downcast_ref::<GenericStringArray<O>>()
514 .unwrap(),
515 expected
516 .as_any()
517 .downcast_ref::<GenericStringArray<O>>()
518 .unwrap(),
519 );
520 }
521
522 #[tokio::test]
523 async fn test_write_binary_data() {
524 test_round_trips(&[&StringArray::from(vec!["a", "b", "cd", "efg"])]).await;
525 test_round_trips(&[&StringArray::from(vec![Some("a"), None, Some("cd"), None])]).await;
526 test_round_trips(&[
527 &StringArray::from(vec![Some("a"), None, Some("cd"), None]),
528 &StringArray::from(vec![Some("f"), None, Some("gh"), None]),
529 &StringArray::from(vec![Some("t"), None, Some("uv"), None]),
530 ])
531 .await;
532 test_round_trips(&[&LargeStringArray::from(vec!["a", "b", "cd", "efg"])]).await;
533 test_round_trips(&[&LargeStringArray::from(vec![
534 Some("a"),
535 None,
536 Some("cd"),
537 None,
538 ])])
539 .await;
540 test_round_trips(&[
541 &LargeStringArray::from(vec![Some("a"), Some("b")]),
542 &LargeStringArray::from(vec![Some("c")]),
543 &LargeStringArray::from(vec![Some("d"), Some("e")]),
544 ])
545 .await;
546 }
547
548 #[tokio::test]
549 async fn test_write_binary_data_with_offset() {
550 let array: StringArray = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1);
551 test_round_trips(&[&array]).await;
552 }
553
554 #[tokio::test]
555 async fn test_range_query() {
556 let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
557
558 let path = TempStdFile::default();
559 let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
560
561 object_writer.write_all(b"1234").await.unwrap();
563 let mut encoder = BinaryEncoder::new(&mut object_writer);
564 let pos = encoder.encode(&[&data]).await.unwrap();
565 object_writer.shutdown().await.unwrap();
566
567 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
568 .await
569 .unwrap();
570 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
571 assert_eq!(
572 decoder.decode().await.unwrap().as_ref(),
573 &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
574 );
575
576 assert_eq!(
577 decoder.get(..).await.unwrap().as_ref(),
578 &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
579 );
580
581 assert_eq!(
582 decoder.get(2..5).await.unwrap().as_ref(),
583 &StringArray::from_iter_values(["c", "d", "e"])
584 );
585
586 assert_eq!(
587 decoder.get(..5).await.unwrap().as_ref(),
588 &StringArray::from_iter_values(["a", "b", "c", "d", "e"])
589 );
590
591 assert_eq!(
592 decoder.get(4..).await.unwrap().as_ref(),
593 &StringArray::from_iter_values(["e", "f", "g"])
594 );
595 assert_eq!(
596 decoder.get(2..2).await.unwrap().as_ref(),
597 &new_empty_array(&DataType::Utf8)
598 );
599 assert!(decoder.get(100..100).await.is_err());
600 }
601
602 #[tokio::test]
603 async fn test_take() {
604 let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
605
606 let path = TempStdFile::default();
607
608 let pos = write_test_data(&path, &[&data]).await.unwrap();
609 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
610 .await
611 .unwrap();
612 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
613
614 let actual = decoder
615 .take(&UInt32Array::from_iter_values([1, 2, 5]))
616 .await
617 .unwrap();
618 assert_eq!(
619 actual.as_ref(),
620 &StringArray::from_iter_values(["b", "c", "f"])
621 );
622 }
623
624 #[tokio::test]
625 async fn test_take_sparse_indices() {
626 let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
627
628 let path = TempStdFile::default();
629 let pos = write_test_data(&path, &[&data]).await.unwrap();
630 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
631 .await
632 .unwrap();
633 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
634
635 let positions = decoder.get_positions(1..999998).await.unwrap();
636 let indices = UInt32Array::from_iter_values([1, 999998]);
637 let chunks = plan_take_chunks(positions.as_ref(), &indices, 64 * 1024).unwrap();
638 assert_eq!(chunks.len(), 2);
640 assert_eq!(chunks[0].indices, UInt32Array::from_iter_values([0]),);
641 assert_eq!(chunks[1].indices, UInt32Array::from_iter_values([999997]),);
642
643 let actual = decoder
644 .take(&UInt32Array::from_iter_values([1, 999998]))
645 .await
646 .unwrap();
647 assert_eq!(
648 actual.as_ref(),
649 &StringArray::from_iter_values(["string-1", "string-999998"])
650 );
651 }
652
653 #[tokio::test]
654 async fn test_take_dense_indices() {
655 let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
656
657 let path = TempStdFile::default();
658 let pos = write_test_data(&path, &[&data]).await.unwrap();
659
660 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
661 .await
662 .unwrap();
663 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
664
665 let positions = decoder.get_positions(1..999998).await.unwrap();
666 let indices = UInt32Array::from_iter_values([
667 2, 3, 4, 1001, 1001, 1002, 2001, 2002, 2004, 3004, 3005,
668 ]);
669
670 let chunks = plan_take_chunks(positions.as_ref(), &indices, 1024).unwrap();
671 assert_eq!(chunks.len(), 4);
672 assert_eq!(chunks[0].indices, UInt32Array::from_iter_values(0..3));
674 assert!(chunks[0].is_contiguous);
675 assert_eq!(
677 chunks[1].indices,
678 UInt32Array::from_iter_values([999, 999, 1000])
679 );
680 assert!(!chunks[1].is_contiguous);
681 assert_eq!(
683 chunks[2].indices,
684 UInt32Array::from_iter_values([1999, 2000, 2002])
685 );
686 assert!(!chunks[2].is_contiguous);
687 assert_eq!(
689 chunks[3].indices,
690 UInt32Array::from_iter_values([3002, 3003])
691 );
692 assert!(chunks[3].is_contiguous);
693
694 let actual = decoder.take(&indices).await.unwrap();
695 assert_eq!(
696 actual.as_ref(),
697 &StringArray::from_iter_values(indices.values().iter().map(|v| format!("string-{v}")))
698 );
699 }
700
701 #[tokio::test]
702 async fn test_write_slice() {
703 let path = TempStdFile::default();
704 let data = StringArray::from_iter_values((0..100).map(|v| format!("abcdef-{v:#03}")));
705
706 let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
707 let mut encoder = BinaryEncoder::new(&mut object_writer);
708 for i in 0..10 {
709 let pos = encoder.encode(&[&data.slice(i * 10, 10)]).await.unwrap();
710 assert_eq!(pos, (i * (8 * 11) + (i + 1) * (10 * 10)));
711 }
712 }
713
714 #[tokio::test]
715 async fn test_write_binary_with_nulls() {
716 let data = BinaryArray::from_iter((0..60000).map(|v| {
717 if v % 4 != 0 {
718 Some::<&[u8]>(b"abcdefgh")
719 } else {
720 None
721 }
722 }));
723 let path = TempStdFile::default();
724
725 let pos = {
726 let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
727
728 object_writer.write_all(b"1234").await.unwrap();
730 let mut encoder = BinaryEncoder::new(&mut object_writer);
731
732 let pos = encoder.encode(&[&data]).await.unwrap();
734 object_writer.shutdown().await.unwrap();
735 pos
736 };
737
738 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
739 .await
740 .unwrap();
741 let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
742 let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]);
743 let actual = decoder.take(&idx).await.unwrap();
744 let values: Vec<Option<&[u8]>> = vec![None, Some(b"abcdefgh"), None];
745 assert_eq!(actual.as_binary::<i32>(), &BinaryArray::from(values));
746 }
747}