Skip to main content

datafusion_ffi/
table_provider_factory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{ffi::c_void, sync::Arc};
19
20use async_ffi::{FfiFuture, FutureExt};
21use async_trait::async_trait;
22use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
23use datafusion_common::error::{DataFusionError, Result};
24use datafusion_execution::TaskContext;
25use datafusion_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
26use datafusion_proto::logical_plan::{
27    AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
28};
29use datafusion_proto::protobuf::LogicalPlanNode;
30use prost::Message;
31
32use stabby::vec::Vec as SVec;
33use tokio::runtime::Handle;
34
35use crate::execution::FFI_TaskContextProvider;
36use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
37use crate::session::{FFI_SessionRef, ForeignSession};
38use crate::table_provider::{FFI_TableProvider, ForeignTableProvider};
39use crate::util::FFI_Result;
40use crate::{df_result, sresult_return};
41
42/// A stable struct for sharing [`TableProviderFactory`] across FFI boundaries.
43///
44/// Similar to [`FFI_TableProvider`], this struct uses the FFI-safe pattern where:
45/// - The `FFI_*` struct exposes stable function pointers
46/// - Private data is stored as an opaque pointer
47/// - The `Foreign*` wrapper is used by consumers on the other side of the FFI boundary
48///
49/// [`FFI_TableProvider`]: crate::table_provider::FFI_TableProvider
50#[repr(C)]
51#[derive(Debug)]
52pub struct FFI_TableProviderFactory {
53    /// Create a TableProvider with the given command.
54    ///
55    /// # Arguments
56    ///
57    /// * `factory` - the table provider factory
58    /// * `session_config` - session configuration
59    /// * `cmd_serialized` - a ['CreateExternalTable`] encoded as a [`LogicalPlanNode`] protobuf message serialized into bytes
60    ///   to pass across the FFI boundary.
61    create: unsafe extern "C" fn(
62        factory: &Self,
63        session: FFI_SessionRef,
64        cmd_serialized: SVec<u8>,
65    ) -> FfiFuture<FFI_Result<FFI_TableProvider>>,
66
67    logical_codec: FFI_LogicalExtensionCodec,
68
69    /// Used to create a clone of the factory. This should only need to be called
70    /// by the receiver of the factory.
71    clone: unsafe extern "C" fn(factory: &Self) -> Self,
72
73    /// Release the memory of the private data when it is no longer being used.
74    release: unsafe extern "C" fn(factory: &mut Self),
75
76    /// Return the major DataFusion version number of this factory.
77    version: unsafe extern "C" fn() -> u64,
78
79    /// Internal data. This is only to be accessed by the provider of the factory.
80    /// A [`ForeignTableProviderFactory`] should never attempt to access this data.
81    private_data: *mut c_void,
82
83    /// Utility to identify when FFI objects are accessed locally through
84    /// the foreign interface. See [`crate::get_library_marker_id`] and
85    /// the crate's `README.md` for more information.
86    library_marker_id: extern "C" fn() -> usize,
87}
88
89unsafe impl Send for FFI_TableProviderFactory {}
90unsafe impl Sync for FFI_TableProviderFactory {}
91
92struct FactoryPrivateData {
93    factory: Arc<dyn TableProviderFactory + Send>,
94    runtime: Option<Handle>,
95}
96
97impl FFI_TableProviderFactory {
98    /// Creates a new [`FFI_TableProvider`].
99    pub fn new(
100        factory: Arc<dyn TableProviderFactory + Send>,
101        runtime: Option<Handle>,
102        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
103        logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
104    ) -> Self {
105        let task_ctx_provider = task_ctx_provider.into();
106        let logical_codec =
107            logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
108        let logical_codec = FFI_LogicalExtensionCodec::new(
109            logical_codec,
110            runtime.clone(),
111            task_ctx_provider.clone(),
112        );
113        Self::new_with_ffi_codec(factory, runtime, logical_codec)
114    }
115
116    pub fn new_with_ffi_codec(
117        factory: Arc<dyn TableProviderFactory + Send>,
118        runtime: Option<Handle>,
119        logical_codec: FFI_LogicalExtensionCodec,
120    ) -> Self {
121        let private_data = Box::new(FactoryPrivateData { factory, runtime });
122
123        Self {
124            create: create_fn_wrapper,
125            logical_codec,
126            clone: clone_fn_wrapper,
127            release: release_fn_wrapper,
128            version: super::version,
129            private_data: Box::into_raw(private_data) as *mut c_void,
130            library_marker_id: crate::get_library_marker_id,
131        }
132    }
133
134    fn inner(&self) -> &Arc<dyn TableProviderFactory + Send> {
135        let private_data = self.private_data as *const FactoryPrivateData;
136        unsafe { &(*private_data).factory }
137    }
138
139    fn runtime(&self) -> &Option<Handle> {
140        let private_data = self.private_data as *const FactoryPrivateData;
141        unsafe { &(*private_data).runtime }
142    }
143
144    fn deserialize_cmd(
145        &self,
146        cmd_serialized: &SVec<u8>,
147    ) -> Result<CreateExternalTable, DataFusionError> {
148        let task_ctx: Arc<TaskContext> =
149            (&self.logical_codec.task_ctx_provider).try_into()?;
150        let logical_codec: Arc<dyn LogicalExtensionCodec> = (&self.logical_codec).into();
151
152        let plan = LogicalPlanNode::decode(cmd_serialized.as_ref())
153            .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?;
154        match plan.try_into_logical_plan(&task_ctx, logical_codec.as_ref())? {
155            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(cmd),
156            _ => Err(DataFusionError::Internal(
157                "Invalid logical plan in FFI_TableProviderFactory.".to_owned(),
158            )),
159        }
160    }
161}
162
163impl Clone for FFI_TableProviderFactory {
164    fn clone(&self) -> Self {
165        unsafe { (self.clone)(self) }
166    }
167}
168
169impl Drop for FFI_TableProviderFactory {
170    fn drop(&mut self) {
171        unsafe { (self.release)(self) }
172    }
173}
174
175impl From<&FFI_TableProviderFactory> for Arc<dyn TableProviderFactory> {
176    fn from(factory: &FFI_TableProviderFactory) -> Self {
177        if (factory.library_marker_id)() == crate::get_library_marker_id() {
178            Arc::clone(factory.inner()) as Arc<dyn TableProviderFactory>
179        } else {
180            Arc::new(ForeignTableProviderFactory(factory.clone()))
181        }
182    }
183}
184
185unsafe extern "C" fn create_fn_wrapper(
186    factory: &FFI_TableProviderFactory,
187    session: FFI_SessionRef,
188    cmd_serialized: SVec<u8>,
189) -> FfiFuture<FFI_Result<FFI_TableProvider>> {
190    let factory = factory.clone();
191
192    async move {
193        let provider = sresult_return!(
194            create_fn_wrapper_impl(factory, session, cmd_serialized).await
195        );
196        FFI_Result::Ok(provider)
197    }
198    .into_ffi()
199}
200
201async fn create_fn_wrapper_impl(
202    factory: FFI_TableProviderFactory,
203    session: FFI_SessionRef,
204    cmd_serialized: SVec<u8>,
205) -> Result<FFI_TableProvider, DataFusionError> {
206    let runtime = factory.runtime().clone();
207    let ffi_logical_codec = factory.logical_codec.clone();
208    let internal_factory = Arc::clone(factory.inner());
209    let cmd = factory.deserialize_cmd(&cmd_serialized)?;
210
211    let mut foreign_session = None;
212    let session = session
213        .as_local()
214        .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
215        .unwrap_or_else(|| {
216            foreign_session = Some(ForeignSession::try_from(&session)?);
217            Ok(foreign_session.as_ref().unwrap())
218        })?;
219
220    let provider = internal_factory.create(session, &cmd).await?;
221    Ok(FFI_TableProvider::new_with_ffi_codec(
222        provider,
223        true,
224        runtime.clone(),
225        ffi_logical_codec,
226    ))
227}
228
229unsafe extern "C" fn clone_fn_wrapper(
230    factory: &FFI_TableProviderFactory,
231) -> FFI_TableProviderFactory {
232    let runtime = factory.runtime().clone();
233    let old_factory = Arc::clone(factory.inner());
234
235    let private_data = Box::into_raw(Box::new(FactoryPrivateData {
236        factory: old_factory,
237        runtime,
238    })) as *mut c_void;
239
240    FFI_TableProviderFactory {
241        create: create_fn_wrapper,
242        logical_codec: factory.logical_codec.clone(),
243        clone: clone_fn_wrapper,
244        release: release_fn_wrapper,
245        version: super::version,
246        private_data,
247        library_marker_id: crate::get_library_marker_id,
248    }
249}
250
251unsafe extern "C" fn release_fn_wrapper(factory: &mut FFI_TableProviderFactory) {
252    unsafe {
253        debug_assert!(!factory.private_data.is_null());
254        let private_data = Box::from_raw(factory.private_data as *mut FactoryPrivateData);
255        drop(private_data);
256        factory.private_data = std::ptr::null_mut();
257    }
258}
259
260/// This wrapper struct exists on the receiver side of the FFI interface, so it has
261/// no guarantees about being able to access the data in `private_data`. Any functions
262/// defined on this struct must only use the stable functions provided in
263/// FFI_TableProviderFactory to interact with the foreign table provider factory.
264#[derive(Debug)]
265pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory);
266
267impl ForeignTableProviderFactory {
268    fn serialize_cmd(
269        &self,
270        cmd: CreateExternalTable,
271    ) -> Result<SVec<u8>, DataFusionError> {
272        let logical_codec: Arc<dyn LogicalExtensionCodec> =
273            (&self.0.logical_codec).into();
274
275        let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd));
276        let plan: LogicalPlanNode =
277            AsLogicalPlan::try_from_logical_plan(&plan, logical_codec.as_ref())?;
278
279        let mut buf: Vec<u8> = Vec::new();
280        plan.try_encode(&mut buf)?;
281
282        Ok(buf.into_iter().collect())
283    }
284}
285
286unsafe impl Send for ForeignTableProviderFactory {}
287unsafe impl Sync for ForeignTableProviderFactory {}
288
289#[async_trait]
290impl TableProviderFactory for ForeignTableProviderFactory {
291    async fn create(
292        &self,
293        session: &dyn Session,
294        cmd: &CreateExternalTable,
295    ) -> Result<Arc<dyn TableProvider>> {
296        let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
297        let cmd = self.serialize_cmd(cmd.clone())?;
298
299        let provider = unsafe {
300            let maybe_provider = (self.0.create)(&self.0, session, cmd).await;
301
302            let ffi_provider = df_result!(maybe_provider)?;
303            ForeignTableProvider(ffi_provider)
304        };
305
306        Ok(Arc::new(provider))
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use arrow::datatypes::Schema;
313    use datafusion::prelude::SessionContext;
314    use datafusion_common::{TableReference, ToDFSchema};
315    use datafusion_execution::TaskContextProvider;
316    use std::collections::HashMap;
317
318    use super::*;
319
320    #[derive(Debug)]
321    struct TestTableProviderFactory {}
322
323    #[async_trait]
324    impl TableProviderFactory for TestTableProviderFactory {
325        async fn create(
326            &self,
327            _session: &dyn Session,
328            _cmd: &CreateExternalTable,
329        ) -> Result<Arc<dyn TableProvider>> {
330            use arrow::datatypes::Field;
331            use datafusion::arrow::array::Float32Array;
332            use datafusion::arrow::datatypes::DataType;
333            use datafusion::arrow::record_batch::RecordBatch;
334            use datafusion::datasource::MemTable;
335
336            let schema =
337                Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
338
339            let batch1 = RecordBatch::try_new(
340                Arc::clone(&schema),
341                vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
342            )?;
343            let batch2 = RecordBatch::try_new(
344                Arc::clone(&schema),
345                vec![Arc::new(Float32Array::from(vec![64.0]))],
346            )?;
347
348            Ok(Arc::new(MemTable::try_new(
349                schema,
350                vec![vec![batch1], vec![batch2]],
351            )?))
352        }
353    }
354
355    #[tokio::test]
356    async fn test_round_trip_ffi_table_provider_factory() -> Result<()> {
357        let ctx = Arc::new(SessionContext::new());
358        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
359        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
360
361        let factory = Arc::new(TestTableProviderFactory {});
362        let mut ffi_factory =
363            FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None);
364        ffi_factory.library_marker_id = crate::mock_foreign_marker_id;
365
366        let factory: Arc<dyn TableProviderFactory> = (&ffi_factory).into();
367
368        let cmd = CreateExternalTable {
369            schema: Schema::empty().to_dfschema_ref()?,
370            name: TableReference::bare("test_table"),
371            location: "test".to_string(),
372            file_type: "test".to_string(),
373            table_partition_cols: vec![],
374            if_not_exists: false,
375            or_replace: false,
376            temporary: false,
377            definition: None,
378            order_exprs: vec![],
379            unbounded: false,
380            options: HashMap::new(),
381            constraints: Default::default(),
382            column_defaults: HashMap::new(),
383        };
384
385        let provider = factory.create(&ctx.state(), &cmd).await?;
386
387        assert_eq!(provider.schema().fields().len(), 1);
388
389        Ok(())
390    }
391
392    #[tokio::test]
393    async fn test_ffi_table_provider_factory_clone() -> Result<()> {
394        let ctx = Arc::new(SessionContext::new());
395        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
396        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
397
398        let factory = Arc::new(TestTableProviderFactory {});
399        let ffi_factory =
400            FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None);
401
402        // Test that we can clone the factory
403        let cloned_factory = ffi_factory.clone();
404        let factory: Arc<dyn TableProviderFactory> = (&cloned_factory).into();
405
406        let cmd = CreateExternalTable {
407            schema: Schema::empty().to_dfschema_ref()?,
408            name: TableReference::bare("cloned_test"),
409            location: "test".to_string(),
410            file_type: "test".to_string(),
411            table_partition_cols: vec![],
412            if_not_exists: false,
413            or_replace: false,
414            temporary: false,
415            definition: None,
416            order_exprs: vec![],
417            unbounded: false,
418            options: HashMap::new(),
419            constraints: Default::default(),
420            column_defaults: HashMap::new(),
421        };
422
423        let provider = factory.create(&ctx.state(), &cmd).await?;
424        assert_eq!(provider.schema().fields().len(), 1);
425
426        Ok(())
427    }
428}