vortex_serde/chunked_reader/
mod.rs1use std::io::Cursor;
2use std::sync::Arc;
3
4use vortex_array::compute::unary::scalar_at;
5use vortex_array::stream::ArrayStream;
6use vortex_array::{Array, Context};
7use vortex_dtype::DType;
8use vortex_error::{vortex_bail, VortexExpect as _, VortexResult};
9
10use crate::io::VortexReadAt;
11use crate::stream_reader::StreamArrayReader;
12
13mod take_rows;
14
15pub struct ChunkedArrayReader<R: VortexReadAt> {
17 read: R,
18 context: Arc<Context>,
19 dtype: Arc<DType>,
20
21 byte_offsets: Array,
23 row_offsets: Array,
24}
25
26impl<R: VortexReadAt> ChunkedArrayReader<R> {
27 pub fn try_new(
28 read: R,
29 context: Arc<Context>,
30 dtype: Arc<DType>,
31 byte_offsets: Array,
32 row_offsets: Array,
33 ) -> VortexResult<Self> {
34 Self::validate(&byte_offsets, &row_offsets)?;
35 Ok(Self {
36 read,
37 context,
38 dtype,
39 byte_offsets,
40 row_offsets,
41 })
42 }
43
44 pub fn nchunks(&self) -> usize {
45 self.byte_offsets.len()
46 }
47
48 fn validate(byte_offsets: &Array, row_offsets: &Array) -> VortexResult<()> {
49 if byte_offsets.len() != row_offsets.len() {
50 vortex_bail!("byte_offsets and row_offsets must have the same length");
51 }
52 Ok(())
53 }
54
55 pub async fn array_stream(&mut self) -> impl ArrayStream + '_ {
56 let mut cursor = Cursor::new(&self.read);
57 let byte_offset = scalar_at(&self.byte_offsets, 0)
58 .and_then(|s| u64::try_from(&s))
59 .vortex_expect("Failed to convert byte_offset to u64");
60
61 cursor.set_position(byte_offset);
62 StreamArrayReader::try_new(cursor, self.context.clone())
63 .await
64 .vortex_expect("Failed to create stream array reader")
65 .with_dtype(self.dtype.clone())
66 .into_array_stream()
67 }
68}