datafusion_ffi/
session_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use abi_stable::{
19    std_types::{RHashMap, RString},
20    StableAbi,
21};
22use datafusion::{config::ConfigOptions, error::Result};
23use datafusion::{error::DataFusionError, prelude::SessionConfig};
24use std::sync::Arc;
25use std::{
26    collections::HashMap,
27    ffi::{c_char, c_void, CString},
28};
29
30/// A stable struct for sharing [`SessionConfig`] across FFI boundaries.
31/// Instead of attempting to expose the entire SessionConfig interface, we
32/// convert the config options into a map from a string to string and pass
33/// those values across the FFI boundary. On the receiver side, we
34/// reconstruct a SessionConfig from those values.
35///
36/// It is possible that using different versions of DataFusion across the
37/// FFI boundary could have differing expectations of the config options.
38/// This is a limitation of this approach, but exposing the entire
39/// SessionConfig via a FFI interface would be extensive and provide limited
40/// value over this version.
41#[repr(C)]
42#[derive(Debug, StableAbi)]
43#[allow(non_camel_case_types)]
44pub struct FFI_SessionConfig {
45    /// Return a hash map from key to value of the config options represented
46    /// by string values.
47    pub config_options: unsafe extern "C" fn(config: &Self) -> RHashMap<RString, RString>,
48
49    /// Used to create a clone on the provider of the execution plan. This should
50    /// only need to be called by the receiver of the plan.
51    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
52
53    /// Release the memory of the private data when it is no longer being used.
54    pub release: unsafe extern "C" fn(arg: &mut Self),
55
56    /// Internal data. This is only to be accessed by the provider of the plan.
57    /// A [`ForeignSessionConfig`] should never attempt to access this data.
58    pub private_data: *mut c_void,
59}
60
61unsafe impl Send for FFI_SessionConfig {}
62unsafe impl Sync for FFI_SessionConfig {}
63
64unsafe extern "C" fn config_options_fn_wrapper(
65    config: &FFI_SessionConfig,
66) -> RHashMap<RString, RString> {
67    let private_data = config.private_data as *mut SessionConfigPrivateData;
68    let config_options = &(*private_data).config;
69
70    let mut options = RHashMap::default();
71    for config_entry in config_options.entries() {
72        if let Some(value) = config_entry.value {
73            options.insert(config_entry.key.into(), value.into());
74        }
75    }
76
77    options
78}
79
80unsafe extern "C" fn release_fn_wrapper(config: &mut FFI_SessionConfig) {
81    let private_data =
82        Box::from_raw(config.private_data as *mut SessionConfigPrivateData);
83    drop(private_data);
84}
85
86unsafe extern "C" fn clone_fn_wrapper(config: &FFI_SessionConfig) -> FFI_SessionConfig {
87    let old_private_data = config.private_data as *mut SessionConfigPrivateData;
88    let old_config = Arc::clone(&(*old_private_data).config);
89
90    let private_data = Box::new(SessionConfigPrivateData { config: old_config });
91
92    FFI_SessionConfig {
93        config_options: config_options_fn_wrapper,
94        private_data: Box::into_raw(private_data) as *mut c_void,
95        clone: clone_fn_wrapper,
96        release: release_fn_wrapper,
97    }
98}
99
100struct SessionConfigPrivateData {
101    pub config: Arc<ConfigOptions>,
102}
103
104impl From<&SessionConfig> for FFI_SessionConfig {
105    fn from(session: &SessionConfig) -> Self {
106        let mut config_keys = Vec::new();
107        let mut config_values = Vec::new();
108        for config_entry in session.options().entries() {
109            if let Some(value) = config_entry.value {
110                let key_cstr = CString::new(config_entry.key).unwrap_or_default();
111                let key_ptr = key_cstr.into_raw() as *const c_char;
112                config_keys.push(key_ptr);
113
114                config_values
115                    .push(CString::new(value).unwrap_or_default().into_raw()
116                        as *const c_char);
117            }
118        }
119
120        let private_data = Box::new(SessionConfigPrivateData {
121            config: Arc::clone(session.options()),
122        });
123
124        Self {
125            config_options: config_options_fn_wrapper,
126            private_data: Box::into_raw(private_data) as *mut c_void,
127            clone: clone_fn_wrapper,
128            release: release_fn_wrapper,
129        }
130    }
131}
132
133impl Clone for FFI_SessionConfig {
134    fn clone(&self) -> Self {
135        unsafe { (self.clone)(self) }
136    }
137}
138
139impl Drop for FFI_SessionConfig {
140    fn drop(&mut self) {
141        unsafe { (self.release)(self) };
142    }
143}
144
145/// A wrapper struct for accessing [`SessionConfig`] across a FFI boundary.
146/// The [`SessionConfig`] will be generated from a hash map of the config
147/// options in the provider and will be reconstructed on this side of the
148/// interface.s
149pub struct ForeignSessionConfig(pub SessionConfig);
150
151impl TryFrom<&FFI_SessionConfig> for ForeignSessionConfig {
152    type Error = DataFusionError;
153
154    fn try_from(config: &FFI_SessionConfig) -> Result<Self, Self::Error> {
155        let config_options = unsafe { (config.config_options)(config) };
156
157        let mut options_map = HashMap::new();
158        config_options.iter().for_each(|kv_pair| {
159            options_map.insert(kv_pair.0.to_string(), kv_pair.1.to_string());
160        });
161
162        Ok(Self(SessionConfig::from_string_hash_map(&options_map)?))
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_round_trip_ffi_session_config() -> Result<()> {
172        let session_config = SessionConfig::new();
173        let original_options = session_config.options().entries();
174
175        let ffi_config: FFI_SessionConfig = (&session_config).into();
176
177        let foreign_config: ForeignSessionConfig = (&ffi_config).try_into()?;
178
179        let returned_options = foreign_config.0.options().entries();
180
181        assert!(original_options.len() == returned_options.len());
182
183        Ok(())
184    }
185}