datafusion_ffi/
schema_provider.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{any::Any, ffi::c_void, sync::Arc};
19
20use abi_stable::{
21    std_types::{ROption, RResult, RString, RVec},
22    StableAbi,
23};
24use async_ffi::{FfiFuture, FutureExt};
25use async_trait::async_trait;
26use datafusion::{
27    catalog::{SchemaProvider, TableProvider},
28    error::DataFusionError,
29};
30use tokio::runtime::Handle;
31
32use crate::{
33    df_result, rresult_return,
34    table_provider::{FFI_TableProvider, ForeignTableProvider},
35};
36
37use datafusion::error::Result;
38
39/// A stable struct for sharing [`SchemaProvider`] across FFI boundaries.
40#[repr(C)]
41#[derive(Debug, StableAbi)]
42#[allow(non_camel_case_types)]
43pub struct FFI_SchemaProvider {
44    pub owner_name: ROption<RString>,
45
46    pub table_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
47
48    pub table: unsafe extern "C" fn(
49        provider: &Self,
50        name: RString,
51    ) -> FfiFuture<
52        RResult<ROption<FFI_TableProvider>, RString>,
53    >,
54
55    pub register_table:
56        unsafe extern "C" fn(
57            provider: &Self,
58            name: RString,
59            table: FFI_TableProvider,
60        ) -> RResult<ROption<FFI_TableProvider>, RString>,
61
62    pub deregister_table:
63        unsafe extern "C" fn(
64            provider: &Self,
65            name: RString,
66        ) -> RResult<ROption<FFI_TableProvider>, RString>,
67
68    pub table_exist: unsafe extern "C" fn(provider: &Self, name: RString) -> bool,
69
70    /// Used to create a clone on the provider of the execution plan. This should
71    /// only need to be called by the receiver of the plan.
72    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
73
74    /// Release the memory of the private data when it is no longer being used.
75    pub release: unsafe extern "C" fn(arg: &mut Self),
76
77    /// Return the major DataFusion version number of this provider.
78    pub version: unsafe extern "C" fn() -> u64,
79
80    /// Internal data. This is only to be accessed by the provider of the plan.
81    /// A [`ForeignSchemaProvider`] should never attempt to access this data.
82    pub private_data: *mut c_void,
83}
84
85unsafe impl Send for FFI_SchemaProvider {}
86unsafe impl Sync for FFI_SchemaProvider {}
87
88struct ProviderPrivateData {
89    provider: Arc<dyn SchemaProvider + Send>,
90    runtime: Option<Handle>,
91}
92
93impl FFI_SchemaProvider {
94    unsafe fn inner(&self) -> &Arc<dyn SchemaProvider + Send> {
95        let private_data = self.private_data as *const ProviderPrivateData;
96        &(*private_data).provider
97    }
98
99    unsafe fn runtime(&self) -> Option<Handle> {
100        let private_data = self.private_data as *const ProviderPrivateData;
101        (*private_data).runtime.clone()
102    }
103}
104
105unsafe extern "C" fn table_names_fn_wrapper(
106    provider: &FFI_SchemaProvider,
107) -> RVec<RString> {
108    let provider = provider.inner();
109
110    let table_names = provider.table_names();
111    table_names.into_iter().map(|s| s.into()).collect()
112}
113
114unsafe extern "C" fn table_fn_wrapper(
115    provider: &FFI_SchemaProvider,
116    name: RString,
117) -> FfiFuture<RResult<ROption<FFI_TableProvider>, RString>> {
118    let runtime = provider.runtime();
119    let provider = Arc::clone(provider.inner());
120
121    async move {
122        let table = rresult_return!(provider.table(name.as_str()).await)
123            .map(|t| FFI_TableProvider::new(t, true, runtime))
124            .into();
125
126        RResult::ROk(table)
127    }
128    .into_ffi()
129}
130
131unsafe extern "C" fn register_table_fn_wrapper(
132    provider: &FFI_SchemaProvider,
133    name: RString,
134    table: FFI_TableProvider,
135) -> RResult<ROption<FFI_TableProvider>, RString> {
136    let runtime = provider.runtime();
137    let provider = provider.inner();
138
139    let table = Arc::new(ForeignTableProvider(table));
140
141    let returned_table = rresult_return!(provider.register_table(name.into(), table))
142        .map(|t| FFI_TableProvider::new(t, true, runtime));
143
144    RResult::ROk(returned_table.into())
145}
146
147unsafe extern "C" fn deregister_table_fn_wrapper(
148    provider: &FFI_SchemaProvider,
149    name: RString,
150) -> RResult<ROption<FFI_TableProvider>, RString> {
151    let runtime = provider.runtime();
152    let provider = provider.inner();
153
154    let returned_table = rresult_return!(provider.deregister_table(name.as_str()))
155        .map(|t| FFI_TableProvider::new(t, true, runtime));
156
157    RResult::ROk(returned_table.into())
158}
159
160unsafe extern "C" fn table_exist_fn_wrapper(
161    provider: &FFI_SchemaProvider,
162    name: RString,
163) -> bool {
164    provider.inner().table_exist(name.as_str())
165}
166
167unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) {
168    let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
169    drop(private_data);
170}
171
172unsafe extern "C" fn clone_fn_wrapper(
173    provider: &FFI_SchemaProvider,
174) -> FFI_SchemaProvider {
175    let old_private_data = provider.private_data as *const ProviderPrivateData;
176    let runtime = (*old_private_data).runtime.clone();
177
178    let private_data = Box::into_raw(Box::new(ProviderPrivateData {
179        provider: Arc::clone(&(*old_private_data).provider),
180        runtime,
181    })) as *mut c_void;
182
183    FFI_SchemaProvider {
184        owner_name: provider.owner_name.clone(),
185        table_names: table_names_fn_wrapper,
186        clone: clone_fn_wrapper,
187        release: release_fn_wrapper,
188        version: super::version,
189        private_data,
190        table: table_fn_wrapper,
191        register_table: register_table_fn_wrapper,
192        deregister_table: deregister_table_fn_wrapper,
193        table_exist: table_exist_fn_wrapper,
194    }
195}
196
197impl Drop for FFI_SchemaProvider {
198    fn drop(&mut self) {
199        unsafe { (self.release)(self) }
200    }
201}
202
203impl FFI_SchemaProvider {
204    /// Creates a new [`FFI_SchemaProvider`].
205    pub fn new(
206        provider: Arc<dyn SchemaProvider + Send>,
207        runtime: Option<Handle>,
208    ) -> Self {
209        let owner_name = provider.owner_name().map(|s| s.into()).into();
210        let private_data = Box::new(ProviderPrivateData { provider, runtime });
211
212        Self {
213            owner_name,
214            table_names: table_names_fn_wrapper,
215            clone: clone_fn_wrapper,
216            release: release_fn_wrapper,
217            version: super::version,
218            private_data: Box::into_raw(private_data) as *mut c_void,
219            table: table_fn_wrapper,
220            register_table: register_table_fn_wrapper,
221            deregister_table: deregister_table_fn_wrapper,
222            table_exist: table_exist_fn_wrapper,
223        }
224    }
225}
226
227/// This wrapper struct exists on the receiver side of the FFI interface, so it has
228/// no guarantees about being able to access the data in `private_data`. Any functions
229/// defined on this struct must only use the stable functions provided in
230/// FFI_SchemaProvider to interact with the foreign table provider.
231#[derive(Debug)]
232pub struct ForeignSchemaProvider(pub FFI_SchemaProvider);
233
234unsafe impl Send for ForeignSchemaProvider {}
235unsafe impl Sync for ForeignSchemaProvider {}
236
237impl From<&FFI_SchemaProvider> for ForeignSchemaProvider {
238    fn from(provider: &FFI_SchemaProvider) -> Self {
239        Self(provider.clone())
240    }
241}
242
243impl Clone for FFI_SchemaProvider {
244    fn clone(&self) -> Self {
245        unsafe { (self.clone)(self) }
246    }
247}
248
249#[async_trait]
250impl SchemaProvider for ForeignSchemaProvider {
251    fn as_any(&self) -> &dyn Any {
252        self
253    }
254
255    fn owner_name(&self) -> Option<&str> {
256        let name: Option<&RString> = self.0.owner_name.as_ref().into();
257        name.map(|s| s.as_str())
258    }
259
260    fn table_names(&self) -> Vec<String> {
261        unsafe {
262            (self.0.table_names)(&self.0)
263                .into_iter()
264                .map(|s| s.into())
265                .collect()
266        }
267    }
268
269    async fn table(
270        &self,
271        name: &str,
272    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
273        unsafe {
274            let table: Option<FFI_TableProvider> =
275                df_result!((self.0.table)(&self.0, name.into()).await)?.into();
276
277            let table = table.as_ref().map(|t| {
278                Arc::new(ForeignTableProvider::from(t)) as Arc<dyn TableProvider>
279            });
280
281            Ok(table)
282        }
283    }
284
285    fn register_table(
286        &self,
287        name: String,
288        table: Arc<dyn TableProvider>,
289    ) -> Result<Option<Arc<dyn TableProvider>>> {
290        unsafe {
291            let ffi_table = match table.as_any().downcast_ref::<ForeignTableProvider>() {
292                Some(t) => t.0.clone(),
293                None => FFI_TableProvider::new(table, true, None),
294            };
295
296            let returned_provider: Option<FFI_TableProvider> =
297                df_result!((self.0.register_table)(&self.0, name.into(), ffi_table))?
298                    .into();
299
300            Ok(returned_provider
301                .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
302        }
303    }
304
305    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
306        let returned_provider: Option<FFI_TableProvider> = unsafe {
307            df_result!((self.0.deregister_table)(&self.0, name.into()))?.into()
308        };
309
310        Ok(returned_provider
311            .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
312    }
313
314    /// Returns true if table exist in the schema provider, false otherwise.
315    fn table_exist(&self, name: &str) -> bool {
316        unsafe { (self.0.table_exist)(&self.0, name.into()) }
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use arrow::datatypes::Schema;
323    use datafusion::{catalog::MemorySchemaProvider, datasource::empty::EmptyTable};
324
325    use super::*;
326
327    fn empty_table() -> Arc<dyn TableProvider> {
328        Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
329    }
330
331    #[tokio::test]
332    async fn test_round_trip_ffi_schema_provider() {
333        let schema_provider = Arc::new(MemorySchemaProvider::new());
334        assert!(schema_provider
335            .as_ref()
336            .register_table("prior_table".to_string(), empty_table())
337            .unwrap()
338            .is_none());
339
340        let ffi_schema_provider = FFI_SchemaProvider::new(schema_provider, None);
341
342        let foreign_schema_provider: ForeignSchemaProvider =
343            (&ffi_schema_provider).into();
344
345        let prior_table_names = foreign_schema_provider.table_names();
346        assert_eq!(prior_table_names.len(), 1);
347        assert_eq!(prior_table_names[0], "prior_table");
348
349        // Replace an existing table with one of the same name generates an error
350        let returned_schema = foreign_schema_provider
351            .register_table("prior_table".to_string(), empty_table());
352        assert!(returned_schema.is_err());
353        assert_eq!(foreign_schema_provider.table_names().len(), 1);
354
355        // Add a new table
356        let returned_schema = foreign_schema_provider
357            .register_table("second_table".to_string(), empty_table())
358            .expect("Unable to register table");
359        assert!(returned_schema.is_none());
360        assert_eq!(foreign_schema_provider.table_names().len(), 2);
361
362        // Remove a table
363        let returned_schema = foreign_schema_provider
364            .deregister_table("prior_table")
365            .expect("Unable to deregister table");
366        assert!(returned_schema.is_some());
367        assert_eq!(foreign_schema_provider.table_names().len(), 1);
368
369        // Retrieve non-existent table
370        let returned_schema = foreign_schema_provider
371            .table("prior_table")
372            .await
373            .expect("Unable to query table");
374        assert!(returned_schema.is_none());
375        assert!(!foreign_schema_provider.table_exist("prior_table"));
376
377        // Retrieve valid table
378        let returned_schema = foreign_schema_provider
379            .table("second_table")
380            .await
381            .expect("Unable to query table");
382        assert!(returned_schema.is_some());
383        assert!(foreign_schema_provider.table_exist("second_table"));
384    }
385}