1use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
10use std::slice::from_raw_parts;
11use std::sync::Arc;
12
13use crate::{
14 ReadBatchParams,
15 traits::{Reader, Writer},
16};
17use arrow_arith::numeric::sub;
18use arrow_array::{
19 Array, ArrayRef, BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, UInt8Array,
20 UInt32Array, builder::BooleanBuilder, cast::AsArray, make_array, new_empty_array,
21};
22use arrow_buffer::{Buffer, bit_util};
23use arrow_data::{ArrayDataBuilder, BufferSpec, layout};
24use arrow_schema::{DataType, Field};
25use arrow_select::{concat::concat, take::take};
26use async_recursion::async_recursion;
27use async_trait::async_trait;
28use bytes::Bytes;
29use futures::stream::{self, StreamExt, TryStreamExt};
30use lance_arrow::*;
31use lance_core::{Error, Result};
32use tokio::io::AsyncWriteExt;
33
34use crate::encodings::{AsyncIndex, Decoder};
35
36pub struct PlainEncoder<'a> {
39 writer: &'a mut dyn Writer,
40 data_type: &'a DataType,
41}
42
43impl<'a> PlainEncoder<'a> {
44 pub fn new(writer: &'a mut dyn Writer, data_type: &'a DataType) -> Self {
45 PlainEncoder { writer, data_type }
46 }
47
48 pub async fn write(writer: &'a mut dyn Writer, arrays: &[&'a dyn Array]) -> Result<usize> {
50 let pos = writer.tell().await?;
51 if !arrays.is_empty() {
52 let mut encoder = Self::new(writer, arrays[0].data_type());
53 encoder.encode(arrays).await?;
54 }
55 Ok(pos)
56 }
57
58 pub async fn encode(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
61 self.encode_internal(arrays, self.data_type).await
62 }
63
64 #[async_recursion]
65 async fn encode_internal(
66 &mut self,
67 array: &[&dyn Array],
68 data_type: &DataType,
69 ) -> Result<usize> {
70 if let DataType::FixedSizeList(items, _) = data_type {
71 self.encode_fixed_size_list(array, items).await
72 } else {
73 self.encode_primitive(array).await
74 }
75 }
76
77 async fn encode_boolean(&mut self, arrays: &[&BooleanArray]) -> Result<()> {
78 let capacity: usize = arrays.iter().map(|a| a.len()).sum();
79 let mut builder = BooleanBuilder::with_capacity(capacity);
80
81 for array in arrays {
82 for val in array.iter() {
83 builder.append_value(val.unwrap_or_default());
84 }
85 }
86
87 let boolean_array = builder.finish();
88 self.writer
89 .write_all(boolean_array.into_data().buffers()[0].as_slice())
90 .await?;
91 Ok(())
92 }
93
94 async fn encode_primitive(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
96 assert!(!arrays.is_empty());
97 let data_type = arrays[0].data_type();
98 let offset = self.writer.tell().await?;
99
100 if matches!(data_type, DataType::Boolean) {
101 let boolean_arr = arrays
102 .iter()
103 .map(|a| a.as_boolean())
104 .collect::<Vec<&BooleanArray>>();
105 self.encode_boolean(boolean_arr.as_slice()).await?;
106 } else {
107 let byte_width = data_type.byte_width();
108 for a in arrays.iter() {
109 let data = a.to_data();
110 let slice = unsafe {
111 from_raw_parts(
112 data.buffers()[0].as_ptr().add(a.offset() * byte_width),
113 a.len() * byte_width,
114 )
115 };
116 self.writer.write_all(slice).await?;
117 }
118 }
119 Ok(offset)
120 }
121
122 async fn encode_fixed_size_list(
124 &mut self,
125 arrays: &[&dyn Array],
126 items: &Field,
127 ) -> Result<usize> {
128 let mut value_arrs: Vec<ArrayRef> = Vec::new();
129
130 for array in arrays {
131 let list_array = array
132 .as_any()
133 .downcast_ref::<FixedSizeListArray>()
134 .ok_or_else(|| {
135 Error::schema(format!(
136 "Needed a FixedSizeListArray but got {}",
137 array.data_type()
138 ))
139 })?;
140 let offset = list_array.value_offset(0) as usize;
141 let length = list_array.len();
142 let value_length = list_array.value_length() as usize;
143 let value_array = list_array.values().slice(offset, length * value_length);
144 value_arrs.push(value_array);
145 }
146
147 self.encode_internal(
148 value_arrs
149 .iter()
150 .map(|a| a.as_ref())
151 .collect::<Vec<_>>()
152 .as_slice(),
153 items.data_type(),
154 )
155 .await
156 }
157}
158
159pub struct PlainDecoder<'a> {
161 reader: &'a dyn Reader,
162 data_type: &'a DataType,
163 position: usize,
165 length: usize,
167}
168
169#[inline]
171fn get_byte_range(data_type: &DataType, row_range: Range<usize>) -> Range<usize> {
172 match data_type {
173 DataType::Boolean => row_range.start / 8..bit_util::ceil(row_range.end, 8),
174 _ => row_range.start * data_type.byte_width()..row_range.end * data_type.byte_width(),
175 }
176}
177
178pub fn bytes_to_array(
179 data_type: &DataType,
180 bytes: Bytes,
181 len: usize,
182 offset: usize,
183) -> Result<ArrayRef> {
184 let layout = layout(data_type);
185
186 if layout.buffers.len() != 1 {
187 return Err(Error::internal(format!(
188 "Can only convert datatypes that require one buffer, found {:?}",
189 data_type
190 )));
191 }
192
193 let buf: Buffer = if let BufferSpec::FixedWidth {
194 byte_width,
195 alignment,
196 } = &layout.buffers[0]
197 {
198 let len_plus_offset = len + offset;
201 let min_buffer_size = len_plus_offset.saturating_mul(*byte_width);
202
203 if bytes.len() < min_buffer_size {
205 Buffer::copy_bytes_bytes(bytes, min_buffer_size)
206 } else {
207 Buffer::from_bytes_bytes(bytes, *alignment as u64)
208 }
209 } else {
210 Buffer::from_slice_ref(bytes)
212 };
213
214 let array_data = ArrayDataBuilder::new(data_type.clone())
215 .len(len)
216 .offset(offset)
217 .null_count(0)
218 .add_buffer(buf)
219 .build()?;
220 Ok(make_array(array_data))
221}
222
223impl<'a> PlainDecoder<'a> {
224 pub fn new(
225 reader: &'a dyn Reader,
226 data_type: &'a DataType,
227 position: usize,
228 length: usize,
229 ) -> Result<Self> {
230 Ok(PlainDecoder {
231 reader,
232 data_type,
233 position,
234 length,
235 })
236 }
237
238 async fn decode_primitive(&self, start: usize, end: usize) -> Result<ArrayRef> {
241 if end > self.length {
242 return Err(Error::invalid_input(format!(
243 "PlainDecoder: request([{}..{}]) out of range: [0..{}]",
244 start, end, self.length
245 )));
246 }
247 let byte_range = get_byte_range(self.data_type, start..end);
248 let range = Range {
249 start: self.position + byte_range.start,
250 end: self.position + byte_range.end,
251 };
252
253 let data = self.reader.get_range(range).await?;
254 let offset = if self.data_type == &DataType::Boolean {
257 start % 8
258 } else {
259 0
260 };
261 bytes_to_array(self.data_type, data, end - start, offset)
262 }
263
264 async fn decode_fixed_size_list(
265 &self,
266 items: &Field,
267 list_size: i32,
268 start: usize,
269 end: usize,
270 ) -> Result<ArrayRef> {
271 if !items.data_type().is_fixed_stride() {
272 return Err(Error::schema(format!(
273 "Items for fixed size list should be primitives but found {}",
274 items.data_type()
275 )));
276 };
277 let item_decoder = PlainDecoder::new(
278 self.reader,
279 items.data_type(),
280 self.position,
281 self.length * list_size as usize,
282 )?;
283 let item_array = item_decoder
284 .get(start * list_size as usize..end * list_size as usize)
285 .await?;
286 Ok(Arc::new(FixedSizeListArray::new(
287 Arc::new(items.clone()),
288 list_size,
289 item_array,
290 None,
291 )) as ArrayRef)
292 }
293
294 async fn decode_fixed_size_binary(
295 &self,
296 stride: i32,
297 start: usize,
298 end: usize,
299 ) -> Result<ArrayRef> {
300 let bytes_decoder = PlainDecoder::new(
301 self.reader,
302 &DataType::UInt8,
303 self.position,
304 self.length * stride as usize,
305 )?;
306 let bytes_array = bytes_decoder
307 .get(start * stride as usize..end * stride as usize)
308 .await?;
309 let values = bytes_array
310 .as_any()
311 .downcast_ref::<UInt8Array>()
312 .ok_or_else(|| {
313 Error::schema("Could not cast to UInt8Array for FixedSizeBinary".to_string())
314 })?;
315 Ok(Arc::new(FixedSizeBinaryArray::try_new_from_values(values, stride)?) as ArrayRef)
316 }
317
318 async fn take_boolean(&self, indices: &UInt32Array) -> Result<ArrayRef> {
319 let block_size = self.reader.block_size() as u32;
320 let boolean_block_size = block_size * 8;
321
322 let mut chunk_ranges = vec![];
323 let mut start: u32 = 0;
324 for j in 0..(indices.len() - 1) as u32 {
325 if (indices.value(j as usize + 1) / boolean_block_size)
326 > (indices.value(start as usize) / boolean_block_size)
327 {
328 let next_start = j + 1;
329 chunk_ranges.push(start..next_start);
330 start = next_start;
331 }
332 }
333 chunk_ranges.push(start..indices.len() as u32);
335
336 let arrays = stream::iter(chunk_ranges)
337 .map(|cr| async move {
338 let request = indices.slice(cr.start as usize, cr.len());
339 let start = request.value(0);
343 let end = request.value(request.len() - 1);
345 let array = self.get(start as usize..end as usize + 1).await?;
346
347 let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?;
348 Ok::<ArrayRef, Error>(take(&array, &shifted_indices, None)?)
349 })
350 .buffered(self.reader.io_parallelism())
351 .try_collect::<Vec<_>>()
352 .await?;
353 let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
354 Ok(concat(&references)?)
355 }
356}
357
358fn make_chunked_requests(
359 indices: &[u32],
360 byte_width: usize,
361 block_size: usize,
362) -> Vec<Range<usize>> {
363 let mut chunked_ranges = vec![];
364 let mut start: usize = 0;
365 for i in 0..indices.len() - 1 {
372 if indices[i + 1] == indices[i] + 1 {
374 continue;
375 }
376 if indices[i + 1] as usize * byte_width > indices[start] as usize * byte_width + block_size
377 {
378 chunked_ranges.push(start..i + 1);
379 start = i + 1;
380 }
381 }
382 chunked_ranges.push(start..indices.len());
383 chunked_ranges
384}
385
386#[async_trait]
387impl Decoder for PlainDecoder<'_> {
388 async fn decode(&self) -> Result<ArrayRef> {
389 self.get(0..self.length).await
390 }
391
392 async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
393 if indices.is_empty() {
394 return Ok(new_empty_array(self.data_type));
395 }
396
397 if matches!(self.data_type, DataType::Boolean) {
398 return self.take_boolean(indices).await;
399 }
400
401 let block_size = self.reader.block_size();
402 let byte_width = self.data_type.byte_width();
403
404 let chunked_ranges = make_chunked_requests(indices.values(), byte_width, block_size);
405
406 let arrays = stream::iter(chunked_ranges)
407 .map(|cr| async move {
408 let request = indices.slice(cr.start, cr.len());
409
410 let start = request.value(0);
411 let end = request.value(request.len() - 1);
412 let array = self.get(start as usize..end as usize + 1).await?;
413 let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?;
414 Ok::<ArrayRef, Error>(take(&array, &adjusted_offsets, None)?)
415 })
416 .buffered(self.reader.io_parallelism())
417 .try_collect::<Vec<_>>()
418 .await?;
419 let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
420 Ok(concat(&references)?)
421 }
422}
423
424#[async_trait]
425impl AsyncIndex<usize> for PlainDecoder<'_> {
426 type Output = Result<ArrayRef>;
428
429 async fn get(&self, index: usize) -> Self::Output {
430 self.get(index..index + 1).await
431 }
432}
433
434#[async_trait]
435impl AsyncIndex<Range<usize>> for PlainDecoder<'_> {
436 type Output = Result<ArrayRef>;
437
438 async fn get(&self, index: Range<usize>) -> Self::Output {
439 if index.is_empty() {
440 return Ok(new_empty_array(self.data_type));
441 }
442 match self.data_type {
443 DataType::FixedSizeList(items, list_size) => {
444 self.decode_fixed_size_list(items, *list_size, index.start, index.end)
445 .await
446 }
447 DataType::FixedSizeBinary(stride) => {
448 self.decode_fixed_size_binary(*stride, index.start, index.end)
449 .await
450 }
451 _ => self.decode_primitive(index.start, index.end).await,
452 }
453 }
454}
455
456#[async_trait]
457impl AsyncIndex<RangeFrom<usize>> for PlainDecoder<'_> {
458 type Output = Result<ArrayRef>;
459
460 async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
461 self.get(index.start..self.length).await
462 }
463}
464
465#[async_trait]
466impl AsyncIndex<RangeTo<usize>> for PlainDecoder<'_> {
467 type Output = Result<ArrayRef>;
468
469 async fn get(&self, index: RangeTo<usize>) -> Self::Output {
470 self.get(0..index.end).await
471 }
472}
473
474#[async_trait]
475impl AsyncIndex<RangeFull> for PlainDecoder<'_> {
476 type Output = Result<ArrayRef>;
477
478 async fn get(&self, _: RangeFull) -> Self::Output {
479 self.get(0..self.length).await
480 }
481}
482
483#[async_trait]
484impl AsyncIndex<ReadBatchParams> for PlainDecoder<'_> {
485 type Output = Result<ArrayRef>;
486
487 async fn get(&self, params: ReadBatchParams) -> Self::Output {
488 match params {
489 ReadBatchParams::Range(r) => self.get(r).await,
490 ReadBatchParams::Ranges(_) => unimplemented!(),
492 ReadBatchParams::RangeFull => self.get(..).await,
493 ReadBatchParams::RangeTo(r) => self.get(r).await,
494 ReadBatchParams::RangeFrom(r) => self.get(r).await,
495 ReadBatchParams::Indices(indices) => self.take(&indices).await,
496 }
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use std::ops::Deref;
503
504 use arrow_array::*;
505 use lance_core::utils::tempfile::TempStdFile;
506 use rand::prelude::*;
507
508 use super::*;
509 use crate::local::LocalObjectReader;
510
511 #[tokio::test]
512 async fn test_encode_decode_primitive_array() {
513 let int_types = vec![
514 DataType::Int8,
515 DataType::Int16,
516 DataType::Int32,
517 DataType::Int64,
518 DataType::UInt8,
519 DataType::UInt16,
520 DataType::UInt32,
521 DataType::UInt64,
522 ];
523 let input: Vec<i64> = Vec::from_iter(1..127_i64);
524 for t in int_types {
525 let buffer = Buffer::from_slice_ref(input.as_slice());
526 let mut arrs: Vec<ArrayRef> = Vec::new();
527 for _ in 0..10 {
528 arrs.push(Arc::new(make_array_(&t, &buffer).await));
529 }
530 test_round_trip(arrs.as_slice(), t).await;
531 }
532
533 let float_types = vec![DataType::Float16, DataType::Float32, DataType::Float64];
534 let mut rng = rand::rng();
535 let input: Vec<f64> = (1..127).map(|_| rng.random()).collect();
536 for t in float_types {
537 let buffer = Buffer::from_slice_ref(input.as_slice());
538 let mut arrs: Vec<ArrayRef> = Vec::new();
539
540 for _ in 0..10 {
541 arrs.push(Arc::new(make_array_(&t, &buffer).await));
542 }
543 test_round_trip(arrs.as_slice(), t).await;
544 }
545 }
546
547 async fn test_round_trip(expected: &[ArrayRef], data_type: DataType) {
548 let path = TempStdFile::default();
549
550 let expected_as_array = expected
551 .iter()
552 .map(|e| e.as_ref())
553 .collect::<Vec<&dyn Array>>();
554 {
555 let mut writer = tokio::fs::File::create(&path).await.unwrap();
556 let mut encoder = PlainEncoder::new(&mut writer, &data_type);
557 assert_eq!(
558 encoder.encode(expected_as_array.as_slice()).await.unwrap(),
559 0
560 );
561 writer.flush().await.unwrap();
562 }
563
564 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
565 .await
566 .unwrap();
567 assert!(reader.size().await.unwrap() > 0);
568 let expected_size = expected.iter().map(|e| e.len()).sum();
570 let decoder = PlainDecoder::new(reader.as_ref(), &data_type, 0, expected_size).unwrap();
571 let arr = decoder.decode().await.unwrap();
572 let actual = arr.as_ref();
573 let expected_merged = concat(expected_as_array.as_slice()).unwrap();
574 assert_eq!(expected_merged.deref(), actual);
575 assert_eq!(expected_size, actual.len());
576 }
577
578 #[tokio::test]
579 async fn test_encode_decode_bool_array() {
580 let mut arrs: Vec<ArrayRef> = Vec::new();
581
582 for _ in 0..10 {
583 arrs.push(Arc::new(BooleanArray::from(vec![true, true, true])) as ArrayRef);
585 }
586 test_round_trip(arrs.as_slice(), DataType::Boolean).await;
587 }
588
589 #[tokio::test]
590 async fn test_encode_decode_fixed_size_list_array() {
591 let int_types = vec![
592 DataType::Int8,
593 DataType::Int16,
594 DataType::Int32,
595 DataType::Int64,
596 DataType::UInt8,
597 DataType::UInt16,
598 DataType::UInt32,
599 DataType::UInt64,
600 ];
601 let input = Vec::from_iter(1..127_i64);
602 for t in int_types {
603 let buffer = Buffer::from_slice_ref(input.as_slice());
604 let list_type =
605 DataType::FixedSizeList(Arc::new(Field::new("item", t.clone(), true)), 3);
606 let mut arrs: Vec<ArrayRef> = Vec::new();
607
608 for _ in 0..10 {
609 let items = make_array_(&t.clone(), &buffer).await;
610 let arr = FixedSizeListArray::try_new_from_values(items, 3).unwrap();
611 arrs.push(Arc::new(arr) as ArrayRef);
612 }
613 test_round_trip(arrs.as_slice(), list_type).await;
614 }
615 }
616
617 #[tokio::test]
618 async fn test_encode_decode_fixed_size_binary_array() {
619 let t = DataType::FixedSizeBinary(3);
620 let mut arrs: Vec<ArrayRef> = Vec::new();
621
622 for _ in 0..10 {
623 let values = UInt8Array::from(Vec::from_iter(1..127_u8));
624 let arr = FixedSizeBinaryArray::try_new_from_values(&values, 3).unwrap();
625 arrs.push(Arc::new(arr) as ArrayRef);
626 }
627 test_round_trip(arrs.as_slice(), t).await;
628 }
629
630 #[tokio::test]
631 async fn test_bytes_to_array_padding() {
632 let bytes = Bytes::from_static(&[0x01, 0x00, 0x02, 0x00, 0x03]);
633 let arr = bytes_to_array(&DataType::UInt16, bytes, 3, 0).unwrap();
634
635 let expected = UInt16Array::from(vec![1, 2, 3]);
636 assert_eq!(arr.as_ref(), &expected);
637
638 let data = arr.to_data();
640 let buf = &data.buffers()[0];
641 let repr = format!("{:?}", buf);
642 assert!(
643 repr.contains("[1, 0, 2, 0, 3, 0]"),
644 "Underlying buffer contains unexpected data: {}",
645 repr
646 );
647 }
648
649 #[tokio::test]
650 async fn test_encode_decode_nested_fixed_size_list() {
651 let inner = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
653 let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
654 let mut arrs: Vec<ArrayRef> = Vec::new();
655
656 for _ in 0..10 {
657 let values = Int64Array::from_iter_values(1..=120_i64);
658 let arr = FixedSizeListArray::try_new_from_values(
659 FixedSizeListArray::try_new_from_values(values, 2).unwrap(),
660 2,
661 )
662 .unwrap();
663 arrs.push(Arc::new(arr) as ArrayRef);
664 }
665 test_round_trip(arrs.as_slice(), t).await;
666
667 let inner = DataType::FixedSizeBinary(2);
669 let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
670 let mut arrs: Vec<ArrayRef> = Vec::new();
671
672 for _ in 0..10 {
673 let values = UInt8Array::from_iter_values(1..=120_u8);
674 let arr = FixedSizeListArray::try_new_from_values(
675 FixedSizeBinaryArray::try_new_from_values(&values, 2).unwrap(),
676 2,
677 )
678 .unwrap();
679 arrs.push(Arc::new(arr) as ArrayRef);
680 }
681 test_round_trip(arrs.as_slice(), t).await;
682 }
683
684 async fn make_array_(data_type: &DataType, buffer: &Buffer) -> ArrayRef {
685 make_array(
686 ArrayDataBuilder::new(data_type.clone())
687 .len(126)
688 .add_buffer(buffer.clone())
689 .build()
690 .unwrap(),
691 )
692 }
693
694 #[tokio::test]
695 async fn test_decode_by_range() {
696 let path = TempStdFile::default();
697
698 let array = Int32Array::from_iter_values([0, 1, 2, 3, 4, 5]);
699 {
700 let mut writer = tokio::fs::File::create(&path).await.unwrap();
701 let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
702 assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
703 writer.flush().await.unwrap();
704 }
705
706 let reader = LocalObjectReader::open_local_path(&path, 2048, None)
707 .await
708 .unwrap();
709 assert!(reader.size().await.unwrap() > 0);
710 let decoder =
711 PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
712 assert_eq!(
713 decoder.get(2..4).await.unwrap().as_ref(),
714 &Int32Array::from_iter_values([2, 3])
715 );
716
717 assert_eq!(
718 decoder.get(..4).await.unwrap().as_ref(),
719 &Int32Array::from_iter_values([0, 1, 2, 3])
720 );
721
722 assert_eq!(
723 decoder.get(2..).await.unwrap().as_ref(),
724 &Int32Array::from_iter_values([2, 3, 4, 5])
725 );
726
727 assert_eq!(
728 &decoder.get(2..2).await.unwrap(),
729 &new_empty_array(&DataType::Int32)
730 );
731
732 assert_eq!(
733 &decoder.get(5..5).await.unwrap(),
734 &new_empty_array(&DataType::Int32)
735 );
736
737 assert!(decoder.get(3..1000).await.is_err());
738 }
739
740 #[tokio::test]
741 async fn test_take() {
742 let path = TempStdFile::default();
743
744 let array = Int32Array::from_iter_values(0..100);
745
746 {
747 let mut writer = tokio::fs::File::create(&path).await.unwrap();
748 let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
749 assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
750 AsyncWriteExt::shutdown(&mut writer).await.unwrap();
751 }
752
753 let reader = LocalObjectReader::open_local_path(&path, 2048, None)
754 .await
755 .unwrap();
756 assert!(reader.size().await.unwrap() > 0);
757 let decoder =
758 PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
759
760 let results = decoder
761 .take(&UInt32Array::from_iter(
762 [2, 4, 5, 20, 30, 55, 60].iter().map(|i| *i as u32),
763 ))
764 .await
765 .unwrap();
766 assert_eq!(
767 results.as_ref(),
768 &Int32Array::from_iter_values([2, 4, 5, 20, 30, 55, 60])
769 );
770 }
771
772 #[test]
889 fn test_make_chunked_request() {
890 let byte_width: usize = 4096; let prefetch_size: usize = 64 * 1024; let u32_overflow: usize = u32::MAX as usize + 10;
893
894 let indices: Vec<u32> = vec![
895 1,
896 10,
897 20,
898 100,
899 120,
900 (u32_overflow / byte_width) as u32, (u32_overflow / byte_width) as u32 + 100,
902 ];
903 let chunks = make_chunked_requests(&indices, byte_width, prefetch_size);
904 assert_eq!(chunks.len(), 6, "got chunks: {:?}", chunks);
905 assert_eq!(chunks, vec![(0..2), (2..3), (3..4), (4..5), (5..6), (6..7)])
906 }
907}