Skip to main content

datafusion_ffi/
schema_provider.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::ffi::c_void;
20use std::sync::Arc;
21
22use abi_stable::StableAbi;
23use abi_stable::std_types::{ROption, RResult, RString, RVec};
24use async_ffi::{FfiFuture, FutureExt};
25use async_trait::async_trait;
26use datafusion_catalog::{SchemaProvider, TableProvider};
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_proto::logical_plan::{
29    DefaultLogicalExtensionCodec, LogicalExtensionCodec,
30};
31use tokio::runtime::Handle;
32
33use crate::execution::FFI_TaskContextProvider;
34use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
35use crate::table_provider::{FFI_TableProvider, ForeignTableProvider};
36use crate::util::FFIResult;
37use crate::{df_result, rresult_return};
38
39/// A stable struct for sharing [`SchemaProvider`] across FFI boundaries.
40#[repr(C)]
41#[derive(Debug, StableAbi)]
42pub struct FFI_SchemaProvider {
43    pub owner_name: ROption<RString>,
44
45    pub table_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
46
47    pub table: unsafe extern "C" fn(
48        provider: &Self,
49        name: RString,
50    )
51        -> FfiFuture<FFIResult<ROption<FFI_TableProvider>>>,
52
53    pub register_table: unsafe extern "C" fn(
54        provider: &Self,
55        name: RString,
56        table: FFI_TableProvider,
57    )
58        -> FFIResult<ROption<FFI_TableProvider>>,
59
60    pub deregister_table: unsafe extern "C" fn(
61        provider: &Self,
62        name: RString,
63    )
64        -> FFIResult<ROption<FFI_TableProvider>>,
65
66    pub table_exist: unsafe extern "C" fn(provider: &Self, name: RString) -> bool,
67
68    pub logical_codec: FFI_LogicalExtensionCodec,
69
70    /// Used to create a clone on the provider of the execution plan. This should
71    /// only need to be called by the receiver of the plan.
72    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
73
74    /// Release the memory of the private data when it is no longer being used.
75    pub release: unsafe extern "C" fn(arg: &mut Self),
76
77    /// Return the major DataFusion version number of this provider.
78    pub version: unsafe extern "C" fn() -> u64,
79
80    /// Internal data. This is only to be accessed by the provider of the plan.
81    /// A [`ForeignSchemaProvider`] should never attempt to access this data.
82    pub private_data: *mut c_void,
83
84    /// Utility to identify when FFI objects are accessed locally through
85    /// the foreign interface. See [`crate::get_library_marker_id`] and
86    /// the crate's `README.md` for more information.
87    pub library_marker_id: extern "C" fn() -> usize,
88}
89
90unsafe impl Send for FFI_SchemaProvider {}
91unsafe impl Sync for FFI_SchemaProvider {}
92
93struct ProviderPrivateData {
94    provider: Arc<dyn SchemaProvider + Send>,
95    runtime: Option<Handle>,
96}
97
98impl FFI_SchemaProvider {
99    unsafe fn inner(&self) -> &Arc<dyn SchemaProvider + Send> {
100        unsafe {
101            let private_data = self.private_data as *const ProviderPrivateData;
102            &(*private_data).provider
103        }
104    }
105
106    unsafe fn runtime(&self) -> Option<Handle> {
107        unsafe {
108            let private_data = self.private_data as *const ProviderPrivateData;
109            (*private_data).runtime.clone()
110        }
111    }
112}
113
114unsafe extern "C" fn table_names_fn_wrapper(
115    provider: &FFI_SchemaProvider,
116) -> RVec<RString> {
117    unsafe {
118        let provider = provider.inner();
119
120        let table_names = provider.table_names();
121        table_names.into_iter().map(|s| s.into()).collect()
122    }
123}
124
125unsafe extern "C" fn table_fn_wrapper(
126    provider: &FFI_SchemaProvider,
127    name: RString,
128) -> FfiFuture<FFIResult<ROption<FFI_TableProvider>>> {
129    unsafe {
130        let runtime = provider.runtime();
131        let logical_codec = provider.logical_codec.clone();
132        let provider = Arc::clone(provider.inner());
133
134        async move {
135            let table = rresult_return!(provider.table(name.as_str()).await)
136                .map(|t| {
137                    FFI_TableProvider::new_with_ffi_codec(t, true, runtime, logical_codec)
138                })
139                .into();
140
141            RResult::ROk(table)
142        }
143        .into_ffi()
144    }
145}
146
147unsafe extern "C" fn register_table_fn_wrapper(
148    provider: &FFI_SchemaProvider,
149    name: RString,
150    table: FFI_TableProvider,
151) -> FFIResult<ROption<FFI_TableProvider>> {
152    unsafe {
153        let runtime = provider.runtime();
154        let logical_codec = provider.logical_codec.clone();
155        let provider = provider.inner();
156
157        let table = Arc::new(ForeignTableProvider(table));
158
159        let returned_table = rresult_return!(provider.register_table(name.into(), table))
160            .map(|t| {
161                FFI_TableProvider::new_with_ffi_codec(t, true, runtime, logical_codec)
162            });
163
164        RResult::ROk(returned_table.into())
165    }
166}
167
168unsafe extern "C" fn deregister_table_fn_wrapper(
169    provider: &FFI_SchemaProvider,
170    name: RString,
171) -> FFIResult<ROption<FFI_TableProvider>> {
172    unsafe {
173        let runtime = provider.runtime();
174        let logical_codec = provider.logical_codec.clone();
175        let provider = provider.inner();
176
177        let returned_table = rresult_return!(provider.deregister_table(name.as_str()))
178            .map(|t| {
179                FFI_TableProvider::new_with_ffi_codec(t, true, runtime, logical_codec)
180            });
181
182        RResult::ROk(returned_table.into())
183    }
184}
185
186unsafe extern "C" fn table_exist_fn_wrapper(
187    provider: &FFI_SchemaProvider,
188    name: RString,
189) -> bool {
190    unsafe { provider.inner().table_exist(name.as_str()) }
191}
192
193unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) {
194    unsafe {
195        debug_assert!(!provider.private_data.is_null());
196        let private_data =
197            Box::from_raw(provider.private_data as *mut ProviderPrivateData);
198        drop(private_data);
199        provider.private_data = std::ptr::null_mut();
200    }
201}
202
203unsafe extern "C" fn clone_fn_wrapper(
204    provider: &FFI_SchemaProvider,
205) -> FFI_SchemaProvider {
206    unsafe {
207        let old_private_data = provider.private_data as *const ProviderPrivateData;
208        let runtime = (*old_private_data).runtime.clone();
209
210        let private_data = Box::into_raw(Box::new(ProviderPrivateData {
211            provider: Arc::clone(&(*old_private_data).provider),
212            runtime,
213        })) as *mut c_void;
214
215        FFI_SchemaProvider {
216            owner_name: provider.owner_name.clone(),
217            table_names: table_names_fn_wrapper,
218            table: table_fn_wrapper,
219            register_table: register_table_fn_wrapper,
220            deregister_table: deregister_table_fn_wrapper,
221            table_exist: table_exist_fn_wrapper,
222            logical_codec: provider.logical_codec.clone(),
223            clone: clone_fn_wrapper,
224            release: release_fn_wrapper,
225            version: super::version,
226            private_data,
227            library_marker_id: crate::get_library_marker_id,
228        }
229    }
230}
231
232impl Drop for FFI_SchemaProvider {
233    fn drop(&mut self) {
234        unsafe { (self.release)(self) }
235    }
236}
237
238impl FFI_SchemaProvider {
239    /// Creates a new [`FFI_SchemaProvider`].
240    pub fn new(
241        provider: Arc<dyn SchemaProvider + Send>,
242        runtime: Option<Handle>,
243        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
244        logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
245    ) -> Self {
246        let task_ctx_provider = task_ctx_provider.into();
247        let logical_codec =
248            logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
249        let logical_codec = FFI_LogicalExtensionCodec::new(
250            logical_codec,
251            runtime.clone(),
252            task_ctx_provider.clone(),
253        );
254        Self::new_with_ffi_codec(provider, runtime, logical_codec)
255    }
256
257    pub fn new_with_ffi_codec(
258        provider: Arc<dyn SchemaProvider + Send>,
259        runtime: Option<Handle>,
260        logical_codec: FFI_LogicalExtensionCodec,
261    ) -> Self {
262        if let Some(provider) = provider.as_any().downcast_ref::<ForeignSchemaProvider>()
263        {
264            return provider.0.clone();
265        }
266
267        let owner_name = provider.owner_name().map(|s| s.into()).into();
268        let private_data = Box::new(ProviderPrivateData { provider, runtime });
269
270        Self {
271            owner_name,
272            table_names: table_names_fn_wrapper,
273            table: table_fn_wrapper,
274            register_table: register_table_fn_wrapper,
275            deregister_table: deregister_table_fn_wrapper,
276            table_exist: table_exist_fn_wrapper,
277            logical_codec,
278            clone: clone_fn_wrapper,
279            release: release_fn_wrapper,
280            version: super::version,
281            private_data: Box::into_raw(private_data) as *mut c_void,
282            library_marker_id: crate::get_library_marker_id,
283        }
284    }
285}
286
287/// This wrapper struct exists on the receiver side of the FFI interface, so it has
288/// no guarantees about being able to access the data in `private_data`. Any functions
289/// defined on this struct must only use the stable functions provided in
290/// FFI_SchemaProvider to interact with the foreign table provider.
291#[derive(Debug)]
292pub struct ForeignSchemaProvider(pub FFI_SchemaProvider);
293
294unsafe impl Send for ForeignSchemaProvider {}
295unsafe impl Sync for ForeignSchemaProvider {}
296
297impl From<&FFI_SchemaProvider> for Arc<dyn SchemaProvider + Send> {
298    fn from(provider: &FFI_SchemaProvider) -> Self {
299        if (provider.library_marker_id)() == crate::get_library_marker_id() {
300            return Arc::clone(unsafe { provider.inner() });
301        }
302
303        Arc::new(ForeignSchemaProvider(provider.clone()))
304            as Arc<dyn SchemaProvider + Send>
305    }
306}
307
308impl Clone for FFI_SchemaProvider {
309    fn clone(&self) -> Self {
310        unsafe { (self.clone)(self) }
311    }
312}
313
314#[async_trait]
315impl SchemaProvider for ForeignSchemaProvider {
316    fn as_any(&self) -> &dyn Any {
317        self
318    }
319
320    fn owner_name(&self) -> Option<&str> {
321        let name: Option<&RString> = self.0.owner_name.as_ref().into();
322        name.map(|s| s.as_str())
323    }
324
325    fn table_names(&self) -> Vec<String> {
326        unsafe {
327            (self.0.table_names)(&self.0)
328                .into_iter()
329                .map(|s| s.into())
330                .collect()
331        }
332    }
333
334    async fn table(
335        &self,
336        name: &str,
337    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
338        unsafe {
339            let table: Option<FFI_TableProvider> =
340                df_result!((self.0.table)(&self.0, name.into()).await)?.into();
341
342            let table = table.as_ref().map(<Arc<dyn TableProvider>>::from);
343
344            Ok(table)
345        }
346    }
347
348    fn register_table(
349        &self,
350        name: String,
351        table: Arc<dyn TableProvider>,
352    ) -> Result<Option<Arc<dyn TableProvider>>> {
353        unsafe {
354            let ffi_table = match table.as_any().downcast_ref::<ForeignTableProvider>() {
355                Some(t) => t.0.clone(),
356                None => FFI_TableProvider::new_with_ffi_codec(
357                    table,
358                    true,
359                    None,
360                    self.0.logical_codec.clone(),
361                ),
362            };
363
364            let returned_provider: Option<FFI_TableProvider> =
365                df_result!((self.0.register_table)(&self.0, name.into(), ffi_table))?
366                    .into();
367
368            Ok(returned_provider
369                .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
370        }
371    }
372
373    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
374        let returned_provider: Option<FFI_TableProvider> = unsafe {
375            df_result!((self.0.deregister_table)(&self.0, name.into()))?.into()
376        };
377
378        Ok(returned_provider
379            .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
380    }
381
382    /// Returns true if table exist in the schema provider, false otherwise.
383    fn table_exist(&self, name: &str) -> bool {
384        unsafe { (self.0.table_exist)(&self.0, name.into()) }
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use arrow::datatypes::Schema;
391    use datafusion::catalog::MemorySchemaProvider;
392    use datafusion::datasource::empty::EmptyTable;
393
394    use super::*;
395
396    fn empty_table() -> Arc<dyn TableProvider> {
397        Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
398    }
399
400    #[tokio::test]
401    async fn test_round_trip_ffi_schema_provider() {
402        let schema_provider = Arc::new(MemorySchemaProvider::new());
403        assert!(
404            schema_provider
405                .as_ref()
406                .register_table("prior_table".to_string(), empty_table())
407                .unwrap()
408                .is_none()
409        );
410
411        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
412
413        let mut ffi_schema_provider =
414            FFI_SchemaProvider::new(schema_provider, None, task_ctx_provider, None);
415        ffi_schema_provider.library_marker_id = crate::mock_foreign_marker_id;
416
417        let foreign_schema_provider: Arc<dyn SchemaProvider + Send> =
418            (&ffi_schema_provider).into();
419
420        let prior_table_names = foreign_schema_provider.table_names();
421        assert_eq!(prior_table_names.len(), 1);
422        assert_eq!(prior_table_names[0], "prior_table");
423
424        // Replace an existing table with one of the same name generates an error
425        let returned_schema = foreign_schema_provider
426            .register_table("prior_table".to_string(), empty_table());
427        assert!(returned_schema.is_err());
428        assert_eq!(foreign_schema_provider.table_names().len(), 1);
429
430        // Add a new table
431        let returned_schema = foreign_schema_provider
432            .register_table("second_table".to_string(), empty_table())
433            .expect("Unable to register table");
434        assert!(returned_schema.is_none());
435        assert_eq!(foreign_schema_provider.table_names().len(), 2);
436
437        // Remove a table
438        let returned_schema = foreign_schema_provider
439            .deregister_table("prior_table")
440            .expect("Unable to deregister table");
441        assert!(returned_schema.is_some());
442        assert_eq!(foreign_schema_provider.table_names().len(), 1);
443
444        // Retrieve non-existent table
445        let returned_schema = foreign_schema_provider
446            .table("prior_table")
447            .await
448            .expect("Unable to query table");
449        assert!(returned_schema.is_none());
450        assert!(!foreign_schema_provider.table_exist("prior_table"));
451
452        // Retrieve valid table
453        let returned_schema = foreign_schema_provider
454            .table("second_table")
455            .await
456            .expect("Unable to query table");
457        assert!(returned_schema.is_some());
458        assert!(foreign_schema_provider.table_exist("second_table"));
459    }
460
461    #[test]
462    fn test_ffi_schema_provider_local_bypass() {
463        let schema_provider = Arc::new(MemorySchemaProvider::new());
464
465        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
466        let mut ffi_schema =
467            FFI_SchemaProvider::new(schema_provider, None, task_ctx_provider, None);
468
469        // Verify local libraries can be downcast to their original
470        let foreign_schema: Arc<dyn SchemaProvider + Send> = (&ffi_schema).into();
471        assert!(
472            foreign_schema
473                .as_any()
474                .downcast_ref::<MemorySchemaProvider>()
475                .is_some()
476        );
477
478        // Verify different library markers generate foreign providers
479        ffi_schema.library_marker_id = crate::mock_foreign_marker_id;
480        let foreign_schema: Arc<dyn SchemaProvider + Send> = (&ffi_schema).into();
481        assert!(
482            foreign_schema
483                .as_any()
484                .downcast_ref::<ForeignSchemaProvider>()
485                .is_some()
486        );
487    }
488}