vortex-array 0.72.0

Vortex in memory columnar data format
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Plugin layer for moving Arrow extension types in and out of Vortex.
//!
//! Vortex's canonical Arrow conversion (see [`crate::dtype::arrow`] and the executor in
//! [`crate::arrow::executor`]) handles every non-extension Arrow type and the builtin temporal
//! extensions. The plugins registered here cover the remaining case: **Arrow extension types**.
//!
//! * An [`ArrowExportVTable`] is dispatched purely by the **target Arrow extension Id** —
//!   the plugin is selected when the caller asks for an Arrow [`Field`] carrying matching
//!   `ARROW:extension:name` metadata. The Vortex source dtype/encoding is irrelevant to
//!   dispatch.
//! * An [`ArrowImportVTable`] is dispatched by the **source Arrow extension name** carried
//!   on the incoming [`Field`]. The plugin is responsible for both preserving extension
//!   identity and re-encoding storage if needed (e.g. Arrow `FixedSizeBinary[16]` for UUID
//!   becomes Vortex `FixedSizeList<u8; 16>`).
//!
//! Multiple plugins may register against the same key. They are tried in registration order;
//! each may return [`ArrowExport::Unsupported`] / [`ArrowImport::Unsupported`] to defer to
//! the next.

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use arc_swap::ArcSwap;
use arrow_array::Array as _;
use arrow_array::ArrayRef as ArrowArrayRef;
use arrow_array::RecordBatch;
use arrow_array::make_array;
use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::Fields;
use arrow_schema::Schema;
use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY;
use arrow_schema::extension::ExtensionType;
use tracing::debug;
use tracing::trace;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_session::Ref;
use vortex_session::SessionExt;
use vortex_session::SessionVar;
use vortex_session::registry::Id;
use vortex_utils::aliases::hash_map::HashMap;

use crate::ArrayRef;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::arrays::StructArray;
use crate::arrow::FromArrowArray;
use crate::arrow::convert::nulls;
use crate::arrow::convert::remove_nulls;
use crate::arrow::executor::execute_arrow_naive;
use crate::dtype::DType;
use crate::dtype::FieldName;
use crate::dtype::FieldNames;
use crate::dtype::Nullability;
use crate::dtype::StructFields;
use crate::dtype::arrow::FromArrowType;
use crate::dtype::arrow::to_data_type_naive;
use crate::dtype::extension::ExtDTypeRef;
use crate::dtype::extension::ExtId;
use crate::extension::datetime::AnyTemporal;
use crate::extension::uuid::Uuid;
use crate::validity::Validity;

/// Outcome of a successful call to [`ArrowExportVTable::execute_arrow`].
///
/// Plugins that don't handle the supplied array return [`Unsupported`][Self::Unsupported]
/// with ownership of the input so the session can probe the next plugin or fall back to the
/// canonical path. Errors are propagated through [`VortexResult`].
pub enum ArrowExport {
    /// The plugin does not handle this input; the session may try another plugin.
    Unsupported(ArrayRef),
    /// A successful export.
    Exported(ArrowArrayRef),
}

/// Outcome of a successful call to [`ArrowImportVTable::from_arrow_array`].
///
/// Plugins that don't handle the supplied array return [`Unsupported`][Self::Unsupported]
/// with ownership of the input so the session can probe the next plugin or fall back to the
/// canonical path. Errors are propagated through [`VortexResult`].
pub enum ArrowImport {
    /// The plugin does not handle this input; the session may try another plugin.
    Unsupported(ArrowArrayRef),
    /// A successful import.
    Imported(ArrayRef),
}

/// Plugin layer for exporting a Vortex array to an Arrow extension type.
///
/// This is purely an implementation trait, its methods should not be called directly. Instead,
/// use the methods on [`ArrowSession`].
pub trait ArrowExportVTable: 'static + Send + Sync + Debug {
    /// The Arrow extension ID this plugin produces.
    fn arrow_ext_id(&self) -> Id;

    /// The Vortex extension ID this plugin maps from. Used only for inference by
    /// [`ArrowSession::to_arrow_field`] / [`ArrowSession::to_arrow_schema`]; never as a
    /// dispatch key for [`execute_arrow`][Self::execute_arrow].
    fn vortex_ext_id(&self) -> ExtId;

    /// Build the Arrow [`Field`] this plugin produces for the given Vortex extension
    /// `dtype`. Used during schema inference.
    fn to_arrow_field(
        &self,
        name: &str,
        dtype: &ExtDTypeRef,
        session: &ArrowSession,
    ) -> VortexResult<Option<Field>>;

    /// Convert a Vortex array into an Arrow array shaped to `target`.
    ///
    /// Returns ownership of `array` via [`ArrowExport::Unsupported`] when the plugin cannot
    /// handle the input.
    fn execute_arrow(
        &self,
        array: ArrayRef,
        target: &Field,
        ctx: &mut ExecutionCtx,
    ) -> VortexResult<ArrowExport>;
}

/// Plugin layer for importing an Arrow extension-typed array into a Vortex extension array.
///
/// Plugins are dispatched by `arrow_ext_id`.
///
/// This is purely an implementation trait, its methods should not be called directly. Instead,
/// use the methods on [`ArrowSession`].
pub trait ArrowImportVTable: 'static + Send + Sync + Debug {
    /// The Arrow extension name this plugin handles.
    fn arrow_ext_id(&self) -> Id;

    /// Build the Vortex [`DType`] that corresponds to `field` (which carries this plugin's
    /// Arrow extension metadata).
    #[allow(clippy::wrong_self_convention)]
    fn from_arrow_field(&self, field: &Field) -> VortexResult<Option<DType>>;

    /// Convert an Arrow array into a Vortex extension array of `dtype`.
    ///
    /// Returns ownership of `array` via [`ArrowImport::Unsupported`] when the plugin cannot
    /// handle the input.
    #[allow(clippy::wrong_self_convention)]
    fn from_arrow_array(
        &self,
        array: ArrowArrayRef,
        dtype: &ExtDTypeRef,
    ) -> VortexResult<ArrowImport>;
}

pub type ArrowExportVTableRef = Arc<dyn ArrowExportVTable>;
pub type ArrowImportVTableRef = Arc<dyn ArrowImportVTable>;

type ExportMap = HashMap<Id, Arc<[ArrowExportVTableRef]>>;
type ImportMap = HashMap<Id, Arc<[ArrowImportVTableRef]>>;
type ExportDTypeMap = HashMap<ExtId, Arc<[ArrowExportVTableRef]>>;

/// Session-scoped registry of Arrow extension plugins.
///
/// Exporters are stored in two indices: one keyed by Arrow extension Id (used for
/// `execute_arrow` dispatch) and one keyed by Vortex extension Id (used **only** by
/// `to_arrow_field` / `to_arrow_schema` inference, when callers need to translate a Vortex
/// extension `DType` into an Arrow `Field` with no target schema in hand). Importers are
/// keyed by Arrow extension name. The default session pre-registers the builtin UUID
/// plugin; temporal extensions are handled by the canonical Arrow ↔ Vortex path and do not
/// need plugins.
#[derive(Debug)]
pub struct ArrowSession {
    exporters: ArcSwap<ExportMap>,
    exporters_by_vortex: ArcSwap<ExportDTypeMap>,
    importers: ArcSwap<ImportMap>,
}

impl Default for ArrowSession {
    fn default() -> Self {
        let session = Self {
            exporters: ArcSwap::from_pointee(ExportMap::default()),
            exporters_by_vortex: ArcSwap::from_pointee(ExportDTypeMap::default()),
            importers: ArcSwap::from_pointee(ImportMap::default()),
        };

        session.register_exporter(Arc::new(Uuid));
        session.register_importer(Arc::new(Uuid));

        session
    }
}

impl ArrowSession {
    /// Register an [`ArrowExportVTable`] under its target Arrow extension Id (for dispatch)
    /// and its source Vortex extension Id (for schema inference).
    pub fn register_exporter(&self, exporter: ArrowExportVTableRef) {
        Self::insert(
            &self.exporters,
            exporter.arrow_ext_id(),
            ArrowExportVTableRef::clone(&exporter),
        );
        Self::insert(
            &self.exporters_by_vortex,
            exporter.vortex_ext_id(),
            exporter,
        );
    }

    /// Register an [`ArrowImportVTable`] under its source Arrow extension name.
    pub fn register_importer(&self, importer: ArrowImportVTableRef) {
        Self::insert(&self.importers, importer.arrow_ext_id(), importer);
    }

    fn insert<K, T>(slot: &ArcSwap<HashMap<K, Arc<[T]>>>, key: K, value: T)
    where
        K: Clone + Eq + std::hash::Hash,
        T: Clone,
    {
        slot.rcu(move |map| {
            let mut next = (**map).clone();
            let entry = next.entry(key.clone()).or_insert_with(|| Arc::from([]));
            let mut extended: Vec<T> = entry.iter().cloned().collect();
            extended.push(value.clone());
            *entry = Arc::from(extended);
            next
        });
    }

    fn exporters(&self, id: &Id) -> Arc<[ArrowExportVTableRef]> {
        self.exporters
            .load()
            .get(id)
            .cloned()
            .unwrap_or_else(|| Arc::from([]))
    }

    fn exporters_by_vortex(&self, id: &ExtId) -> Arc<[ArrowExportVTableRef]> {
        self.exporters_by_vortex
            .load()
            .get(id)
            .cloned()
            .unwrap_or_else(|| Arc::from([]))
    }

    fn importers(&self, id: &Id) -> Arc<[ArrowImportVTableRef]> {
        self.importers
            .load()
            .get(id)
            .cloned()
            .unwrap_or_else(|| Arc::from([]))
    }

    /// Build the Arrow [`Field`] for a Vortex [`DType`].
    ///
    /// For [`DType::Extension`]s, plugins registered against the extension's `Id`
    /// are tried in registration order; the first plugin to return `Some(field)` wins.
    pub fn to_arrow_field(&self, name: &str, dtype: &DType) -> VortexResult<Field> {
        // Handle the structural encodings, which may have recursive types
        match dtype {
            DType::List(elem_dtype, nullability) => {
                let elem_field = self.to_arrow_field(Field::LIST_FIELD_DEFAULT_NAME, elem_dtype)?;
                Ok(Field::new_list(name, elem_field, nullability.is_nullable()))
            }
            DType::FixedSizeList(elem_dtype, elem_size, nullability) => {
                let elem_field = self.to_arrow_field(Field::LIST_FIELD_DEFAULT_NAME, elem_dtype)?;
                Ok(Field::new_fixed_size_list(
                    name,
                    elem_field,
                    (*elem_size).try_into()?,
                    nullability.is_nullable(),
                ))
            }
            DType::Struct(fields, nullability) => {
                let arrow_fields = Fields::from_iter(
                    fields
                        .fields()
                        .zip(fields.names().iter())
                        .map(|(field, name)| self.to_arrow_field(name.as_ref(), &field))
                        .collect::<VortexResult<Vec<_>>>()?,
                );
                Ok(Field::new_struct(
                    name,
                    arrow_fields,
                    nullability.is_nullable(),
                ))
            }
            DType::Extension(ext) if !ext.is::<AnyTemporal>() => {
                for plugin in self.exporters_by_vortex(&ext.id()).iter() {
                    if let Some(field) = plugin.to_arrow_field(name, ext, self)? {
                        return Ok(field);
                    }
                }
                vortex_bail!("extension type cannot be converted to Arrow without a plugin: {ext}");
            }
            DType::Variant(_) => {
                vortex_bail!("Arrow does not have a raw/transparent Variant encoding");
            }
            _ => Ok(Field::new(
                name,
                to_data_type_naive(dtype)?,
                dtype.is_nullable(),
            )),
        }
    }

    /// Build the Arrow [`Schema`] for a Vortex top-level [`DType::Struct`], dispatching
    /// extension fields through registered export plugins for inference. Nested
    /// extensions are preserved via [`Self::to_arrow_field`].
    pub fn to_arrow_schema(&self, dtype: &DType) -> VortexResult<Schema> {
        let DType::Struct(struct_dtype, _) = dtype else {
            vortex_error::vortex_bail!(
                "to_arrow_schema requires a top-level struct dtype, got {dtype}"
            );
        };
        let mut fields = Vec::with_capacity(struct_dtype.names().len());
        for (name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) {
            fields.push(self.to_arrow_field(name.as_ref(), &field_dtype)?);
        }
        Ok(Schema::new(fields))
    }

    /// Build the Vortex [`DType`] for an Arrow [`Field`].
    ///
    /// Plugins registered against the field's Arrow extension name are tried in
    /// registration order; the first plugin to return `Some(dtype)` wins. If none
    /// match (or all return `None`), recurses into container types ([`DataType::List`]
    /// family, [`DataType::FixedSizeList`], [`DataType::Struct`]) so extension metadata
    /// on nested element/struct fields is preserved. Leaf types use the canonical
    /// Arrow → Vortex mapping via [`DType::from_arrow`].
    pub fn from_arrow_field(&self, field: &Field) -> VortexResult<DType> {
        if let Some(name) = field.metadata().get(EXTENSION_TYPE_NAME_KEY) {
            for plugin in self.importers(&Id::new(name)).iter() {
                if let Some(dtype) = plugin.from_arrow_field(field)? {
                    return Ok(dtype);
                }
            }
        }
        let nullability: Nullability = field.is_nullable().into();
        Ok(match field.data_type() {
            DataType::List(elem)
            | DataType::LargeList(elem)
            | DataType::ListView(elem)
            | DataType::LargeListView(elem) => {
                DType::List(Arc::new(self.from_arrow_field(elem.as_ref())?), nullability)
            }
            DataType::FixedSizeList(elem, size) => DType::FixedSizeList(
                Arc::new(self.from_arrow_field(elem.as_ref())?),
                *size as u32,
                nullability,
            ),
            DataType::Struct(fields) => {
                let entries = fields
                    .iter()
                    .map(|f| {
                        self.from_arrow_field(f)
                            .map(|dt| (FieldName::from(f.name().as_str()), dt))
                    })
                    .collect::<VortexResult<Vec<_>>>()?;
                DType::Struct(StructFields::from_iter(entries), nullability)
            }
            _ => DType::from_arrow(field),
        })
    }

    /// Build the Vortex [`DType`] for an Arrow [`Schema`], dispatching extension fields
    /// through registered import plugins. The result is a top-level non-nullable struct
    /// matching the schema's fields.
    pub fn from_arrow_schema(&self, schema: &Schema) -> VortexResult<DType> {
        let entries = schema
            .fields()
            .iter()
            .map(|f| {
                self.from_arrow_field(f)
                    .map(|dt| (FieldName::from(f.name().as_str()), dt))
            })
            .collect::<VortexResult<Vec<_>>>()?;
        Ok(DType::Struct(
            StructFields::from_iter(entries),
            Nullability::NonNullable,
        ))
    }

    /// Decode an Arrow [`RecordBatch`] into a Vortex struct array, dispatching each
    /// extension column through its registered import plugin.
    ///
    /// `schema` is the authoritative Arrow schema used for dispatch — the columns are
    /// consumed positionally. Pass an external schema (rather than relying on
    /// `batch.schema()`) when upstream DataFusion plumbing may have stripped Field-level
    /// extension metadata from the runtime RecordBatch.
    pub fn from_arrow_record_batch(
        &self,
        batch: RecordBatch,
        schema: &Schema,
    ) -> VortexResult<ArrayRef> {
        vortex_ensure!(
            batch.num_columns() == schema.fields().len(),
            "RecordBatch has {} columns but schema has {} fields",
            batch.num_columns(),
            schema.fields().len()
        );
        let length = batch.num_rows();
        let names = FieldNames::from_iter(
            schema
                .fields()
                .iter()
                .map(|f| FieldName::from(f.name().as_str())),
        );
        let mut columns = Vec::with_capacity(schema.fields().len());
        for (col, field) in batch.columns().iter().zip(schema.fields().iter()) {
            columns.push(self.from_arrow_array(ArrowArrayRef::clone(col), field)?);
        }
        Ok(StructArray::try_new(names, columns, length, Validity::NonNullable)?.into_array())
    }

    /// Execute a Vortex array into an Arrow array.
    ///
    /// If `target` carries an `ARROW:extension:name`, the plugin registry is probed for one that
    /// can support executing to the target extension type.
    ///
    /// With `target = None` the fallback path picks the array's preferred Arrow physical type
    /// and executes directly into that, ignoring extension types.
    pub fn execute_arrow(
        &self,
        array: ArrayRef,
        target: Option<&Field>,
        ctx: &mut ExecutionCtx,
    ) -> VortexResult<ArrowArrayRef> {
        // NOTE(aduffy): this looks strange, but we do this to keep target_field as &Field so
        //  we can avoid cloning target when it is provided. It contains a HashMap internally that
        //  can be expensive to copy.
        let arrow_field;
        let target_field = match target {
            Some(field) => field,
            None => {
                let session = ctx.session().clone();
                arrow_field = session.arrow().to_arrow_field("", array.dtype())?;
                &arrow_field
            }
        };

        if let Some(arrow_ext_name) = target_field.metadata().get(EXTENSION_TYPE_NAME_KEY) {
            // There can be multiple plugins that report support for a particular extension type.
            // We try them in order until one of them reports a successful conversion.
            let len = array.len();
            let mut current = array;

            for plugin in self.exporters(&Id::new(arrow_ext_name)).iter() {
                trace!(
                    plugin = ?plugin,
                    extension_name = arrow_ext_name,
                    "probing plugin for converting Arrow array"
                );

                match plugin.execute_arrow(current, target_field, ctx)? {
                    ArrowExport::Exported(arrow) => {
                        vortex_ensure!(
                            arrow.len() == len,
                            "Arrow array length does not match Vortex array length after conversion to {:?}",
                            arrow
                        );
                        return Ok(arrow);
                    }
                    ArrowExport::Unsupported(array) => current = array,
                }
            }

            debug!(
                extension_id = arrow_ext_name,
                data_type = ?target_field.data_type(),
                "unsupported Arrow extension type encountered, falling back to naive execution"
            );

            return execute_arrow_naive(current, Some(target_field.data_type()), ctx);
        }

        execute_arrow_naive(array, target.map(|field| field.data_type()), ctx)
    }

    /// Decode an Arrow array into a Vortex array.
    ///
    /// Routes through the registered import plugin if `field` carries an Arrow extension
    /// name we recognize, probing each plugin in registration order until one handles the
    /// input or all return [`ArrowImport::Unsupported`]. Otherwise recurses into container
    /// arrays ([`arrow_array::StructArray`], [`arrow_array::GenericListArray`],
    /// [`arrow_array::FixedSizeListArray`], [`arrow_array::GenericListViewArray`]) so
    /// extension fields nested inside containers reach their importers; leaf types fall
    /// through to the canonical Arrow → Vortex array conversion.
    pub fn from_arrow_array(&self, array: ArrowArrayRef, field: &Field) -> VortexResult<ArrayRef> {
        if let Some(extension_name) = field.metadata().get(EXTENSION_TYPE_NAME_KEY) {
            let importers = self.importers(&Id::new(extension_name));
            if !importers.is_empty() {
                let dtype = self.from_arrow_field(field)?;
                if let DType::Extension(ext_dtype) = dtype {
                    let mut current = array;
                    for plugin in importers.iter() {
                        match plugin.from_arrow_array(current, &ext_dtype)? {
                            ArrowImport::Imported(arr) => return Ok(arr),
                            ArrowImport::Unsupported(arr) => current = arr,
                        }
                    }
                    return ArrayRef::from_arrow(current.as_ref(), field.is_nullable());
                }
            }
        }
        self.from_arrow_array_canonical(array, field)
    }

    /// Recurse into Arrow container arrays so nested fields with extension metadata reach
    /// their importers, falling through to [`ArrayRef::from_arrow`] for leaf types.
    #[allow(clippy::wrong_self_convention)]
    fn from_arrow_array_canonical(
        &self,
        array: ArrowArrayRef,
        field: &Field,
    ) -> VortexResult<ArrayRef> {
        use arrow_array::cast::AsArray;

        match field.data_type() {
            DataType::Struct(fields) => {
                let arrow_struct = array.as_struct();
                let names = FieldNames::from_iter(
                    fields.iter().map(|f| FieldName::from(f.name().as_str())),
                );
                let columns = arrow_struct
                    .columns()
                    .iter()
                    .zip(fields.iter())
                    .map(|(col, child_field)| {
                        // Arrow pushes nulls into non-nullable fields; strip before recursing
                        // so Vortex's stricter validity invariants are upheld.
                        let inner = if col.null_count() > 0 && !child_field.is_nullable() {
                            make_array(remove_nulls(col.to_data()))
                        } else {
                            ArrowArrayRef::clone(col)
                        };
                        self.from_arrow_array(inner, child_field.as_ref())
                    })
                    .collect::<VortexResult<Vec<_>>>()?;
                let validity = nulls(arrow_struct.nulls(), field.is_nullable());
                Ok(
                    StructArray::try_new(names, columns, arrow_struct.len(), validity)?
                        .into_array(),
                )
            }
            DataType::List(elem_field) => {
                let list = array.as_list::<i32>();
                let elements = self
                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
                let offsets = list.offsets().clone().into_array();
                let validity = nulls(list.nulls(), field.is_nullable());
                Ok(crate::arrays::ListArray::try_new(elements, offsets, validity)?.into_array())
            }
            DataType::LargeList(elem_field) => {
                let list = array.as_list::<i64>();
                let elements = self
                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
                let offsets = list.offsets().clone().into_array();
                let validity = nulls(list.nulls(), field.is_nullable());
                Ok(crate::arrays::ListArray::try_new(elements, offsets, validity)?.into_array())
            }
            DataType::FixedSizeList(elem_field, list_size) => {
                let fsl = array.as_fixed_size_list();
                let elements =
                    self.from_arrow_array(ArrowArrayRef::clone(fsl.values()), elem_field.as_ref())?;
                let validity = nulls(fsl.nulls(), field.is_nullable());
                Ok(crate::arrays::FixedSizeListArray::try_new(
                    elements,
                    *list_size as u32,
                    validity,
                    fsl.len(),
                )?
                .into_array())
            }
            DataType::ListView(elem_field) => {
                let list = array.as_list_view::<i32>();
                let elements = self
                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
                let offsets = list.offsets().clone().into_array();
                let sizes = list.sizes().clone().into_array();
                let validity = nulls(list.nulls(), field.is_nullable());
                Ok(
                    crate::arrays::ListViewArray::try_new(elements, offsets, sizes, validity)?
                        .into_array(),
                )
            }
            DataType::LargeListView(elem_field) => {
                let list = array.as_list_view::<i64>();
                let elements = self
                    .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?;
                let offsets = list.offsets().clone().into_array();
                let sizes = list.sizes().clone().into_array();
                let validity = nulls(list.nulls(), field.is_nullable());
                Ok(
                    crate::arrays::ListViewArray::try_new(elements, offsets, sizes, validity)?
                        .into_array(),
                )
            }
            _ => ArrayRef::from_arrow(array.as_ref(), field.is_nullable()),
        }
    }
}

// NOTE(aduffy): We should remove this once we bump Arrow to 0.59.0. This is replicating the
//  `Field::has_valid_extension_type` method on Arrow added in 58.2.0, we polyfill it here so that
//  this crate can build with minimal-versions declared.
pub(crate) fn has_valid_extension_type<E: ExtensionType>(field: &Field) -> bool {
    if field.extension_type_name() != Some(E::NAME) {
        return false;
    }

    E::try_new_from_field_metadata(field.data_type(), field.metadata()).is_ok()
}

impl SessionVar for ArrowSession {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
}

/// Extension trait for accessing the [`ArrowSession`] on a Vortex session.
pub trait ArrowSessionExt: SessionExt {
    /// Get the Arrow session.
    fn arrow(&self) -> Ref<'_, ArrowSession>;
}

impl<S: SessionExt> ArrowSessionExt for S {
    fn arrow(&self) -> Ref<'_, ArrowSession> {
        self.get::<ArrowSession>()
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use arrow_array::FixedSizeBinaryArray;
    use arrow_array::cast::AsArray;
    use arrow_schema::DataType;
    use arrow_schema::Field;
    use arrow_schema::extension::Uuid as ArrowUuid;
    use vortex_error::VortexResult;

    use super::*;
    use crate::LEGACY_SESSION;
    use crate::VortexSessionExecute;
    use crate::dtype::DType;
    use crate::dtype::FieldName;
    use crate::dtype::Nullability;
    use crate::dtype::PType;
    use crate::dtype::StructFields;
    use crate::dtype::extension::ExtDType;
    use crate::dtype::extension::ExtVTable;
    use crate::extension::uuid::Uuid;
    use crate::extension::uuid::UuidMetadata;

    fn uuid_dtype(nullable: bool) -> DType {
        let storage = DType::FixedSizeList(
            Arc::new(DType::Primitive(PType::U8, Nullability::NonNullable)),
            16,
            nullable.into(),
        );
        DType::Extension(
            ExtDType::try_with_vtable(Uuid, UuidMetadata::default(), storage)
                .expect("uuid ext dtype")
                .erased(),
        )
    }

    #[test]
    fn to_arrow_field_top_level_uuid_carries_extension_metadata() -> VortexResult<()> {
        let session = ArrowSession::default();
        let field = session.to_arrow_field("id", &uuid_dtype(false))?;
        assert!(has_valid_extension_type::<ArrowUuid>(&field));
        Ok(())
    }

    #[test]
    fn to_arrow_field_struct_with_nested_uuid_preserves_metadata() -> VortexResult<()> {
        let session = ArrowSession::default();
        let dtype = DType::Struct(
            StructFields::from_iter([(FieldName::from("id"), uuid_dtype(false))]),
            Nullability::NonNullable,
        );
        let field = session.to_arrow_field("row", &dtype)?;
        let DataType::Struct(inner) = field.data_type() else {
            panic!("expected Struct, got {:?}", field.data_type());
        };
        assert_eq!(inner.len(), 1);
        assert_eq!(inner[0].data_type(), &DataType::FixedSizeBinary(16));
        assert!(has_valid_extension_type::<ArrowUuid>(&inner[0]));
        Ok(())
    }

    #[test]
    fn to_arrow_field_list_of_uuid_preserves_metadata() -> VortexResult<()> {
        let session = ArrowSession::default();
        let dtype = DType::List(Arc::new(uuid_dtype(true)), Nullability::NonNullable);
        let field = session.to_arrow_field("ids", &dtype)?;
        let DataType::List(elem) = field.data_type() else {
            panic!("expected List, got {:?}", field.data_type());
        };
        assert!(has_valid_extension_type::<ArrowUuid>(elem));
        Ok(())
    }

    #[test]
    fn to_arrow_field_fixed_size_list_of_uuid_preserves_metadata() -> VortexResult<()> {
        let session = ArrowSession::default();
        let dtype = DType::FixedSizeList(Arc::new(uuid_dtype(false)), 3, Nullability::NonNullable);
        let field = session.to_arrow_field("triple", &dtype)?;
        let DataType::FixedSizeList(elem, size) = field.data_type() else {
            panic!("expected FixedSizeList, got {:?}", field.data_type());
        };
        assert_eq!(*size, 3);
        assert!(has_valid_extension_type::<ArrowUuid>(elem));
        Ok(())
    }

    #[test]
    fn to_arrow_schema_struct_of_struct_uuid() -> VortexResult<()> {
        let session = ArrowSession::default();
        let inner = DType::Struct(
            StructFields::from_iter([(FieldName::from("id"), uuid_dtype(true))]),
            Nullability::NonNullable,
        );
        let outer = DType::Struct(
            StructFields::from_iter([(FieldName::from("payload"), inner)]),
            Nullability::NonNullable,
        );
        let schema = session.to_arrow_schema(&outer)?;
        let payload = schema.field(0);
        let DataType::Struct(inner_fields) = payload.data_type() else {
            panic!("expected Struct, got {:?}", payload.data_type());
        };
        assert!(has_valid_extension_type::<ArrowUuid>(&inner_fields[0]));
        Ok(())
    }

    #[test]
    fn from_arrow_field_recurses_into_nested_uuid() -> VortexResult<()> {
        let session = ArrowSession::default();
        let mut elem = Field::new("item", DataType::FixedSizeBinary(16), false);
        elem.try_with_extension_type(ArrowUuid)?;
        let outer = Field::new("ids", DataType::List(Arc::new(elem)), false);

        let dtype = session.from_arrow_field(&outer)?;
        let DType::List(inner_dt, _) = dtype else {
            panic!("expected List dtype, got {dtype}");
        };
        assert!(
            matches!(inner_dt.as_ref(), DType::Extension(ext) if ext.id() == Uuid.id()),
            "expected Uuid extension element, got {inner_dt}",
        );
        Ok(())
    }

    #[test]
    fn schema_roundtrip_preserves_nested_uuid() -> VortexResult<()> {
        let session = ArrowSession::default();
        let dtype = DType::Struct(
            StructFields::from_iter([
                (FieldName::from("id"), uuid_dtype(false)),
                (
                    FieldName::from("ids"),
                    DType::List(Arc::new(uuid_dtype(true)), Nullability::NonNullable),
                ),
            ]),
            Nullability::NonNullable,
        );
        let schema = session.to_arrow_schema(&dtype)?;
        let roundtripped = session.from_arrow_schema(&schema)?;
        assert_eq!(roundtripped, dtype);
        Ok(())
    }

    #[test]
    fn execute_arrow_target_none_preserves_top_level_uuid_metadata() -> VortexResult<()> {
        let mut ctx = LEGACY_SESSION.create_execution_ctx();
        let session = LEGACY_SESSION.arrow();

        let mut field = Field::new("id", DataType::FixedSizeBinary(16), false);
        field.try_with_extension_type(ArrowUuid)?;
        let arrow_array: ArrowArrayRef = Arc::new(FixedSizeBinaryArray::try_from_iter(
            [*b"0123456789abcdef", *b"fedcba9876543210"].into_iter(),
        )?);

        let vortex_array = session.from_arrow_array(arrow_array, &field)?;

        let vortex_ext = vortex_array.dtype().as_extension();
        assert!(vortex_ext.is::<Uuid>());

        let exported = session.execute_arrow(vortex_array, None, &mut ctx)?;
        assert_eq!(exported.data_type(), &DataType::FixedSizeBinary(16));
        let fsb = exported.as_fixed_size_binary();
        assert_eq!(fsb.len(), 2);
        assert_eq!(fsb.value(0), b"0123456789abcdef");
        assert_eq!(fsb.value(1), b"fedcba9876543210");
        Ok(())
    }
}