vortex_array/arrays/struct_/
array.rs1use std::borrow::Borrow;
5use std::iter::once;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_error::vortex_bail;
12use vortex_error::vortex_err;
13
14use crate::ArrayRef;
15use crate::IntoArray;
16use crate::array::Array;
17use crate::array::ArrayParts;
18use crate::array::EmptyArrayData;
19use crate::array::TypedArrayRef;
20use crate::array::child_to_validity;
21use crate::array::validity_to_child;
22use crate::arrays::ChunkedArray;
23use crate::arrays::Struct;
24use crate::dtype::DType;
25use crate::dtype::FieldName;
26use crate::dtype::FieldNames;
27use crate::dtype::StructFields;
28use crate::validity::Validity;
29
30pub(super) const VALIDITY_SLOT: usize = 0;
33pub(super) const FIELDS_OFFSET: usize = 1;
35
36pub struct StructDataParts {
159 pub struct_fields: StructFields,
160 pub fields: Arc<[ArrayRef]>,
161 pub validity: Validity,
162}
163
164pub(super) fn make_struct_slots(
165 fields: &[ArrayRef],
166 validity: &Validity,
167 length: usize,
168) -> Vec<Option<ArrayRef>> {
169 once(validity_to_child(validity, length))
170 .chain(fields.iter().cloned().map(Some))
171 .collect()
172}
173
174pub trait StructArrayExt: TypedArrayRef<Struct> {
175 fn nullability(&self) -> crate::dtype::Nullability {
176 match self.as_ref().dtype() {
177 DType::Struct(_, nullability) => *nullability,
178 _ => unreachable!("StructArrayExt requires a struct dtype"),
179 }
180 }
181
182 fn names(&self) -> &FieldNames {
183 self.as_ref().dtype().as_struct_fields().names()
184 }
185
186 fn struct_validity(&self) -> Validity {
187 child_to_validity(
188 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
189 self.nullability(),
190 )
191 }
192
193 fn iter_unmasked_fields(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
194 self.as_ref().slots()[FIELDS_OFFSET..]
195 .iter()
196 .map(|s| s.as_ref().vortex_expect("StructArray field slot"))
197 }
198
199 fn unmasked_fields(&self) -> Arc<[ArrayRef]> {
200 self.iter_unmasked_fields().cloned().collect()
201 }
202
203 fn unmasked_field(&self, idx: usize) -> &ArrayRef {
204 self.as_ref().slots()[FIELDS_OFFSET + idx]
205 .as_ref()
206 .vortex_expect("StructArray field slot")
207 }
208
209 fn unmasked_field_by_name_opt(&self, name: impl AsRef<str>) -> Option<&ArrayRef> {
210 let name = name.as_ref();
211 self.struct_fields()
212 .find(name)
213 .map(|idx| self.unmasked_field(idx))
214 }
215
216 fn unmasked_field_by_name(&self, name: impl AsRef<str>) -> VortexResult<&ArrayRef> {
217 let name = name.as_ref();
218 self.unmasked_field_by_name_opt(name).ok_or_else(|| {
219 vortex_err!(
220 "Field {name} not found in struct array with names {:?}",
221 self.names()
222 )
223 })
224 }
225
226 fn struct_fields(&self) -> &StructFields {
227 self.as_ref().dtype().as_struct_fields()
228 }
229}
230impl<T: TypedArrayRef<Struct>> StructArrayExt for T {}
231
232impl Array<Struct> {
233 pub fn new(
235 names: FieldNames,
236 fields: impl Into<Arc<[ArrayRef]>>,
237 length: usize,
238 validity: Validity,
239 ) -> Self {
240 Self::try_new(names, fields, length, validity)
241 .vortex_expect("StructArray construction failed")
242 }
243
244 pub fn try_new(
246 names: FieldNames,
247 fields: impl Into<Arc<[ArrayRef]>>,
248 length: usize,
249 validity: Validity,
250 ) -> VortexResult<Self> {
251 let fields = fields.into();
252 let field_dtypes: Vec<_> = fields.iter().map(|d| d.dtype().clone()).collect();
253 let dtype = StructFields::new(names, field_dtypes);
254 let slots = make_struct_slots(&fields, &validity, length);
255 Array::try_from_parts(
256 ArrayParts::new(
257 Struct,
258 DType::Struct(dtype, validity.nullability()),
259 length,
260 EmptyArrayData,
261 )
262 .with_slots(slots),
263 )
264 }
265
266 pub unsafe fn new_unchecked(
272 fields: impl Into<Arc<[ArrayRef]>>,
273 dtype: StructFields,
274 length: usize,
275 validity: Validity,
276 ) -> Self {
277 let fields = fields.into();
278 let outer_dtype = DType::Struct(dtype, validity.nullability());
279 let slots = make_struct_slots(&fields, &validity, length);
280 unsafe {
281 Array::from_parts_unchecked(
282 ArrayParts::new(Struct, outer_dtype, length, EmptyArrayData).with_slots(slots),
283 )
284 }
285 }
286
287 pub fn try_new_with_dtype(
289 fields: impl Into<Arc<[ArrayRef]>>,
290 dtype: StructFields,
291 length: usize,
292 validity: Validity,
293 ) -> VortexResult<Self> {
294 let fields = fields.into();
295 let outer_dtype = DType::Struct(dtype, validity.nullability());
296 let slots = make_struct_slots(&fields, &validity, length);
297 Array::try_from_parts(
298 ArrayParts::new(Struct, outer_dtype, length, EmptyArrayData).with_slots(slots),
299 )
300 }
301
302 pub fn from_fields<N: AsRef<str>>(items: &[(N, ArrayRef)]) -> VortexResult<Self> {
304 Self::try_from_iter(items.iter().map(|(a, b)| (a, b.clone())))
305 }
306
307 pub fn try_from_iter_with_validity<
309 N: AsRef<str>,
310 A: IntoArray,
311 T: IntoIterator<Item = (N, A)>,
312 >(
313 iter: T,
314 validity: Validity,
315 ) -> VortexResult<Self> {
316 let (names, fields): (Vec<FieldName>, Vec<ArrayRef>) = iter
317 .into_iter()
318 .map(|(name, fields)| (FieldName::from(name.as_ref()), fields.into_array()))
319 .unzip();
320 let len = fields
321 .first()
322 .map(|f| f.len())
323 .ok_or_else(|| vortex_err!("StructArray cannot be constructed from an empty slice of arrays because the length is unspecified"))?;
324
325 Self::try_new(FieldNames::from_iter(names), fields, len, validity)
326 }
327
328 pub fn try_from_iter<N: AsRef<str>, A: IntoArray, T: IntoIterator<Item = (N, A)>>(
330 iter: T,
331 ) -> VortexResult<Self> {
332 let (names, fields): (Vec<FieldName>, Vec<ArrayRef>) = iter
333 .into_iter()
334 .map(|(name, field)| (FieldName::from(name.as_ref()), field.into_array()))
335 .unzip();
336 let len = fields
337 .first()
338 .map(ArrayRef::len)
339 .ok_or_else(|| vortex_err!("StructArray cannot be constructed from an empty slice of arrays because the length is unspecified"))?;
340
341 Self::try_new(
342 FieldNames::from_iter(names),
343 fields,
344 len,
345 Validity::NonNullable,
346 )
347 }
348
349 pub fn project(&self, projection: &[FieldName]) -> VortexResult<Self> {
357 let mut children = Vec::with_capacity(projection.len());
358 let mut names = Vec::with_capacity(projection.len());
359
360 for f_name in projection {
361 let idx = self
362 .struct_fields()
363 .find(f_name.as_ref())
364 .ok_or_else(|| vortex_err!("Unknown field {f_name}"))?;
365
366 names.push(self.names()[idx].clone());
367 children.push(self.unmasked_field(idx).clone());
368 }
369
370 Self::try_new(
371 FieldNames::from(names.as_slice()),
372 children,
373 self.len(),
374 self.validity()?,
375 )
376 }
377
378 pub fn new_fieldless_with_len(len: usize) -> Self {
380 let dtype = DType::Struct(
381 StructFields::new(FieldNames::default(), Vec::new()),
382 crate::dtype::Nullability::NonNullable,
383 );
384 let slots = make_struct_slots(&[], &Validity::NonNullable, len);
385 unsafe {
386 Array::from_parts_unchecked(
387 ArrayParts::new(Struct, dtype, len, EmptyArrayData).with_slots(slots),
388 )
389 }
390 }
391
392 pub fn into_data_parts(self) -> StructDataParts {
394 let fields: Arc<[ArrayRef]> = self.slots()[FIELDS_OFFSET..]
395 .iter()
396 .map(|s| s.as_ref().vortex_expect("StructArray field slot").clone())
397 .collect();
398 let validity = self.validity().vortex_expect("StructArray validity");
399 StructDataParts {
400 struct_fields: self.struct_fields().clone(),
401 fields,
402 validity,
403 }
404 }
405
406 pub fn remove_column(&self, name: impl Into<FieldName>) -> Option<(Self, ArrayRef)> {
407 let name = name.into();
408 let struct_dtype = self.struct_fields();
409 let len = self.len();
410
411 let position = struct_dtype.find(name.as_ref())?;
412
413 let slot_position = FIELDS_OFFSET + position;
414 let field = self.slots()[slot_position]
415 .as_ref()
416 .vortex_expect("StructArray field slot")
417 .clone();
418 let new_slots: Vec<Option<ArrayRef>> = self
419 .slots()
420 .iter()
421 .enumerate()
422 .filter(|(i, _)| *i != slot_position)
423 .map(|(_, s)| s.clone())
424 .collect();
425
426 let new_dtype = struct_dtype.without_field(position).ok()?;
427 let new_array = unsafe {
428 Array::from_parts_unchecked(
429 ArrayParts::new(
430 Struct,
431 DType::Struct(new_dtype, self.dtype().nullability()),
432 len,
433 EmptyArrayData,
434 )
435 .with_slots(new_slots),
436 )
437 };
438 Some((new_array, field))
439 }
440
441 pub fn with_column(&self, name: impl Into<FieldName>, array: ArrayRef) -> VortexResult<Self> {
442 let name = name.into();
443 let struct_dtype = self.struct_fields();
444
445 let names = struct_dtype.names().iter().cloned().chain(once(name));
446 let types = struct_dtype.fields().chain(once(array.dtype().clone()));
447 let new_fields = StructFields::new(names.collect(), types.collect());
448
449 let children: Arc<[ArrayRef]> = self.slots()[FIELDS_OFFSET..]
450 .iter()
451 .map(|s| s.as_ref().vortex_expect("StructArray field slot").clone())
452 .chain(once(array))
453 .collect();
454
455 Self::try_new_with_dtype(children, new_fields, self.len(), self.validity()?)
456 }
457
458 pub fn remove_column_owned(&self, name: impl Into<FieldName>) -> Option<(Self, ArrayRef)> {
459 self.remove_column(name)
460 }
461
462 pub fn try_concat<T>(chunks: impl IntoIterator<Item = T>) -> VortexResult<Self>
463 where
464 T: Borrow<Array<Struct>>,
465 {
466 let mut it = chunks.into_iter();
467 let Some(first) = it.next() else {
468 vortex_bail!("cannot concat empty iterator of arrays");
469 };
470 let first_dtype = first.borrow().dtype().clone();
471 let struct_fields = first_dtype.as_struct_fields().clone();
472 let names = struct_fields.names();
473
474 let it = [first].into_iter().chain(it);
475 let (field_arrays_per_chunk, validities) = it
476 .map(|chunk| {
477 let chunk = chunk.borrow();
478 if &first_dtype != chunk.dtype() {
479 vortex_bail!(
480 "cannot concatenate struct arrays with differing dtypes: {}, {}",
481 first_dtype,
482 chunk.dtype(),
483 );
484 }
485
486 let fields = names
487 .iter()
488 .map(|name| {
489 chunk
490 .unmasked_field_by_name(name)
491 .vortex_expect("field exists because it is in dtype")
492 .clone()
493 })
494 .collect::<Vec<_>>();
495 let validity = chunk.validity()?;
496
497 Ok((fields, (validity, chunk.len())))
498 })
499 .process_results(|iter| iter.unzip::<_, _, Vec<_>, Vec<_>>())?;
500
501 let field_arrays = struct_fields
502 .fields()
503 .enumerate()
504 .map(|(i, dtype)| {
505 let chunks = field_arrays_per_chunk
507 .iter()
508 .map(|x| x[i].clone())
509 .collect();
510 unsafe { ChunkedArray::new_unchecked(chunks, dtype) }.into_array()
511 })
512 .collect::<Vec<_>>();
513 let len = validities.iter().map(|(_v, len)| len).sum();
514 let validity = Validity::concat(validities).vortex_expect("verified non-empty above");
515
516 Ok(unsafe { Array::<Struct>::new_unchecked(field_arrays, struct_fields, len, validity) })
526 }
527}