datafusion_ffi/
arrow_wrappers.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use abi_stable::StableAbi;
21use arrow::array::{ArrayRef, make_array};
22use arrow::datatypes::{Schema, SchemaRef};
23use arrow::error::ArrowError;
24use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi, to_ffi};
25use datafusion_common::{DataFusionError, ScalarValue};
26use log::error;
27
28/// This is a wrapper struct around FFI_ArrowSchema simply to indicate
29/// to the StableAbi macros that the underlying struct is FFI safe.
30#[repr(C)]
31#[derive(Debug, StableAbi)]
32pub struct WrappedSchema(#[sabi(unsafe_opaque_field)] pub FFI_ArrowSchema);
33
34impl From<SchemaRef> for WrappedSchema {
35    fn from(value: SchemaRef) -> Self {
36        let ffi_schema = match FFI_ArrowSchema::try_from(value.as_ref()) {
37            Ok(s) => s,
38            Err(e) => {
39                error!(
40                    "Unable to convert DataFusion Schema to FFI_ArrowSchema in FFI_PlanProperties. {e}"
41                );
42                FFI_ArrowSchema::empty()
43            }
44        };
45
46        WrappedSchema(ffi_schema)
47    }
48}
49/// Some functions are expected to always succeed, like getting the schema from a TableProvider.
50/// Since going through the FFI always has the potential to fail, we need to catch these errors,
51/// give the user a warning, and return some kind of result. In this case we default to an
52/// empty schema.
53#[cfg(not(tarpaulin_include))]
54fn catch_df_schema_error(e: &ArrowError) -> Schema {
55    error!(
56        "Unable to convert from FFI_ArrowSchema to DataFusion Schema in FFI_PlanProperties. {e}"
57    );
58    Schema::empty()
59}
60
61impl From<WrappedSchema> for SchemaRef {
62    fn from(value: WrappedSchema) -> Self {
63        let schema =
64            Schema::try_from(&value.0).unwrap_or_else(|e| catch_df_schema_error(&e));
65        Arc::new(schema)
66    }
67}
68
69/// This is a wrapper struct for FFI_ArrowArray to indicate to StableAbi
70/// that the struct is FFI Safe. For convenience, we also include the
71/// schema needed to create a record batch from the array.
72#[repr(C)]
73#[derive(Debug, StableAbi)]
74pub struct WrappedArray {
75    #[sabi(unsafe_opaque_field)]
76    pub array: FFI_ArrowArray,
77
78    pub schema: WrappedSchema,
79}
80
81impl TryFrom<WrappedArray> for ArrayRef {
82    type Error = ArrowError;
83
84    fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
85        let data = unsafe { from_ffi(value.array, &value.schema.0)? };
86
87        Ok(make_array(data))
88    }
89}
90
91impl TryFrom<&ArrayRef> for WrappedArray {
92    type Error = ArrowError;
93
94    fn try_from(array: &ArrayRef) -> Result<Self, Self::Error> {
95        let (array, schema) = to_ffi(&array.to_data())?;
96        let schema = WrappedSchema(schema);
97
98        Ok(WrappedArray { array, schema })
99    }
100}
101
102impl TryFrom<&ScalarValue> for WrappedArray {
103    type Error = DataFusionError;
104
105    fn try_from(value: &ScalarValue) -> Result<Self, Self::Error> {
106        let array = value.to_array()?;
107        WrappedArray::try_from(&array).map_err(Into::into)
108    }
109}
110
111impl TryFrom<WrappedArray> for ScalarValue {
112    type Error = DataFusionError;
113
114    fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
115        let array: ArrayRef = value.try_into()?;
116        ScalarValue::try_from_array(array.as_ref(), 0)
117    }
118}