Skip to main content

datafusion_ffi/
table_provider_factory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{ffi::c_void, sync::Arc};
19
20use abi_stable::{
21    StableAbi,
22    std_types::{RResult, RString, RVec},
23};
24use async_ffi::{FfiFuture, FutureExt};
25use async_trait::async_trait;
26use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_execution::TaskContext;
29use datafusion_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
30use datafusion_proto::logical_plan::{
31    AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
32};
33use datafusion_proto::protobuf::LogicalPlanNode;
34use prost::Message;
35use tokio::runtime::Handle;
36
37use crate::execution::FFI_TaskContextProvider;
38use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
39use crate::session::{FFI_SessionRef, ForeignSession};
40use crate::table_provider::{FFI_TableProvider, ForeignTableProvider};
41use crate::{df_result, rresult_return};
42
43/// A stable struct for sharing [`TableProviderFactory`] across FFI boundaries.
44///
45/// Similar to [`FFI_TableProvider`], this struct uses the FFI-safe pattern where:
46/// - The `FFI_*` struct exposes stable function pointers
47/// - Private data is stored as an opaque pointer
48/// - The `Foreign*` wrapper is used by consumers on the other side of the FFI boundary
49///
50/// [`FFI_TableProvider`]: crate::table_provider::FFI_TableProvider
51#[repr(C)]
52#[derive(Debug, StableAbi)]
53pub struct FFI_TableProviderFactory {
54    /// Create a TableProvider with the given command.
55    ///
56    /// # Arguments
57    ///
58    /// * `factory` - the table provider factory
59    /// * `session_config` - session configuration
60    /// * `cmd_serialized` - a ['CreateExternalTable`] encoded as a [`LogicalPlanNode`] protobuf message serialized into bytes
61    ///   to pass across the FFI boundary.
62    create: unsafe extern "C" fn(
63        factory: &Self,
64        session: FFI_SessionRef,
65        cmd_serialized: RVec<u8>,
66    ) -> FfiFuture<RResult<FFI_TableProvider, RString>>,
67
68    logical_codec: FFI_LogicalExtensionCodec,
69
70    /// Used to create a clone of the factory. This should only need to be called
71    /// by the receiver of the factory.
72    clone: unsafe extern "C" fn(factory: &Self) -> Self,
73
74    /// Release the memory of the private data when it is no longer being used.
75    release: unsafe extern "C" fn(factory: &mut Self),
76
77    /// Return the major DataFusion version number of this factory.
78    version: unsafe extern "C" fn() -> u64,
79
80    /// Internal data. This is only to be accessed by the provider of the factory.
81    /// A [`ForeignTableProviderFactory`] should never attempt to access this data.
82    private_data: *mut c_void,
83
84    /// Utility to identify when FFI objects are accessed locally through
85    /// the foreign interface. See [`crate::get_library_marker_id`] and
86    /// the crate's `README.md` for more information.
87    library_marker_id: extern "C" fn() -> usize,
88}
89
90unsafe impl Send for FFI_TableProviderFactory {}
91unsafe impl Sync for FFI_TableProviderFactory {}
92
93struct FactoryPrivateData {
94    factory: Arc<dyn TableProviderFactory + Send>,
95    runtime: Option<Handle>,
96}
97
98impl FFI_TableProviderFactory {
99    /// Creates a new [`FFI_TableProvider`].
100    pub fn new(
101        factory: Arc<dyn TableProviderFactory + Send>,
102        runtime: Option<Handle>,
103        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
104        logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
105    ) -> Self {
106        let task_ctx_provider = task_ctx_provider.into();
107        let logical_codec =
108            logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
109        let logical_codec = FFI_LogicalExtensionCodec::new(
110            logical_codec,
111            runtime.clone(),
112            task_ctx_provider.clone(),
113        );
114        Self::new_with_ffi_codec(factory, runtime, logical_codec)
115    }
116
117    pub fn new_with_ffi_codec(
118        factory: Arc<dyn TableProviderFactory + Send>,
119        runtime: Option<Handle>,
120        logical_codec: FFI_LogicalExtensionCodec,
121    ) -> Self {
122        let private_data = Box::new(FactoryPrivateData { factory, runtime });
123
124        Self {
125            create: create_fn_wrapper,
126            logical_codec,
127            clone: clone_fn_wrapper,
128            release: release_fn_wrapper,
129            version: super::version,
130            private_data: Box::into_raw(private_data) as *mut c_void,
131            library_marker_id: crate::get_library_marker_id,
132        }
133    }
134
135    fn inner(&self) -> &Arc<dyn TableProviderFactory + Send> {
136        let private_data = self.private_data as *const FactoryPrivateData;
137        unsafe { &(*private_data).factory }
138    }
139
140    fn runtime(&self) -> &Option<Handle> {
141        let private_data = self.private_data as *const FactoryPrivateData;
142        unsafe { &(*private_data).runtime }
143    }
144
145    fn deserialize_cmd(
146        &self,
147        cmd_serialized: &RVec<u8>,
148    ) -> Result<CreateExternalTable, DataFusionError> {
149        let task_ctx: Arc<TaskContext> =
150            (&self.logical_codec.task_ctx_provider).try_into()?;
151        let logical_codec: Arc<dyn LogicalExtensionCodec> = (&self.logical_codec).into();
152
153        let plan = LogicalPlanNode::decode(cmd_serialized.as_ref())
154            .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?;
155        match plan.try_into_logical_plan(&task_ctx, logical_codec.as_ref())? {
156            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(cmd),
157            _ => Err(DataFusionError::Internal(
158                "Invalid logical plan in FFI_TableProviderFactory.".to_owned(),
159            )),
160        }
161    }
162}
163
164impl Clone for FFI_TableProviderFactory {
165    fn clone(&self) -> Self {
166        unsafe { (self.clone)(self) }
167    }
168}
169
170impl Drop for FFI_TableProviderFactory {
171    fn drop(&mut self) {
172        unsafe { (self.release)(self) }
173    }
174}
175
176impl From<&FFI_TableProviderFactory> for Arc<dyn TableProviderFactory> {
177    fn from(factory: &FFI_TableProviderFactory) -> Self {
178        if (factory.library_marker_id)() == crate::get_library_marker_id() {
179            Arc::clone(factory.inner()) as Arc<dyn TableProviderFactory>
180        } else {
181            Arc::new(ForeignTableProviderFactory(factory.clone()))
182        }
183    }
184}
185
186unsafe extern "C" fn create_fn_wrapper(
187    factory: &FFI_TableProviderFactory,
188    session: FFI_SessionRef,
189    cmd_serialized: RVec<u8>,
190) -> FfiFuture<RResult<FFI_TableProvider, RString>> {
191    let factory = factory.clone();
192
193    async move {
194        let provider = rresult_return!(
195            create_fn_wrapper_impl(factory, session, cmd_serialized).await
196        );
197        RResult::ROk(provider)
198    }
199    .into_ffi()
200}
201
202async fn create_fn_wrapper_impl(
203    factory: FFI_TableProviderFactory,
204    session: FFI_SessionRef,
205    cmd_serialized: RVec<u8>,
206) -> Result<FFI_TableProvider, DataFusionError> {
207    let runtime = factory.runtime().clone();
208    let ffi_logical_codec = factory.logical_codec.clone();
209    let internal_factory = Arc::clone(factory.inner());
210    let cmd = factory.deserialize_cmd(&cmd_serialized)?;
211
212    let mut foreign_session = None;
213    let session = session
214        .as_local()
215        .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
216        .unwrap_or_else(|| {
217            foreign_session = Some(ForeignSession::try_from(&session)?);
218            Ok(foreign_session.as_ref().unwrap())
219        })?;
220
221    let provider = internal_factory.create(session, &cmd).await?;
222    Ok(FFI_TableProvider::new_with_ffi_codec(
223        provider,
224        true,
225        runtime.clone(),
226        ffi_logical_codec,
227    ))
228}
229
230unsafe extern "C" fn clone_fn_wrapper(
231    factory: &FFI_TableProviderFactory,
232) -> FFI_TableProviderFactory {
233    let runtime = factory.runtime().clone();
234    let old_factory = Arc::clone(factory.inner());
235
236    let private_data = Box::into_raw(Box::new(FactoryPrivateData {
237        factory: old_factory,
238        runtime,
239    })) as *mut c_void;
240
241    FFI_TableProviderFactory {
242        create: create_fn_wrapper,
243        logical_codec: factory.logical_codec.clone(),
244        clone: clone_fn_wrapper,
245        release: release_fn_wrapper,
246        version: super::version,
247        private_data,
248        library_marker_id: crate::get_library_marker_id,
249    }
250}
251
252unsafe extern "C" fn release_fn_wrapper(factory: &mut FFI_TableProviderFactory) {
253    unsafe {
254        debug_assert!(!factory.private_data.is_null());
255        let private_data = Box::from_raw(factory.private_data as *mut FactoryPrivateData);
256        drop(private_data);
257        factory.private_data = std::ptr::null_mut();
258    }
259}
260
261/// This wrapper struct exists on the receiver side of the FFI interface, so it has
262/// no guarantees about being able to access the data in `private_data`. Any functions
263/// defined on this struct must only use the stable functions provided in
264/// FFI_TableProviderFactory to interact with the foreign table provider factory.
265#[derive(Debug)]
266pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory);
267
268impl ForeignTableProviderFactory {
269    fn serialize_cmd(
270        &self,
271        cmd: CreateExternalTable,
272    ) -> Result<RVec<u8>, DataFusionError> {
273        let logical_codec: Arc<dyn LogicalExtensionCodec> =
274            (&self.0.logical_codec).into();
275
276        let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd));
277        let plan: LogicalPlanNode =
278            AsLogicalPlan::try_from_logical_plan(&plan, logical_codec.as_ref())?;
279
280        let mut buf: Vec<u8> = Vec::new();
281        plan.try_encode(&mut buf)?;
282
283        Ok(buf.into())
284    }
285}
286
287unsafe impl Send for ForeignTableProviderFactory {}
288unsafe impl Sync for ForeignTableProviderFactory {}
289
290#[async_trait]
291impl TableProviderFactory for ForeignTableProviderFactory {
292    async fn create(
293        &self,
294        session: &dyn Session,
295        cmd: &CreateExternalTable,
296    ) -> Result<Arc<dyn TableProvider>> {
297        let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
298        let cmd = self.serialize_cmd(cmd.clone())?;
299
300        let provider = unsafe {
301            let maybe_provider = (self.0.create)(&self.0, session, cmd).await;
302
303            let ffi_provider = df_result!(maybe_provider)?;
304            ForeignTableProvider(ffi_provider)
305        };
306
307        Ok(Arc::new(provider))
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use arrow::datatypes::Schema;
314    use datafusion::prelude::SessionContext;
315    use datafusion_common::{TableReference, ToDFSchema};
316    use datafusion_execution::TaskContextProvider;
317    use std::collections::HashMap;
318
319    use super::*;
320
321    #[derive(Debug)]
322    struct TestTableProviderFactory {}
323
324    #[async_trait]
325    impl TableProviderFactory for TestTableProviderFactory {
326        async fn create(
327            &self,
328            _session: &dyn Session,
329            _cmd: &CreateExternalTable,
330        ) -> Result<Arc<dyn TableProvider>> {
331            use arrow::datatypes::Field;
332            use datafusion::arrow::array::Float32Array;
333            use datafusion::arrow::datatypes::DataType;
334            use datafusion::arrow::record_batch::RecordBatch;
335            use datafusion::datasource::MemTable;
336
337            let schema =
338                Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
339
340            let batch1 = RecordBatch::try_new(
341                Arc::clone(&schema),
342                vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
343            )?;
344            let batch2 = RecordBatch::try_new(
345                Arc::clone(&schema),
346                vec![Arc::new(Float32Array::from(vec![64.0]))],
347            )?;
348
349            Ok(Arc::new(MemTable::try_new(
350                schema,
351                vec![vec![batch1], vec![batch2]],
352            )?))
353        }
354    }
355
356    #[tokio::test]
357    async fn test_round_trip_ffi_table_provider_factory() -> Result<()> {
358        let ctx = Arc::new(SessionContext::new());
359        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
360        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
361
362        let factory = Arc::new(TestTableProviderFactory {});
363        let mut ffi_factory =
364            FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None);
365        ffi_factory.library_marker_id = crate::mock_foreign_marker_id;
366
367        let factory: Arc<dyn TableProviderFactory> = (&ffi_factory).into();
368
369        let cmd = CreateExternalTable {
370            schema: Schema::empty().to_dfschema_ref()?,
371            name: TableReference::bare("test_table"),
372            location: "test".to_string(),
373            file_type: "test".to_string(),
374            table_partition_cols: vec![],
375            if_not_exists: false,
376            or_replace: false,
377            temporary: false,
378            definition: None,
379            order_exprs: vec![],
380            unbounded: false,
381            options: HashMap::new(),
382            constraints: Default::default(),
383            column_defaults: HashMap::new(),
384        };
385
386        let provider = factory.create(&ctx.state(), &cmd).await?;
387
388        assert_eq!(provider.schema().fields().len(), 1);
389
390        Ok(())
391    }
392
393    #[tokio::test]
394    async fn test_ffi_table_provider_factory_clone() -> Result<()> {
395        let ctx = Arc::new(SessionContext::new());
396        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
397        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
398
399        let factory = Arc::new(TestTableProviderFactory {});
400        let ffi_factory =
401            FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None);
402
403        // Test that we can clone the factory
404        let cloned_factory = ffi_factory.clone();
405        let factory: Arc<dyn TableProviderFactory> = (&cloned_factory).into();
406
407        let cmd = CreateExternalTable {
408            schema: Schema::empty().to_dfschema_ref()?,
409            name: TableReference::bare("cloned_test"),
410            location: "test".to_string(),
411            file_type: "test".to_string(),
412            table_partition_cols: vec![],
413            if_not_exists: false,
414            or_replace: false,
415            temporary: false,
416            definition: None,
417            order_exprs: vec![],
418            unbounded: false,
419            options: HashMap::new(),
420            constraints: Default::default(),
421            column_defaults: HashMap::new(),
422        };
423
424        let provider = factory.create(&ctx.state(), &cmd).await?;
425        assert_eq!(provider.schema().fields().len(), 1);
426
427        Ok(())
428    }
429}