1use std::{ffi::c_void, sync::Arc};
19
20use abi_stable::{
21 StableAbi,
22 std_types::{RResult, RString, RVec},
23};
24use async_ffi::{FfiFuture, FutureExt};
25use async_trait::async_trait;
26use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_execution::TaskContext;
29use datafusion_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
30use datafusion_proto::logical_plan::{
31 AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
32};
33use datafusion_proto::protobuf::LogicalPlanNode;
34use prost::Message;
35use tokio::runtime::Handle;
36
37use crate::execution::FFI_TaskContextProvider;
38use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
39use crate::session::{FFI_SessionRef, ForeignSession};
40use crate::table_provider::{FFI_TableProvider, ForeignTableProvider};
41use crate::{df_result, rresult_return};
42
43#[repr(C)]
52#[derive(Debug, StableAbi)]
53pub struct FFI_TableProviderFactory {
54 create: unsafe extern "C" fn(
63 factory: &Self,
64 session: FFI_SessionRef,
65 cmd_serialized: RVec<u8>,
66 ) -> FfiFuture<RResult<FFI_TableProvider, RString>>,
67
68 logical_codec: FFI_LogicalExtensionCodec,
69
70 clone: unsafe extern "C" fn(factory: &Self) -> Self,
73
74 release: unsafe extern "C" fn(factory: &mut Self),
76
77 version: unsafe extern "C" fn() -> u64,
79
80 private_data: *mut c_void,
83
84 library_marker_id: extern "C" fn() -> usize,
88}
89
90unsafe impl Send for FFI_TableProviderFactory {}
91unsafe impl Sync for FFI_TableProviderFactory {}
92
93struct FactoryPrivateData {
94 factory: Arc<dyn TableProviderFactory + Send>,
95 runtime: Option<Handle>,
96}
97
98impl FFI_TableProviderFactory {
99 pub fn new(
101 factory: Arc<dyn TableProviderFactory + Send>,
102 runtime: Option<Handle>,
103 task_ctx_provider: impl Into<FFI_TaskContextProvider>,
104 logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
105 ) -> Self {
106 let task_ctx_provider = task_ctx_provider.into();
107 let logical_codec =
108 logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
109 let logical_codec = FFI_LogicalExtensionCodec::new(
110 logical_codec,
111 runtime.clone(),
112 task_ctx_provider.clone(),
113 );
114 Self::new_with_ffi_codec(factory, runtime, logical_codec)
115 }
116
117 pub fn new_with_ffi_codec(
118 factory: Arc<dyn TableProviderFactory + Send>,
119 runtime: Option<Handle>,
120 logical_codec: FFI_LogicalExtensionCodec,
121 ) -> Self {
122 let private_data = Box::new(FactoryPrivateData { factory, runtime });
123
124 Self {
125 create: create_fn_wrapper,
126 logical_codec,
127 clone: clone_fn_wrapper,
128 release: release_fn_wrapper,
129 version: super::version,
130 private_data: Box::into_raw(private_data) as *mut c_void,
131 library_marker_id: crate::get_library_marker_id,
132 }
133 }
134
135 fn inner(&self) -> &Arc<dyn TableProviderFactory + Send> {
136 let private_data = self.private_data as *const FactoryPrivateData;
137 unsafe { &(*private_data).factory }
138 }
139
140 fn runtime(&self) -> &Option<Handle> {
141 let private_data = self.private_data as *const FactoryPrivateData;
142 unsafe { &(*private_data).runtime }
143 }
144
145 fn deserialize_cmd(
146 &self,
147 cmd_serialized: &RVec<u8>,
148 ) -> Result<CreateExternalTable, DataFusionError> {
149 let task_ctx: Arc<TaskContext> =
150 (&self.logical_codec.task_ctx_provider).try_into()?;
151 let logical_codec: Arc<dyn LogicalExtensionCodec> = (&self.logical_codec).into();
152
153 let plan = LogicalPlanNode::decode(cmd_serialized.as_ref())
154 .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?;
155 match plan.try_into_logical_plan(&task_ctx, logical_codec.as_ref())? {
156 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(cmd),
157 _ => Err(DataFusionError::Internal(
158 "Invalid logical plan in FFI_TableProviderFactory.".to_owned(),
159 )),
160 }
161 }
162}
163
164impl Clone for FFI_TableProviderFactory {
165 fn clone(&self) -> Self {
166 unsafe { (self.clone)(self) }
167 }
168}
169
170impl Drop for FFI_TableProviderFactory {
171 fn drop(&mut self) {
172 unsafe { (self.release)(self) }
173 }
174}
175
176impl From<&FFI_TableProviderFactory> for Arc<dyn TableProviderFactory> {
177 fn from(factory: &FFI_TableProviderFactory) -> Self {
178 if (factory.library_marker_id)() == crate::get_library_marker_id() {
179 Arc::clone(factory.inner()) as Arc<dyn TableProviderFactory>
180 } else {
181 Arc::new(ForeignTableProviderFactory(factory.clone()))
182 }
183 }
184}
185
186unsafe extern "C" fn create_fn_wrapper(
187 factory: &FFI_TableProviderFactory,
188 session: FFI_SessionRef,
189 cmd_serialized: RVec<u8>,
190) -> FfiFuture<RResult<FFI_TableProvider, RString>> {
191 let factory = factory.clone();
192
193 async move {
194 let provider = rresult_return!(
195 create_fn_wrapper_impl(factory, session, cmd_serialized).await
196 );
197 RResult::ROk(provider)
198 }
199 .into_ffi()
200}
201
202async fn create_fn_wrapper_impl(
203 factory: FFI_TableProviderFactory,
204 session: FFI_SessionRef,
205 cmd_serialized: RVec<u8>,
206) -> Result<FFI_TableProvider, DataFusionError> {
207 let runtime = factory.runtime().clone();
208 let ffi_logical_codec = factory.logical_codec.clone();
209 let internal_factory = Arc::clone(factory.inner());
210 let cmd = factory.deserialize_cmd(&cmd_serialized)?;
211
212 let mut foreign_session = None;
213 let session = session
214 .as_local()
215 .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
216 .unwrap_or_else(|| {
217 foreign_session = Some(ForeignSession::try_from(&session)?);
218 Ok(foreign_session.as_ref().unwrap())
219 })?;
220
221 let provider = internal_factory.create(session, &cmd).await?;
222 Ok(FFI_TableProvider::new_with_ffi_codec(
223 provider,
224 true,
225 runtime.clone(),
226 ffi_logical_codec,
227 ))
228}
229
230unsafe extern "C" fn clone_fn_wrapper(
231 factory: &FFI_TableProviderFactory,
232) -> FFI_TableProviderFactory {
233 let runtime = factory.runtime().clone();
234 let old_factory = Arc::clone(factory.inner());
235
236 let private_data = Box::into_raw(Box::new(FactoryPrivateData {
237 factory: old_factory,
238 runtime,
239 })) as *mut c_void;
240
241 FFI_TableProviderFactory {
242 create: create_fn_wrapper,
243 logical_codec: factory.logical_codec.clone(),
244 clone: clone_fn_wrapper,
245 release: release_fn_wrapper,
246 version: super::version,
247 private_data,
248 library_marker_id: crate::get_library_marker_id,
249 }
250}
251
252unsafe extern "C" fn release_fn_wrapper(factory: &mut FFI_TableProviderFactory) {
253 unsafe {
254 debug_assert!(!factory.private_data.is_null());
255 let private_data = Box::from_raw(factory.private_data as *mut FactoryPrivateData);
256 drop(private_data);
257 factory.private_data = std::ptr::null_mut();
258 }
259}
260
261#[derive(Debug)]
266pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory);
267
268impl ForeignTableProviderFactory {
269 fn serialize_cmd(
270 &self,
271 cmd: CreateExternalTable,
272 ) -> Result<RVec<u8>, DataFusionError> {
273 let logical_codec: Arc<dyn LogicalExtensionCodec> =
274 (&self.0.logical_codec).into();
275
276 let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd));
277 let plan: LogicalPlanNode =
278 AsLogicalPlan::try_from_logical_plan(&plan, logical_codec.as_ref())?;
279
280 let mut buf: Vec<u8> = Vec::new();
281 plan.try_encode(&mut buf)?;
282
283 Ok(buf.into())
284 }
285}
286
287unsafe impl Send for ForeignTableProviderFactory {}
288unsafe impl Sync for ForeignTableProviderFactory {}
289
290#[async_trait]
291impl TableProviderFactory for ForeignTableProviderFactory {
292 async fn create(
293 &self,
294 session: &dyn Session,
295 cmd: &CreateExternalTable,
296 ) -> Result<Arc<dyn TableProvider>> {
297 let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
298 let cmd = self.serialize_cmd(cmd.clone())?;
299
300 let provider = unsafe {
301 let maybe_provider = (self.0.create)(&self.0, session, cmd).await;
302
303 let ffi_provider = df_result!(maybe_provider)?;
304 ForeignTableProvider(ffi_provider)
305 };
306
307 Ok(Arc::new(provider))
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use arrow::datatypes::Schema;
314 use datafusion::prelude::SessionContext;
315 use datafusion_common::{TableReference, ToDFSchema};
316 use datafusion_execution::TaskContextProvider;
317 use std::collections::HashMap;
318
319 use super::*;
320
321 #[derive(Debug)]
322 struct TestTableProviderFactory {}
323
324 #[async_trait]
325 impl TableProviderFactory for TestTableProviderFactory {
326 async fn create(
327 &self,
328 _session: &dyn Session,
329 _cmd: &CreateExternalTable,
330 ) -> Result<Arc<dyn TableProvider>> {
331 use arrow::datatypes::Field;
332 use datafusion::arrow::array::Float32Array;
333 use datafusion::arrow::datatypes::DataType;
334 use datafusion::arrow::record_batch::RecordBatch;
335 use datafusion::datasource::MemTable;
336
337 let schema =
338 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
339
340 let batch1 = RecordBatch::try_new(
341 Arc::clone(&schema),
342 vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
343 )?;
344 let batch2 = RecordBatch::try_new(
345 Arc::clone(&schema),
346 vec![Arc::new(Float32Array::from(vec![64.0]))],
347 )?;
348
349 Ok(Arc::new(MemTable::try_new(
350 schema,
351 vec![vec![batch1], vec![batch2]],
352 )?))
353 }
354 }
355
356 #[tokio::test]
357 async fn test_round_trip_ffi_table_provider_factory() -> Result<()> {
358 let ctx = Arc::new(SessionContext::new());
359 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
360 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
361
362 let factory = Arc::new(TestTableProviderFactory {});
363 let mut ffi_factory =
364 FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None);
365 ffi_factory.library_marker_id = crate::mock_foreign_marker_id;
366
367 let factory: Arc<dyn TableProviderFactory> = (&ffi_factory).into();
368
369 let cmd = CreateExternalTable {
370 schema: Schema::empty().to_dfschema_ref()?,
371 name: TableReference::bare("test_table"),
372 location: "test".to_string(),
373 file_type: "test".to_string(),
374 table_partition_cols: vec![],
375 if_not_exists: false,
376 or_replace: false,
377 temporary: false,
378 definition: None,
379 order_exprs: vec![],
380 unbounded: false,
381 options: HashMap::new(),
382 constraints: Default::default(),
383 column_defaults: HashMap::new(),
384 };
385
386 let provider = factory.create(&ctx.state(), &cmd).await?;
387
388 assert_eq!(provider.schema().fields().len(), 1);
389
390 Ok(())
391 }
392
393 #[tokio::test]
394 async fn test_ffi_table_provider_factory_clone() -> Result<()> {
395 let ctx = Arc::new(SessionContext::new());
396 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
397 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
398
399 let factory = Arc::new(TestTableProviderFactory {});
400 let ffi_factory =
401 FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None);
402
403 let cloned_factory = ffi_factory.clone();
405 let factory: Arc<dyn TableProviderFactory> = (&cloned_factory).into();
406
407 let cmd = CreateExternalTable {
408 schema: Schema::empty().to_dfschema_ref()?,
409 name: TableReference::bare("cloned_test"),
410 location: "test".to_string(),
411 file_type: "test".to_string(),
412 table_partition_cols: vec![],
413 if_not_exists: false,
414 or_replace: false,
415 temporary: false,
416 definition: None,
417 order_exprs: vec![],
418 unbounded: false,
419 options: HashMap::new(),
420 constraints: Default::default(),
421 column_defaults: HashMap::new(),
422 };
423
424 let provider = factory.create(&ctx.state(), &cmd).await?;
425 assert_eq!(provider.schema().fields().len(), 1);
426
427 Ok(())
428 }
429}