datafusion_ffi/
arrow_wrappers.rs1use std::sync::Arc;
19
20use abi_stable::StableAbi;
21use arrow::array::{ArrayRef, make_array};
22use arrow::datatypes::{Schema, SchemaRef};
23use arrow::error::ArrowError;
24use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi, to_ffi};
25use datafusion_common::{DataFusionError, ScalarValue};
26use log::error;
27
28#[repr(C)]
31#[derive(Debug, StableAbi)]
32pub struct WrappedSchema(#[sabi(unsafe_opaque_field)] pub FFI_ArrowSchema);
33
34impl From<SchemaRef> for WrappedSchema {
35 fn from(value: SchemaRef) -> Self {
36 let ffi_schema = match FFI_ArrowSchema::try_from(value.as_ref()) {
37 Ok(s) => s,
38 Err(e) => {
39 error!(
40 "Unable to convert DataFusion Schema to FFI_ArrowSchema in FFI_PlanProperties. {e}"
41 );
42 FFI_ArrowSchema::empty()
43 }
44 };
45
46 WrappedSchema(ffi_schema)
47 }
48}
49#[cfg(not(tarpaulin_include))]
54fn catch_df_schema_error(e: &ArrowError) -> Schema {
55 error!(
56 "Unable to convert from FFI_ArrowSchema to DataFusion Schema in FFI_PlanProperties. {e}"
57 );
58 Schema::empty()
59}
60
61impl From<WrappedSchema> for SchemaRef {
62 fn from(value: WrappedSchema) -> Self {
63 let schema =
64 Schema::try_from(&value.0).unwrap_or_else(|e| catch_df_schema_error(&e));
65 Arc::new(schema)
66 }
67}
68
69#[repr(C)]
73#[derive(Debug, StableAbi)]
74pub struct WrappedArray {
75 #[sabi(unsafe_opaque_field)]
76 pub array: FFI_ArrowArray,
77
78 pub schema: WrappedSchema,
79}
80
81impl TryFrom<WrappedArray> for ArrayRef {
82 type Error = ArrowError;
83
84 fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
85 let data = unsafe { from_ffi(value.array, &value.schema.0)? };
86
87 Ok(make_array(data))
88 }
89}
90
91impl TryFrom<&ArrayRef> for WrappedArray {
92 type Error = ArrowError;
93
94 fn try_from(array: &ArrayRef) -> Result<Self, Self::Error> {
95 let (array, schema) = to_ffi(&array.to_data())?;
96 let schema = WrappedSchema(schema);
97
98 Ok(WrappedArray { array, schema })
99 }
100}
101
102impl TryFrom<&ScalarValue> for WrappedArray {
103 type Error = DataFusionError;
104
105 fn try_from(value: &ScalarValue) -> Result<Self, Self::Error> {
106 let array = value.to_array()?;
107 WrappedArray::try_from(&array).map_err(Into::into)
108 }
109}
110
111impl TryFrom<WrappedArray> for ScalarValue {
112 type Error = DataFusionError;
113
114 fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
115 let array: ArrayRef = value.try_into()?;
116 ScalarValue::try_from_array(array.as_ref(), 0)
117 }
118}