Skip to main content

vortex_array/arrow/
session.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Plugin layer for moving Arrow extension types in and out of Vortex.
5//!
6//! Vortex's canonical Arrow conversion (see [`crate::dtype::arrow`] and the executor in
7//! [`crate::arrow::executor`]) handles every non-extension Arrow type and the builtin temporal
8//! extensions. The plugins registered here cover the remaining case: **Arrow extension types**.
9//!
10//! * An [`ArrowExportVTable`] is dispatched purely by the **target Arrow extension Id** —
11//!   the plugin is selected when the caller asks for an Arrow [`Field`] carrying matching
12//!   `ARROW:extension:name` metadata. The Vortex source dtype/encoding is irrelevant to
13//!   dispatch.
14//! * An [`ArrowImportVTable`] is dispatched by the **source Arrow extension name** carried
15//!   on the incoming [`Field`]. The plugin is responsible for both preserving extension
16//!   identity and re-encoding storage if needed (e.g. Arrow `FixedSizeBinary[16]` for UUID
17//!   becomes Vortex `FixedSizeList<u8; 16>`).
18//!
19//! Multiple plugins may register against the same key. They are tried in registration order;
20//! each may return [`ArrowExport::Unsupported`] / [`ArrowImport::Unsupported`] to defer to
21//! the next.
22
23use std::any::Any;
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use arc_swap::ArcSwap;
28use arrow_array::Array as _;
29use arrow_array::ArrayRef as ArrowArrayRef;
30use arrow_array::RecordBatch;
31use arrow_array::make_array;
32use arrow_schema::DataType;
33use arrow_schema::Field;
34use arrow_schema::Fields;
35use arrow_schema::Schema;
36use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY;
37use arrow_schema::extension::ExtensionType;
38use tracing::debug;
39use tracing::trace;
40use vortex_error::VortexResult;
41use vortex_error::vortex_bail;
42use vortex_error::vortex_ensure;
43use vortex_session::Ref;
44use vortex_session::SessionExt;
45use vortex_session::SessionVar;
46use vortex_session::registry::Id;
47use vortex_utils::aliases::hash_map::HashMap;
48
49use crate::ArrayRef;
50use crate::ExecutionCtx;
51use crate::IntoArray;
52use crate::arrays::StructArray;
53use crate::arrow::FromArrowArray;
54use crate::arrow::convert::nulls;
55use crate::arrow::convert::remove_nulls;
56use crate::arrow::executor::execute_arrow_naive;
57use crate::dtype::DType;
58use crate::dtype::FieldName;
59use crate::dtype::FieldNames;
60use crate::dtype::Nullability;
61use crate::dtype::StructFields;
62use crate::dtype::arrow::FromArrowType;
63use crate::dtype::arrow::to_data_type_naive;
64use crate::extension::datetime::AnyTemporal;
65use crate::extension::uuid::Uuid;
66use crate::validity::Validity;
67
68/// Outcome of a successful call to [`ArrowExportVTable::execute_arrow`].
69///
70/// Plugins that don't handle the supplied array return [`Unsupported`][Self::Unsupported]
71/// with ownership of the input so the session can probe the next plugin or fall back to the
72/// canonical path. Errors are propagated through [`VortexResult`].
73pub enum ArrowExport {
74    /// The plugin does not handle this input; the session may try another plugin.
75    Unsupported(ArrayRef),
76    /// A successful export.
77    Exported(ArrowArrayRef),
78}
79
80/// Outcome of a successful call to [`ArrowImportVTable::from_arrow_array`].
81///
82/// Plugins that don't handle the supplied array return [`Unsupported`][Self::Unsupported]
83/// with ownership of the input so the session can probe the next plugin or fall back to the
84/// canonical path. Errors are propagated through [`VortexResult`].
85pub enum ArrowImport {
86    /// The plugin does not handle this input; the session may try another plugin.
87    Unsupported(ArrowArrayRef),
88    /// A successful import.
89    Imported(ArrayRef),
90}
91
92/// Plugin layer for exporting a Vortex array to an Arrow extension type.
93///
94/// This is purely an implementation trait, its methods should not be called directly. Instead,
95/// use the methods on [`ArrowSession`].
96pub trait ArrowExportVTable: 'static + Send + Sync + Debug {
97    /// The Arrow extension ID this plugin produces.
98    fn arrow_ext_id(&self) -> Id;
99
100    /// The Vortex array or extension ID this plugin maps from. Used only for inference by
101    /// [`ArrowSession::to_arrow_field`] / [`ArrowSession::to_arrow_schema`]; never as a
102    /// dispatch key for [`execute_arrow`][Self::execute_arrow].
103    fn vortex_id(&self) -> Id;
104
105    /// Build the Arrow [`Field`] this plugin produces for the given Vortex extension
106    /// `dtype`. Used during schema inference.
107    fn to_arrow_field(
108        &self,
109        name: &str,
110        dtype: &DType,
111        session: &ArrowSession,
112    ) -> VortexResult<Option<Field>>;
113
114    /// Convert a Vortex array into an Arrow array shaped to `target`.
115    ///
116    /// Returns ownership of `array` via [`ArrowExport::Unsupported`] when the plugin cannot
117    /// handle the input.
118    fn execute_arrow(
119        &self,
120        array: ArrayRef,
121        target: &Field,
122        ctx: &mut ExecutionCtx,
123    ) -> VortexResult<ArrowExport>;
124}
125
126/// Plugin layer for importing an Arrow extension-typed array into a Vortex array.
127///
128/// Plugins are dispatched by `arrow_ext_id`.
129///
130/// This is purely an implementation trait, its methods should not be called directly. Instead,
131/// use the methods on [`ArrowSession`].
132pub trait ArrowImportVTable: 'static + Send + Sync + Debug {
133    /// The Arrow extension name this plugin handles.
134    fn arrow_ext_id(&self) -> Id;
135
136    /// Build the Vortex [`DType`] that corresponds to `field` (which carries this plugin's
137    /// Arrow extension metadata).
138    #[allow(clippy::wrong_self_convention)]
139    fn from_arrow_field(&self, field: &Field) -> VortexResult<Option<DType>>;
140
141    /// Convert an Arrow array into a Vortex array of `dtype`.
142    ///
143    /// Returns ownership of `array` via [`ArrowImport::Unsupported`] when the plugin cannot
144    /// handle the input.
145    #[allow(clippy::wrong_self_convention)]
146    fn from_arrow_array(
147        &self,
148        array: ArrowArrayRef,
149        field: &Field,
150        dtype: &DType,
151    ) -> VortexResult<ArrowImport>;
152}
153
154pub type ArrowExportVTableRef = Arc<dyn ArrowExportVTable>;
155pub type ArrowImportVTableRef = Arc<dyn ArrowImportVTable>;
156
157type ExportMap = HashMap<Id, Arc<[ArrowExportVTableRef]>>;
158type ImportMap = HashMap<Id, Arc<[ArrowImportVTableRef]>>;
159type ExportDTypeMap = HashMap<Id, Arc<[ArrowExportVTableRef]>>;
160
161/// Session-scoped registry of Arrow extension plugins.
162///
163/// Exporters are stored in two indices: one keyed by Arrow extension Id (used for
164/// `execute_arrow` dispatch) and one keyed by Vortex extension Id (used **only** by
165/// `to_arrow_field` / `to_arrow_schema` inference, when callers need to translate a Vortex
166/// extension `DType` into an Arrow `Field` with no target schema in hand). Importers are
167/// keyed by Arrow extension name. The default session pre-registers the builtin UUID
168/// plugin; temporal extensions are handled by the canonical Arrow ↔ Vortex path and do not
169/// need plugins.
170#[derive(Debug)]
171pub struct ArrowSession {
172    exporters: ArcSwap<ExportMap>,
173    exporters_by_vortex: ArcSwap<ExportDTypeMap>,
174    importers: ArcSwap<ImportMap>,
175}
176
177impl Default for ArrowSession {
178    fn default() -> Self {
179        let session = Self {
180            exporters: ArcSwap::from_pointee(ExportMap::default()),
181            exporters_by_vortex: ArcSwap::from_pointee(ExportDTypeMap::default()),
182            importers: ArcSwap::from_pointee(ImportMap::default()),
183        };
184
185        session.register_exporter(Arc::new(Uuid));
186        session.register_importer(Arc::new(Uuid));
187
188        session
189    }
190}
191
192impl ArrowSession {
193    /// Register an [`ArrowExportVTable`] under its target Arrow extension Id (for dispatch)
194    /// and its source Vortex extension Id (for schema inference).
195    pub fn register_exporter(&self, exporter: ArrowExportVTableRef) {
196        Self::insert(
197            &self.exporters,
198            exporter.arrow_ext_id(),
199            ArrowExportVTableRef::clone(&exporter),
200        );
201        Self::insert(&self.exporters_by_vortex, exporter.vortex_id(), exporter);
202    }
203
204    /// Register an [`ArrowImportVTable`] under its source Arrow extension name.
205    pub fn register_importer(&self, importer: ArrowImportVTableRef) {
206        Self::insert(&self.importers, importer.arrow_ext_id(), importer);
207    }
208
209    fn insert<K, T>(slot: &ArcSwap<HashMap<K, Arc<[T]>>>, key: K, value: T)
210    where
211        K: Clone + Eq + std::hash::Hash,
212        T: Clone,
213    {
214        slot.rcu(move |map| {
215            let mut next = (**map).clone();
216            let entry = next.entry(key.clone()).or_insert_with(|| Arc::from([]));
217            let mut extended: Vec<T> = entry.iter().cloned().collect();
218            extended.push(value.clone());
219            *entry = Arc::from(extended);
220            next
221        });
222    }
223
224    fn exporters(&self, id: &Id) -> Arc<[ArrowExportVTableRef]> {
225        self.exporters
226            .load()
227            .get(id)
228            .cloned()
229            .unwrap_or_else(|| Arc::from([]))
230    }
231
232    fn exporters_by_vortex(&self, id: &Id) -> Arc<[ArrowExportVTableRef]> {
233        self.exporters_by_vortex
234            .load()
235            .get(id)
236            .cloned()
237            .unwrap_or_else(|| Arc::from([]))
238    }
239
240    fn importers(&self, id: &Id) -> Arc<[ArrowImportVTableRef]> {
241        self.importers
242            .load()
243            .get(id)
244            .cloned()
245            .unwrap_or_else(|| Arc::from([]))
246    }
247
248    /// Build the Arrow [`Field`] for a Vortex [`DType`].
249    ///
250    /// For [`DType::Extension`]s, plugins registered against the extension's `Id`
251    /// are tried in registration order; the first plugin to return `Some(field)` wins.
252    pub fn to_arrow_field(&self, name: &str, dtype: &DType) -> VortexResult<Field> {
253        // Handle the structural encodings, which may have recursive types
254        match dtype {
255            DType::List(elem_dtype, nullability) => {
256                let elem_field = self.to_arrow_field(Field::LIST_FIELD_DEFAULT_NAME, elem_dtype)?;
257                Ok(Field::new_list(name, elem_field, nullability.is_nullable()))
258            }
259            DType::FixedSizeList(elem_dtype, elem_size, nullability) => {
260                let elem_field = self.to_arrow_field(Field::LIST_FIELD_DEFAULT_NAME, elem_dtype)?;
261                Ok(Field::new_fixed_size_list(
262                    name,
263                    elem_field,
264                    (*elem_size).try_into()?,
265                    nullability.is_nullable(),
266                ))
267            }
268            DType::Struct(fields, nullability) => {
269                let arrow_fields = Fields::from_iter(
270                    fields
271                        .fields()
272                        .zip(fields.names().iter())
273                        .map(|(field, name)| self.to_arrow_field(name.as_ref(), &field))
274                        .collect::<VortexResult<Vec<_>>>()?,
275                );
276                Ok(Field::new_struct(
277                    name,
278                    arrow_fields,
279                    nullability.is_nullable(),
280                ))
281            }
282            DType::Extension(ext) if !ext.is::<AnyTemporal>() => {
283                for plugin in self.exporters_by_vortex(&ext.id()).iter() {
284                    if let Some(field) =
285                        plugin.to_arrow_field(name, &DType::Extension(ext.clone()), self)?
286                    {
287                        return Ok(field);
288                    }
289                }
290                vortex_bail!("extension type cannot be converted to Arrow without a plugin: {ext}");
291            }
292            DType::Variant(_) => {
293                // TODO(Adam): This currently encodes information about parquet-variant
294                // at this level. Variant's complexity with being an essentially logical type
295                // with multiple physical layout complicates handling this correctly.
296                Ok(Field::new(
297                    name,
298                    DataType::Struct(
299                        vec![
300                            Field::new("metadata", DataType::BinaryView, dtype.is_nullable()),
301                            Field::new("value", DataType::BinaryView, dtype.is_nullable()),
302                        ]
303                        .into(),
304                    ),
305                    dtype.is_nullable(),
306                )
307                .with_metadata(
308                    [(
309                        EXTENSION_TYPE_NAME_KEY.to_string(),
310                        "arrow.parquet.variant".to_string(),
311                    )]
312                    .into(),
313                ))
314            }
315            _ => Ok(Field::new(
316                name,
317                to_data_type_naive(dtype)?,
318                dtype.is_nullable(),
319            )),
320        }
321    }
322
323    /// Build the Arrow [`Schema`] for a Vortex top-level [`DType::Struct`], dispatching
324    /// extension fields through registered export plugins for inference. Nested
325    /// extensions are preserved via [`Self::to_arrow_field`].
326    pub fn to_arrow_schema(&self, dtype: &DType) -> VortexResult<Schema> {
327        let DType::Struct(struct_dtype, _) = dtype else {
328            vortex_error::vortex_bail!(
329                "to_arrow_schema requires a top-level struct dtype, got {dtype}"
330            );
331        };
332        let mut fields = Vec::with_capacity(struct_dtype.names().len());
333        for (name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) {
334            fields.push(self.to_arrow_field(name.as_ref(), &field_dtype)?);
335        }
336        Ok(Schema::new(fields))
337    }
338
339    /// Build the Vortex [`DType`] for an Arrow [`Field`].
340    ///
341    /// Plugins registered against the field's Arrow extension name are tried in
342    /// registration order; the first plugin to return `Some(dtype)` wins. If none
343    /// match (or all return `None`), recurses into container types ([`DataType::List`]
344    /// family, [`DataType::FixedSizeList`], [`DataType::Struct`]) so extension metadata
345    /// on nested element/struct fields is preserved. Leaf types use the canonical
346    /// Arrow → Vortex mapping via [`DType::from_arrow`].
347    pub fn from_arrow_field(&self, field: &Field) -> VortexResult<DType> {
348        if let Some(name) = field.metadata().get(EXTENSION_TYPE_NAME_KEY) {
349            for plugin in self.importers(&Id::new(name)).iter() {
350                if let Some(dtype) = plugin.from_arrow_field(field)? {
351                    return Ok(dtype);
352                }
353            }
354        }
355        let nullability: Nullability = field.is_nullable().into();
356        Ok(match field.data_type() {
357            DataType::List(elem)
358            | DataType::LargeList(elem)
359            | DataType::ListView(elem)
360            | DataType::LargeListView(elem) => {
361                DType::List(Arc::new(self.from_arrow_field(elem.as_ref())?), nullability)
362            }
363            DataType::FixedSizeList(elem, size) => DType::FixedSizeList(
364                Arc::new(self.from_arrow_field(elem.as_ref())?),
365                *size as u32,
366                nullability,
367            ),
368            DataType::Struct(fields) => {
369                let entries = fields
370                    .iter()
371                    .map(|f| {
372                        self.from_arrow_field(f)
373                            .map(|dt| (FieldName::from(f.name().as_str()), dt))
374                    })
375                    .collect::<VortexResult<Vec<_>>>()?;
376                DType::Struct(StructFields::from_iter(entries), nullability)
377            }
378            _ => DType::from_arrow(field),
379        })
380    }
381
382    /// Build the Vortex [`DType`] for an Arrow [`Schema`], dispatching extension fields
383    /// through registered import plugins. The result is a top-level non-nullable struct
384    /// matching the schema's fields.
385    pub fn from_arrow_schema(&self, schema: &Schema) -> VortexResult<DType> {
386        let entries = schema
387            .fields()
388            .iter()
389            .map(|f| {
390                self.from_arrow_field(f)
391                    .map(|dt| (FieldName::from(f.name().as_str()), dt))
392            })
393            .collect::<VortexResult<Vec<_>>>()?;
394        Ok(DType::Struct(
395            StructFields::from_iter(entries),
396            Nullability::NonNullable,
397        ))
398    }
399
400    /// Decode an Arrow [`RecordBatch`] into a Vortex struct array, dispatching each
401    /// extension column through its registered import plugin.
402    ///
403    /// `schema` is the authoritative Arrow schema used for dispatch — the columns are
404    /// consumed positionally. Pass an external schema (rather than relying on
405    /// `batch.schema()`) when upstream DataFusion plumbing may have stripped Field-level
406    /// extension metadata from the runtime RecordBatch.
407    pub fn from_arrow_record_batch(
408        &self,
409        batch: RecordBatch,
410        schema: &Schema,
411    ) -> VortexResult<ArrayRef> {
412        vortex_ensure!(
413            batch.num_columns() == schema.fields().len(),
414            "RecordBatch has {} columns but schema has {} fields",
415            batch.num_columns(),
416            schema.fields().len()
417        );
418        let length = batch.num_rows();
419        let names = FieldNames::from_iter(
420            schema
421                .fields()
422                .iter()
423                .map(|f| FieldName::from(f.name().as_str())),
424        );
425        let mut columns = Vec::with_capacity(schema.fields().len());
426        for (col, field) in batch.columns().iter().zip(schema.fields().iter()) {
427            columns.push(self.from_arrow_array(ArrowArrayRef::clone(col), field)?);
428        }
429        Ok(StructArray::try_new(names, columns, length, Validity::NonNullable)?.into_array())
430    }
431
432    /// Execute a Vortex array into an Arrow array.
433    ///
434    /// If `target` carries an `ARROW:extension:name`, the plugin registry is probed for one that
435    /// can support executing to the target extension type.
436    ///
437    /// With `target = None` the fallback path picks the array's preferred Arrow physical type
438    /// and executes directly into that, ignoring extension types.
439    pub fn execute_arrow(
440        &self,
441        array: ArrayRef,
442        target: Option<&Field>,
443        ctx: &mut ExecutionCtx,
444    ) -> VortexResult<ArrowArrayRef> {
445        // NOTE(aduffy): this looks strange, but we do this to keep target_field as &Field so
446        //  we can avoid cloning target when it is provided. It contains a HashMap internally that
447        //  can be expensive to copy.
448        let arrow_field;
449        let target_field = match target {
450            Some(field) => field,
451            None => {
452                let session = ctx.session().clone();
453                arrow_field = session.arrow().to_arrow_field("", array.dtype())?;
454                &arrow_field
455            }
456        };
457
458        if let Some(arrow_ext_name) = target_field.metadata().get(EXTENSION_TYPE_NAME_KEY) {
459            // There can be multiple plugins that report support for a particular extension type.
460            // We try them in order until one of them reports a successful conversion.
461            let len = array.len();
462            let mut current = array;
463
464            for plugin in self.exporters(&Id::new(arrow_ext_name)).iter() {
465                trace!(
466                    plugin = ?plugin,
467                    extension_name = arrow_ext_name,
468                    "probing plugin for converting Arrow array"
469                );
470
471                match plugin.execute_arrow(current, target_field, ctx)? {
472                    ArrowExport::Exported(arrow) => {
473                        vortex_ensure!(
474                            arrow.len() == len,
475                            "Arrow array length does not match Vortex array length after conversion to {:?}",
476                            arrow
477                        );
478                        return Ok(arrow);
479                    }
480                    ArrowExport::Unsupported(array) => current = array,
481                }
482            }
483
484            debug!(
485                extension_id = arrow_ext_name,
486                data_type = ?target_field.data_type(),
487                "unsupported Arrow extension type encountered, falling back to naive execution"
488            );
489
490            return execute_arrow_naive(current, Some(target_field.data_type()), ctx);
491        }
492
493        execute_arrow_naive(array, target.map(|field| field.data_type()), ctx)
494    }
495
496    /// Decode an Arrow array into a Vortex array.
497    ///
498    /// Routes through the registered import plugin if `field` carries an Arrow extension
499    /// name we recognize, probing each plugin in registration order until one handles the
500    /// input or all return [`ArrowImport::Unsupported`]. Otherwise recurses into container
501    /// arrays ([`arrow_array::StructArray`], [`arrow_array::GenericListArray`],
502    /// [`arrow_array::FixedSizeListArray`], [`arrow_array::GenericListViewArray`]) so
503    /// extension fields nested inside containers reach their importers; leaf types fall
504    /// through to the canonical Arrow → Vortex array conversion.
505    pub fn from_arrow_array(&self, array: ArrowArrayRef, field: &Field) -> VortexResult<ArrayRef> {
506        if let Some(extension_name) = field.metadata().get(EXTENSION_TYPE_NAME_KEY) {
507            let importers = self.importers(&Id::new(extension_name));
508            if !importers.is_empty() {
509                let dtype = self.from_arrow_field(field)?;
510                let mut current = array;
511                for plugin in importers.iter() {
512                    match plugin.from_arrow_array(current, field, &dtype)? {
513                        ArrowImport::Imported(arr) => return Ok(arr),
514                        ArrowImport::Unsupported(arr) => current = arr,
515                    }
516                }
517                return ArrayRef::from_arrow(current.as_ref(), field.is_nullable());
518            }
519        }
520        self.from_arrow_array_canonical(array, field)
521    }
522
523    /// Recurse into Arrow container arrays so nested fields with extension metadata reach
524    /// their importers, falling through to [`ArrayRef::from_arrow`] for leaf types.
525    #[allow(clippy::wrong_self_convention)]
526    fn from_arrow_array_canonical(
527        &self,
528        array: ArrowArrayRef,
529        field: &Field,
530    ) -> VortexResult<ArrayRef> {
531        use arrow_array::cast::AsArray;
532
533        match field.data_type() {
534            DataType::Struct(fields) => {
535                let arrow_struct = array.as_struct();
536                let names = FieldNames::from_iter(
537                    fields.iter().map(|f| FieldName::from(f.name().as_str())),
538                );
539                let columns = arrow_struct
540                    .columns()
541                    .iter()
542                    .zip(fields.iter())
543                    .map(|(col, child_field)| {
544                        // Arrow pushes nulls into non-nullable fields; strip before recursing
545                        // so Vortex's stricter validity invariants are upheld.
546                        let inner = if col.null_count() > 0 && !child_field.is_nullable() {
547                            make_array(remove_nulls(col.to_data()))
548                        } else {
549                            ArrowArrayRef::clone(col)
550                        };
551                        self.from_arrow_array(inner, child_field.as_ref())
552                    })
553                    .collect::<VortexResult<Vec<_>>>()?;
554                let validity = nulls(arrow_struct.nulls(), field.is_nullable());
555                Ok(
556                    StructArray::try_new(names, columns, arrow_struct.len(), validity)?
557                        .into_array(),
558                )
559            }
560            DataType::List(elem_field) => {
561                let list = array.as_list::<i32>();
562                let elements = self
563                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
564                let offsets = list.offsets().clone().into_array();
565                let validity = nulls(list.nulls(), field.is_nullable());
566                Ok(crate::arrays::ListArray::try_new(elements, offsets, validity)?.into_array())
567            }
568            DataType::LargeList(elem_field) => {
569                let list = array.as_list::<i64>();
570                let elements = self
571                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
572                let offsets = list.offsets().clone().into_array();
573                let validity = nulls(list.nulls(), field.is_nullable());
574                Ok(crate::arrays::ListArray::try_new(elements, offsets, validity)?.into_array())
575            }
576            DataType::FixedSizeList(elem_field, list_size) => {
577                let fsl = array.as_fixed_size_list();
578                let elements =
579                    self.from_arrow_array(ArrowArrayRef::clone(fsl.values()), elem_field.as_ref())?;
580                let validity = nulls(fsl.nulls(), field.is_nullable());
581                Ok(crate::arrays::FixedSizeListArray::try_new(
582                    elements,
583                    *list_size as u32,
584                    validity,
585                    fsl.len(),
586                )?
587                .into_array())
588            }
589            DataType::ListView(elem_field) => {
590                let list = array.as_list_view::<i32>();
591                let elements = self
592                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
593                let offsets = list.offsets().clone().into_array();
594                let sizes = list.sizes().clone().into_array();
595                let validity = nulls(list.nulls(), field.is_nullable());
596                Ok(
597                    crate::arrays::ListViewArray::try_new(elements, offsets, sizes, validity)?
598                        .into_array(),
599                )
600            }
601            DataType::LargeListView(elem_field) => {
602                let list = array.as_list_view::<i64>();
603                let elements = self
604                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
605                let offsets = list.offsets().clone().into_array();
606                let sizes = list.sizes().clone().into_array();
607                let validity = nulls(list.nulls(), field.is_nullable());
608                Ok(
609                    crate::arrays::ListViewArray::try_new(elements, offsets, sizes, validity)?
610                        .into_array(),
611                )
612            }
613            _ => ArrayRef::from_arrow(array.as_ref(), field.is_nullable()),
614        }
615    }
616}
617
618// NOTE(aduffy): We should remove this once we bump Arrow to 0.59.0. This is replicating the
619//  `Field::has_valid_extension_type` method on Arrow added in 58.2.0, we polyfill it here so that
620//  this crate can build with minimal-versions declared.
621pub(crate) fn has_valid_extension_type<E: ExtensionType>(field: &Field) -> bool {
622    if field.extension_type_name() != Some(E::NAME) {
623        return false;
624    }
625
626    E::try_new_from_field_metadata(field.data_type(), field.metadata()).is_ok()
627}
628
629impl SessionVar for ArrowSession {
630    fn as_any(&self) -> &dyn Any {
631        self
632    }
633
634    fn as_any_mut(&mut self) -> &mut dyn Any {
635        self
636    }
637}
638
639/// Extension trait for accessing the [`ArrowSession`] on a Vortex session.
640pub trait ArrowSessionExt: SessionExt {
641    /// Get the Arrow session.
642    fn arrow(&self) -> Ref<'_, ArrowSession>;
643}
644
645impl<S: SessionExt> ArrowSessionExt for S {
646    fn arrow(&self) -> Ref<'_, ArrowSession> {
647        self.get::<ArrowSession>()
648    }
649}
650
651#[cfg(test)]
652mod tests {
653    use std::sync::Arc;
654
655    use arrow_array::FixedSizeBinaryArray;
656    use arrow_array::cast::AsArray;
657    use arrow_schema::DataType;
658    use arrow_schema::Field;
659    use arrow_schema::extension::Uuid as ArrowUuid;
660    use vortex_error::VortexResult;
661
662    use super::*;
663    use crate::LEGACY_SESSION;
664    use crate::VortexSessionExecute;
665    use crate::dtype::DType;
666    use crate::dtype::FieldName;
667    use crate::dtype::Nullability;
668    use crate::dtype::PType;
669    use crate::dtype::StructFields;
670    use crate::dtype::extension::ExtDType;
671    use crate::dtype::extension::ExtVTable;
672    use crate::extension::uuid::Uuid;
673    use crate::extension::uuid::UuidMetadata;
674
675    fn uuid_dtype(nullable: bool) -> DType {
676        let storage = DType::FixedSizeList(
677            Arc::new(DType::Primitive(PType::U8, Nullability::NonNullable)),
678            16,
679            nullable.into(),
680        );
681        DType::Extension(
682            ExtDType::try_with_vtable(Uuid, UuidMetadata::default(), storage)
683                .expect("uuid ext dtype")
684                .erased(),
685        )
686    }
687
688    #[test]
689    fn to_arrow_field_top_level_uuid_carries_extension_metadata() -> VortexResult<()> {
690        let session = ArrowSession::default();
691        let field = session.to_arrow_field("id", &uuid_dtype(false))?;
692        assert!(has_valid_extension_type::<ArrowUuid>(&field));
693        Ok(())
694    }
695
696    #[test]
697    fn to_arrow_field_struct_with_nested_uuid_preserves_metadata() -> VortexResult<()> {
698        let session = ArrowSession::default();
699        let dtype = DType::Struct(
700            StructFields::from_iter([(FieldName::from("id"), uuid_dtype(false))]),
701            Nullability::NonNullable,
702        );
703        let field = session.to_arrow_field("row", &dtype)?;
704        let DataType::Struct(inner) = field.data_type() else {
705            panic!("expected Struct, got {:?}", field.data_type());
706        };
707        assert_eq!(inner.len(), 1);
708        assert_eq!(inner[0].data_type(), &DataType::FixedSizeBinary(16));
709        assert!(has_valid_extension_type::<ArrowUuid>(&inner[0]));
710        Ok(())
711    }
712
713    #[test]
714    fn to_arrow_field_list_of_uuid_preserves_metadata() -> VortexResult<()> {
715        let session = ArrowSession::default();
716        let dtype = DType::List(Arc::new(uuid_dtype(true)), Nullability::NonNullable);
717        let field = session.to_arrow_field("ids", &dtype)?;
718        let DataType::List(elem) = field.data_type() else {
719            panic!("expected List, got {:?}", field.data_type());
720        };
721        assert!(has_valid_extension_type::<ArrowUuid>(elem));
722        Ok(())
723    }
724
725    #[test]
726    fn to_arrow_field_fixed_size_list_of_uuid_preserves_metadata() -> VortexResult<()> {
727        let session = ArrowSession::default();
728        let dtype = DType::FixedSizeList(Arc::new(uuid_dtype(false)), 3, Nullability::NonNullable);
729        let field = session.to_arrow_field("triple", &dtype)?;
730        let DataType::FixedSizeList(elem, size) = field.data_type() else {
731            panic!("expected FixedSizeList, got {:?}", field.data_type());
732        };
733        assert_eq!(*size, 3);
734        assert!(has_valid_extension_type::<ArrowUuid>(elem));
735        Ok(())
736    }
737
738    #[test]
739    fn to_arrow_schema_struct_of_struct_uuid() -> VortexResult<()> {
740        let session = ArrowSession::default();
741        let inner = DType::Struct(
742            StructFields::from_iter([(FieldName::from("id"), uuid_dtype(true))]),
743            Nullability::NonNullable,
744        );
745        let outer = DType::Struct(
746            StructFields::from_iter([(FieldName::from("payload"), inner)]),
747            Nullability::NonNullable,
748        );
749        let schema = session.to_arrow_schema(&outer)?;
750        let payload = schema.field(0);
751        let DataType::Struct(inner_fields) = payload.data_type() else {
752            panic!("expected Struct, got {:?}", payload.data_type());
753        };
754        assert!(has_valid_extension_type::<ArrowUuid>(&inner_fields[0]));
755        Ok(())
756    }
757
758    #[test]
759    fn from_arrow_field_recurses_into_nested_uuid() -> VortexResult<()> {
760        let session = ArrowSession::default();
761        let mut elem = Field::new("item", DataType::FixedSizeBinary(16), false);
762        elem.try_with_extension_type(ArrowUuid)?;
763        let outer = Field::new("ids", DataType::List(Arc::new(elem)), false);
764
765        let dtype = session.from_arrow_field(&outer)?;
766        let DType::List(inner_dt, _) = dtype else {
767            panic!("expected List dtype, got {dtype}");
768        };
769        assert!(
770            matches!(inner_dt.as_ref(), DType::Extension(ext) if ext.id() == Uuid.id()),
771            "expected Uuid extension element, got {inner_dt}",
772        );
773        Ok(())
774    }
775
776    #[test]
777    fn schema_roundtrip_preserves_nested_uuid() -> VortexResult<()> {
778        let session = ArrowSession::default();
779        let dtype = DType::Struct(
780            StructFields::from_iter([
781                (FieldName::from("id"), uuid_dtype(false)),
782                (
783                    FieldName::from("ids"),
784                    DType::List(Arc::new(uuid_dtype(true)), Nullability::NonNullable),
785                ),
786            ]),
787            Nullability::NonNullable,
788        );
789        let schema = session.to_arrow_schema(&dtype)?;
790        let roundtripped = session.from_arrow_schema(&schema)?;
791        assert_eq!(roundtripped, dtype);
792        Ok(())
793    }
794
795    #[test]
796    fn execute_arrow_target_none_preserves_top_level_uuid_metadata() -> VortexResult<()> {
797        let mut ctx = LEGACY_SESSION.create_execution_ctx();
798        let session = LEGACY_SESSION.arrow();
799
800        let mut field = Field::new("id", DataType::FixedSizeBinary(16), false);
801        field.try_with_extension_type(ArrowUuid)?;
802        let arrow_array: ArrowArrayRef = Arc::new(FixedSizeBinaryArray::try_from_iter(
803            [*b"0123456789abcdef", *b"fedcba9876543210"].into_iter(),
804        )?);
805
806        let vortex_array = session.from_arrow_array(arrow_array, &field)?;
807
808        let vortex_ext = vortex_array.dtype().as_extension();
809        assert!(vortex_ext.is::<Uuid>());
810
811        let exported = session.execute_arrow(vortex_array, None, &mut ctx)?;
812        assert_eq!(exported.data_type(), &DataType::FixedSizeBinary(16));
813        let fsb = exported.as_fixed_size_binary();
814        assert_eq!(fsb.len(), 2);
815        assert_eq!(fsb.value(0), b"0123456789abcdef");
816        assert_eq!(fsb.value(1), b"fedcba9876543210");
817        Ok(())
818    }
819}