1use std::{ffi::c_void, sync::Arc};
19
20use async_ffi::{FfiFuture, FutureExt};
21use async_trait::async_trait;
22use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
23use datafusion_common::error::{DataFusionError, Result};
24use datafusion_execution::TaskContext;
25use datafusion_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
26use datafusion_proto::logical_plan::{
27 AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
28};
29use datafusion_proto::protobuf::LogicalPlanNode;
30use prost::Message;
31
32use stabby::vec::Vec as SVec;
33use tokio::runtime::Handle;
34
35use crate::execution::FFI_TaskContextProvider;
36use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
37use crate::session::{FFI_SessionRef, ForeignSession};
38use crate::table_provider::{FFI_TableProvider, ForeignTableProvider};
39use crate::util::FFI_Result;
40use crate::{df_result, sresult_return};
41
42#[repr(C)]
51#[derive(Debug)]
52pub struct FFI_TableProviderFactory {
53 create: unsafe extern "C" fn(
62 factory: &Self,
63 session: FFI_SessionRef,
64 cmd_serialized: SVec<u8>,
65 ) -> FfiFuture<FFI_Result<FFI_TableProvider>>,
66
67 logical_codec: FFI_LogicalExtensionCodec,
68
69 clone: unsafe extern "C" fn(factory: &Self) -> Self,
72
73 release: unsafe extern "C" fn(factory: &mut Self),
75
76 version: unsafe extern "C" fn() -> u64,
78
79 private_data: *mut c_void,
82
83 library_marker_id: extern "C" fn() -> usize,
87}
88
89unsafe impl Send for FFI_TableProviderFactory {}
90unsafe impl Sync for FFI_TableProviderFactory {}
91
92struct FactoryPrivateData {
93 factory: Arc<dyn TableProviderFactory + Send>,
94 runtime: Option<Handle>,
95}
96
97impl FFI_TableProviderFactory {
98 pub fn new(
100 factory: Arc<dyn TableProviderFactory + Send>,
101 runtime: Option<Handle>,
102 task_ctx_provider: impl Into<FFI_TaskContextProvider>,
103 logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
104 ) -> Self {
105 let task_ctx_provider = task_ctx_provider.into();
106 let logical_codec =
107 logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
108 let logical_codec = FFI_LogicalExtensionCodec::new(
109 logical_codec,
110 runtime.clone(),
111 task_ctx_provider.clone(),
112 );
113 Self::new_with_ffi_codec(factory, runtime, logical_codec)
114 }
115
116 pub fn new_with_ffi_codec(
117 factory: Arc<dyn TableProviderFactory + Send>,
118 runtime: Option<Handle>,
119 logical_codec: FFI_LogicalExtensionCodec,
120 ) -> Self {
121 let private_data = Box::new(FactoryPrivateData { factory, runtime });
122
123 Self {
124 create: create_fn_wrapper,
125 logical_codec,
126 clone: clone_fn_wrapper,
127 release: release_fn_wrapper,
128 version: super::version,
129 private_data: Box::into_raw(private_data) as *mut c_void,
130 library_marker_id: crate::get_library_marker_id,
131 }
132 }
133
134 fn inner(&self) -> &Arc<dyn TableProviderFactory + Send> {
135 let private_data = self.private_data as *const FactoryPrivateData;
136 unsafe { &(*private_data).factory }
137 }
138
139 fn runtime(&self) -> &Option<Handle> {
140 let private_data = self.private_data as *const FactoryPrivateData;
141 unsafe { &(*private_data).runtime }
142 }
143
144 fn deserialize_cmd(
145 &self,
146 cmd_serialized: &SVec<u8>,
147 ) -> Result<CreateExternalTable, DataFusionError> {
148 let task_ctx: Arc<TaskContext> =
149 (&self.logical_codec.task_ctx_provider).try_into()?;
150 let logical_codec: Arc<dyn LogicalExtensionCodec> = (&self.logical_codec).into();
151
152 let plan = LogicalPlanNode::decode(cmd_serialized.as_ref())
153 .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?;
154 match plan.try_into_logical_plan(&task_ctx, logical_codec.as_ref())? {
155 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(cmd),
156 _ => Err(DataFusionError::Internal(
157 "Invalid logical plan in FFI_TableProviderFactory.".to_owned(),
158 )),
159 }
160 }
161}
162
163impl Clone for FFI_TableProviderFactory {
164 fn clone(&self) -> Self {
165 unsafe { (self.clone)(self) }
166 }
167}
168
169impl Drop for FFI_TableProviderFactory {
170 fn drop(&mut self) {
171 unsafe { (self.release)(self) }
172 }
173}
174
175impl From<&FFI_TableProviderFactory> for Arc<dyn TableProviderFactory> {
176 fn from(factory: &FFI_TableProviderFactory) -> Self {
177 if (factory.library_marker_id)() == crate::get_library_marker_id() {
178 Arc::clone(factory.inner()) as Arc<dyn TableProviderFactory>
179 } else {
180 Arc::new(ForeignTableProviderFactory(factory.clone()))
181 }
182 }
183}
184
185unsafe extern "C" fn create_fn_wrapper(
186 factory: &FFI_TableProviderFactory,
187 session: FFI_SessionRef,
188 cmd_serialized: SVec<u8>,
189) -> FfiFuture<FFI_Result<FFI_TableProvider>> {
190 let factory = factory.clone();
191
192 async move {
193 let provider = sresult_return!(
194 create_fn_wrapper_impl(factory, session, cmd_serialized).await
195 );
196 FFI_Result::Ok(provider)
197 }
198 .into_ffi()
199}
200
201async fn create_fn_wrapper_impl(
202 factory: FFI_TableProviderFactory,
203 session: FFI_SessionRef,
204 cmd_serialized: SVec<u8>,
205) -> Result<FFI_TableProvider, DataFusionError> {
206 let runtime = factory.runtime().clone();
207 let ffi_logical_codec = factory.logical_codec.clone();
208 let internal_factory = Arc::clone(factory.inner());
209 let cmd = factory.deserialize_cmd(&cmd_serialized)?;
210
211 let mut foreign_session = None;
212 let session = session
213 .as_local()
214 .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
215 .unwrap_or_else(|| {
216 foreign_session = Some(ForeignSession::try_from(&session)?);
217 Ok(foreign_session.as_ref().unwrap())
218 })?;
219
220 let provider = internal_factory.create(session, &cmd).await?;
221 Ok(FFI_TableProvider::new_with_ffi_codec(
222 provider,
223 true,
224 runtime.clone(),
225 ffi_logical_codec,
226 ))
227}
228
229unsafe extern "C" fn clone_fn_wrapper(
230 factory: &FFI_TableProviderFactory,
231) -> FFI_TableProviderFactory {
232 let runtime = factory.runtime().clone();
233 let old_factory = Arc::clone(factory.inner());
234
235 let private_data = Box::into_raw(Box::new(FactoryPrivateData {
236 factory: old_factory,
237 runtime,
238 })) as *mut c_void;
239
240 FFI_TableProviderFactory {
241 create: create_fn_wrapper,
242 logical_codec: factory.logical_codec.clone(),
243 clone: clone_fn_wrapper,
244 release: release_fn_wrapper,
245 version: super::version,
246 private_data,
247 library_marker_id: crate::get_library_marker_id,
248 }
249}
250
251unsafe extern "C" fn release_fn_wrapper(factory: &mut FFI_TableProviderFactory) {
252 unsafe {
253 debug_assert!(!factory.private_data.is_null());
254 let private_data = Box::from_raw(factory.private_data as *mut FactoryPrivateData);
255 drop(private_data);
256 factory.private_data = std::ptr::null_mut();
257 }
258}
259
260#[derive(Debug)]
265pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory);
266
267impl ForeignTableProviderFactory {
268 fn serialize_cmd(
269 &self,
270 cmd: CreateExternalTable,
271 ) -> Result<SVec<u8>, DataFusionError> {
272 let logical_codec: Arc<dyn LogicalExtensionCodec> =
273 (&self.0.logical_codec).into();
274
275 let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd));
276 let plan: LogicalPlanNode =
277 AsLogicalPlan::try_from_logical_plan(&plan, logical_codec.as_ref())?;
278
279 let mut buf: Vec<u8> = Vec::new();
280 plan.try_encode(&mut buf)?;
281
282 Ok(buf.into_iter().collect())
283 }
284}
285
286unsafe impl Send for ForeignTableProviderFactory {}
287unsafe impl Sync for ForeignTableProviderFactory {}
288
289#[async_trait]
290impl TableProviderFactory for ForeignTableProviderFactory {
291 async fn create(
292 &self,
293 session: &dyn Session,
294 cmd: &CreateExternalTable,
295 ) -> Result<Arc<dyn TableProvider>> {
296 let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
297 let cmd = self.serialize_cmd(cmd.clone())?;
298
299 let provider = unsafe {
300 let maybe_provider = (self.0.create)(&self.0, session, cmd).await;
301
302 let ffi_provider = df_result!(maybe_provider)?;
303 ForeignTableProvider(ffi_provider)
304 };
305
306 Ok(Arc::new(provider))
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use arrow::datatypes::Schema;
313 use datafusion::prelude::SessionContext;
314 use datafusion_common::{TableReference, ToDFSchema};
315 use datafusion_execution::TaskContextProvider;
316 use std::collections::HashMap;
317
318 use super::*;
319
320 #[derive(Debug)]
321 struct TestTableProviderFactory {}
322
323 #[async_trait]
324 impl TableProviderFactory for TestTableProviderFactory {
325 async fn create(
326 &self,
327 _session: &dyn Session,
328 _cmd: &CreateExternalTable,
329 ) -> Result<Arc<dyn TableProvider>> {
330 use arrow::datatypes::Field;
331 use datafusion::arrow::array::Float32Array;
332 use datafusion::arrow::datatypes::DataType;
333 use datafusion::arrow::record_batch::RecordBatch;
334 use datafusion::datasource::MemTable;
335
336 let schema =
337 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
338
339 let batch1 = RecordBatch::try_new(
340 Arc::clone(&schema),
341 vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
342 )?;
343 let batch2 = RecordBatch::try_new(
344 Arc::clone(&schema),
345 vec![Arc::new(Float32Array::from(vec![64.0]))],
346 )?;
347
348 Ok(Arc::new(MemTable::try_new(
349 schema,
350 vec![vec![batch1], vec![batch2]],
351 )?))
352 }
353 }
354
355 #[tokio::test]
356 async fn test_round_trip_ffi_table_provider_factory() -> Result<()> {
357 let ctx = Arc::new(SessionContext::new());
358 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
359 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
360
361 let factory = Arc::new(TestTableProviderFactory {});
362 let mut ffi_factory =
363 FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None);
364 ffi_factory.library_marker_id = crate::mock_foreign_marker_id;
365
366 let factory: Arc<dyn TableProviderFactory> = (&ffi_factory).into();
367
368 let cmd = CreateExternalTable {
369 schema: Schema::empty().to_dfschema_ref()?,
370 name: TableReference::bare("test_table"),
371 location: "test".to_string(),
372 file_type: "test".to_string(),
373 table_partition_cols: vec![],
374 if_not_exists: false,
375 or_replace: false,
376 temporary: false,
377 definition: None,
378 order_exprs: vec![],
379 unbounded: false,
380 options: HashMap::new(),
381 constraints: Default::default(),
382 column_defaults: HashMap::new(),
383 };
384
385 let provider = factory.create(&ctx.state(), &cmd).await?;
386
387 assert_eq!(provider.schema().fields().len(), 1);
388
389 Ok(())
390 }
391
392 #[tokio::test]
393 async fn test_ffi_table_provider_factory_clone() -> Result<()> {
394 let ctx = Arc::new(SessionContext::new());
395 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
396 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
397
398 let factory = Arc::new(TestTableProviderFactory {});
399 let ffi_factory =
400 FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None);
401
402 let cloned_factory = ffi_factory.clone();
404 let factory: Arc<dyn TableProviderFactory> = (&cloned_factory).into();
405
406 let cmd = CreateExternalTable {
407 schema: Schema::empty().to_dfschema_ref()?,
408 name: TableReference::bare("cloned_test"),
409 location: "test".to_string(),
410 file_type: "test".to_string(),
411 table_partition_cols: vec![],
412 if_not_exists: false,
413 or_replace: false,
414 temporary: false,
415 definition: None,
416 order_exprs: vec![],
417 unbounded: false,
418 options: HashMap::new(),
419 constraints: Default::default(),
420 column_defaults: HashMap::new(),
421 };
422
423 let provider = factory.create(&ctx.state(), &cmd).await?;
424 assert_eq!(provider.schema().fields().len(), 1);
425
426 Ok(())
427 }
428}