Skip to main content

vortex_array/arrays/chunked/compute/
zip.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_error::VortexResult;
5
6use crate::ArrayRef;
7use crate::ExecutionCtx;
8use crate::IntoArray;
9use crate::arrays::Chunked;
10use crate::arrays::ChunkedArray;
11use crate::builtins::ArrayBuiltins;
12use crate::scalar_fn::fns::zip::ZipKernel;
13
14// Push down the zip call to the chunks. Without this rule
15// the default implementation canonicalises the chunked array
16// then zips once.
17impl ZipKernel for Chunked {
18    fn zip(
19        if_true: &ChunkedArray,
20        if_false: &ArrayRef,
21        mask: &ArrayRef,
22        _ctx: &mut ExecutionCtx,
23    ) -> VortexResult<Option<ArrayRef>> {
24        let Some(if_false) = if_false.as_opt::<Chunked>() else {
25            return Ok(None);
26        };
27        let dtype = if_true
28            .dtype()
29            .union_nullability(if_false.dtype().nullability());
30        let mut out_chunks = Vec::with_capacity(if_true.nchunks() + if_false.nchunks());
31
32        for pair in if_true.paired_chunks(if_false) {
33            let pair = pair?;
34            let mask_slice = mask.slice(pair.pos)?;
35            out_chunks.push(mask_slice.zip(pair.left, pair.right)?);
36        }
37
38        // SAFETY: chunks originate from zipping slices of inputs that share dtype/nullability.
39        let chunked = unsafe { ChunkedArray::new_unchecked(out_chunks, dtype) };
40        Ok(Some(chunked.into_array()))
41    }
42}
43
44#[cfg(test)]
45mod tests {
46    use vortex_buffer::buffer;
47    use vortex_mask::Mask;
48
49    use crate::ArrayRef;
50    use crate::IntoArray;
51    use crate::LEGACY_SESSION;
52    use crate::ToCanonical;
53    use crate::VortexSessionExecute;
54    use crate::arrays::Chunked;
55    use crate::arrays::ChunkedArray;
56    use crate::builtins::ArrayBuiltins;
57    use crate::dtype::DType;
58    use crate::dtype::Nullability;
59    use crate::dtype::PType;
60
61    #[test]
62    fn test_chunked_zip_aligns_across_boundaries() {
63        let if_true = ChunkedArray::try_new(
64            vec![
65                buffer![1i32, 2].into_array(),
66                buffer![3i32].into_array(),
67                buffer![4i32, 5].into_array(),
68            ],
69            DType::Primitive(PType::I32, Nullability::NonNullable),
70        )
71        .unwrap();
72
73        let if_false = ChunkedArray::try_new(
74            vec![
75                buffer![10i32].into_array(),
76                buffer![11i32, 12].into_array(),
77                buffer![13i32, 14].into_array(),
78            ],
79            DType::Primitive(PType::I32, Nullability::NonNullable),
80        )
81        .unwrap();
82
83        let mask = Mask::from_iter([true, false, true, false, true]);
84
85        let zipped = &mask
86            .into_array()
87            .zip(if_true.into_array(), if_false.into_array())
88            .unwrap();
89        // One step of execution will push down the zip.
90        let zipped = zipped
91            .clone()
92            .execute::<ArrayRef>(&mut LEGACY_SESSION.create_execution_ctx())
93            .unwrap();
94        let zipped = zipped
95            .as_opt::<Chunked>()
96            .expect("zip should keep chunked encoding");
97
98        assert_eq!(zipped.nchunks(), 4);
99        let mut values: Vec<i32> = Vec::new();
100        for chunk in zipped.chunks() {
101            let primitive = chunk.to_primitive();
102            values.extend_from_slice(primitive.as_slice::<i32>());
103        }
104        assert_eq!(values, vec![1, 11, 3, 13, 5]);
105    }
106}