vortex_array/arrays/masked/vtable/
mod.rs1mod canonical;
4mod operations;
5mod validity;
6
7use std::hash::Hasher;
8
9use smallvec::smallvec;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_bail;
13use vortex_error::vortex_ensure;
14use vortex_error::vortex_panic;
15use vortex_session::VortexSession;
16use vortex_session::registry::CachedId;
17
18use crate::AnyCanonical;
19use crate::ArrayEq;
20use crate::ArrayHash;
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::IntoArray;
24use crate::LEGACY_SESSION;
25use crate::Precision;
26use crate::VortexSessionExecute;
27use crate::array::Array;
28use crate::array::ArrayId;
29use crate::array::ArrayView;
30use crate::array::VTable;
31use crate::array::validity_to_child;
32use crate::arrays::ConstantArray;
33use crate::arrays::masked::MaskedArrayExt;
34use crate::arrays::masked::MaskedArraySlotsExt;
35use crate::arrays::masked::MaskedData;
36use crate::arrays::masked::array::MaskedSlots;
37use crate::arrays::masked::compute::rules::PARENT_RULES;
38use crate::arrays::masked::mask_validity_canonical;
39use crate::buffer::BufferHandle;
40use crate::dtype::DType;
41use crate::executor::ExecutionCtx;
42use crate::executor::ExecutionResult;
43use crate::require_child;
44use crate::scalar::Scalar;
45use crate::serde::ArrayChildren;
46use crate::validity::Validity;
47pub type MaskedArray = Array<Masked>;
49
50#[derive(Clone, Debug)]
51pub struct Masked;
52
53impl ArrayHash for MaskedData {
54 fn array_hash<H: Hasher>(&self, _state: &mut H, _precision: Precision) {}
55}
56
57impl ArrayEq for MaskedData {
58 fn array_eq(&self, _other: &Self, _precision: Precision) -> bool {
59 true
60 }
61}
62
63impl VTable for Masked {
64 type TypedArrayData = MaskedData;
65
66 type OperationsVTable = Self;
67 type ValidityVTable = Self;
68
69 fn id(&self) -> ArrayId {
70 static ID: CachedId = CachedId::new("vortex.masked");
71 *ID
72 }
73
74 fn validate(
75 &self,
76 _data: &MaskedData,
77 dtype: &DType,
78 len: usize,
79 slots: &[Option<ArrayRef>],
80 ) -> VortexResult<()> {
81 vortex_ensure!(
82 slots[MaskedSlots::CHILD].is_some(),
83 "MaskedArray child slot must be present"
84 );
85 let child = slots[MaskedSlots::CHILD]
86 .as_ref()
87 .vortex_expect("validated child slot");
88 vortex_ensure!(child.len() == len, "MaskedArray child length mismatch");
89 vortex_ensure!(
90 child.dtype().as_nullable() == *dtype,
91 "MaskedArray dtype does not match child and validity"
92 );
93 Ok(())
94 }
95
96 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
97 0
98 }
99
100 fn buffer(_array: ArrayView<'_, Self>, _idx: usize) -> BufferHandle {
101 vortex_panic!("MaskedArray has no buffers")
102 }
103
104 fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option<String> {
105 None
106 }
107
108 fn serialize(
109 _array: ArrayView<'_, Self>,
110 _session: &VortexSession,
111 ) -> VortexResult<Option<Vec<u8>>> {
112 Ok(Some(vec![]))
113 }
114
115 fn deserialize(
116 &self,
117 dtype: &DType,
118 len: usize,
119 metadata: &[u8],
120
121 buffers: &[BufferHandle],
122 children: &dyn ArrayChildren,
123 _session: &VortexSession,
124 ) -> VortexResult<crate::array::ArrayParts<Self>> {
125 if !metadata.is_empty() {
126 vortex_bail!(
127 "MaskedArray expects empty metadata, got {} bytes",
128 metadata.len()
129 );
130 }
131 if !buffers.is_empty() {
132 vortex_bail!("Expected 0 buffer, got {}", buffers.len());
133 }
134
135 vortex_ensure!(
136 children.len() == 1 || children.len() == 2,
137 "`MaskedArray::build` expects 1 or 2 children, got {}",
138 children.len()
139 );
140
141 let child = children.get(0, &dtype.as_nonnullable(), len)?;
142
143 let validity = if children.len() == 2 {
144 let validity = children.get(1, &Validity::DTYPE, len)?;
145 Validity::Array(validity)
146 } else {
147 Validity::from(dtype.nullability())
148 };
149
150 let validity_slot = validity_to_child(&validity, len);
151 let data = MaskedData::try_new(
152 len,
153 child.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?,
154 validity,
155 )?;
156 Ok(
157 crate::array::ArrayParts::new(self.clone(), dtype.clone(), len, data)
158 .with_slots(smallvec![Some(child), validity_slot]),
159 )
160 }
161
162 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
163 let array = require_child!(array, array.child(), MaskedSlots::CHILD => AnyCanonical);
164
165 let validity = array.masked_validity();
166
167 if matches!(validity, Validity::AllInvalid) {
169 return Ok(ExecutionResult::done(
170 ConstantArray::new(Scalar::null(array.dtype().as_nullable()), array.len())
171 .into_array(),
172 ));
173 }
174
175 let child = Canonical::from(array.child().as_::<AnyCanonical>());
182 Ok(ExecutionResult::done(
183 mask_validity_canonical(child, validity, ctx)?.into_array(),
184 ))
185 }
186
187 fn reduce_parent(
188 array: ArrayView<'_, Self>,
189 parent: &ArrayRef,
190 child_idx: usize,
191 ) -> VortexResult<Option<ArrayRef>> {
192 PARENT_RULES.evaluate(array, parent, child_idx)
193 }
194 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
195 MaskedSlots::NAMES[idx].to_string()
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use rstest::rstest;
202 use vortex_buffer::ByteBufferMut;
203 use vortex_error::VortexError;
204 use vortex_session::registry::ReadContext;
205
206 use crate::ArrayContext;
207 use crate::Canonical;
208 use crate::IntoArray;
209 use crate::LEGACY_SESSION;
210 use crate::VortexSessionExecute;
211 use crate::arrays::Masked;
212 use crate::arrays::MaskedArray;
213 use crate::arrays::PrimitiveArray;
214 use crate::dtype::Nullability;
215 use crate::serde::SerializeOptions;
216 use crate::serde::SerializedArray;
217 use crate::validity::Validity;
218
219 #[rstest]
220 #[case(
221 MaskedArray::try_new(
222 PrimitiveArray::from_iter([1i32, 2, 3]).into_array(),
223 Validity::AllValid
224 ).unwrap()
225 )]
226 #[case(
227 MaskedArray::try_new(
228 PrimitiveArray::from_iter([1i32, 2, 3, 4, 5]).into_array(),
229 Validity::from_iter([true, true, false, true, false])
230 ).unwrap()
231 )]
232 #[case(
233 MaskedArray::try_new(
234 PrimitiveArray::from_iter(0..100).into_array(),
235 Validity::from_iter((0..100).map(|i| i % 3 != 0))
236 ).unwrap()
237 )]
238 fn test_serde_roundtrip(#[case] array: MaskedArray) {
239 let dtype = array.dtype().clone();
240 let len = array.len();
241
242 let ctx = ArrayContext::empty();
243 let serialized = array
244 .clone()
245 .into_array()
246 .serialize(&ctx, &LEGACY_SESSION, &SerializeOptions::default())
247 .unwrap();
248
249 let mut concat = ByteBufferMut::empty();
251 for buf in serialized {
252 concat.extend_from_slice(buf.as_ref());
253 }
254 let concat = concat.freeze();
255
256 let parts = SerializedArray::try_from(concat).unwrap();
257 let decoded = parts
258 .decode(
259 &dtype,
260 len,
261 &ReadContext::new(ctx.to_ids()),
262 &LEGACY_SESSION,
263 )
264 .unwrap();
265
266 assert!(decoded.is::<Masked>());
267 assert_eq!(
268 array.as_ref().display_values().to_string(),
269 decoded.display_values().to_string()
270 );
271 }
272
273 #[test]
279 fn test_execute_with_all_valid_preserves_nullable_dtype() -> Result<(), VortexError> {
280 let child = PrimitiveArray::from_iter([1i32, 2, 3]).into_array();
284 assert_eq!(child.dtype().nullability(), Nullability::NonNullable);
285
286 let array = MaskedArray::try_new(child, Validity::AllValid)?;
287 assert_eq!(array.dtype().nullability(), Nullability::Nullable);
288
289 let mut ctx = LEGACY_SESSION.create_execution_ctx();
291 let result: Canonical = array.into_array().execute(&mut ctx)?;
292
293 assert_eq!(
294 result.dtype().nullability(),
295 Nullability::Nullable,
296 "MaskedArray execute should produce Nullable dtype"
297 );
298
299 Ok(())
300 }
301}