vortex_array/arrays/struct_/
array.rs1use std::borrow::Borrow;
5use std::iter::once;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_error::vortex_bail;
12use vortex_error::vortex_err;
13
14use crate::ArrayRef;
15use crate::ArraySlots;
16use crate::IntoArray;
17use crate::array::Array;
18use crate::array::ArrayParts;
19use crate::array::EmptyArrayData;
20use crate::array::TypedArrayRef;
21use crate::array::child_to_validity;
22use crate::array::validity_to_child;
23use crate::arrays::ChunkedArray;
24use crate::arrays::Struct;
25use crate::dtype::DType;
26use crate::dtype::FieldName;
27use crate::dtype::FieldNames;
28use crate::dtype::StructFields;
29use crate::validity::Validity;
30
31pub(super) const VALIDITY_SLOT: usize = 0;
34pub(super) const FIELDS_OFFSET: usize = 1;
36
37pub struct StructDataParts {
160 pub struct_fields: StructFields,
161 pub fields: Arc<[ArrayRef]>,
162 pub validity: Validity,
163}
164
165pub(super) fn make_struct_slots(
166 fields: &[ArrayRef],
167 validity: &Validity,
168 length: usize,
169) -> ArraySlots {
170 once(validity_to_child(validity, length))
171 .chain(fields.iter().cloned().map(Some))
172 .collect()
173}
174
175pub trait StructArrayExt: TypedArrayRef<Struct> {
176 fn nullability(&self) -> crate::dtype::Nullability {
177 match self.as_ref().dtype() {
178 DType::Struct(_, nullability) => *nullability,
179 _ => unreachable!("StructArrayExt requires a struct dtype"),
180 }
181 }
182
183 fn names(&self) -> &FieldNames {
184 self.as_ref().dtype().as_struct_fields().names()
185 }
186
187 fn struct_validity(&self) -> Validity {
188 child_to_validity(
189 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
190 self.nullability(),
191 )
192 }
193
194 fn iter_unmasked_fields(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
195 self.as_ref().slots()[FIELDS_OFFSET..]
196 .iter()
197 .map(|s| s.as_ref().vortex_expect("StructArray field slot"))
198 }
199
200 fn unmasked_fields(&self) -> Arc<[ArrayRef]> {
201 self.iter_unmasked_fields().cloned().collect()
202 }
203
204 fn unmasked_field(&self, idx: usize) -> &ArrayRef {
205 self.as_ref().slots()[FIELDS_OFFSET + idx]
206 .as_ref()
207 .vortex_expect("StructArray field slot")
208 }
209
210 fn unmasked_field_by_name_opt(&self, name: impl AsRef<str>) -> Option<&ArrayRef> {
211 let name = name.as_ref();
212 self.struct_fields()
213 .find(name)
214 .map(|idx| self.unmasked_field(idx))
215 }
216
217 fn unmasked_field_by_name(&self, name: impl AsRef<str>) -> VortexResult<&ArrayRef> {
218 let name = name.as_ref();
219 self.unmasked_field_by_name_opt(name).ok_or_else(|| {
220 vortex_err!(
221 "Field {name} not found in struct array with names {:?}",
222 self.names()
223 )
224 })
225 }
226
227 fn struct_fields(&self) -> &StructFields {
228 self.as_ref().dtype().as_struct_fields()
229 }
230}
231impl<T: TypedArrayRef<Struct>> StructArrayExt for T {}
232
233impl Array<Struct> {
234 pub fn new(
236 names: FieldNames,
237 fields: impl Into<Arc<[ArrayRef]>>,
238 length: usize,
239 validity: Validity,
240 ) -> Self {
241 Self::try_new(names, fields, length, validity)
242 .vortex_expect("StructArray construction failed")
243 }
244
245 pub fn try_new(
247 names: FieldNames,
248 fields: impl Into<Arc<[ArrayRef]>>,
249 length: usize,
250 validity: Validity,
251 ) -> VortexResult<Self> {
252 let fields = fields.into();
253 let field_dtypes: Vec<_> = fields.iter().map(|d| d.dtype().clone()).collect();
254 let dtype = StructFields::new(names, field_dtypes);
255 let slots = make_struct_slots(&fields, &validity, length);
256 Array::try_from_parts(
257 ArrayParts::new(
258 Struct,
259 DType::Struct(dtype, validity.nullability()),
260 length,
261 EmptyArrayData,
262 )
263 .with_slots(slots),
264 )
265 }
266
267 pub unsafe fn new_unchecked(
273 fields: impl Into<Arc<[ArrayRef]>>,
274 dtype: StructFields,
275 length: usize,
276 validity: Validity,
277 ) -> Self {
278 let fields = fields.into();
279 let outer_dtype = DType::Struct(dtype, validity.nullability());
280 let slots = make_struct_slots(&fields, &validity, length);
281 unsafe {
282 Array::from_parts_unchecked(
283 ArrayParts::new(Struct, outer_dtype, length, EmptyArrayData).with_slots(slots),
284 )
285 }
286 }
287
288 pub fn try_new_with_dtype(
290 fields: impl Into<Arc<[ArrayRef]>>,
291 dtype: StructFields,
292 length: usize,
293 validity: Validity,
294 ) -> VortexResult<Self> {
295 let fields = fields.into();
296 let outer_dtype = DType::Struct(dtype, validity.nullability());
297 let slots = make_struct_slots(&fields, &validity, length);
298 Array::try_from_parts(
299 ArrayParts::new(Struct, outer_dtype, length, EmptyArrayData).with_slots(slots),
300 )
301 }
302
303 pub fn from_fields<N: AsRef<str>>(items: &[(N, ArrayRef)]) -> VortexResult<Self> {
305 Self::try_from_iter(items.iter().map(|(a, b)| (a, b.clone())))
306 }
307
308 pub fn try_from_iter_with_validity<
310 N: AsRef<str>,
311 A: IntoArray,
312 T: IntoIterator<Item = (N, A)>,
313 >(
314 iter: T,
315 validity: Validity,
316 ) -> VortexResult<Self> {
317 let (names, fields): (Vec<FieldName>, Vec<ArrayRef>) = iter
318 .into_iter()
319 .map(|(name, fields)| (FieldName::from(name.as_ref()), fields.into_array()))
320 .unzip();
321 let len = fields
322 .first()
323 .map(|f| f.len())
324 .ok_or_else(|| vortex_err!("StructArray cannot be constructed from an empty slice of arrays because the length is unspecified"))?;
325
326 Self::try_new(FieldNames::from_iter(names), fields, len, validity)
327 }
328
329 pub fn try_from_iter<N: AsRef<str>, A: IntoArray, T: IntoIterator<Item = (N, A)>>(
331 iter: T,
332 ) -> VortexResult<Self> {
333 let (names, fields): (Vec<FieldName>, Vec<ArrayRef>) = iter
334 .into_iter()
335 .map(|(name, field)| (FieldName::from(name.as_ref()), field.into_array()))
336 .unzip();
337 let len = fields
338 .first()
339 .map(ArrayRef::len)
340 .ok_or_else(|| vortex_err!("StructArray cannot be constructed from an empty slice of arrays because the length is unspecified"))?;
341
342 Self::try_new(
343 FieldNames::from_iter(names),
344 fields,
345 len,
346 Validity::NonNullable,
347 )
348 }
349
350 pub fn project(&self, projection: &[FieldName]) -> VortexResult<Self> {
358 let mut children = Vec::with_capacity(projection.len());
359 let mut names = Vec::with_capacity(projection.len());
360
361 for f_name in projection {
362 let idx = self
363 .struct_fields()
364 .find(f_name.as_ref())
365 .ok_or_else(|| vortex_err!("Unknown field {f_name}"))?;
366
367 names.push(self.names()[idx].clone());
368 children.push(self.unmasked_field(idx).clone());
369 }
370
371 Self::try_new(
372 FieldNames::from(names.as_slice()),
373 children,
374 self.len(),
375 self.validity()?,
376 )
377 }
378
379 pub fn new_fieldless_with_len(len: usize) -> Self {
381 let dtype = DType::Struct(
382 StructFields::new(FieldNames::default(), Vec::new()),
383 crate::dtype::Nullability::NonNullable,
384 );
385 let slots = make_struct_slots(&[], &Validity::NonNullable, len);
386 unsafe {
387 Array::from_parts_unchecked(
388 ArrayParts::new(Struct, dtype, len, EmptyArrayData).with_slots(slots),
389 )
390 }
391 }
392
393 pub fn into_data_parts(self) -> StructDataParts {
395 let fields: Arc<[ArrayRef]> = self.slots()[FIELDS_OFFSET..]
396 .iter()
397 .map(|s| s.as_ref().vortex_expect("StructArray field slot").clone())
398 .collect();
399 let validity = self.validity().vortex_expect("StructArray validity");
400 StructDataParts {
401 struct_fields: self.struct_fields().clone(),
402 fields,
403 validity,
404 }
405 }
406
407 pub fn remove_column(&self, name: impl Into<FieldName>) -> Option<(Self, ArrayRef)> {
408 let name = name.into();
409 let struct_dtype = self.struct_fields();
410 let len = self.len();
411
412 let position = struct_dtype.find(name.as_ref())?;
413
414 let slot_position = FIELDS_OFFSET + position;
415 let field = self.slots()[slot_position]
416 .as_ref()
417 .vortex_expect("StructArray field slot")
418 .clone();
419 let new_slots: ArraySlots = self
420 .slots()
421 .iter()
422 .enumerate()
423 .filter(|(i, _)| *i != slot_position)
424 .map(|(_, s)| s.clone())
425 .collect();
426
427 let new_dtype = struct_dtype.without_field(position).ok()?;
428 let new_array = unsafe {
429 Array::from_parts_unchecked(
430 ArrayParts::new(
431 Struct,
432 DType::Struct(new_dtype, self.dtype().nullability()),
433 len,
434 EmptyArrayData,
435 )
436 .with_slots(new_slots),
437 )
438 };
439 Some((new_array, field))
440 }
441
442 pub fn with_column(&self, name: impl Into<FieldName>, array: ArrayRef) -> VortexResult<Self> {
443 let name = name.into();
444 let struct_dtype = self.struct_fields();
445
446 let names = struct_dtype.names().iter().cloned().chain(once(name));
447 let types = struct_dtype.fields().chain(once(array.dtype().clone()));
448 let new_fields = StructFields::new(names.collect(), types.collect());
449
450 let children: Arc<[ArrayRef]> = self.slots()[FIELDS_OFFSET..]
451 .iter()
452 .map(|s| s.as_ref().vortex_expect("StructArray field slot").clone())
453 .chain(once(array))
454 .collect();
455
456 Self::try_new_with_dtype(children, new_fields, self.len(), self.validity()?)
457 }
458
459 pub fn remove_column_owned(&self, name: impl Into<FieldName>) -> Option<(Self, ArrayRef)> {
460 self.remove_column(name)
461 }
462
463 pub fn try_concat<T>(chunks: impl IntoIterator<Item = T>) -> VortexResult<Self>
464 where
465 T: Borrow<Array<Struct>>,
466 {
467 let mut it = chunks.into_iter();
468 let Some(first) = it.next() else {
469 vortex_bail!("cannot concat empty iterator of arrays");
470 };
471 let first_dtype = first.borrow().dtype().clone();
472 let struct_fields = first_dtype.as_struct_fields().clone();
473 let names = struct_fields.names();
474
475 let it = [first].into_iter().chain(it);
476 let (field_arrays_per_chunk, validities) = it
477 .map(|chunk| {
478 let chunk = chunk.borrow();
479 if &first_dtype != chunk.dtype() {
480 vortex_bail!(
481 "cannot concatenate struct arrays with differing dtypes: {}, {}",
482 first_dtype,
483 chunk.dtype(),
484 );
485 }
486
487 let fields = names
488 .iter()
489 .map(|name| {
490 chunk
491 .unmasked_field_by_name(name)
492 .vortex_expect("field exists because it is in dtype")
493 .clone()
494 })
495 .collect::<Vec<_>>();
496 let validity = chunk.validity()?;
497
498 Ok((fields, (validity, chunk.len())))
499 })
500 .process_results(|iter| iter.unzip::<_, _, Vec<_>, Vec<_>>())?;
501
502 let field_arrays = struct_fields
503 .fields()
504 .enumerate()
505 .map(|(i, dtype)| {
506 let chunks = field_arrays_per_chunk
508 .iter()
509 .map(|x| x[i].clone())
510 .collect();
511 unsafe { ChunkedArray::new_unchecked(chunks, dtype) }.into_array()
512 })
513 .collect::<Vec<_>>();
514 let len = validities.iter().map(|(_v, len)| len).sum();
515 let validity = Validity::concat(validities).vortex_expect("verified non-empty above");
516
517 Ok(unsafe { Array::<Struct>::new_unchecked(field_arrays, struct_fields, len, validity) })
527 }
528}