1use std::any::Any;
19use std::ffi::c_void;
20use std::sync::Arc;
21
22use abi_stable::StableAbi;
23use abi_stable::std_types::{ROption, RResult, RVec};
24use arrow::datatypes::SchemaRef;
25use async_ffi::{FfiFuture, FutureExt};
26use async_trait::async_trait;
27use datafusion_catalog::{Session, TableProvider};
28use datafusion_common::error::{DataFusionError, Result};
29use datafusion_execution::TaskContext;
30use datafusion_expr::dml::InsertOp;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datafusion_physical_plan::ExecutionPlan;
33use datafusion_proto::logical_plan::from_proto::parse_exprs;
34use datafusion_proto::logical_plan::to_proto::serialize_exprs;
35use datafusion_proto::logical_plan::{
36 DefaultLogicalExtensionCodec, LogicalExtensionCodec,
37};
38use datafusion_proto::protobuf::LogicalExprList;
39use prost::Message;
40use tokio::runtime::Handle;
41
42use super::execution_plan::FFI_ExecutionPlan;
43use super::insert_op::FFI_InsertOp;
44use crate::arrow_wrappers::WrappedSchema;
45use crate::execution::FFI_TaskContextProvider;
46use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
47use crate::session::{FFI_SessionRef, ForeignSession};
48use crate::table_source::{FFI_TableProviderFilterPushDown, FFI_TableType};
49use crate::util::FFIResult;
50use crate::{df_result, rresult_return};
51
52#[repr(C)]
92#[derive(Debug, StableAbi)]
93pub struct FFI_TableProvider {
94 schema: unsafe extern "C" fn(provider: &Self) -> WrappedSchema,
96
97 scan: unsafe extern "C" fn(
109 provider: &Self,
110 session: FFI_SessionRef,
111 projections: ROption<RVec<usize>>,
112 filters_serialized: RVec<u8>,
113 limit: ROption<usize>,
114 ) -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
115
116 table_type: unsafe extern "C" fn(provider: &Self) -> FFI_TableType,
118
119 supports_filters_pushdown: Option<
123 unsafe extern "C" fn(
124 provider: &FFI_TableProvider,
125 filters_serialized: RVec<u8>,
126 ) -> FFIResult<RVec<FFI_TableProviderFilterPushDown>>,
127 >,
128
129 insert_into: unsafe extern "C" fn(
130 provider: &Self,
131 session: FFI_SessionRef,
132 input: &FFI_ExecutionPlan,
133 insert_op: FFI_InsertOp,
134 ) -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
135
136 pub logical_codec: FFI_LogicalExtensionCodec,
137
138 clone: unsafe extern "C" fn(plan: &Self) -> Self,
141
142 release: unsafe extern "C" fn(arg: &mut Self),
144
145 pub version: unsafe extern "C" fn() -> u64,
147
148 private_data: *mut c_void,
151
152 pub library_marker_id: extern "C" fn() -> usize,
156}
157
158unsafe impl Send for FFI_TableProvider {}
159unsafe impl Sync for FFI_TableProvider {}
160
161struct ProviderPrivateData {
162 provider: Arc<dyn TableProvider + Send>,
163 runtime: Option<Handle>,
164}
165
166impl FFI_TableProvider {
167 fn inner(&self) -> &Arc<dyn TableProvider + Send> {
168 let private_data = self.private_data as *const ProviderPrivateData;
169 unsafe { &(*private_data).provider }
170 }
171
172 fn runtime(&self) -> &Option<Handle> {
173 let private_data = self.private_data as *const ProviderPrivateData;
174 unsafe { &(*private_data).runtime }
175 }
176}
177
178unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema {
179 provider.inner().schema().into()
180}
181
182unsafe extern "C" fn table_type_fn_wrapper(
183 provider: &FFI_TableProvider,
184) -> FFI_TableType {
185 provider.inner().table_type().into()
186}
187
188fn supports_filters_pushdown_internal(
189 provider: &Arc<dyn TableProvider + Send>,
190 filters_serialized: &[u8],
191 task_ctx: &Arc<TaskContext>,
192 codec: &dyn LogicalExtensionCodec,
193) -> Result<RVec<FFI_TableProviderFilterPushDown>> {
194 let filters = match filters_serialized.is_empty() {
195 true => vec![],
196 false => {
197 let proto_filters = LogicalExprList::decode(filters_serialized)
198 .map_err(|e| DataFusionError::Plan(e.to_string()))?;
199
200 parse_exprs(proto_filters.expr.iter(), task_ctx.as_ref(), codec)?
201 }
202 };
203 let filters_borrowed: Vec<&Expr> = filters.iter().collect();
204
205 let results: RVec<_> = provider
206 .supports_filters_pushdown(&filters_borrowed)?
207 .iter()
208 .map(|v| v.into())
209 .collect();
210
211 Ok(results)
212}
213
214unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
215 provider: &FFI_TableProvider,
216 filters_serialized: RVec<u8>,
217) -> FFIResult<RVec<FFI_TableProviderFilterPushDown>> {
218 let logical_codec: Arc<dyn LogicalExtensionCodec> = (&provider.logical_codec).into();
219 let task_ctx = rresult_return!(<Arc<TaskContext>>::try_from(
220 &provider.logical_codec.task_ctx_provider
221 ));
222 supports_filters_pushdown_internal(
223 provider.inner(),
224 &filters_serialized,
225 &task_ctx,
226 logical_codec.as_ref(),
227 )
228 .map_err(|e| e.to_string().into())
229 .into()
230}
231
232unsafe extern "C" fn scan_fn_wrapper(
233 provider: &FFI_TableProvider,
234 session: FFI_SessionRef,
235 projections: ROption<RVec<usize>>,
236 filters_serialized: RVec<u8>,
237 limit: ROption<usize>,
238) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
239 let task_ctx: Result<Arc<TaskContext>, DataFusionError> =
240 (&provider.logical_codec.task_ctx_provider).try_into();
241 let runtime = provider.runtime().clone();
242 let logical_codec: Arc<dyn LogicalExtensionCodec> = (&provider.logical_codec).into();
243 let internal_provider = Arc::clone(provider.inner());
244
245 async move {
246 let mut foreign_session = None;
247 let session = rresult_return!(
248 session
249 .as_local()
250 .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
251 .unwrap_or_else(|| {
252 foreign_session = Some(ForeignSession::try_from(&session)?);
253 Ok(foreign_session.as_ref().unwrap())
254 })
255 );
256
257 let task_ctx = rresult_return!(task_ctx);
258 let filters = match filters_serialized.is_empty() {
259 true => vec![],
260 false => {
261 let proto_filters =
262 rresult_return!(LogicalExprList::decode(filters_serialized.as_ref()));
263
264 rresult_return!(parse_exprs(
265 proto_filters.expr.iter(),
266 task_ctx.as_ref(),
267 logical_codec.as_ref(),
268 ))
269 }
270 };
271
272 let projections: Option<Vec<usize>> =
273 projections.into_option().map(|p| p.into_iter().collect());
274
275 let plan = rresult_return!(
276 internal_provider
277 .scan(session, projections.as_ref(), &filters, limit.into())
278 .await
279 );
280
281 RResult::ROk(FFI_ExecutionPlan::new(plan, runtime.clone()))
282 }
283 .into_ffi()
284}
285
286unsafe extern "C" fn insert_into_fn_wrapper(
287 provider: &FFI_TableProvider,
288 session: FFI_SessionRef,
289 input: &FFI_ExecutionPlan,
290 insert_op: FFI_InsertOp,
291) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
292 let runtime = provider.runtime().clone();
293 let internal_provider = Arc::clone(provider.inner());
294 let input = input.clone();
295
296 async move {
297 let mut foreign_session = None;
298 let session = rresult_return!(
299 session
300 .as_local()
301 .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
302 .unwrap_or_else(|| {
303 foreign_session = Some(ForeignSession::try_from(&session)?);
304 Ok(foreign_session.as_ref().unwrap())
305 })
306 );
307
308 let input = rresult_return!(<Arc<dyn ExecutionPlan>>::try_from(&input));
309
310 let insert_op = InsertOp::from(insert_op);
311
312 let plan = rresult_return!(
313 internal_provider
314 .insert_into(session, input, insert_op)
315 .await
316 );
317
318 RResult::ROk(FFI_ExecutionPlan::new(plan, runtime.clone()))
319 }
320 .into_ffi()
321}
322
323unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) {
324 unsafe {
325 debug_assert!(!provider.private_data.is_null());
326 let private_data =
327 Box::from_raw(provider.private_data as *mut ProviderPrivateData);
328 drop(private_data);
329 provider.private_data = std::ptr::null_mut();
330 }
331}
332
333unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider {
334 let runtime = provider.runtime().clone();
335 let old_provider = Arc::clone(provider.inner());
336
337 let private_data = Box::into_raw(Box::new(ProviderPrivateData {
338 provider: old_provider,
339 runtime,
340 })) as *mut c_void;
341
342 FFI_TableProvider {
343 schema: schema_fn_wrapper,
344 scan: scan_fn_wrapper,
345 table_type: table_type_fn_wrapper,
346 supports_filters_pushdown: provider.supports_filters_pushdown,
347 insert_into: provider.insert_into,
348 logical_codec: provider.logical_codec.clone(),
349 clone: clone_fn_wrapper,
350 release: release_fn_wrapper,
351 version: super::version,
352 private_data,
353 library_marker_id: crate::get_library_marker_id,
354 }
355}
356
357impl Drop for FFI_TableProvider {
358 fn drop(&mut self) {
359 unsafe { (self.release)(self) }
360 }
361}
362
363impl FFI_TableProvider {
364 pub fn new(
366 provider: Arc<dyn TableProvider + Send>,
367 can_support_pushdown_filters: bool,
368 runtime: Option<Handle>,
369 task_ctx_provider: impl Into<FFI_TaskContextProvider>,
370 logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
371 ) -> Self {
372 let task_ctx_provider = task_ctx_provider.into();
373 let logical_codec =
374 logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
375 let logical_codec = FFI_LogicalExtensionCodec::new(
376 logical_codec,
377 runtime.clone(),
378 task_ctx_provider.clone(),
379 );
380 Self::new_with_ffi_codec(
381 provider,
382 can_support_pushdown_filters,
383 runtime,
384 logical_codec,
385 )
386 }
387
388 pub fn new_with_ffi_codec(
389 provider: Arc<dyn TableProvider + Send>,
390 can_support_pushdown_filters: bool,
391 runtime: Option<Handle>,
392 logical_codec: FFI_LogicalExtensionCodec,
393 ) -> Self {
394 if let Some(provider) = provider.as_any().downcast_ref::<ForeignTableProvider>() {
395 return provider.0.clone();
396 }
397 let private_data = Box::new(ProviderPrivateData { provider, runtime });
398
399 Self {
400 schema: schema_fn_wrapper,
401 scan: scan_fn_wrapper,
402 table_type: table_type_fn_wrapper,
403 supports_filters_pushdown: match can_support_pushdown_filters {
404 true => Some(supports_filters_pushdown_fn_wrapper),
405 false => None,
406 },
407 insert_into: insert_into_fn_wrapper,
408 logical_codec,
409 clone: clone_fn_wrapper,
410 release: release_fn_wrapper,
411 version: super::version,
412 private_data: Box::into_raw(private_data) as *mut c_void,
413 library_marker_id: crate::get_library_marker_id,
414 }
415 }
416}
417
418#[derive(Debug)]
423pub struct ForeignTableProvider(pub FFI_TableProvider);
424
425unsafe impl Send for ForeignTableProvider {}
426unsafe impl Sync for ForeignTableProvider {}
427
428impl From<&FFI_TableProvider> for Arc<dyn TableProvider> {
429 fn from(provider: &FFI_TableProvider) -> Self {
430 if (provider.library_marker_id)() == crate::get_library_marker_id() {
431 Arc::clone(provider.inner()) as Arc<dyn TableProvider>
432 } else {
433 Arc::new(ForeignTableProvider(provider.clone()))
434 }
435 }
436}
437
438impl Clone for FFI_TableProvider {
439 fn clone(&self) -> Self {
440 unsafe { (self.clone)(self) }
441 }
442}
443
444#[async_trait]
445impl TableProvider for ForeignTableProvider {
446 fn as_any(&self) -> &dyn Any {
447 self
448 }
449
450 fn schema(&self) -> SchemaRef {
451 let wrapped_schema = unsafe { (self.0.schema)(&self.0) };
452 wrapped_schema.into()
453 }
454
455 fn table_type(&self) -> TableType {
456 unsafe { (self.0.table_type)(&self.0).into() }
457 }
458
459 async fn scan(
460 &self,
461 session: &dyn Session,
462 projection: Option<&Vec<usize>>,
463 filters: &[Expr],
464 limit: Option<usize>,
465 ) -> Result<Arc<dyn ExecutionPlan>> {
466 let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
467
468 let projections: ROption<RVec<usize>> = projection
469 .map(|p| p.iter().map(|v| v.to_owned()).collect())
470 .into();
471
472 let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
473 let filter_list = LogicalExprList {
474 expr: serialize_exprs(filters, codec.as_ref())?,
475 };
476 let filters_serialized = filter_list.encode_to_vec().into();
477
478 let plan = unsafe {
479 let maybe_plan = (self.0.scan)(
480 &self.0,
481 session,
482 projections,
483 filters_serialized,
484 limit.into(),
485 )
486 .await;
487
488 <Arc<dyn ExecutionPlan>>::try_from(&df_result!(maybe_plan)?)?
489 };
490
491 Ok(plan)
492 }
493
494 fn supports_filters_pushdown(
497 &self,
498 filters: &[&Expr],
499 ) -> Result<Vec<TableProviderFilterPushDown>> {
500 unsafe {
501 let pushdown_fn = match self.0.supports_filters_pushdown {
502 Some(func) => func,
503 None => {
504 return Ok(vec![
505 TableProviderFilterPushDown::Unsupported;
506 filters.len()
507 ]);
508 }
509 };
510
511 let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
512
513 let expr_list = LogicalExprList {
514 expr: serialize_exprs(
515 filters.iter().map(|f| f.to_owned()),
516 codec.as_ref(),
517 )?,
518 };
519 let serialized_filters = expr_list.encode_to_vec();
520
521 let pushdowns = df_result!(pushdown_fn(&self.0, serialized_filters.into()))?;
522
523 Ok(pushdowns.iter().map(|v| v.into()).collect())
524 }
525 }
526
527 async fn insert_into(
528 &self,
529 session: &dyn Session,
530 input: Arc<dyn ExecutionPlan>,
531 insert_op: InsertOp,
532 ) -> Result<Arc<dyn ExecutionPlan>> {
533 let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
534
535 let rc = Handle::try_current().ok();
536 let input = FFI_ExecutionPlan::new(input, rc);
537 let insert_op: FFI_InsertOp = insert_op.into();
538
539 let plan = unsafe {
540 let maybe_plan =
541 (self.0.insert_into)(&self.0, session, &input, insert_op).await;
542
543 <Arc<dyn ExecutionPlan>>::try_from(&df_result!(maybe_plan)?)?
544 };
545
546 Ok(plan)
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use arrow::datatypes::Schema;
553 use datafusion::prelude::{SessionContext, col, lit};
554 use datafusion_execution::TaskContextProvider;
555
556 use super::*;
557
558 fn create_test_table_provider() -> Result<Arc<dyn TableProvider>> {
559 use arrow::datatypes::Field;
560 use datafusion::arrow::array::Float32Array;
561 use datafusion::arrow::datatypes::DataType;
562 use datafusion::arrow::record_batch::RecordBatch;
563 use datafusion::datasource::MemTable;
564
565 let schema =
566 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
567
568 let batch1 = RecordBatch::try_new(
570 Arc::clone(&schema),
571 vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
572 )?;
573 let batch2 = RecordBatch::try_new(
574 Arc::clone(&schema),
575 vec![Arc::new(Float32Array::from(vec![64.0]))],
576 )?;
577
578 Ok(Arc::new(MemTable::try_new(
579 schema,
580 vec![vec![batch1], vec![batch2]],
581 )?))
582 }
583
584 #[tokio::test]
585 async fn test_round_trip_ffi_table_provider_scan() -> Result<()> {
586 let provider = create_test_table_provider()?;
587 let ctx = Arc::new(SessionContext::new());
588 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
589 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
590
591 let mut ffi_provider =
592 FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
593 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
594
595 let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
596
597 ctx.register_table("t", foreign_table_provider)?;
598
599 let df = ctx.table("t").await?;
600
601 df.select(vec![col("a")])?
602 .filter(col("a").gt(lit(3.0)))?
603 .show()
604 .await?;
605
606 Ok(())
607 }
608
609 #[tokio::test]
610 async fn test_round_trip_ffi_table_provider_insert_into() -> Result<()> {
611 let provider = create_test_table_provider()?;
612 let ctx = Arc::new(SessionContext::new());
613 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
614 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
615
616 let mut ffi_provider =
617 FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
618 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
619
620 let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
621
622 ctx.register_table("t", foreign_table_provider)?;
623
624 let result = ctx
625 .sql("INSERT INTO t VALUES (128.0);")
626 .await?
627 .collect()
628 .await?;
629
630 assert!(result.len() == 1 && result[0].num_rows() == 1);
631
632 ctx.table("t")
633 .await?
634 .select(vec![col("a")])?
635 .filter(col("a").gt(lit(3.0)))?
636 .show()
637 .await?;
638
639 Ok(())
640 }
641
642 #[tokio::test]
643 async fn test_aggregation() -> Result<()> {
644 use arrow::datatypes::Field;
645 use datafusion::arrow::array::Float32Array;
646 use datafusion::arrow::datatypes::DataType;
647 use datafusion::arrow::record_batch::RecordBatch;
648 use datafusion::common::assert_batches_eq;
649 use datafusion::datasource::MemTable;
650
651 let schema =
652 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
653
654 let batch1 = RecordBatch::try_new(
656 Arc::clone(&schema),
657 vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
658 )?;
659
660 let ctx = Arc::new(SessionContext::new());
661 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
662 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
663
664 let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch1]])?);
665
666 let mut ffi_provider =
667 FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
668 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
669
670 let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
671
672 ctx.register_table("t", foreign_table_provider)?;
673
674 let result = ctx
675 .sql("SELECT COUNT(*) as cnt FROM t")
676 .await?
677 .collect()
678 .await?;
679 #[rustfmt::skip]
680 let expected = [
681 "+-----+",
682 "| cnt |",
683 "+-----+",
684 "| 3 |",
685 "+-----+"
686 ];
687 assert_batches_eq!(expected, &result);
688 Ok(())
689 }
690
691 #[test]
692 fn test_ffi_table_provider_local_bypass() -> Result<()> {
693 let table_provider = create_test_table_provider()?;
694
695 let ctx = Arc::new(SessionContext::new()) as Arc<dyn TaskContextProvider>;
696 let task_ctx_provider = FFI_TaskContextProvider::from(&ctx);
697 let mut ffi_table =
698 FFI_TableProvider::new(table_provider, false, None, task_ctx_provider, None);
699
700 let foreign_table: Arc<dyn TableProvider> = (&ffi_table).into();
702 assert!(
703 foreign_table
704 .as_any()
705 .downcast_ref::<datafusion::datasource::MemTable>()
706 .is_some()
707 );
708
709 ffi_table.library_marker_id = crate::mock_foreign_marker_id;
711 let foreign_table: Arc<dyn TableProvider> = (&ffi_table).into();
712 assert!(
713 foreign_table
714 .as_any()
715 .downcast_ref::<ForeignTableProvider>()
716 .is_some()
717 );
718
719 Ok(())
720 }
721
722 #[tokio::test]
723 async fn test_scan_with_none_projection_returns_all_columns() -> Result<()> {
724 use arrow::datatypes::Field;
725 use datafusion::arrow::array::Float32Array;
726 use datafusion::arrow::datatypes::DataType;
727 use datafusion::arrow::record_batch::RecordBatch;
728 use datafusion::datasource::MemTable;
729 use datafusion::physical_plan::collect;
730
731 let schema = Arc::new(Schema::new(vec![
732 Field::new("a", DataType::Float32, false),
733 Field::new("b", DataType::Float32, false),
734 Field::new("c", DataType::Float32, false),
735 ]));
736
737 let batch = RecordBatch::try_new(
738 Arc::clone(&schema),
739 vec![
740 Arc::new(Float32Array::from(vec![1.0, 2.0])),
741 Arc::new(Float32Array::from(vec![3.0, 4.0])),
742 Arc::new(Float32Array::from(vec![5.0, 6.0])),
743 ],
744 )?;
745
746 let provider =
747 Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
748
749 let ctx = Arc::new(SessionContext::new());
750 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
751 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
752
753 let mut ffi_provider =
755 FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
756 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
757
758 let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
759
760 let plan = foreign_table_provider
762 .scan(&ctx.state(), None, &[], None)
763 .await?;
764 assert_eq!(
765 plan.schema().fields().len(),
766 3,
767 "scan(projection=None) should return all columns; got {}",
768 plan.schema().fields().len()
769 );
770
771 let batches = collect(plan, ctx.task_ctx()).await?;
773 assert_eq!(batches.len(), 1);
774 assert_eq!(batches[0].num_columns(), 3);
775 assert_eq!(batches[0].num_rows(), 2);
776
777 Ok(())
778 }
779}