vortex_serde/chunked_reader/
mod.rs

1use std::io::Cursor;
2use std::sync::Arc;
3
4use vortex_array::compute::unary::scalar_at;
5use vortex_array::stream::ArrayStream;
6use vortex_array::{Array, Context};
7use vortex_dtype::DType;
8use vortex_error::{vortex_bail, VortexExpect as _, VortexResult};
9
10use crate::io::VortexReadAt;
11use crate::stream_reader::StreamArrayReader;
12
13mod take_rows;
14
15/// A reader for a chunked array.
16pub struct ChunkedArrayReader<R: VortexReadAt> {
17    read: R,
18    context: Arc<Context>,
19    dtype: Arc<DType>,
20
21    // One row per chunk + 1 row for the end of the last chunk.
22    byte_offsets: Array,
23    row_offsets: Array,
24}
25
26impl<R: VortexReadAt> ChunkedArrayReader<R> {
27    pub fn try_new(
28        read: R,
29        context: Arc<Context>,
30        dtype: Arc<DType>,
31        byte_offsets: Array,
32        row_offsets: Array,
33    ) -> VortexResult<Self> {
34        Self::validate(&byte_offsets, &row_offsets)?;
35        Ok(Self {
36            read,
37            context,
38            dtype,
39            byte_offsets,
40            row_offsets,
41        })
42    }
43
44    pub fn nchunks(&self) -> usize {
45        self.byte_offsets.len()
46    }
47
48    fn validate(byte_offsets: &Array, row_offsets: &Array) -> VortexResult<()> {
49        if byte_offsets.len() != row_offsets.len() {
50            vortex_bail!("byte_offsets and row_offsets must have the same length");
51        }
52        Ok(())
53    }
54
55    pub async fn array_stream(&mut self) -> impl ArrayStream + '_ {
56        let mut cursor = Cursor::new(&self.read);
57        let byte_offset = scalar_at(&self.byte_offsets, 0)
58            .and_then(|s| u64::try_from(&s))
59            .vortex_expect("Failed to convert byte_offset to u64");
60
61        cursor.set_position(byte_offset);
62        StreamArrayReader::try_new(cursor, self.context.clone())
63            .await
64            .vortex_expect("Failed to create stream array reader")
65            .with_dtype(self.dtype.clone())
66            .into_array_stream()
67    }
68}