1use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
10use std::slice::from_raw_parts;
11use std::sync::Arc;
12
13use crate::{
14 traits::{Reader, Writer},
15 ReadBatchParams,
16};
17use arrow_arith::numeric::sub;
18use arrow_array::{
19 builder::BooleanBuilder, cast::AsArray, make_array, new_empty_array, Array, ArrayRef,
20 BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, UInt32Array, UInt8Array,
21};
22use arrow_buffer::{bit_util, Buffer};
23use arrow_data::{layout, ArrayDataBuilder, BufferSpec};
24use arrow_schema::{DataType, Field};
25use arrow_select::{concat::concat, take::take};
26use async_recursion::async_recursion;
27use async_trait::async_trait;
28use bytes::Bytes;
29use futures::stream::{self, StreamExt, TryStreamExt};
30use lance_arrow::*;
31use lance_core::{Error, Result};
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34
35use crate::encodings::{AsyncIndex, Decoder};
36
37pub struct PlainEncoder<'a> {
40 writer: &'a mut dyn Writer,
41 data_type: &'a DataType,
42}
43
44impl<'a> PlainEncoder<'a> {
45 pub fn new(writer: &'a mut dyn Writer, data_type: &'a DataType) -> Self {
46 PlainEncoder { writer, data_type }
47 }
48
49 pub async fn write(writer: &'a mut dyn Writer, arrays: &[&'a dyn Array]) -> Result<usize> {
51 let pos = writer.tell().await?;
52 if !arrays.is_empty() {
53 let mut encoder = Self::new(writer, arrays[0].data_type());
54 encoder.encode(arrays).await?;
55 }
56 Ok(pos)
57 }
58
59 pub async fn encode(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
62 self.encode_internal(arrays, self.data_type).await
63 }
64
65 #[async_recursion]
66 async fn encode_internal(
67 &mut self,
68 array: &[&dyn Array],
69 data_type: &DataType,
70 ) -> Result<usize> {
71 if let DataType::FixedSizeList(items, _) = data_type {
72 self.encode_fixed_size_list(array, items).await
73 } else {
74 self.encode_primitive(array).await
75 }
76 }
77
78 async fn encode_boolean(&mut self, arrays: &[&BooleanArray]) -> Result<()> {
79 let capacity: usize = arrays.iter().map(|a| a.len()).sum();
80 let mut builder = BooleanBuilder::with_capacity(capacity);
81
82 for array in arrays {
83 for val in array.iter() {
84 builder.append_value(val.unwrap_or_default());
85 }
86 }
87
88 let boolean_array = builder.finish();
89 self.writer
90 .write_all(boolean_array.into_data().buffers()[0].as_slice())
91 .await?;
92 Ok(())
93 }
94
95 async fn encode_primitive(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
97 assert!(!arrays.is_empty());
98 let data_type = arrays[0].data_type();
99 let offset = self.writer.tell().await?;
100
101 if matches!(data_type, DataType::Boolean) {
102 let boolean_arr = arrays
103 .iter()
104 .map(|a| a.as_boolean())
105 .collect::<Vec<&BooleanArray>>();
106 self.encode_boolean(boolean_arr.as_slice()).await?;
107 } else {
108 let byte_width = data_type.byte_width();
109 for a in arrays.iter() {
110 let data = a.to_data();
111 let slice = unsafe {
112 from_raw_parts(
113 data.buffers()[0].as_ptr().add(a.offset() * byte_width),
114 a.len() * byte_width,
115 )
116 };
117 self.writer.write_all(slice).await?;
118 }
119 }
120 Ok(offset)
121 }
122
123 async fn encode_fixed_size_list(
125 &mut self,
126 arrays: &[&dyn Array],
127 items: &Field,
128 ) -> Result<usize> {
129 let mut value_arrs: Vec<ArrayRef> = Vec::new();
130
131 for array in arrays {
132 let list_array = array
133 .as_any()
134 .downcast_ref::<FixedSizeListArray>()
135 .ok_or_else(|| Error::Schema {
136 message: format!("Needed a FixedSizeListArray but got {}", array.data_type()),
137 location: location!(),
138 })?;
139 let offset = list_array.value_offset(0) as usize;
140 let length = list_array.len();
141 let value_length = list_array.value_length() as usize;
142 let value_array = list_array.values().slice(offset, length * value_length);
143 value_arrs.push(value_array);
144 }
145
146 self.encode_internal(
147 value_arrs
148 .iter()
149 .map(|a| a.as_ref())
150 .collect::<Vec<_>>()
151 .as_slice(),
152 items.data_type(),
153 )
154 .await
155 }
156}
157
158pub struct PlainDecoder<'a> {
160 reader: &'a dyn Reader,
161 data_type: &'a DataType,
162 position: usize,
164 length: usize,
166}
167
168#[inline]
170fn get_byte_range(data_type: &DataType, row_range: Range<usize>) -> Range<usize> {
171 match data_type {
172 DataType::Boolean => row_range.start / 8..bit_util::ceil(row_range.end, 8),
173 _ => row_range.start * data_type.byte_width()..row_range.end * data_type.byte_width(),
174 }
175}
176
177pub fn bytes_to_array(
178 data_type: &DataType,
179 bytes: Bytes,
180 len: usize,
181 offset: usize,
182) -> Result<ArrayRef> {
183 let layout = layout(data_type);
184
185 if layout.buffers.len() != 1 {
186 return Err(Error::Internal {
187 message: format!(
188 "Can only convert datatypes that require one buffer, found {:?}",
189 data_type
190 ),
191 location: location!(),
192 });
193 }
194
195 let buf: Buffer = if let BufferSpec::FixedWidth {
196 byte_width,
197 alignment,
198 } = &layout.buffers[0]
199 {
200 let len_plus_offset = len + offset;
203 let min_buffer_size = len_plus_offset.saturating_mul(*byte_width);
204
205 if bytes.len() < min_buffer_size {
207 Buffer::copy_bytes_bytes(bytes, min_buffer_size)
208 } else {
209 Buffer::from_bytes_bytes(bytes, *alignment as u64)
210 }
211 } else {
212 Buffer::from_slice_ref(bytes)
214 };
215
216 let array_data = ArrayDataBuilder::new(data_type.clone())
217 .len(len)
218 .offset(offset)
219 .null_count(0)
220 .add_buffer(buf)
221 .build()?;
222 Ok(make_array(array_data))
223}
224
225impl<'a> PlainDecoder<'a> {
226 pub fn new(
227 reader: &'a dyn Reader,
228 data_type: &'a DataType,
229 position: usize,
230 length: usize,
231 ) -> Result<Self> {
232 Ok(PlainDecoder {
233 reader,
234 data_type,
235 position,
236 length,
237 })
238 }
239
240 async fn decode_primitive(&self, start: usize, end: usize) -> Result<ArrayRef> {
243 if end > self.length {
244 return Err(Error::io(
245 format!(
246 "PlainDecoder: request([{}..{}]) out of range: [0..{}]",
247 start, end, self.length
248 ),
249 location!(),
250 ));
251 }
252 let byte_range = get_byte_range(self.data_type, start..end);
253 let range = Range {
254 start: self.position + byte_range.start,
255 end: self.position + byte_range.end,
256 };
257
258 let data = self.reader.get_range(range).await?;
259 let offset = if self.data_type == &DataType::Boolean {
262 start % 8
263 } else {
264 0
265 };
266 bytes_to_array(self.data_type, data, end - start, offset)
267 }
268
269 async fn decode_fixed_size_list(
270 &self,
271 items: &Field,
272 list_size: i32,
273 start: usize,
274 end: usize,
275 ) -> Result<ArrayRef> {
276 if !items.data_type().is_fixed_stride() {
277 return Err(Error::Schema {
278 message: format!(
279 "Items for fixed size list should be primitives but found {}",
280 items.data_type()
281 ),
282 location: location!(),
283 });
284 };
285 let item_decoder = PlainDecoder::new(
286 self.reader,
287 items.data_type(),
288 self.position,
289 self.length * list_size as usize,
290 )?;
291 let item_array = item_decoder
292 .get(start * list_size as usize..end * list_size as usize)
293 .await?;
294 Ok(Arc::new(FixedSizeListArray::new(
295 Arc::new(items.clone()),
296 list_size,
297 item_array,
298 None,
299 )) as ArrayRef)
300 }
301
302 async fn decode_fixed_size_binary(
303 &self,
304 stride: i32,
305 start: usize,
306 end: usize,
307 ) -> Result<ArrayRef> {
308 let bytes_decoder = PlainDecoder::new(
309 self.reader,
310 &DataType::UInt8,
311 self.position,
312 self.length * stride as usize,
313 )?;
314 let bytes_array = bytes_decoder
315 .get(start * stride as usize..end * stride as usize)
316 .await?;
317 let values = bytes_array
318 .as_any()
319 .downcast_ref::<UInt8Array>()
320 .ok_or_else(|| Error::Schema {
321 message: "Could not cast to UInt8Array for FixedSizeBinary".to_string(),
322 location: location!(),
323 })?;
324 Ok(Arc::new(FixedSizeBinaryArray::try_new_from_values(values, stride)?) as ArrayRef)
325 }
326
327 async fn take_boolean(&self, indices: &UInt32Array) -> Result<ArrayRef> {
328 let block_size = self.reader.block_size() as u32;
329 let boolean_block_size = block_size * 8;
330
331 let mut chunk_ranges = vec![];
332 let mut start: u32 = 0;
333 for j in 0..(indices.len() - 1) as u32 {
334 if (indices.value(j as usize + 1) / boolean_block_size)
335 > (indices.value(start as usize) / boolean_block_size)
336 {
337 let next_start = j + 1;
338 chunk_ranges.push(start..next_start);
339 start = next_start;
340 }
341 }
342 chunk_ranges.push(start..indices.len() as u32);
344
345 let arrays = stream::iter(chunk_ranges)
346 .map(|cr| async move {
347 let request = indices.slice(cr.start as usize, cr.len());
348 let start = request.value(0);
352 let end = request.value(request.len() - 1);
354 let array = self.get(start as usize..end as usize + 1).await?;
355
356 let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?;
357 Ok::<ArrayRef, Error>(take(&array, &shifted_indices, None)?)
358 })
359 .buffered(self.reader.io_parallelism())
360 .try_collect::<Vec<_>>()
361 .await?;
362 let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
363 Ok(concat(&references)?)
364 }
365}
366
367fn make_chunked_requests(
368 indices: &[u32],
369 byte_width: usize,
370 block_size: usize,
371) -> Vec<Range<usize>> {
372 let mut chunked_ranges = vec![];
373 let mut start: usize = 0;
374 for i in 0..indices.len() - 1 {
381 if indices[i + 1] == indices[i] + 1 {
383 continue;
384 }
385 if indices[i + 1] as usize * byte_width > indices[start] as usize * byte_width + block_size
386 {
387 chunked_ranges.push(start..i + 1);
388 start = i + 1;
389 }
390 }
391 chunked_ranges.push(start..indices.len());
392 chunked_ranges
393}
394
395#[async_trait]
396impl Decoder for PlainDecoder<'_> {
397 async fn decode(&self) -> Result<ArrayRef> {
398 self.get(0..self.length).await
399 }
400
401 async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
402 if indices.is_empty() {
403 return Ok(new_empty_array(self.data_type));
404 }
405
406 if matches!(self.data_type, DataType::Boolean) {
407 return self.take_boolean(indices).await;
408 }
409
410 let block_size = self.reader.block_size();
411 let byte_width = self.data_type.byte_width();
412
413 let chunked_ranges = make_chunked_requests(indices.values(), byte_width, block_size);
414
415 let arrays = stream::iter(chunked_ranges)
416 .map(|cr| async move {
417 let request = indices.slice(cr.start, cr.len());
418
419 let start = request.value(0);
420 let end = request.value(request.len() - 1);
421 let array = self.get(start as usize..end as usize + 1).await?;
422 let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?;
423 Ok::<ArrayRef, Error>(take(&array, &adjusted_offsets, None)?)
424 })
425 .buffered(self.reader.io_parallelism())
426 .try_collect::<Vec<_>>()
427 .await?;
428 let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
429 Ok(concat(&references)?)
430 }
431}
432
433#[async_trait]
434impl AsyncIndex<usize> for PlainDecoder<'_> {
435 type Output = Result<ArrayRef>;
437
438 async fn get(&self, index: usize) -> Self::Output {
439 self.get(index..index + 1).await
440 }
441}
442
443#[async_trait]
444impl AsyncIndex<Range<usize>> for PlainDecoder<'_> {
445 type Output = Result<ArrayRef>;
446
447 async fn get(&self, index: Range<usize>) -> Self::Output {
448 if index.is_empty() {
449 return Ok(new_empty_array(self.data_type));
450 }
451 match self.data_type {
452 DataType::FixedSizeList(items, list_size) => {
453 self.decode_fixed_size_list(items, *list_size, index.start, index.end)
454 .await
455 }
456 DataType::FixedSizeBinary(stride) => {
457 self.decode_fixed_size_binary(*stride, index.start, index.end)
458 .await
459 }
460 _ => self.decode_primitive(index.start, index.end).await,
461 }
462 }
463}
464
465#[async_trait]
466impl AsyncIndex<RangeFrom<usize>> for PlainDecoder<'_> {
467 type Output = Result<ArrayRef>;
468
469 async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
470 self.get(index.start..self.length).await
471 }
472}
473
474#[async_trait]
475impl AsyncIndex<RangeTo<usize>> for PlainDecoder<'_> {
476 type Output = Result<ArrayRef>;
477
478 async fn get(&self, index: RangeTo<usize>) -> Self::Output {
479 self.get(0..index.end).await
480 }
481}
482
483#[async_trait]
484impl AsyncIndex<RangeFull> for PlainDecoder<'_> {
485 type Output = Result<ArrayRef>;
486
487 async fn get(&self, _: RangeFull) -> Self::Output {
488 self.get(0..self.length).await
489 }
490}
491
492#[async_trait]
493impl AsyncIndex<ReadBatchParams> for PlainDecoder<'_> {
494 type Output = Result<ArrayRef>;
495
496 async fn get(&self, params: ReadBatchParams) -> Self::Output {
497 match params {
498 ReadBatchParams::Range(r) => self.get(r).await,
499 ReadBatchParams::Ranges(_) => unimplemented!(),
501 ReadBatchParams::RangeFull => self.get(..).await,
502 ReadBatchParams::RangeTo(r) => self.get(r).await,
503 ReadBatchParams::RangeFrom(r) => self.get(r).await,
504 ReadBatchParams::Indices(indices) => self.take(&indices).await,
505 }
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use std::ops::Deref;
512
513 use arrow_array::*;
514 use lance_core::utils::tempfile::TempStdFile;
515 use rand::prelude::*;
516
517 use super::*;
518 use crate::local::LocalObjectReader;
519
520 #[tokio::test]
521 async fn test_encode_decode_primitive_array() {
522 let int_types = vec![
523 DataType::Int8,
524 DataType::Int16,
525 DataType::Int32,
526 DataType::Int64,
527 DataType::UInt8,
528 DataType::UInt16,
529 DataType::UInt32,
530 DataType::UInt64,
531 ];
532 let input: Vec<i64> = Vec::from_iter(1..127_i64);
533 for t in int_types {
534 let buffer = Buffer::from_slice_ref(input.as_slice());
535 let mut arrs: Vec<ArrayRef> = Vec::new();
536 for _ in 0..10 {
537 arrs.push(Arc::new(make_array_(&t, &buffer).await));
538 }
539 test_round_trip(arrs.as_slice(), t).await;
540 }
541
542 let float_types = vec![DataType::Float16, DataType::Float32, DataType::Float64];
543 let mut rng = rand::rng();
544 let input: Vec<f64> = (1..127).map(|_| rng.random()).collect();
545 for t in float_types {
546 let buffer = Buffer::from_slice_ref(input.as_slice());
547 let mut arrs: Vec<ArrayRef> = Vec::new();
548
549 for _ in 0..10 {
550 arrs.push(Arc::new(make_array_(&t, &buffer).await));
551 }
552 test_round_trip(arrs.as_slice(), t).await;
553 }
554 }
555
556 async fn test_round_trip(expected: &[ArrayRef], data_type: DataType) {
557 let path = TempStdFile::default();
558
559 let expected_as_array = expected
560 .iter()
561 .map(|e| e.as_ref())
562 .collect::<Vec<&dyn Array>>();
563 {
564 let mut writer = tokio::fs::File::create(&path).await.unwrap();
565 let mut encoder = PlainEncoder::new(&mut writer, &data_type);
566 assert_eq!(
567 encoder.encode(expected_as_array.as_slice()).await.unwrap(),
568 0
569 );
570 writer.flush().await.unwrap();
571 }
572
573 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
574 .await
575 .unwrap();
576 assert!(reader.size().await.unwrap() > 0);
577 let expected_size = expected.iter().map(|e| e.len()).sum();
579 let decoder = PlainDecoder::new(reader.as_ref(), &data_type, 0, expected_size).unwrap();
580 let arr = decoder.decode().await.unwrap();
581 let actual = arr.as_ref();
582 let expected_merged = concat(expected_as_array.as_slice()).unwrap();
583 assert_eq!(expected_merged.deref(), actual);
584 assert_eq!(expected_size, actual.len());
585 }
586
587 #[tokio::test]
588 async fn test_encode_decode_bool_array() {
589 let mut arrs: Vec<ArrayRef> = Vec::new();
590
591 for _ in 0..10 {
592 arrs.push(Arc::new(BooleanArray::from(vec![true, true, true])) as ArrayRef);
594 }
595 test_round_trip(arrs.as_slice(), DataType::Boolean).await;
596 }
597
598 #[tokio::test]
599 async fn test_encode_decode_fixed_size_list_array() {
600 let int_types = vec![
601 DataType::Int8,
602 DataType::Int16,
603 DataType::Int32,
604 DataType::Int64,
605 DataType::UInt8,
606 DataType::UInt16,
607 DataType::UInt32,
608 DataType::UInt64,
609 ];
610 let input = Vec::from_iter(1..127_i64);
611 for t in int_types {
612 let buffer = Buffer::from_slice_ref(input.as_slice());
613 let list_type =
614 DataType::FixedSizeList(Arc::new(Field::new("item", t.clone(), true)), 3);
615 let mut arrs: Vec<ArrayRef> = Vec::new();
616
617 for _ in 0..10 {
618 let items = make_array_(&t.clone(), &buffer).await;
619 let arr = FixedSizeListArray::try_new_from_values(items, 3).unwrap();
620 arrs.push(Arc::new(arr) as ArrayRef);
621 }
622 test_round_trip(arrs.as_slice(), list_type).await;
623 }
624 }
625
626 #[tokio::test]
627 async fn test_encode_decode_fixed_size_binary_array() {
628 let t = DataType::FixedSizeBinary(3);
629 let mut arrs: Vec<ArrayRef> = Vec::new();
630
631 for _ in 0..10 {
632 let values = UInt8Array::from(Vec::from_iter(1..127_u8));
633 let arr = FixedSizeBinaryArray::try_new_from_values(&values, 3).unwrap();
634 arrs.push(Arc::new(arr) as ArrayRef);
635 }
636 test_round_trip(arrs.as_slice(), t).await;
637 }
638
639 #[tokio::test]
640 async fn test_bytes_to_array_padding() {
641 let bytes = Bytes::from_static(&[0x01, 0x00, 0x02, 0x00, 0x03]);
642 let arr = bytes_to_array(&DataType::UInt16, bytes, 3, 0).unwrap();
643
644 let expected = UInt16Array::from(vec![1, 2, 3]);
645 assert_eq!(arr.as_ref(), &expected);
646
647 let data = arr.to_data();
649 let buf = &data.buffers()[0];
650 let repr = format!("{:?}", buf);
651 assert!(
652 repr.contains("[1, 0, 2, 0, 3, 0]"),
653 "Underlying buffer contains unexpected data: {}",
654 repr
655 );
656 }
657
658 #[tokio::test]
659 async fn test_encode_decode_nested_fixed_size_list() {
660 let inner = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
662 let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
663 let mut arrs: Vec<ArrayRef> = Vec::new();
664
665 for _ in 0..10 {
666 let values = Int64Array::from_iter_values(1..=120_i64);
667 let arr = FixedSizeListArray::try_new_from_values(
668 FixedSizeListArray::try_new_from_values(values, 2).unwrap(),
669 2,
670 )
671 .unwrap();
672 arrs.push(Arc::new(arr) as ArrayRef);
673 }
674 test_round_trip(arrs.as_slice(), t).await;
675
676 let inner = DataType::FixedSizeBinary(2);
678 let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
679 let mut arrs: Vec<ArrayRef> = Vec::new();
680
681 for _ in 0..10 {
682 let values = UInt8Array::from_iter_values(1..=120_u8);
683 let arr = FixedSizeListArray::try_new_from_values(
684 FixedSizeBinaryArray::try_new_from_values(&values, 2).unwrap(),
685 2,
686 )
687 .unwrap();
688 arrs.push(Arc::new(arr) as ArrayRef);
689 }
690 test_round_trip(arrs.as_slice(), t).await;
691 }
692
693 async fn make_array_(data_type: &DataType, buffer: &Buffer) -> ArrayRef {
694 make_array(
695 ArrayDataBuilder::new(data_type.clone())
696 .len(126)
697 .add_buffer(buffer.clone())
698 .build()
699 .unwrap(),
700 )
701 }
702
703 #[tokio::test]
704 async fn test_decode_by_range() {
705 let path = TempStdFile::default();
706
707 let array = Int32Array::from_iter_values([0, 1, 2, 3, 4, 5]);
708 {
709 let mut writer = tokio::fs::File::create(&path).await.unwrap();
710 let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
711 assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
712 writer.flush().await.unwrap();
713 }
714
715 let reader = LocalObjectReader::open_local_path(&path, 2048, None)
716 .await
717 .unwrap();
718 assert!(reader.size().await.unwrap() > 0);
719 let decoder =
720 PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
721 assert_eq!(
722 decoder.get(2..4).await.unwrap().as_ref(),
723 &Int32Array::from_iter_values([2, 3])
724 );
725
726 assert_eq!(
727 decoder.get(..4).await.unwrap().as_ref(),
728 &Int32Array::from_iter_values([0, 1, 2, 3])
729 );
730
731 assert_eq!(
732 decoder.get(2..).await.unwrap().as_ref(),
733 &Int32Array::from_iter_values([2, 3, 4, 5])
734 );
735
736 assert_eq!(
737 &decoder.get(2..2).await.unwrap(),
738 &new_empty_array(&DataType::Int32)
739 );
740
741 assert_eq!(
742 &decoder.get(5..5).await.unwrap(),
743 &new_empty_array(&DataType::Int32)
744 );
745
746 assert!(decoder.get(3..1000).await.is_err());
747 }
748
749 #[tokio::test]
750 async fn test_take() {
751 let path = TempStdFile::default();
752
753 let array = Int32Array::from_iter_values(0..100);
754
755 {
756 let mut writer = tokio::fs::File::create(&path).await.unwrap();
757 let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
758 assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
759 writer.shutdown().await.unwrap();
760 }
761
762 let reader = LocalObjectReader::open_local_path(&path, 2048, None)
763 .await
764 .unwrap();
765 assert!(reader.size().await.unwrap() > 0);
766 let decoder =
767 PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
768
769 let results = decoder
770 .take(&UInt32Array::from_iter(
771 [2, 4, 5, 20, 30, 55, 60].iter().map(|i| *i as u32),
772 ))
773 .await
774 .unwrap();
775 assert_eq!(
776 results.as_ref(),
777 &Int32Array::from_iter_values([2, 4, 5, 20, 30, 55, 60])
778 );
779 }
780
781 #[test]
898 fn test_make_chunked_request() {
899 let byte_width: usize = 4096; let prefetch_size: usize = 64 * 1024; let u32_overflow: usize = u32::MAX as usize + 10;
902
903 let indices: Vec<u32> = vec![
904 1,
905 10,
906 20,
907 100,
908 120,
909 (u32_overflow / byte_width) as u32, (u32_overflow / byte_width) as u32 + 100,
911 ];
912 let chunks = make_chunked_requests(&indices, byte_width, prefetch_size);
913 assert_eq!(chunks.len(), 6, "got chunks: {:?}", chunks);
914 assert_eq!(chunks, vec![(0..2), (2..3), (3..4), (4..5), (5..6), (6..7)])
915 }
916}