Skip to main content

vortex_array/arrow/
session.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Plugin layer for moving Arrow extension types in and out of Vortex.
5//!
6//! Vortex's canonical Arrow conversion (see [`crate::dtype::arrow`] and the executor in
7//! [`crate::arrow::executor`]) handles every non-extension Arrow type and the builtin temporal
8//! extensions. The plugins registered here cover the remaining case: **Arrow extension types**.
9//!
10//! * An [`ArrowExportVTable`] is dispatched purely by the **target Arrow extension Id** —
11//!   the plugin is selected when the caller asks for an Arrow [`Field`] carrying matching
12//!   `ARROW:extension:name` metadata. The Vortex source dtype/encoding is irrelevant to
13//!   dispatch.
14//! * An [`ArrowImportVTable`] is dispatched by the **source Arrow extension name** carried
15//!   on the incoming [`Field`]. The plugin is responsible for both preserving extension
16//!   identity and re-encoding storage if needed (e.g. Arrow `FixedSizeBinary[16]` for UUID
17//!   becomes Vortex `FixedSizeList<u8; 16>`).
18//!
19//! Multiple plugins may register against the same key. They are tried in registration order;
20//! each may return [`ArrowExport::Unsupported`] / [`ArrowImport::Unsupported`] to defer to
21//! the next.
22
23use std::any::Any;
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use arc_swap::ArcSwap;
28use arrow_array::Array as _;
29use arrow_array::ArrayRef as ArrowArrayRef;
30use arrow_array::RecordBatch;
31use arrow_array::make_array;
32use arrow_schema::DataType;
33use arrow_schema::Field;
34use arrow_schema::Fields;
35use arrow_schema::Schema;
36use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY;
37use arrow_schema::extension::ExtensionType;
38use tracing::debug;
39use tracing::trace;
40use vortex_error::VortexResult;
41use vortex_error::vortex_bail;
42use vortex_error::vortex_ensure;
43use vortex_session::Ref;
44use vortex_session::SessionExt;
45use vortex_session::SessionVar;
46use vortex_session::registry::Id;
47use vortex_utils::aliases::hash_map::HashMap;
48
49use crate::ArrayRef;
50use crate::ExecutionCtx;
51use crate::IntoArray;
52use crate::arrays::StructArray;
53use crate::arrow::FromArrowArray;
54use crate::arrow::convert::nulls;
55use crate::arrow::convert::remove_nulls;
56use crate::arrow::executor::execute_arrow_naive;
57use crate::dtype::DType;
58use crate::dtype::FieldName;
59use crate::dtype::FieldNames;
60use crate::dtype::Nullability;
61use crate::dtype::StructFields;
62use crate::dtype::arrow::FromArrowType;
63use crate::dtype::arrow::to_data_type_naive;
64use crate::dtype::extension::ExtDTypeRef;
65use crate::dtype::extension::ExtId;
66use crate::extension::datetime::AnyTemporal;
67use crate::extension::uuid::Uuid;
68use crate::validity::Validity;
69
70/// Outcome of a successful call to [`ArrowExportVTable::execute_arrow`].
71///
72/// Plugins that don't handle the supplied array return [`Unsupported`][Self::Unsupported]
73/// with ownership of the input so the session can probe the next plugin or fall back to the
74/// canonical path. Errors are propagated through [`VortexResult`].
75pub enum ArrowExport {
76    /// The plugin does not handle this input; the session may try another plugin.
77    Unsupported(ArrayRef),
78    /// A successful export.
79    Exported(ArrowArrayRef),
80}
81
82/// Outcome of a successful call to [`ArrowImportVTable::from_arrow_array`].
83///
84/// Plugins that don't handle the supplied array return [`Unsupported`][Self::Unsupported]
85/// with ownership of the input so the session can probe the next plugin or fall back to the
86/// canonical path. Errors are propagated through [`VortexResult`].
87pub enum ArrowImport {
88    /// The plugin does not handle this input; the session may try another plugin.
89    Unsupported(ArrowArrayRef),
90    /// A successful import.
91    Imported(ArrayRef),
92}
93
94/// Plugin layer for exporting a Vortex array to an Arrow extension type.
95///
96/// This is purely an implementation trait, its methods should not be called directly. Instead,
97/// use the methods on [`ArrowSession`].
98pub trait ArrowExportVTable: 'static + Send + Sync + Debug {
99    /// The Arrow extension ID this plugin produces.
100    fn arrow_ext_id(&self) -> Id;
101
102    /// The Vortex extension ID this plugin maps from. Used only for inference by
103    /// [`ArrowSession::to_arrow_field`] / [`ArrowSession::to_arrow_schema`]; never as a
104    /// dispatch key for [`execute_arrow`][Self::execute_arrow].
105    fn vortex_ext_id(&self) -> ExtId;
106
107    /// Build the Arrow [`Field`] this plugin produces for the given Vortex extension
108    /// `dtype`. Used during schema inference.
109    fn to_arrow_field(
110        &self,
111        name: &str,
112        dtype: &ExtDTypeRef,
113        session: &ArrowSession,
114    ) -> VortexResult<Option<Field>>;
115
116    /// Convert a Vortex array into an Arrow array shaped to `target`.
117    ///
118    /// Returns ownership of `array` via [`ArrowExport::Unsupported`] when the plugin cannot
119    /// handle the input.
120    fn execute_arrow(
121        &self,
122        array: ArrayRef,
123        target: &Field,
124        ctx: &mut ExecutionCtx,
125    ) -> VortexResult<ArrowExport>;
126}
127
128/// Plugin layer for importing an Arrow extension-typed array into a Vortex extension array.
129///
130/// Plugins are dispatched by `arrow_ext_id`.
131///
132/// This is purely an implementation trait, its methods should not be called directly. Instead,
133/// use the methods on [`ArrowSession`].
134pub trait ArrowImportVTable: 'static + Send + Sync + Debug {
135    /// The Arrow extension name this plugin handles.
136    fn arrow_ext_id(&self) -> Id;
137
138    /// Build the Vortex [`DType`] that corresponds to `field` (which carries this plugin's
139    /// Arrow extension metadata).
140    #[allow(clippy::wrong_self_convention)]
141    fn from_arrow_field(&self, field: &Field) -> VortexResult<Option<DType>>;
142
143    /// Convert an Arrow array into a Vortex extension array of `dtype`.
144    ///
145    /// Returns ownership of `array` via [`ArrowImport::Unsupported`] when the plugin cannot
146    /// handle the input.
147    #[allow(clippy::wrong_self_convention)]
148    fn from_arrow_array(
149        &self,
150        array: ArrowArrayRef,
151        dtype: &ExtDTypeRef,
152    ) -> VortexResult<ArrowImport>;
153}
154
155pub type ArrowExportVTableRef = Arc<dyn ArrowExportVTable>;
156pub type ArrowImportVTableRef = Arc<dyn ArrowImportVTable>;
157
158type ExportMap = HashMap<Id, Arc<[ArrowExportVTableRef]>>;
159type ImportMap = HashMap<Id, Arc<[ArrowImportVTableRef]>>;
160type ExportDTypeMap = HashMap<ExtId, Arc<[ArrowExportVTableRef]>>;
161
162/// Session-scoped registry of Arrow extension plugins.
163///
164/// Exporters are stored in two indices: one keyed by Arrow extension Id (used for
165/// `execute_arrow` dispatch) and one keyed by Vortex extension Id (used **only** by
166/// `to_arrow_field` / `to_arrow_schema` inference, when callers need to translate a Vortex
167/// extension `DType` into an Arrow `Field` with no target schema in hand). Importers are
168/// keyed by Arrow extension name. The default session pre-registers the builtin UUID
169/// plugin; temporal extensions are handled by the canonical Arrow ↔ Vortex path and do not
170/// need plugins.
171#[derive(Debug)]
172pub struct ArrowSession {
173    exporters: ArcSwap<ExportMap>,
174    exporters_by_vortex: ArcSwap<ExportDTypeMap>,
175    importers: ArcSwap<ImportMap>,
176}
177
178impl Default for ArrowSession {
179    fn default() -> Self {
180        let session = Self {
181            exporters: ArcSwap::from_pointee(ExportMap::default()),
182            exporters_by_vortex: ArcSwap::from_pointee(ExportDTypeMap::default()),
183            importers: ArcSwap::from_pointee(ImportMap::default()),
184        };
185
186        session.register_exporter(Arc::new(Uuid));
187        session.register_importer(Arc::new(Uuid));
188
189        session
190    }
191}
192
193impl ArrowSession {
194    /// Register an [`ArrowExportVTable`] under its target Arrow extension Id (for dispatch)
195    /// and its source Vortex extension Id (for schema inference).
196    pub fn register_exporter(&self, exporter: ArrowExportVTableRef) {
197        Self::insert(
198            &self.exporters,
199            exporter.arrow_ext_id(),
200            ArrowExportVTableRef::clone(&exporter),
201        );
202        Self::insert(
203            &self.exporters_by_vortex,
204            exporter.vortex_ext_id(),
205            exporter,
206        );
207    }
208
209    /// Register an [`ArrowImportVTable`] under its source Arrow extension name.
210    pub fn register_importer(&self, importer: ArrowImportVTableRef) {
211        Self::insert(&self.importers, importer.arrow_ext_id(), importer);
212    }
213
214    fn insert<K, T>(slot: &ArcSwap<HashMap<K, Arc<[T]>>>, key: K, value: T)
215    where
216        K: Clone + Eq + std::hash::Hash,
217        T: Clone,
218    {
219        slot.rcu(move |map| {
220            let mut next = (**map).clone();
221            let entry = next.entry(key.clone()).or_insert_with(|| Arc::from([]));
222            let mut extended: Vec<T> = entry.iter().cloned().collect();
223            extended.push(value.clone());
224            *entry = Arc::from(extended);
225            next
226        });
227    }
228
229    fn exporters(&self, id: &Id) -> Arc<[ArrowExportVTableRef]> {
230        self.exporters
231            .load()
232            .get(id)
233            .cloned()
234            .unwrap_or_else(|| Arc::from([]))
235    }
236
237    fn exporters_by_vortex(&self, id: &ExtId) -> Arc<[ArrowExportVTableRef]> {
238        self.exporters_by_vortex
239            .load()
240            .get(id)
241            .cloned()
242            .unwrap_or_else(|| Arc::from([]))
243    }
244
245    fn importers(&self, id: &Id) -> Arc<[ArrowImportVTableRef]> {
246        self.importers
247            .load()
248            .get(id)
249            .cloned()
250            .unwrap_or_else(|| Arc::from([]))
251    }
252
253    /// Build the Arrow [`Field`] for a Vortex [`DType`].
254    ///
255    /// For [`DType::Extension`]s, plugins registered against the extension's `Id`
256    /// are tried in registration order; the first plugin to return `Some(field)` wins.
257    pub fn to_arrow_field(&self, name: &str, dtype: &DType) -> VortexResult<Field> {
258        // Handle the structural encodings, which may have recursive types
259        match dtype {
260            DType::List(elem_dtype, nullability) => {
261                let elem_field = self.to_arrow_field(Field::LIST_FIELD_DEFAULT_NAME, elem_dtype)?;
262                Ok(Field::new_list(name, elem_field, nullability.is_nullable()))
263            }
264            DType::FixedSizeList(elem_dtype, elem_size, nullability) => {
265                let elem_field = self.to_arrow_field(Field::LIST_FIELD_DEFAULT_NAME, elem_dtype)?;
266                Ok(Field::new_fixed_size_list(
267                    name,
268                    elem_field,
269                    (*elem_size).try_into()?,
270                    nullability.is_nullable(),
271                ))
272            }
273            DType::Struct(fields, nullability) => {
274                let arrow_fields = Fields::from_iter(
275                    fields
276                        .fields()
277                        .zip(fields.names().iter())
278                        .map(|(field, name)| self.to_arrow_field(name.as_ref(), &field))
279                        .collect::<VortexResult<Vec<_>>>()?,
280                );
281                Ok(Field::new_struct(
282                    name,
283                    arrow_fields,
284                    nullability.is_nullable(),
285                ))
286            }
287            DType::Extension(ext) if !ext.is::<AnyTemporal>() => {
288                for plugin in self.exporters_by_vortex(&ext.id()).iter() {
289                    if let Some(field) = plugin.to_arrow_field(name, ext, self)? {
290                        return Ok(field);
291                    }
292                }
293                vortex_bail!("extension type cannot be converted to Arrow without a plugin: {ext}");
294            }
295            DType::Variant(_) => {
296                vortex_bail!("Arrow does not have a raw/transparent Variant encoding");
297            }
298            _ => Ok(Field::new(
299                name,
300                to_data_type_naive(dtype)?,
301                dtype.is_nullable(),
302            )),
303        }
304    }
305
306    /// Build the Arrow [`Schema`] for a Vortex top-level [`DType::Struct`], dispatching
307    /// extension fields through registered export plugins for inference. Nested
308    /// extensions are preserved via [`Self::to_arrow_field`].
309    pub fn to_arrow_schema(&self, dtype: &DType) -> VortexResult<Schema> {
310        let DType::Struct(struct_dtype, _) = dtype else {
311            vortex_error::vortex_bail!(
312                "to_arrow_schema requires a top-level struct dtype, got {dtype}"
313            );
314        };
315        let mut fields = Vec::with_capacity(struct_dtype.names().len());
316        for (name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) {
317            fields.push(self.to_arrow_field(name.as_ref(), &field_dtype)?);
318        }
319        Ok(Schema::new(fields))
320    }
321
322    /// Build the Vortex [`DType`] for an Arrow [`Field`].
323    ///
324    /// Plugins registered against the field's Arrow extension name are tried in
325    /// registration order; the first plugin to return `Some(dtype)` wins. If none
326    /// match (or all return `None`), recurses into container types ([`DataType::List`]
327    /// family, [`DataType::FixedSizeList`], [`DataType::Struct`]) so extension metadata
328    /// on nested element/struct fields is preserved. Leaf types use the canonical
329    /// Arrow → Vortex mapping via [`DType::from_arrow`].
330    pub fn from_arrow_field(&self, field: &Field) -> VortexResult<DType> {
331        if let Some(name) = field.metadata().get(EXTENSION_TYPE_NAME_KEY) {
332            for plugin in self.importers(&Id::new(name)).iter() {
333                if let Some(dtype) = plugin.from_arrow_field(field)? {
334                    return Ok(dtype);
335                }
336            }
337        }
338        let nullability: Nullability = field.is_nullable().into();
339        Ok(match field.data_type() {
340            DataType::List(elem)
341            | DataType::LargeList(elem)
342            | DataType::ListView(elem)
343            | DataType::LargeListView(elem) => {
344                DType::List(Arc::new(self.from_arrow_field(elem.as_ref())?), nullability)
345            }
346            DataType::FixedSizeList(elem, size) => DType::FixedSizeList(
347                Arc::new(self.from_arrow_field(elem.as_ref())?),
348                *size as u32,
349                nullability,
350            ),
351            DataType::Struct(fields) => {
352                let entries = fields
353                    .iter()
354                    .map(|f| {
355                        self.from_arrow_field(f)
356                            .map(|dt| (FieldName::from(f.name().as_str()), dt))
357                    })
358                    .collect::<VortexResult<Vec<_>>>()?;
359                DType::Struct(StructFields::from_iter(entries), nullability)
360            }
361            _ => DType::from_arrow(field),
362        })
363    }
364
365    /// Build the Vortex [`DType`] for an Arrow [`Schema`], dispatching extension fields
366    /// through registered import plugins. The result is a top-level non-nullable struct
367    /// matching the schema's fields.
368    pub fn from_arrow_schema(&self, schema: &Schema) -> VortexResult<DType> {
369        let entries = schema
370            .fields()
371            .iter()
372            .map(|f| {
373                self.from_arrow_field(f)
374                    .map(|dt| (FieldName::from(f.name().as_str()), dt))
375            })
376            .collect::<VortexResult<Vec<_>>>()?;
377        Ok(DType::Struct(
378            StructFields::from_iter(entries),
379            Nullability::NonNullable,
380        ))
381    }
382
383    /// Decode an Arrow [`RecordBatch`] into a Vortex struct array, dispatching each
384    /// extension column through its registered import plugin.
385    ///
386    /// `schema` is the authoritative Arrow schema used for dispatch — the columns are
387    /// consumed positionally. Pass an external schema (rather than relying on
388    /// `batch.schema()`) when upstream DataFusion plumbing may have stripped Field-level
389    /// extension metadata from the runtime RecordBatch.
390    pub fn from_arrow_record_batch(
391        &self,
392        batch: RecordBatch,
393        schema: &Schema,
394    ) -> VortexResult<ArrayRef> {
395        vortex_ensure!(
396            batch.num_columns() == schema.fields().len(),
397            "RecordBatch has {} columns but schema has {} fields",
398            batch.num_columns(),
399            schema.fields().len()
400        );
401        let length = batch.num_rows();
402        let names = FieldNames::from_iter(
403            schema
404                .fields()
405                .iter()
406                .map(|f| FieldName::from(f.name().as_str())),
407        );
408        let mut columns = Vec::with_capacity(schema.fields().len());
409        for (col, field) in batch.columns().iter().zip(schema.fields().iter()) {
410            columns.push(self.from_arrow_array(ArrowArrayRef::clone(col), field)?);
411        }
412        Ok(StructArray::try_new(names, columns, length, Validity::NonNullable)?.into_array())
413    }
414
415    /// Execute a Vortex array into an Arrow array.
416    ///
417    /// If `target` carries an `ARROW:extension:name`, the plugin registry is probed for one that
418    /// can support executing to the target extension type.
419    ///
420    /// With `target = None` the fallback path picks the array's preferred Arrow physical type
421    /// and executes directly into that, ignoring extension types.
422    pub fn execute_arrow(
423        &self,
424        array: ArrayRef,
425        target: Option<&Field>,
426        ctx: &mut ExecutionCtx,
427    ) -> VortexResult<ArrowArrayRef> {
428        // NOTE(aduffy): this looks strange, but we do this to keep target_field as &Field so
429        //  we can avoid cloning target when it is provided. It contains a HashMap internally that
430        //  can be expensive to copy.
431        let arrow_field;
432        let target_field = match target {
433            Some(field) => field,
434            None => {
435                let session = ctx.session().clone();
436                arrow_field = session.arrow().to_arrow_field("", array.dtype())?;
437                &arrow_field
438            }
439        };
440
441        if let Some(arrow_ext_name) = target_field.metadata().get(EXTENSION_TYPE_NAME_KEY) {
442            // There can be multiple plugins that report support for a particular extension type.
443            // We try them in order until one of them reports a successful conversion.
444            let len = array.len();
445            let mut current = array;
446
447            for plugin in self.exporters(&Id::new(arrow_ext_name)).iter() {
448                trace!(
449                    plugin = ?plugin,
450                    extension_name = arrow_ext_name,
451                    "probing plugin for converting Arrow array"
452                );
453
454                match plugin.execute_arrow(current, target_field, ctx)? {
455                    ArrowExport::Exported(arrow) => {
456                        vortex_ensure!(
457                            arrow.len() == len,
458                            "Arrow array length does not match Vortex array length after conversion to {:?}",
459                            arrow
460                        );
461                        return Ok(arrow);
462                    }
463                    ArrowExport::Unsupported(array) => current = array,
464                }
465            }
466
467            debug!(
468                extension_id = arrow_ext_name,
469                data_type = ?target_field.data_type(),
470                "unsupported Arrow extension type encountered, falling back to naive execution"
471            );
472
473            return execute_arrow_naive(current, Some(target_field.data_type()), ctx);
474        }
475
476        execute_arrow_naive(array, target.map(|field| field.data_type()), ctx)
477    }
478
479    /// Decode an Arrow array into a Vortex array.
480    ///
481    /// Routes through the registered import plugin if `field` carries an Arrow extension
482    /// name we recognize, probing each plugin in registration order until one handles the
483    /// input or all return [`ArrowImport::Unsupported`]. Otherwise recurses into container
484    /// arrays ([`arrow_array::StructArray`], [`arrow_array::GenericListArray`],
485    /// [`arrow_array::FixedSizeListArray`], [`arrow_array::GenericListViewArray`]) so
486    /// extension fields nested inside containers reach their importers; leaf types fall
487    /// through to the canonical Arrow → Vortex array conversion.
488    pub fn from_arrow_array(&self, array: ArrowArrayRef, field: &Field) -> VortexResult<ArrayRef> {
489        if let Some(extension_name) = field.metadata().get(EXTENSION_TYPE_NAME_KEY) {
490            let importers = self.importers(&Id::new(extension_name));
491            if !importers.is_empty() {
492                let dtype = self.from_arrow_field(field)?;
493                if let DType::Extension(ext_dtype) = dtype {
494                    let mut current = array;
495                    for plugin in importers.iter() {
496                        match plugin.from_arrow_array(current, &ext_dtype)? {
497                            ArrowImport::Imported(arr) => return Ok(arr),
498                            ArrowImport::Unsupported(arr) => current = arr,
499                        }
500                    }
501                    return ArrayRef::from_arrow(current.as_ref(), field.is_nullable());
502                }
503            }
504        }
505        self.from_arrow_array_canonical(array, field)
506    }
507
508    /// Recurse into Arrow container arrays so nested fields with extension metadata reach
509    /// their importers, falling through to [`ArrayRef::from_arrow`] for leaf types.
510    #[allow(clippy::wrong_self_convention)]
511    fn from_arrow_array_canonical(
512        &self,
513        array: ArrowArrayRef,
514        field: &Field,
515    ) -> VortexResult<ArrayRef> {
516        use arrow_array::cast::AsArray;
517
518        match field.data_type() {
519            DataType::Struct(fields) => {
520                let arrow_struct = array.as_struct();
521                let names = FieldNames::from_iter(
522                    fields.iter().map(|f| FieldName::from(f.name().as_str())),
523                );
524                let columns = arrow_struct
525                    .columns()
526                    .iter()
527                    .zip(fields.iter())
528                    .map(|(col, child_field)| {
529                        // Arrow pushes nulls into non-nullable fields; strip before recursing
530                        // so Vortex's stricter validity invariants are upheld.
531                        let inner = if col.null_count() > 0 && !child_field.is_nullable() {
532                            make_array(remove_nulls(col.to_data()))
533                        } else {
534                            ArrowArrayRef::clone(col)
535                        };
536                        self.from_arrow_array(inner, child_field.as_ref())
537                    })
538                    .collect::<VortexResult<Vec<_>>>()?;
539                let validity = nulls(arrow_struct.nulls(), field.is_nullable());
540                Ok(
541                    StructArray::try_new(names, columns, arrow_struct.len(), validity)?
542                        .into_array(),
543                )
544            }
545            DataType::List(elem_field) => {
546                let list = array.as_list::<i32>();
547                let elements = self
548                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
549                let offsets = list.offsets().clone().into_array();
550                let validity = nulls(list.nulls(), field.is_nullable());
551                Ok(crate::arrays::ListArray::try_new(elements, offsets, validity)?.into_array())
552            }
553            DataType::LargeList(elem_field) => {
554                let list = array.as_list::<i64>();
555                let elements = self
556                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
557                let offsets = list.offsets().clone().into_array();
558                let validity = nulls(list.nulls(), field.is_nullable());
559                Ok(crate::arrays::ListArray::try_new(elements, offsets, validity)?.into_array())
560            }
561            DataType::FixedSizeList(elem_field, list_size) => {
562                let fsl = array.as_fixed_size_list();
563                let elements =
564                    self.from_arrow_array(ArrowArrayRef::clone(fsl.values()), elem_field.as_ref())?;
565                let validity = nulls(fsl.nulls(), field.is_nullable());
566                Ok(crate::arrays::FixedSizeListArray::try_new(
567                    elements,
568                    *list_size as u32,
569                    validity,
570                    fsl.len(),
571                )?
572                .into_array())
573            }
574            DataType::ListView(elem_field) => {
575                let list = array.as_list_view::<i32>();
576                let elements = self
577                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
578                let offsets = list.offsets().clone().into_array();
579                let sizes = list.sizes().clone().into_array();
580                let validity = nulls(list.nulls(), field.is_nullable());
581                Ok(
582                    crate::arrays::ListViewArray::try_new(elements, offsets, sizes, validity)?
583                        .into_array(),
584                )
585            }
586            DataType::LargeListView(elem_field) => {
587                let list = array.as_list_view::<i64>();
588                let elements = self
589                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
590                let offsets = list.offsets().clone().into_array();
591                let sizes = list.sizes().clone().into_array();
592                let validity = nulls(list.nulls(), field.is_nullable());
593                Ok(
594                    crate::arrays::ListViewArray::try_new(elements, offsets, sizes, validity)?
595                        .into_array(),
596                )
597            }
598            _ => ArrayRef::from_arrow(array.as_ref(), field.is_nullable()),
599        }
600    }
601}
602
603// NOTE(aduffy): We should remove this once we bump Arrow to 0.59.0. This is replicating the
604//  `Field::has_valid_extension_type` method on Arrow added in 58.2.0, we polyfill it here so that
605//  this crate can build with minimal-versions declared.
606pub(crate) fn has_valid_extension_type<E: ExtensionType>(field: &Field) -> bool {
607    if field.extension_type_name() != Some(E::NAME) {
608        return false;
609    }
610
611    E::try_new_from_field_metadata(field.data_type(), field.metadata()).is_ok()
612}
613
614impl SessionVar for ArrowSession {
615    fn as_any(&self) -> &dyn Any {
616        self
617    }
618
619    fn as_any_mut(&mut self) -> &mut dyn Any {
620        self
621    }
622}
623
624/// Extension trait for accessing the [`ArrowSession`] on a Vortex session.
625pub trait ArrowSessionExt: SessionExt {
626    /// Get the Arrow session.
627    fn arrow(&self) -> Ref<'_, ArrowSession>;
628}
629
630impl<S: SessionExt> ArrowSessionExt for S {
631    fn arrow(&self) -> Ref<'_, ArrowSession> {
632        self.get::<ArrowSession>()
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use std::sync::Arc;
639
640    use arrow_array::FixedSizeBinaryArray;
641    use arrow_array::cast::AsArray;
642    use arrow_schema::DataType;
643    use arrow_schema::Field;
644    use arrow_schema::extension::Uuid as ArrowUuid;
645    use vortex_error::VortexResult;
646
647    use super::*;
648    use crate::LEGACY_SESSION;
649    use crate::VortexSessionExecute;
650    use crate::dtype::DType;
651    use crate::dtype::FieldName;
652    use crate::dtype::Nullability;
653    use crate::dtype::PType;
654    use crate::dtype::StructFields;
655    use crate::dtype::extension::ExtDType;
656    use crate::dtype::extension::ExtVTable;
657    use crate::extension::uuid::Uuid;
658    use crate::extension::uuid::UuidMetadata;
659
660    fn uuid_dtype(nullable: bool) -> DType {
661        let storage = DType::FixedSizeList(
662            Arc::new(DType::Primitive(PType::U8, Nullability::NonNullable)),
663            16,
664            nullable.into(),
665        );
666        DType::Extension(
667            ExtDType::try_with_vtable(Uuid, UuidMetadata::default(), storage)
668                .expect("uuid ext dtype")
669                .erased(),
670        )
671    }
672
673    #[test]
674    fn to_arrow_field_top_level_uuid_carries_extension_metadata() -> VortexResult<()> {
675        let session = ArrowSession::default();
676        let field = session.to_arrow_field("id", &uuid_dtype(false))?;
677        assert!(has_valid_extension_type::<ArrowUuid>(&field));
678        Ok(())
679    }
680
681    #[test]
682    fn to_arrow_field_struct_with_nested_uuid_preserves_metadata() -> VortexResult<()> {
683        let session = ArrowSession::default();
684        let dtype = DType::Struct(
685            StructFields::from_iter([(FieldName::from("id"), uuid_dtype(false))]),
686            Nullability::NonNullable,
687        );
688        let field = session.to_arrow_field("row", &dtype)?;
689        let DataType::Struct(inner) = field.data_type() else {
690            panic!("expected Struct, got {:?}", field.data_type());
691        };
692        assert_eq!(inner.len(), 1);
693        assert_eq!(inner[0].data_type(), &DataType::FixedSizeBinary(16));
694        assert!(has_valid_extension_type::<ArrowUuid>(&inner[0]));
695        Ok(())
696    }
697
698    #[test]
699    fn to_arrow_field_list_of_uuid_preserves_metadata() -> VortexResult<()> {
700        let session = ArrowSession::default();
701        let dtype = DType::List(Arc::new(uuid_dtype(true)), Nullability::NonNullable);
702        let field = session.to_arrow_field("ids", &dtype)?;
703        let DataType::List(elem) = field.data_type() else {
704            panic!("expected List, got {:?}", field.data_type());
705        };
706        assert!(has_valid_extension_type::<ArrowUuid>(elem));
707        Ok(())
708    }
709
710    #[test]
711    fn to_arrow_field_fixed_size_list_of_uuid_preserves_metadata() -> VortexResult<()> {
712        let session = ArrowSession::default();
713        let dtype = DType::FixedSizeList(Arc::new(uuid_dtype(false)), 3, Nullability::NonNullable);
714        let field = session.to_arrow_field("triple", &dtype)?;
715        let DataType::FixedSizeList(elem, size) = field.data_type() else {
716            panic!("expected FixedSizeList, got {:?}", field.data_type());
717        };
718        assert_eq!(*size, 3);
719        assert!(has_valid_extension_type::<ArrowUuid>(elem));
720        Ok(())
721    }
722
723    #[test]
724    fn to_arrow_schema_struct_of_struct_uuid() -> VortexResult<()> {
725        let session = ArrowSession::default();
726        let inner = DType::Struct(
727            StructFields::from_iter([(FieldName::from("id"), uuid_dtype(true))]),
728            Nullability::NonNullable,
729        );
730        let outer = DType::Struct(
731            StructFields::from_iter([(FieldName::from("payload"), inner)]),
732            Nullability::NonNullable,
733        );
734        let schema = session.to_arrow_schema(&outer)?;
735        let payload = schema.field(0);
736        let DataType::Struct(inner_fields) = payload.data_type() else {
737            panic!("expected Struct, got {:?}", payload.data_type());
738        };
739        assert!(has_valid_extension_type::<ArrowUuid>(&inner_fields[0]));
740        Ok(())
741    }
742
743    #[test]
744    fn from_arrow_field_recurses_into_nested_uuid() -> VortexResult<()> {
745        let session = ArrowSession::default();
746        let mut elem = Field::new("item", DataType::FixedSizeBinary(16), false);
747        elem.try_with_extension_type(ArrowUuid)?;
748        let outer = Field::new("ids", DataType::List(Arc::new(elem)), false);
749
750        let dtype = session.from_arrow_field(&outer)?;
751        let DType::List(inner_dt, _) = dtype else {
752            panic!("expected List dtype, got {dtype}");
753        };
754        assert!(
755            matches!(inner_dt.as_ref(), DType::Extension(ext) if ext.id() == Uuid.id()),
756            "expected Uuid extension element, got {inner_dt}",
757        );
758        Ok(())
759    }
760
761    #[test]
762    fn schema_roundtrip_preserves_nested_uuid() -> VortexResult<()> {
763        let session = ArrowSession::default();
764        let dtype = DType::Struct(
765            StructFields::from_iter([
766                (FieldName::from("id"), uuid_dtype(false)),
767                (
768                    FieldName::from("ids"),
769                    DType::List(Arc::new(uuid_dtype(true)), Nullability::NonNullable),
770                ),
771            ]),
772            Nullability::NonNullable,
773        );
774        let schema = session.to_arrow_schema(&dtype)?;
775        let roundtripped = session.from_arrow_schema(&schema)?;
776        assert_eq!(roundtripped, dtype);
777        Ok(())
778    }
779
780    #[test]
781    fn execute_arrow_target_none_preserves_top_level_uuid_metadata() -> VortexResult<()> {
782        let mut ctx = LEGACY_SESSION.create_execution_ctx();
783        let session = LEGACY_SESSION.arrow();
784
785        let mut field = Field::new("id", DataType::FixedSizeBinary(16), false);
786        field.try_with_extension_type(ArrowUuid)?;
787        let arrow_array: ArrowArrayRef = Arc::new(FixedSizeBinaryArray::try_from_iter(
788            [*b"0123456789abcdef", *b"fedcba9876543210"].into_iter(),
789        )?);
790
791        let vortex_array = session.from_arrow_array(arrow_array, &field)?;
792
793        let vortex_ext = vortex_array.dtype().as_extension();
794        assert!(vortex_ext.is::<Uuid>());
795
796        let exported = session.execute_arrow(vortex_array, None, &mut ctx)?;
797        assert_eq!(exported.data_type(), &DataType::FixedSizeBinary(16));
798        let fsb = exported.as_fixed_size_binary();
799        assert_eq!(fsb.len(), 2);
800        assert_eq!(fsb.value(0), b"0123456789abcdef");
801        assert_eq!(fsb.value(1), b"fedcba9876543210");
802        Ok(())
803    }
804}