vortex_array/arrays/chunked/
serde.rs

1use itertools::Itertools;
2use vortex_dtype::{DType, Nullability, PType};
3use vortex_error::{VortexExpect, VortexResult, vortex_bail};
4
5use crate::arrays::{ChunkedArray, ChunkedEncoding, PrimitiveArray};
6use crate::serde::ArrayParts;
7use crate::validity::Validity;
8use crate::vtable::SerdeVTable;
9use crate::{
10    Array, ArrayChildVisitor, ArrayContext, ArrayRef, ArrayVisitorImpl, EmptyMetadata, ToCanonical,
11};
12
13impl ArrayVisitorImpl for ChunkedArray {
14    fn _children(&self, visitor: &mut dyn ArrayChildVisitor) {
15        let chunk_offsets = PrimitiveArray::new(self.chunk_offsets.clone(), Validity::NonNullable);
16        visitor.visit_child("chunk_offsets", &chunk_offsets);
17
18        for (idx, chunk) in self.chunks().iter().enumerate() {
19            visitor.visit_child(format!("chunks[{}]", idx).as_str(), chunk);
20        }
21    }
22
23    fn _metadata(&self) -> EmptyMetadata {
24        EmptyMetadata
25    }
26}
27
28impl SerdeVTable<&ChunkedArray> for ChunkedEncoding {
29    fn decode(
30        &self,
31        parts: &ArrayParts,
32        ctx: &ArrayContext,
33        dtype: DType,
34        // TODO(ngates): should we avoid storing the final chunk offset and push the length instead?
35        _len: usize,
36    ) -> VortexResult<ArrayRef> {
37        if parts.nchildren() == 0 {
38            vortex_bail!("Chunked array needs at least one child");
39        }
40
41        let nchunks = parts.nchildren() - 1;
42
43        // The first child contains the row offsets of the chunks
44        let chunk_offsets = parts
45            .child(0)
46            .decode(
47                ctx,
48                DType::Primitive(PType::U64, Nullability::NonNullable),
49                // 1 extra offset for the end of the last chunk
50                nchunks + 1,
51            )?
52            .to_primitive()?
53            .buffer::<u64>();
54
55        // The remaining children contain the actual data of the chunks
56        let chunks = chunk_offsets
57            .iter()
58            .tuple_windows()
59            .enumerate()
60            .map(|(idx, (start, end))| {
61                let chunk_len =
62                    usize::try_from(end - start).vortex_expect("chunk length exceeds usize");
63                parts.child(idx + 1).decode(ctx, dtype.clone(), chunk_len)
64            })
65            .try_collect()?;
66
67        // Unchecked because we just created each chunk with the same DType.
68        Ok(ChunkedArray::new_unchecked(chunks, dtype).into_array())
69    }
70}