1use std::ffi::c_void;
19use std::sync::Arc;
20
21use arrow::datatypes::SchemaRef;
22use async_ffi::{FfiFuture, FutureExt};
23use async_trait::async_trait;
24use datafusion_catalog::{Session, TableProvider};
25use datafusion_common::Statistics;
26use datafusion_common::error::{DataFusionError, Result};
27use datafusion_execution::TaskContext;
28use datafusion_expr::dml::InsertOp;
29use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
30use datafusion_physical_plan::ExecutionPlan;
31use datafusion_proto::logical_plan::from_proto::parse_exprs;
32use datafusion_proto::logical_plan::to_proto::serialize_exprs;
33use datafusion_proto::logical_plan::{
34 DefaultLogicalExtensionCodec, LogicalExtensionCodec,
35};
36use datafusion_proto::protobuf::LogicalExprList;
37use prost::Message;
38
39use stabby::vec::Vec as SVec;
40use tokio::runtime::Handle;
41
42use super::execution_plan::FFI_ExecutionPlan;
43use super::insert_op::FFI_InsertOp;
44use crate::arrow_wrappers::WrappedSchema;
45use crate::execution::FFI_TaskContextProvider;
46use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
47use crate::session::{FFI_SessionRef, ForeignSession};
48use crate::statistics::{deserialize_statistics, serialize_statistics};
49use crate::table_source::{FFI_TableProviderFilterPushDown, FFI_TableType};
50use crate::util::{FFI_Option, FFI_Result};
51use crate::{df_result, sresult_return};
52
53#[repr(C)]
93#[derive(Debug)]
94pub struct FFI_TableProvider {
95 schema: unsafe extern "C" fn(provider: &Self) -> WrappedSchema,
97
98 scan: unsafe extern "C" fn(
110 provider: &Self,
111 session: FFI_SessionRef,
112 projections: FFI_Option<SVec<usize>>,
113 filters_serialized: SVec<u8>,
114 limit: FFI_Option<usize>,
115 ) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>>,
116
117 table_type: unsafe extern "C" fn(provider: &Self) -> FFI_TableType,
119
120 supports_filters_pushdown: Option<
124 unsafe extern "C" fn(
125 provider: &FFI_TableProvider,
126 filters_serialized: SVec<u8>,
127 )
128 -> FFI_Result<SVec<FFI_TableProviderFilterPushDown>>,
129 >,
130
131 insert_into: unsafe extern "C" fn(
132 provider: &Self,
133 session: FFI_SessionRef,
134 input: &FFI_ExecutionPlan,
135 insert_op: FFI_InsertOp,
136 ) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>>,
137
138 pub statistics: unsafe extern "C" fn(provider: &Self) -> FFI_Option<SVec<u8>>,
142
143 pub logical_codec: FFI_LogicalExtensionCodec,
144
145 clone: unsafe extern "C" fn(plan: &Self) -> Self,
148
149 release: unsafe extern "C" fn(arg: &mut Self),
151
152 pub version: unsafe extern "C" fn() -> u64,
154
155 private_data: *mut c_void,
158
159 pub library_marker_id: extern "C" fn() -> usize,
163}
164
165unsafe impl Send for FFI_TableProvider {}
166unsafe impl Sync for FFI_TableProvider {}
167
168struct ProviderPrivateData {
169 provider: Arc<dyn TableProvider>,
170 runtime: Option<Handle>,
171}
172
173impl FFI_TableProvider {
174 fn inner(&self) -> &Arc<dyn TableProvider> {
175 let private_data = self.private_data as *const ProviderPrivateData;
176 unsafe { &(*private_data).provider }
177 }
178
179 fn runtime(&self) -> &Option<Handle> {
180 let private_data = self.private_data as *const ProviderPrivateData;
181 unsafe { &(*private_data).runtime }
182 }
183}
184
185unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema {
186 provider.inner().schema().into()
187}
188
189unsafe extern "C" fn statistics_fn_wrapper(
190 provider: &FFI_TableProvider,
191) -> FFI_Option<SVec<u8>> {
192 let serialized: Option<SVec<u8>> = provider
193 .inner()
194 .statistics()
195 .map(|s| SVec::from(&*serialize_statistics(&s)));
196 serialized.into()
197}
198
199unsafe extern "C" fn table_type_fn_wrapper(
200 provider: &FFI_TableProvider,
201) -> FFI_TableType {
202 provider.inner().table_type().into()
203}
204
205fn supports_filters_pushdown_internal(
206 provider: &Arc<dyn TableProvider>,
207 filters_serialized: &[u8],
208 task_ctx: &Arc<TaskContext>,
209 codec: &dyn LogicalExtensionCodec,
210) -> Result<SVec<FFI_TableProviderFilterPushDown>> {
211 let filters = match filters_serialized.is_empty() {
212 true => vec![],
213 false => {
214 let proto_filters = LogicalExprList::decode(filters_serialized)
215 .map_err(|e| DataFusionError::Plan(e.to_string()))?;
216
217 parse_exprs(proto_filters.expr.iter(), task_ctx.as_ref(), codec)?
218 }
219 };
220 let filters_borrowed: Vec<&Expr> = filters.iter().collect();
221
222 let results: SVec<_> = provider
223 .supports_filters_pushdown(&filters_borrowed)?
224 .iter()
225 .map(|v| v.into())
226 .collect();
227
228 Ok(results)
229}
230
231unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
232 provider: &FFI_TableProvider,
233 filters_serialized: SVec<u8>,
234) -> FFI_Result<SVec<FFI_TableProviderFilterPushDown>> {
235 let logical_codec: Arc<dyn LogicalExtensionCodec> = (&provider.logical_codec).into();
236 let task_ctx = sresult_return!(<Arc<TaskContext>>::try_from(
237 &provider.logical_codec.task_ctx_provider
238 ));
239 supports_filters_pushdown_internal(
240 provider.inner(),
241 &filters_serialized,
242 &task_ctx,
243 logical_codec.as_ref(),
244 )
245 .into()
246}
247
248unsafe extern "C" fn scan_fn_wrapper(
249 provider: &FFI_TableProvider,
250 session: FFI_SessionRef,
251 projections: FFI_Option<SVec<usize>>,
252 filters_serialized: SVec<u8>,
253 limit: FFI_Option<usize>,
254) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>> {
255 let task_ctx: Result<Arc<TaskContext>, DataFusionError> =
256 (&provider.logical_codec.task_ctx_provider).try_into();
257 let runtime = provider.runtime().clone();
258 let logical_codec: Arc<dyn LogicalExtensionCodec> = (&provider.logical_codec).into();
259 let internal_provider = Arc::clone(provider.inner());
260
261 async move {
262 let mut foreign_session = None;
263 let session = sresult_return!(
264 session
265 .as_local()
266 .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
267 .unwrap_or_else(|| {
268 foreign_session = Some(ForeignSession::try_from(&session)?);
269 Ok(foreign_session.as_ref().unwrap())
270 })
271 );
272
273 let task_ctx = sresult_return!(task_ctx);
274 let filters = match filters_serialized.is_empty() {
275 true => vec![],
276 false => {
277 let proto_filters =
278 sresult_return!(LogicalExprList::decode(filters_serialized.as_ref()));
279
280 sresult_return!(parse_exprs(
281 proto_filters.expr.iter(),
282 task_ctx.as_ref(),
283 logical_codec.as_ref(),
284 ))
285 }
286 };
287
288 let projections: Option<Vec<usize>> =
289 projections.into_option().map(|p| p.into_iter().collect());
290
291 let plan = sresult_return!(
292 internal_provider
293 .scan(session, projections.as_ref(), &filters, limit.into())
294 .await
295 );
296
297 FFI_Result::Ok(FFI_ExecutionPlan::new(plan, runtime.clone()))
298 }
299 .into_ffi()
300}
301
302unsafe extern "C" fn insert_into_fn_wrapper(
303 provider: &FFI_TableProvider,
304 session: FFI_SessionRef,
305 input: &FFI_ExecutionPlan,
306 insert_op: FFI_InsertOp,
307) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>> {
308 let runtime = provider.runtime().clone();
309 let internal_provider = Arc::clone(provider.inner());
310 let input = input.clone();
311
312 async move {
313 let mut foreign_session = None;
314 let session = sresult_return!(
315 session
316 .as_local()
317 .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
318 .unwrap_or_else(|| {
319 foreign_session = Some(ForeignSession::try_from(&session)?);
320 Ok(foreign_session.as_ref().unwrap())
321 })
322 );
323
324 let input = sresult_return!(<Arc<dyn ExecutionPlan>>::try_from(&input));
325
326 let insert_op = InsertOp::from(insert_op);
327
328 let plan = sresult_return!(
329 internal_provider
330 .insert_into(session, input, insert_op)
331 .await
332 );
333
334 FFI_Result::Ok(FFI_ExecutionPlan::new(plan, runtime.clone()))
335 }
336 .into_ffi()
337}
338
339unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) {
340 unsafe {
341 debug_assert!(!provider.private_data.is_null());
342 let private_data =
343 Box::from_raw(provider.private_data as *mut ProviderPrivateData);
344 drop(private_data);
345 provider.private_data = std::ptr::null_mut();
346 }
347}
348
349unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider {
350 let runtime = provider.runtime().clone();
351 let old_provider = Arc::clone(provider.inner());
352
353 let private_data = Box::into_raw(Box::new(ProviderPrivateData {
354 provider: old_provider,
355 runtime,
356 })) as *mut c_void;
357
358 FFI_TableProvider {
359 schema: schema_fn_wrapper,
360 scan: scan_fn_wrapper,
361 table_type: table_type_fn_wrapper,
362 supports_filters_pushdown: provider.supports_filters_pushdown,
363 insert_into: provider.insert_into,
364 statistics: statistics_fn_wrapper,
365 logical_codec: provider.logical_codec.clone(),
366 clone: clone_fn_wrapper,
367 release: release_fn_wrapper,
368 version: super::version,
369 private_data,
370 library_marker_id: crate::get_library_marker_id,
371 }
372}
373
374impl Drop for FFI_TableProvider {
375 fn drop(&mut self) {
376 unsafe { (self.release)(self) }
377 }
378}
379
380impl FFI_TableProvider {
381 pub fn new(
383 provider: Arc<dyn TableProvider>,
384 can_support_pushdown_filters: bool,
385 runtime: Option<Handle>,
386 task_ctx_provider: impl Into<FFI_TaskContextProvider>,
387 logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
388 ) -> Self {
389 let task_ctx_provider = task_ctx_provider.into();
390 let logical_codec =
391 logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
392 let logical_codec = FFI_LogicalExtensionCodec::new(
393 logical_codec,
394 runtime.clone(),
395 task_ctx_provider.clone(),
396 );
397 Self::new_with_ffi_codec(
398 provider,
399 can_support_pushdown_filters,
400 runtime,
401 logical_codec,
402 )
403 }
404
405 pub fn new_with_ffi_codec(
406 provider: Arc<dyn TableProvider>,
407 can_support_pushdown_filters: bool,
408 runtime: Option<Handle>,
409 logical_codec: FFI_LogicalExtensionCodec,
410 ) -> Self {
411 if let Some(provider) = provider.downcast_ref::<ForeignTableProvider>() {
412 return provider.0.clone();
413 }
414 let private_data = Box::new(ProviderPrivateData { provider, runtime });
415
416 Self {
417 schema: schema_fn_wrapper,
418 scan: scan_fn_wrapper,
419 table_type: table_type_fn_wrapper,
420 supports_filters_pushdown: match can_support_pushdown_filters {
421 true => Some(supports_filters_pushdown_fn_wrapper),
422 false => None,
423 },
424 insert_into: insert_into_fn_wrapper,
425 statistics: statistics_fn_wrapper,
426 logical_codec,
427 clone: clone_fn_wrapper,
428 release: release_fn_wrapper,
429 version: super::version,
430 private_data: Box::into_raw(private_data) as *mut c_void,
431 library_marker_id: crate::get_library_marker_id,
432 }
433 }
434}
435
436#[derive(Debug)]
441pub struct ForeignTableProvider(pub FFI_TableProvider);
442
443unsafe impl Send for ForeignTableProvider {}
444unsafe impl Sync for ForeignTableProvider {}
445
446impl From<&FFI_TableProvider> for Arc<dyn TableProvider> {
447 fn from(provider: &FFI_TableProvider) -> Self {
448 if (provider.library_marker_id)() == crate::get_library_marker_id() {
449 Arc::clone(provider.inner()) as Arc<dyn TableProvider>
450 } else {
451 Arc::new(ForeignTableProvider(provider.clone()))
452 }
453 }
454}
455
456impl Clone for FFI_TableProvider {
457 fn clone(&self) -> Self {
458 unsafe { (self.clone)(self) }
459 }
460}
461
462#[async_trait]
463impl TableProvider for ForeignTableProvider {
464 fn schema(&self) -> SchemaRef {
465 let wrapped_schema = unsafe { (self.0.schema)(&self.0) };
466 wrapped_schema.into()
467 }
468
469 fn table_type(&self) -> TableType {
470 unsafe { (self.0.table_type)(&self.0).into() }
471 }
472
473 fn statistics(&self) -> Option<Statistics> {
474 let ffi_opt = unsafe { (self.0.statistics)(&self.0) };
475 let bytes: Option<SVec<u8>> = ffi_opt.into();
476 let bytes = bytes?;
477 match deserialize_statistics(bytes.as_slice()) {
478 Ok(stats) => Some(stats),
479 Err(e) => {
480 log::warn!("Failed to deserialize FFI statistics: {e}");
481 debug_assert!(false, "Failed to deserialize FFI statistics: {e}");
483 None
484 }
485 }
486 }
487
488 async fn scan(
489 &self,
490 session: &dyn Session,
491 projection: Option<&Vec<usize>>,
492 filters: &[Expr],
493 limit: Option<usize>,
494 ) -> Result<Arc<dyn ExecutionPlan>> {
495 let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
496
497 let projections: FFI_Option<SVec<usize>> = projection
498 .map(|p| p.iter().map(|v| v.to_owned()).collect())
499 .into();
500
501 let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
502 let filter_list = LogicalExprList {
503 expr: serialize_exprs(filters, codec.as_ref())?,
504 };
505 let filters_serialized = filter_list.encode_to_vec().into_iter().collect();
506
507 let plan = unsafe {
508 let maybe_plan = (self.0.scan)(
509 &self.0,
510 session,
511 projections,
512 filters_serialized,
513 limit.into(),
514 )
515 .await;
516
517 <Arc<dyn ExecutionPlan>>::try_from(&df_result!(maybe_plan)?)?
518 };
519
520 Ok(plan)
521 }
522
523 fn supports_filters_pushdown(
526 &self,
527 filters: &[&Expr],
528 ) -> Result<Vec<TableProviderFilterPushDown>> {
529 unsafe {
530 let pushdown_fn = match self.0.supports_filters_pushdown {
531 Some(func) => func,
532 None => {
533 return Ok(vec![
534 TableProviderFilterPushDown::Unsupported;
535 filters.len()
536 ]);
537 }
538 };
539
540 let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
541
542 let expr_list = LogicalExprList {
543 expr: serialize_exprs(
544 filters.iter().map(|f| f.to_owned()),
545 codec.as_ref(),
546 )?,
547 };
548 let serialized_filters = expr_list.encode_to_vec();
549
550 let pushdowns = df_result!(pushdown_fn(
551 &self.0,
552 serialized_filters.into_iter().collect()
553 ))?;
554
555 Ok(pushdowns.iter().map(|v| v.into()).collect())
556 }
557 }
558
559 async fn insert_into(
560 &self,
561 session: &dyn Session,
562 input: Arc<dyn ExecutionPlan>,
563 insert_op: InsertOp,
564 ) -> Result<Arc<dyn ExecutionPlan>> {
565 let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
566
567 let rc = Handle::try_current().ok();
568 let input = FFI_ExecutionPlan::new(input, rc);
569 let insert_op: FFI_InsertOp = insert_op.into();
570
571 let plan = unsafe {
572 let maybe_plan =
573 (self.0.insert_into)(&self.0, session, &input, insert_op).await;
574
575 <Arc<dyn ExecutionPlan>>::try_from(&df_result!(maybe_plan)?)?
576 };
577
578 Ok(plan)
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use arrow::datatypes::Schema;
585 use datafusion::prelude::{SessionContext, col, lit};
586 use datafusion_execution::TaskContextProvider;
587
588 use super::*;
589
590 fn create_test_table_provider() -> Result<Arc<dyn TableProvider>> {
591 use arrow::datatypes::Field;
592 use datafusion::arrow::array::Float32Array;
593 use datafusion::arrow::datatypes::DataType;
594 use datafusion::arrow::record_batch::RecordBatch;
595 use datafusion::datasource::MemTable;
596
597 let schema =
598 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
599
600 let batch1 = RecordBatch::try_new(
602 Arc::clone(&schema),
603 vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
604 )?;
605 let batch2 = RecordBatch::try_new(
606 Arc::clone(&schema),
607 vec![Arc::new(Float32Array::from(vec![64.0]))],
608 )?;
609
610 Ok(Arc::new(MemTable::try_new(
611 schema,
612 vec![vec![batch1], vec![batch2]],
613 )?))
614 }
615
616 #[tokio::test]
617 async fn test_round_trip_ffi_table_provider_scan() -> Result<()> {
618 let provider = create_test_table_provider()?;
619 let ctx = Arc::new(SessionContext::new());
620 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
621 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
622
623 let mut ffi_provider =
624 FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
625 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
626
627 let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
628
629 ctx.register_table("t", foreign_table_provider)?;
630
631 let df = ctx.table("t").await?;
632
633 df.select(vec![col("a")])?
634 .filter(col("a").gt(lit(3.0)))?
635 .show()
636 .await?;
637
638 Ok(())
639 }
640
641 #[tokio::test]
642 async fn test_round_trip_ffi_table_provider_insert_into() -> Result<()> {
643 let provider = create_test_table_provider()?;
644 let ctx = Arc::new(SessionContext::new());
645 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
646 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
647
648 let mut ffi_provider =
649 FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
650 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
651
652 let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
653
654 ctx.register_table("t", foreign_table_provider)?;
655
656 let result = ctx
657 .sql("INSERT INTO t VALUES (128.0);")
658 .await?
659 .collect()
660 .await?;
661
662 assert!(result.len() == 1 && result[0].num_rows() == 1);
663
664 ctx.table("t")
665 .await?
666 .select(vec![col("a")])?
667 .filter(col("a").gt(lit(3.0)))?
668 .show()
669 .await?;
670
671 Ok(())
672 }
673
674 #[tokio::test]
675 async fn test_aggregation() -> Result<()> {
676 use arrow::datatypes::Field;
677 use datafusion::arrow::array::Float32Array;
678 use datafusion::arrow::datatypes::DataType;
679 use datafusion::arrow::record_batch::RecordBatch;
680 use datafusion::common::assert_batches_eq;
681 use datafusion::datasource::MemTable;
682
683 let schema =
684 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
685
686 let batch1 = RecordBatch::try_new(
688 Arc::clone(&schema),
689 vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
690 )?;
691
692 let ctx = Arc::new(SessionContext::new());
693 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
694 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
695
696 let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch1]])?);
697
698 let mut ffi_provider =
699 FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
700 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
701
702 let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
703
704 ctx.register_table("t", foreign_table_provider)?;
705
706 let result = ctx
707 .sql("SELECT COUNT(*) as cnt FROM t")
708 .await?
709 .collect()
710 .await?;
711 #[rustfmt::skip]
712 let expected = [
713 "+-----+",
714 "| cnt |",
715 "+-----+",
716 "| 3 |",
717 "+-----+"
718 ];
719 assert_batches_eq!(expected, &result);
720 Ok(())
721 }
722
723 #[test]
724 fn test_ffi_table_provider_local_bypass() -> Result<()> {
725 let table_provider = create_test_table_provider()?;
726
727 let ctx = Arc::new(SessionContext::new()) as Arc<dyn TaskContextProvider>;
728 let task_ctx_provider = FFI_TaskContextProvider::from(&ctx);
729 let mut ffi_table =
730 FFI_TableProvider::new(table_provider, false, None, task_ctx_provider, None);
731
732 let foreign_table: Arc<dyn TableProvider> = (&ffi_table).into();
734 assert!(
735 foreign_table
736 .downcast_ref::<datafusion::datasource::MemTable>()
737 .is_some()
738 );
739
740 ffi_table.library_marker_id = crate::mock_foreign_marker_id;
742 let foreign_table: Arc<dyn TableProvider> = (&ffi_table).into();
743 assert!(
744 foreign_table
745 .downcast_ref::<ForeignTableProvider>()
746 .is_some()
747 );
748
749 Ok(())
750 }
751
752 #[tokio::test]
753 async fn test_scan_with_none_projection_returns_all_columns() -> Result<()> {
754 use arrow::datatypes::Field;
755 use datafusion::arrow::array::Float32Array;
756 use datafusion::arrow::datatypes::DataType;
757 use datafusion::arrow::record_batch::RecordBatch;
758 use datafusion::datasource::MemTable;
759 use datafusion::physical_plan::collect;
760
761 let schema = Arc::new(Schema::new(vec![
762 Field::new("a", DataType::Float32, false),
763 Field::new("b", DataType::Float32, false),
764 Field::new("c", DataType::Float32, false),
765 ]));
766
767 let batch = RecordBatch::try_new(
768 Arc::clone(&schema),
769 vec![
770 Arc::new(Float32Array::from(vec![1.0, 2.0])),
771 Arc::new(Float32Array::from(vec![3.0, 4.0])),
772 Arc::new(Float32Array::from(vec![5.0, 6.0])),
773 ],
774 )?;
775
776 let provider =
777 Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
778
779 let ctx = Arc::new(SessionContext::new());
780 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
781 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
782
783 let mut ffi_provider =
785 FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
786 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
787
788 let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
789
790 let plan = foreign_table_provider
792 .scan(&ctx.state(), None, &[], None)
793 .await?;
794 assert_eq!(
795 plan.schema().fields().len(),
796 3,
797 "scan(projection=None) should return all columns; got {}",
798 plan.schema().fields().len()
799 );
800
801 let batches = collect(plan, ctx.task_ctx()).await?;
803 assert_eq!(batches.len(), 1);
804 assert_eq!(batches[0].num_columns(), 3);
805 assert_eq!(batches[0].num_rows(), 2);
806
807 Ok(())
808 }
809
810 #[test]
811 fn test_ffi_table_provider_statistics_round_trip() -> Result<()> {
812 use arrow::datatypes::{DataType, Field};
813 use datafusion::arrow::array::Int32Array;
814 use datafusion::arrow::record_batch::RecordBatch;
815 use datafusion::datasource::MemTable;
816 use datafusion_common::stats::Precision;
817 use datafusion_common::{ColumnStatistics, ScalarValue};
818
819 #[derive(Debug)]
821 struct TableWithStats {
822 inner: Arc<dyn TableProvider>,
823 stats: Option<Statistics>,
824 }
825
826 #[async_trait]
827 impl TableProvider for TableWithStats {
828 fn schema(&self) -> SchemaRef {
829 self.inner.schema()
830 }
831 fn table_type(&self) -> TableType {
832 self.inner.table_type()
833 }
834 fn statistics(&self) -> Option<Statistics> {
835 self.stats.clone()
836 }
837 async fn scan(
838 &self,
839 session: &dyn Session,
840 projection: Option<&Vec<usize>>,
841 filters: &[Expr],
842 limit: Option<usize>,
843 ) -> Result<Arc<dyn ExecutionPlan>> {
844 self.inner.scan(session, projection, filters, limit).await
845 }
846 }
847
848 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
849
850 let batch = RecordBatch::try_new(
851 Arc::clone(&schema),
852 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
853 )?;
854
855 let ctx = Arc::new(SessionContext::new());
856 let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
857 let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
858
859 let no_stats_inner = Arc::new(MemTable::try_new(
861 Arc::clone(&schema),
862 vec![vec![batch.clone()]],
863 )?);
864 let no_stats_provider = Arc::new(TableWithStats {
865 inner: no_stats_inner,
866 stats: None,
867 });
868 let mut ffi_provider = FFI_TableProvider::new(
869 no_stats_provider,
870 true,
871 None,
872 task_ctx_provider.clone(),
873 None,
874 );
875 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
876 let foreign: Arc<dyn TableProvider> = (&ffi_provider).into();
877 assert!(foreign.statistics().is_none());
878
879 let original_stats = Statistics {
881 num_rows: Precision::Exact(3),
882 total_byte_size: Precision::Inexact(12),
883 column_statistics: vec![ColumnStatistics {
884 null_count: Precision::Exact(0),
885 max_value: Precision::Exact(ScalarValue::Int32(Some(3))),
886 min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
887 sum_value: Precision::Exact(ScalarValue::Int64(Some(6))),
888 distinct_count: Precision::Exact(3),
889 byte_size: Precision::Exact(12),
890 }],
891 };
892 let stats_inner =
893 Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
894 let stats_provider = Arc::new(TableWithStats {
895 inner: stats_inner,
896 stats: Some(original_stats.clone()),
897 });
898 let mut ffi_provider =
899 FFI_TableProvider::new(stats_provider, true, None, task_ctx_provider, None);
900 ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
901 let foreign: Arc<dyn TableProvider> = (&ffi_provider).into();
902 assert_eq!(foreign.statistics().as_ref(), Some(&original_stats));
903
904 Ok(())
905 }
906}