vortex_array/arrays/struct_/
array.rs1use std::borrow::Borrow;
5use std::iter::once;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_error::vortex_bail;
12use vortex_error::vortex_err;
13
14use crate::ArrayRef;
15use crate::ArraySlots;
16use crate::IntoArray;
17use crate::array::Array;
18use crate::array::ArrayParts;
19use crate::array::EmptyArrayData;
20use crate::array::TypedArrayRef;
21use crate::array::child_to_validity;
22use crate::array::validity_to_child;
23use crate::arrays::ChunkedArray;
24use crate::arrays::Struct;
25use crate::builtins::ArrayBuiltins;
26use crate::dtype::DType;
27use crate::dtype::FieldName;
28use crate::dtype::FieldNames;
29use crate::dtype::StructFields;
30use crate::validity::Validity;
31
32pub(super) const VALIDITY_SLOT: usize = 0;
35pub(super) const FIELDS_OFFSET: usize = 1;
37
38pub struct StructDataParts {
161 pub struct_fields: StructFields,
162 pub fields: Arc<[ArrayRef]>,
163 pub validity: Validity,
164}
165
166pub(super) fn make_struct_slots(
167 fields: &[ArrayRef],
168 validity: &Validity,
169 length: usize,
170) -> ArraySlots {
171 once(validity_to_child(validity, length))
172 .chain(fields.iter().cloned().map(Some))
173 .collect()
174}
175
176pub trait StructArrayExt: TypedArrayRef<Struct> {
177 fn nullability(&self) -> crate::dtype::Nullability {
178 match self.as_ref().dtype() {
179 DType::Struct(_, nullability) => *nullability,
180 _ => unreachable!("StructArrayExt requires a struct dtype"),
181 }
182 }
183
184 fn names(&self) -> &FieldNames {
185 self.as_ref().dtype().as_struct_fields().names()
186 }
187
188 fn struct_validity(&self) -> Validity {
189 child_to_validity(
190 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
191 self.nullability(),
192 )
193 }
194
195 fn iter_unmasked_fields(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
196 self.as_ref().slots()[FIELDS_OFFSET..]
197 .iter()
198 .map(|s| s.as_ref().vortex_expect("StructArray field slot"))
199 }
200
201 fn unmasked_fields(&self) -> Arc<[ArrayRef]> {
202 self.iter_unmasked_fields().cloned().collect()
203 }
204
205 fn unmasked_field(&self, idx: usize) -> &ArrayRef {
206 self.as_ref().slots()[FIELDS_OFFSET + idx]
207 .as_ref()
208 .vortex_expect("StructArray field slot")
209 }
210
211 fn unmasked_field_by_name_opt(&self, name: impl AsRef<str>) -> Option<&ArrayRef> {
212 let name = name.as_ref();
213 self.struct_fields()
214 .find(name)
215 .map(|idx| self.unmasked_field(idx))
216 }
217
218 fn unmasked_field_by_name(&self, name: impl AsRef<str>) -> VortexResult<&ArrayRef> {
219 let name = name.as_ref();
220 self.unmasked_field_by_name_opt(name).ok_or_else(|| {
221 vortex_err!(
222 "Field {name} not found in struct array with names {:?}",
223 self.names()
224 )
225 })
226 }
227
228 fn struct_fields(&self) -> &StructFields {
229 self.as_ref().dtype().as_struct_fields()
230 }
231}
232impl<T: TypedArrayRef<Struct>> StructArrayExt for T {}
233
234impl Array<Struct> {
235 pub fn new(
237 names: FieldNames,
238 fields: impl Into<Arc<[ArrayRef]>>,
239 length: usize,
240 validity: Validity,
241 ) -> Self {
242 Self::try_new(names, fields, length, validity)
243 .vortex_expect("StructArray construction failed")
244 }
245
246 pub fn try_new(
248 names: FieldNames,
249 fields: impl Into<Arc<[ArrayRef]>>,
250 length: usize,
251 validity: Validity,
252 ) -> VortexResult<Self> {
253 let fields = fields.into();
254 let field_dtypes: Vec<_> = fields.iter().map(|d| d.dtype().clone()).collect();
255 let dtype = StructFields::new(names, field_dtypes);
256 let slots = make_struct_slots(&fields, &validity, length);
257 Array::try_from_parts(
258 ArrayParts::new(
259 Struct,
260 DType::Struct(dtype, validity.nullability()),
261 length,
262 EmptyArrayData,
263 )
264 .with_slots(slots),
265 )
266 }
267
268 pub unsafe fn new_unchecked(
274 fields: impl Into<Arc<[ArrayRef]>>,
275 dtype: StructFields,
276 length: usize,
277 validity: Validity,
278 ) -> Self {
279 let fields = fields.into();
280 let outer_dtype = DType::Struct(dtype, validity.nullability());
281 let slots = make_struct_slots(&fields, &validity, length);
282 unsafe {
283 Array::from_parts_unchecked(
284 ArrayParts::new(Struct, outer_dtype, length, EmptyArrayData).with_slots(slots),
285 )
286 }
287 }
288
289 pub fn try_new_with_dtype(
291 fields: impl Into<Arc<[ArrayRef]>>,
292 dtype: StructFields,
293 length: usize,
294 validity: Validity,
295 ) -> VortexResult<Self> {
296 let fields = fields.into();
297 let outer_dtype = DType::Struct(dtype, validity.nullability());
298 let slots = make_struct_slots(&fields, &validity, length);
299 Array::try_from_parts(
300 ArrayParts::new(Struct, outer_dtype, length, EmptyArrayData).with_slots(slots),
301 )
302 }
303
304 pub fn from_fields<N: AsRef<str>>(items: &[(N, ArrayRef)]) -> VortexResult<Self> {
306 Self::try_from_iter(items.iter().map(|(a, b)| (a, b.clone())))
307 }
308
309 pub fn try_from_iter_with_validity<
311 N: AsRef<str>,
312 A: IntoArray,
313 T: IntoIterator<Item = (N, A)>,
314 >(
315 iter: T,
316 validity: Validity,
317 ) -> VortexResult<Self> {
318 let (names, fields): (Vec<FieldName>, Vec<ArrayRef>) = iter
319 .into_iter()
320 .map(|(name, fields)| (FieldName::from(name.as_ref()), fields.into_array()))
321 .unzip();
322 let len = fields
323 .first()
324 .map(|f| f.len())
325 .ok_or_else(|| vortex_err!("StructArray cannot be constructed from an empty slice of arrays because the length is unspecified"))?;
326
327 Self::try_new(FieldNames::from_iter(names), fields, len, validity)
328 }
329
330 pub fn try_from_iter<N: AsRef<str>, A: IntoArray, T: IntoIterator<Item = (N, A)>>(
332 iter: T,
333 ) -> VortexResult<Self> {
334 let (names, fields): (Vec<FieldName>, Vec<ArrayRef>) = iter
335 .into_iter()
336 .map(|(name, field)| (FieldName::from(name.as_ref()), field.into_array()))
337 .unzip();
338 let len = fields
339 .first()
340 .map(ArrayRef::len)
341 .ok_or_else(|| vortex_err!("StructArray cannot be constructed from an empty slice of arrays because the length is unspecified"))?;
342
343 Self::try_new(
344 FieldNames::from_iter(names),
345 fields,
346 len,
347 Validity::NonNullable,
348 )
349 }
350
351 pub fn project(&self, projection: &[FieldName]) -> VortexResult<Self> {
359 let mut children = Vec::with_capacity(projection.len());
360 let mut names = Vec::with_capacity(projection.len());
361
362 for f_name in projection {
363 let idx = self
364 .struct_fields()
365 .find(f_name.as_ref())
366 .ok_or_else(|| vortex_err!("Unknown field {f_name}"))?;
367
368 names.push(self.names()[idx].clone());
369 children.push(self.unmasked_field(idx).clone());
370 }
371
372 Self::try_new(
373 FieldNames::from(names.as_slice()),
374 children,
375 self.len(),
376 self.validity()?,
377 )
378 }
379
380 pub fn new_fieldless_with_len(len: usize) -> Self {
382 let dtype = DType::Struct(
383 StructFields::new(FieldNames::default(), Vec::new()),
384 crate::dtype::Nullability::NonNullable,
385 );
386 let slots = make_struct_slots(&[], &Validity::NonNullable, len);
387 unsafe {
388 Array::from_parts_unchecked(
389 ArrayParts::new(Struct, dtype, len, EmptyArrayData).with_slots(slots),
390 )
391 }
392 }
393
394 pub fn into_data_parts(self) -> StructDataParts {
396 let fields: Arc<[ArrayRef]> = self.slots()[FIELDS_OFFSET..]
397 .iter()
398 .map(|s| s.as_ref().vortex_expect("StructArray field slot").clone())
399 .collect();
400 let validity = self.validity().vortex_expect("StructArray validity");
401 StructDataParts {
402 struct_fields: self.struct_fields().clone(),
403 fields,
404 validity,
405 }
406 }
407
408 pub fn remove_column(&self, name: impl Into<FieldName>) -> Option<(Self, ArrayRef)> {
409 let name = name.into();
410 let struct_dtype = self.struct_fields();
411 let len = self.len();
412
413 let position = struct_dtype.find(name.as_ref())?;
414
415 let slot_position = FIELDS_OFFSET + position;
416 let field = self.slots()[slot_position]
417 .as_ref()
418 .vortex_expect("StructArray field slot")
419 .clone();
420 let new_slots: ArraySlots = self
421 .slots()
422 .iter()
423 .enumerate()
424 .filter(|(i, _)| *i != slot_position)
425 .map(|(_, s)| s.clone())
426 .collect();
427
428 let new_dtype = struct_dtype.without_field(position).ok()?;
429 let new_array = unsafe {
430 Array::from_parts_unchecked(
431 ArrayParts::new(
432 Struct,
433 DType::Struct(new_dtype, self.dtype().nullability()),
434 len,
435 EmptyArrayData,
436 )
437 .with_slots(new_slots),
438 )
439 };
440 Some((new_array, field))
441 }
442
443 pub fn with_column(&self, name: impl Into<FieldName>, array: ArrayRef) -> VortexResult<Self> {
444 let name = name.into();
445 let struct_dtype = self.struct_fields();
446
447 let names = struct_dtype.names().iter().cloned().chain(once(name));
448 let types = struct_dtype.fields().chain(once(array.dtype().clone()));
449 let new_fields = StructFields::new(names.collect(), types.collect());
450
451 let children: Arc<[ArrayRef]> = self.slots()[FIELDS_OFFSET..]
452 .iter()
453 .map(|s| s.as_ref().vortex_expect("StructArray field slot").clone())
454 .chain(once(array))
455 .collect();
456
457 Self::try_new_with_dtype(children, new_fields, self.len(), self.validity()?)
458 }
459
460 pub fn remove_column_owned(&self, name: impl Into<FieldName>) -> Option<(Self, ArrayRef)> {
461 self.remove_column(name)
462 }
463
464 pub fn try_concat<T>(chunks: impl IntoIterator<Item = T>) -> VortexResult<Self>
465 where
466 T: Borrow<Array<Struct>>,
467 {
468 let mut it = chunks.into_iter();
469 let Some(first) = it.next() else {
470 vortex_bail!("cannot concat empty iterator of arrays");
471 };
472 let first_dtype = first.borrow().dtype().clone();
473 let struct_fields = first_dtype.as_struct_fields().clone();
474 let names = struct_fields.names();
475
476 let it = [first].into_iter().chain(it);
477 let (field_arrays_per_chunk, validities) = it
478 .map(|chunk| {
479 let chunk = chunk.borrow();
480 if &first_dtype != chunk.dtype() {
481 vortex_bail!(
482 "cannot concatenate struct arrays with differing dtypes: {}, {}",
483 first_dtype,
484 chunk.dtype(),
485 );
486 }
487
488 let fields = names
489 .iter()
490 .map(|name| {
491 chunk
492 .unmasked_field_by_name(name)
493 .vortex_expect("field exists because it is in dtype")
494 .clone()
495 })
496 .collect::<Vec<_>>();
497 let validity = chunk.validity()?;
498
499 Ok((fields, (validity, chunk.len())))
500 })
501 .process_results(|iter| iter.unzip::<_, _, Vec<_>, Vec<_>>())?;
502
503 let field_arrays = struct_fields
504 .fields()
505 .enumerate()
506 .map(|(i, dtype)| {
507 let chunks = field_arrays_per_chunk
509 .iter()
510 .map(|x| x[i].clone())
511 .collect();
512 unsafe { ChunkedArray::new_unchecked(chunks, dtype) }.into_array()
513 })
514 .collect::<Vec<_>>();
515 let len = validities.iter().map(|(_v, len)| len).sum();
516 let validity = Validity::concat(validities).vortex_expect("verified non-empty above");
517
518 Ok(unsafe { Array::<Struct>::new_unchecked(field_arrays, struct_fields, len, validity) })
528 }
529
530 pub fn push_validity_into_children(&self, remove_struct_validity: bool) -> VortexResult<Self> {
536 let struct_validity = self.struct_validity();
537
538 let new_validity = if remove_struct_validity {
539 Validity::NonNullable
540 } else {
541 struct_validity.clone()
542 };
543
544 if struct_validity.definitely_no_nulls() {
546 return Self::try_new(
547 self.names().clone(),
548 self.unmasked_fields(),
549 self.len(),
550 new_validity,
551 );
552 }
553
554 let mask = struct_validity.to_array(self.len());
556 let fields = self
557 .iter_unmasked_fields()
558 .map(|field| field.clone().mask(mask.clone()))
559 .collect::<VortexResult<Vec<_>>>()?;
560
561 Self::try_new(self.names().clone(), fields, self.len(), new_validity)
562 }
563}