Skip to main content

datafusion_ffi/
table_provider.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ffi::c_void;
19use std::sync::Arc;
20
21use arrow::datatypes::SchemaRef;
22use async_ffi::{FfiFuture, FutureExt};
23use async_trait::async_trait;
24use datafusion_catalog::{Session, TableProvider};
25use datafusion_common::Statistics;
26use datafusion_common::error::{DataFusionError, Result};
27use datafusion_execution::TaskContext;
28use datafusion_expr::dml::InsertOp;
29use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
30use datafusion_physical_plan::ExecutionPlan;
31use datafusion_proto::logical_plan::from_proto::parse_exprs;
32use datafusion_proto::logical_plan::to_proto::serialize_exprs;
33use datafusion_proto::logical_plan::{
34    DefaultLogicalExtensionCodec, LogicalExtensionCodec,
35};
36use datafusion_proto::protobuf::LogicalExprList;
37use prost::Message;
38
39use stabby::vec::Vec as SVec;
40use tokio::runtime::Handle;
41
42use super::execution_plan::FFI_ExecutionPlan;
43use super::insert_op::FFI_InsertOp;
44use crate::arrow_wrappers::WrappedSchema;
45use crate::execution::FFI_TaskContextProvider;
46use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
47use crate::session::{FFI_SessionRef, ForeignSession};
48use crate::statistics::{deserialize_statistics, serialize_statistics};
49use crate::table_source::{FFI_TableProviderFilterPushDown, FFI_TableType};
50use crate::util::{FFI_Option, FFI_Result};
51use crate::{df_result, sresult_return};
52
53/// A stable struct for sharing [`TableProvider`] across FFI boundaries.
54///
55/// # Struct Layout
56///
57/// The following description applies to all structs provided in this crate.
58///
59/// Each of the exposed structs in this crate is provided with a variant prefixed
60/// with `Foreign`. This variant is designed to be used by the consumer of the
61/// foreign code. The `Foreign` structs should _never_ access the `private_data`
62/// fields. Instead they should only access the data returned through the function
63/// calls defined on the `FFI_` structs. The second purpose of the `Foreign`
64/// structs is to contain additional data that may be needed by the traits that
65/// are implemented on them. Some of these traits require borrowing data which
66/// can be far more convenient to be locally stored.
67///
68/// For example, we have a struct `FFI_TableProvider` to give access to the
69/// `TableProvider` functions like `table_type()` and `scan()`. If we write a
70/// library that wishes to expose it's `TableProvider`, then we can access the
71/// private data that contains the Arc reference to the `TableProvider` via
72/// `FFI_TableProvider`. This data is local to the library.
73///
74/// If we have a program that accesses a `TableProvider` via FFI, then it
75/// will use `ForeignTableProvider`. When using `ForeignTableProvider` we **must**
76/// not attempt to access the `private_data` field in `FFI_TableProvider`. If a
77/// user is testing locally, you may be able to successfully access this field, but
78/// it will only work if you are building against the exact same version of
79/// `DataFusion` for both libraries **and** the same compiler. It will not work
80/// in general.
81///
82/// It is worth noting that which library is the `local` and which is `foreign`
83/// depends on which interface we are considering. For example, suppose we have a
84/// Python library called `my_provider` that exposes a `TableProvider` called
85/// `MyProvider` via `FFI_TableProvider`. Within the library `my_provider` we can
86/// access the `private_data` via `FFI_TableProvider`. We connect this to
87/// `datafusion-python`, where we access it as a `ForeignTableProvider`. Now when
88/// we call `scan()` on this interface, we have to pass it a `FFI_SessionConfig`.
89/// The `SessionConfig` is local to `datafusion-python` and **not** `my_provider`.
90/// It is important to be careful when expanding these functions to be certain which
91/// side of the interface each object refers to.
92#[repr(C)]
93#[derive(Debug)]
94pub struct FFI_TableProvider {
95    /// Return the table schema
96    schema: unsafe extern "C" fn(provider: &Self) -> WrappedSchema,
97
98    /// Perform a scan on the table. See [`TableProvider`] for detailed usage information.
99    ///
100    /// # Arguments
101    ///
102    /// * `provider` - the table provider
103    /// * `session` - session
104    /// * `projections` - if specified, only a subset of the columns are returned
105    /// * `filters_serialized` - filters to apply to the scan, which are a
106    ///   [`LogicalExprList`] protobuf message serialized into bytes to pass
107    ///   across the FFI boundary.
108    /// * `limit` - if specified, limit the number of rows returned
109    scan: unsafe extern "C" fn(
110        provider: &Self,
111        session: FFI_SessionRef,
112        projections: FFI_Option<SVec<usize>>,
113        filters_serialized: SVec<u8>,
114        limit: FFI_Option<usize>,
115    ) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>>,
116
117    /// Return the type of table. See [`TableType`] for options.
118    table_type: unsafe extern "C" fn(provider: &Self) -> FFI_TableType,
119
120    /// Based upon the input filters, identify which are supported. The filters
121    /// are a [`LogicalExprList`] protobuf message serialized into bytes to pass
122    /// across the FFI boundary.
123    supports_filters_pushdown: Option<
124        unsafe extern "C" fn(
125            provider: &FFI_TableProvider,
126            filters_serialized: SVec<u8>,
127        )
128            -> FFI_Result<SVec<FFI_TableProviderFilterPushDown>>,
129    >,
130
131    insert_into: unsafe extern "C" fn(
132        provider: &Self,
133        session: FFI_SessionRef,
134        input: &FFI_ExecutionPlan,
135        insert_op: FFI_InsertOp,
136    ) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>>,
137
138    /// Snapshot the provider's table-level statistics. [`FFI_Option::None`]
139    /// corresponds to [`TableProvider::statistics`] returning `None`;
140    /// `Some(bytes)` is a prost-encoded `datafusion_proto_common::Statistics`.
141    pub statistics: unsafe extern "C" fn(provider: &Self) -> FFI_Option<SVec<u8>>,
142
143    pub logical_codec: FFI_LogicalExtensionCodec,
144
145    /// Used to create a clone on the provider of the execution plan. This should
146    /// only need to be called by the receiver of the plan.
147    clone: unsafe extern "C" fn(plan: &Self) -> Self,
148
149    /// Release the memory of the private data when it is no longer being used.
150    release: unsafe extern "C" fn(arg: &mut Self),
151
152    /// Return the major DataFusion version number of this provider.
153    pub version: unsafe extern "C" fn() -> u64,
154
155    /// Internal data. This is only to be accessed by the provider of the plan.
156    /// A [`ForeignTableProvider`] should never attempt to access this data.
157    private_data: *mut c_void,
158
159    /// Utility to identify when FFI objects are accessed locally through
160    /// the foreign interface. See [`crate::get_library_marker_id`] and
161    /// the crate's `README.md` for more information.
162    pub library_marker_id: extern "C" fn() -> usize,
163}
164
165unsafe impl Send for FFI_TableProvider {}
166unsafe impl Sync for FFI_TableProvider {}
167
168struct ProviderPrivateData {
169    provider: Arc<dyn TableProvider>,
170    runtime: Option<Handle>,
171}
172
173impl FFI_TableProvider {
174    fn inner(&self) -> &Arc<dyn TableProvider> {
175        let private_data = self.private_data as *const ProviderPrivateData;
176        unsafe { &(*private_data).provider }
177    }
178
179    fn runtime(&self) -> &Option<Handle> {
180        let private_data = self.private_data as *const ProviderPrivateData;
181        unsafe { &(*private_data).runtime }
182    }
183}
184
185unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema {
186    provider.inner().schema().into()
187}
188
189unsafe extern "C" fn statistics_fn_wrapper(
190    provider: &FFI_TableProvider,
191) -> FFI_Option<SVec<u8>> {
192    let serialized: Option<SVec<u8>> = provider
193        .inner()
194        .statistics()
195        .map(|s| SVec::from(&*serialize_statistics(&s)));
196    serialized.into()
197}
198
199unsafe extern "C" fn table_type_fn_wrapper(
200    provider: &FFI_TableProvider,
201) -> FFI_TableType {
202    provider.inner().table_type().into()
203}
204
205fn supports_filters_pushdown_internal(
206    provider: &Arc<dyn TableProvider>,
207    filters_serialized: &[u8],
208    task_ctx: &Arc<TaskContext>,
209    codec: &dyn LogicalExtensionCodec,
210) -> Result<SVec<FFI_TableProviderFilterPushDown>> {
211    let filters = match filters_serialized.is_empty() {
212        true => vec![],
213        false => {
214            let proto_filters = LogicalExprList::decode(filters_serialized)
215                .map_err(|e| DataFusionError::Plan(e.to_string()))?;
216
217            parse_exprs(proto_filters.expr.iter(), task_ctx.as_ref(), codec)?
218        }
219    };
220    let filters_borrowed: Vec<&Expr> = filters.iter().collect();
221
222    let results: SVec<_> = provider
223        .supports_filters_pushdown(&filters_borrowed)?
224        .iter()
225        .map(|v| v.into())
226        .collect();
227
228    Ok(results)
229}
230
231unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
232    provider: &FFI_TableProvider,
233    filters_serialized: SVec<u8>,
234) -> FFI_Result<SVec<FFI_TableProviderFilterPushDown>> {
235    let logical_codec: Arc<dyn LogicalExtensionCodec> = (&provider.logical_codec).into();
236    let task_ctx = sresult_return!(<Arc<TaskContext>>::try_from(
237        &provider.logical_codec.task_ctx_provider
238    ));
239    supports_filters_pushdown_internal(
240        provider.inner(),
241        &filters_serialized,
242        &task_ctx,
243        logical_codec.as_ref(),
244    )
245    .into()
246}
247
248unsafe extern "C" fn scan_fn_wrapper(
249    provider: &FFI_TableProvider,
250    session: FFI_SessionRef,
251    projections: FFI_Option<SVec<usize>>,
252    filters_serialized: SVec<u8>,
253    limit: FFI_Option<usize>,
254) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>> {
255    let task_ctx: Result<Arc<TaskContext>, DataFusionError> =
256        (&provider.logical_codec.task_ctx_provider).try_into();
257    let runtime = provider.runtime().clone();
258    let logical_codec: Arc<dyn LogicalExtensionCodec> = (&provider.logical_codec).into();
259    let internal_provider = Arc::clone(provider.inner());
260
261    async move {
262        let mut foreign_session = None;
263        let session = sresult_return!(
264            session
265                .as_local()
266                .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
267                .unwrap_or_else(|| {
268                    foreign_session = Some(ForeignSession::try_from(&session)?);
269                    Ok(foreign_session.as_ref().unwrap())
270                })
271        );
272
273        let task_ctx = sresult_return!(task_ctx);
274        let filters = match filters_serialized.is_empty() {
275            true => vec![],
276            false => {
277                let proto_filters =
278                    sresult_return!(LogicalExprList::decode(filters_serialized.as_ref()));
279
280                sresult_return!(parse_exprs(
281                    proto_filters.expr.iter(),
282                    task_ctx.as_ref(),
283                    logical_codec.as_ref(),
284                ))
285            }
286        };
287
288        let projections: Option<Vec<usize>> =
289            projections.into_option().map(|p| p.into_iter().collect());
290
291        let plan = sresult_return!(
292            internal_provider
293                .scan(session, projections.as_ref(), &filters, limit.into())
294                .await
295        );
296
297        FFI_Result::Ok(FFI_ExecutionPlan::new(plan, runtime.clone()))
298    }
299    .into_ffi()
300}
301
302unsafe extern "C" fn insert_into_fn_wrapper(
303    provider: &FFI_TableProvider,
304    session: FFI_SessionRef,
305    input: &FFI_ExecutionPlan,
306    insert_op: FFI_InsertOp,
307) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>> {
308    let runtime = provider.runtime().clone();
309    let internal_provider = Arc::clone(provider.inner());
310    let input = input.clone();
311
312    async move {
313        let mut foreign_session = None;
314        let session = sresult_return!(
315            session
316                .as_local()
317                .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
318                .unwrap_or_else(|| {
319                    foreign_session = Some(ForeignSession::try_from(&session)?);
320                    Ok(foreign_session.as_ref().unwrap())
321                })
322        );
323
324        let input = sresult_return!(<Arc<dyn ExecutionPlan>>::try_from(&input));
325
326        let insert_op = InsertOp::from(insert_op);
327
328        let plan = sresult_return!(
329            internal_provider
330                .insert_into(session, input, insert_op)
331                .await
332        );
333
334        FFI_Result::Ok(FFI_ExecutionPlan::new(plan, runtime.clone()))
335    }
336    .into_ffi()
337}
338
339unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) {
340    unsafe {
341        debug_assert!(!provider.private_data.is_null());
342        let private_data =
343            Box::from_raw(provider.private_data as *mut ProviderPrivateData);
344        drop(private_data);
345        provider.private_data = std::ptr::null_mut();
346    }
347}
348
349unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider {
350    let runtime = provider.runtime().clone();
351    let old_provider = Arc::clone(provider.inner());
352
353    let private_data = Box::into_raw(Box::new(ProviderPrivateData {
354        provider: old_provider,
355        runtime,
356    })) as *mut c_void;
357
358    FFI_TableProvider {
359        schema: schema_fn_wrapper,
360        scan: scan_fn_wrapper,
361        table_type: table_type_fn_wrapper,
362        supports_filters_pushdown: provider.supports_filters_pushdown,
363        insert_into: provider.insert_into,
364        statistics: statistics_fn_wrapper,
365        logical_codec: provider.logical_codec.clone(),
366        clone: clone_fn_wrapper,
367        release: release_fn_wrapper,
368        version: super::version,
369        private_data,
370        library_marker_id: crate::get_library_marker_id,
371    }
372}
373
374impl Drop for FFI_TableProvider {
375    fn drop(&mut self) {
376        unsafe { (self.release)(self) }
377    }
378}
379
380impl FFI_TableProvider {
381    /// Creates a new [`FFI_TableProvider`].
382    pub fn new(
383        provider: Arc<dyn TableProvider>,
384        can_support_pushdown_filters: bool,
385        runtime: Option<Handle>,
386        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
387        logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
388    ) -> Self {
389        let task_ctx_provider = task_ctx_provider.into();
390        let logical_codec =
391            logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
392        let logical_codec = FFI_LogicalExtensionCodec::new(
393            logical_codec,
394            runtime.clone(),
395            task_ctx_provider.clone(),
396        );
397        Self::new_with_ffi_codec(
398            provider,
399            can_support_pushdown_filters,
400            runtime,
401            logical_codec,
402        )
403    }
404
405    pub fn new_with_ffi_codec(
406        provider: Arc<dyn TableProvider>,
407        can_support_pushdown_filters: bool,
408        runtime: Option<Handle>,
409        logical_codec: FFI_LogicalExtensionCodec,
410    ) -> Self {
411        if let Some(provider) = provider.downcast_ref::<ForeignTableProvider>() {
412            return provider.0.clone();
413        }
414        let private_data = Box::new(ProviderPrivateData { provider, runtime });
415
416        Self {
417            schema: schema_fn_wrapper,
418            scan: scan_fn_wrapper,
419            table_type: table_type_fn_wrapper,
420            supports_filters_pushdown: match can_support_pushdown_filters {
421                true => Some(supports_filters_pushdown_fn_wrapper),
422                false => None,
423            },
424            insert_into: insert_into_fn_wrapper,
425            statistics: statistics_fn_wrapper,
426            logical_codec,
427            clone: clone_fn_wrapper,
428            release: release_fn_wrapper,
429            version: super::version,
430            private_data: Box::into_raw(private_data) as *mut c_void,
431            library_marker_id: crate::get_library_marker_id,
432        }
433    }
434}
435
436/// This wrapper struct exists on the receiver side of the FFI interface, so it has
437/// no guarantees about being able to access the data in `private_data`. Any functions
438/// defined on this struct must only use the stable functions provided in
439/// FFI_TableProvider to interact with the foreign table provider.
440#[derive(Debug)]
441pub struct ForeignTableProvider(pub FFI_TableProvider);
442
443unsafe impl Send for ForeignTableProvider {}
444unsafe impl Sync for ForeignTableProvider {}
445
446impl From<&FFI_TableProvider> for Arc<dyn TableProvider> {
447    fn from(provider: &FFI_TableProvider) -> Self {
448        if (provider.library_marker_id)() == crate::get_library_marker_id() {
449            Arc::clone(provider.inner()) as Arc<dyn TableProvider>
450        } else {
451            Arc::new(ForeignTableProvider(provider.clone()))
452        }
453    }
454}
455
456impl Clone for FFI_TableProvider {
457    fn clone(&self) -> Self {
458        unsafe { (self.clone)(self) }
459    }
460}
461
462#[async_trait]
463impl TableProvider for ForeignTableProvider {
464    fn schema(&self) -> SchemaRef {
465        let wrapped_schema = unsafe { (self.0.schema)(&self.0) };
466        wrapped_schema.into()
467    }
468
469    fn table_type(&self) -> TableType {
470        unsafe { (self.0.table_type)(&self.0).into() }
471    }
472
473    fn statistics(&self) -> Option<Statistics> {
474        let ffi_opt = unsafe { (self.0.statistics)(&self.0) };
475        let bytes: Option<SVec<u8>> = ffi_opt.into();
476        let bytes = bytes?;
477        match deserialize_statistics(bytes.as_slice()) {
478            Ok(stats) => Some(stats),
479            Err(e) => {
480                log::warn!("Failed to deserialize FFI statistics: {e}");
481                // Fires in debug builds to surface encoding bugs early; callers see None.
482                debug_assert!(false, "Failed to deserialize FFI statistics: {e}");
483                None
484            }
485        }
486    }
487
488    async fn scan(
489        &self,
490        session: &dyn Session,
491        projection: Option<&Vec<usize>>,
492        filters: &[Expr],
493        limit: Option<usize>,
494    ) -> Result<Arc<dyn ExecutionPlan>> {
495        let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
496
497        let projections: FFI_Option<SVec<usize>> = projection
498            .map(|p| p.iter().map(|v| v.to_owned()).collect())
499            .into();
500
501        let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
502        let filter_list = LogicalExprList {
503            expr: serialize_exprs(filters, codec.as_ref())?,
504        };
505        let filters_serialized = filter_list.encode_to_vec().into_iter().collect();
506
507        let plan = unsafe {
508            let maybe_plan = (self.0.scan)(
509                &self.0,
510                session,
511                projections,
512                filters_serialized,
513                limit.into(),
514            )
515            .await;
516
517            <Arc<dyn ExecutionPlan>>::try_from(&df_result!(maybe_plan)?)?
518        };
519
520        Ok(plan)
521    }
522
523    /// Tests whether the table provider can make use of a filter expression
524    /// to optimize data retrieval.
525    fn supports_filters_pushdown(
526        &self,
527        filters: &[&Expr],
528    ) -> Result<Vec<TableProviderFilterPushDown>> {
529        unsafe {
530            let pushdown_fn = match self.0.supports_filters_pushdown {
531                Some(func) => func,
532                None => {
533                    return Ok(vec![
534                        TableProviderFilterPushDown::Unsupported;
535                        filters.len()
536                    ]);
537                }
538            };
539
540            let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
541
542            let expr_list = LogicalExprList {
543                expr: serialize_exprs(
544                    filters.iter().map(|f| f.to_owned()),
545                    codec.as_ref(),
546                )?,
547            };
548            let serialized_filters = expr_list.encode_to_vec();
549
550            let pushdowns = df_result!(pushdown_fn(
551                &self.0,
552                serialized_filters.into_iter().collect()
553            ))?;
554
555            Ok(pushdowns.iter().map(|v| v.into()).collect())
556        }
557    }
558
559    async fn insert_into(
560        &self,
561        session: &dyn Session,
562        input: Arc<dyn ExecutionPlan>,
563        insert_op: InsertOp,
564    ) -> Result<Arc<dyn ExecutionPlan>> {
565        let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
566
567        let rc = Handle::try_current().ok();
568        let input = FFI_ExecutionPlan::new(input, rc);
569        let insert_op: FFI_InsertOp = insert_op.into();
570
571        let plan = unsafe {
572            let maybe_plan =
573                (self.0.insert_into)(&self.0, session, &input, insert_op).await;
574
575            <Arc<dyn ExecutionPlan>>::try_from(&df_result!(maybe_plan)?)?
576        };
577
578        Ok(plan)
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use arrow::datatypes::Schema;
585    use datafusion::prelude::{SessionContext, col, lit};
586    use datafusion_execution::TaskContextProvider;
587
588    use super::*;
589
590    fn create_test_table_provider() -> Result<Arc<dyn TableProvider>> {
591        use arrow::datatypes::Field;
592        use datafusion::arrow::array::Float32Array;
593        use datafusion::arrow::datatypes::DataType;
594        use datafusion::arrow::record_batch::RecordBatch;
595        use datafusion::datasource::MemTable;
596
597        let schema =
598            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
599
600        // define data in two partitions
601        let batch1 = RecordBatch::try_new(
602            Arc::clone(&schema),
603            vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
604        )?;
605        let batch2 = RecordBatch::try_new(
606            Arc::clone(&schema),
607            vec![Arc::new(Float32Array::from(vec![64.0]))],
608        )?;
609
610        Ok(Arc::new(MemTable::try_new(
611            schema,
612            vec![vec![batch1], vec![batch2]],
613        )?))
614    }
615
616    #[tokio::test]
617    async fn test_round_trip_ffi_table_provider_scan() -> Result<()> {
618        let provider = create_test_table_provider()?;
619        let ctx = Arc::new(SessionContext::new());
620        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
621        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
622
623        let mut ffi_provider =
624            FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
625        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
626
627        let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
628
629        ctx.register_table("t", foreign_table_provider)?;
630
631        let df = ctx.table("t").await?;
632
633        df.select(vec![col("a")])?
634            .filter(col("a").gt(lit(3.0)))?
635            .show()
636            .await?;
637
638        Ok(())
639    }
640
641    #[tokio::test]
642    async fn test_round_trip_ffi_table_provider_insert_into() -> Result<()> {
643        let provider = create_test_table_provider()?;
644        let ctx = Arc::new(SessionContext::new());
645        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
646        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
647
648        let mut ffi_provider =
649            FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
650        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
651
652        let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
653
654        ctx.register_table("t", foreign_table_provider)?;
655
656        let result = ctx
657            .sql("INSERT INTO t VALUES (128.0);")
658            .await?
659            .collect()
660            .await?;
661
662        assert!(result.len() == 1 && result[0].num_rows() == 1);
663
664        ctx.table("t")
665            .await?
666            .select(vec![col("a")])?
667            .filter(col("a").gt(lit(3.0)))?
668            .show()
669            .await?;
670
671        Ok(())
672    }
673
674    #[tokio::test]
675    async fn test_aggregation() -> Result<()> {
676        use arrow::datatypes::Field;
677        use datafusion::arrow::array::Float32Array;
678        use datafusion::arrow::datatypes::DataType;
679        use datafusion::arrow::record_batch::RecordBatch;
680        use datafusion::common::assert_batches_eq;
681        use datafusion::datasource::MemTable;
682
683        let schema =
684            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
685
686        // define data in two partitions
687        let batch1 = RecordBatch::try_new(
688            Arc::clone(&schema),
689            vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
690        )?;
691
692        let ctx = Arc::new(SessionContext::new());
693        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
694        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
695
696        let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch1]])?);
697
698        let mut ffi_provider =
699            FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
700        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
701
702        let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
703
704        ctx.register_table("t", foreign_table_provider)?;
705
706        let result = ctx
707            .sql("SELECT COUNT(*) as cnt FROM t")
708            .await?
709            .collect()
710            .await?;
711        #[rustfmt::skip]
712        let expected = [
713            "+-----+",
714            "| cnt |",
715            "+-----+",
716            "| 3   |",
717            "+-----+"
718        ];
719        assert_batches_eq!(expected, &result);
720        Ok(())
721    }
722
723    #[test]
724    fn test_ffi_table_provider_local_bypass() -> Result<()> {
725        let table_provider = create_test_table_provider()?;
726
727        let ctx = Arc::new(SessionContext::new()) as Arc<dyn TaskContextProvider>;
728        let task_ctx_provider = FFI_TaskContextProvider::from(&ctx);
729        let mut ffi_table =
730            FFI_TableProvider::new(table_provider, false, None, task_ctx_provider, None);
731
732        // Verify local libraries can be downcast to their original
733        let foreign_table: Arc<dyn TableProvider> = (&ffi_table).into();
734        assert!(
735            foreign_table
736                .downcast_ref::<datafusion::datasource::MemTable>()
737                .is_some()
738        );
739
740        // Verify different library markers generate foreign providers
741        ffi_table.library_marker_id = crate::mock_foreign_marker_id;
742        let foreign_table: Arc<dyn TableProvider> = (&ffi_table).into();
743        assert!(
744            foreign_table
745                .downcast_ref::<ForeignTableProvider>()
746                .is_some()
747        );
748
749        Ok(())
750    }
751
752    #[tokio::test]
753    async fn test_scan_with_none_projection_returns_all_columns() -> Result<()> {
754        use arrow::datatypes::Field;
755        use datafusion::arrow::array::Float32Array;
756        use datafusion::arrow::datatypes::DataType;
757        use datafusion::arrow::record_batch::RecordBatch;
758        use datafusion::datasource::MemTable;
759        use datafusion::physical_plan::collect;
760
761        let schema = Arc::new(Schema::new(vec![
762            Field::new("a", DataType::Float32, false),
763            Field::new("b", DataType::Float32, false),
764            Field::new("c", DataType::Float32, false),
765        ]));
766
767        let batch = RecordBatch::try_new(
768            Arc::clone(&schema),
769            vec![
770                Arc::new(Float32Array::from(vec![1.0, 2.0])),
771                Arc::new(Float32Array::from(vec![3.0, 4.0])),
772                Arc::new(Float32Array::from(vec![5.0, 6.0])),
773            ],
774        )?;
775
776        let provider =
777            Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
778
779        let ctx = Arc::new(SessionContext::new());
780        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
781        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
782
783        // Wrap in FFI and force the foreign path (not local bypass)
784        let mut ffi_provider =
785            FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
786        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
787
788        let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
789
790        // Call scan with projection=None, meaning "return all columns"
791        let plan = foreign_table_provider
792            .scan(&ctx.state(), None, &[], None)
793            .await?;
794        assert_eq!(
795            plan.schema().fields().len(),
796            3,
797            "scan(projection=None) should return all columns; got {}",
798            plan.schema().fields().len()
799        );
800
801        // Also verify we can execute and get correct data
802        let batches = collect(plan, ctx.task_ctx()).await?;
803        assert_eq!(batches.len(), 1);
804        assert_eq!(batches[0].num_columns(), 3);
805        assert_eq!(batches[0].num_rows(), 2);
806
807        Ok(())
808    }
809
810    #[test]
811    fn test_ffi_table_provider_statistics_round_trip() -> Result<()> {
812        use arrow::datatypes::{DataType, Field};
813        use datafusion::arrow::array::Int32Array;
814        use datafusion::arrow::record_batch::RecordBatch;
815        use datafusion::datasource::MemTable;
816        use datafusion_common::stats::Precision;
817        use datafusion_common::{ColumnStatistics, ScalarValue};
818
819        // A thin wrapper that lets us inject statistics onto any TableProvider.
820        #[derive(Debug)]
821        struct TableWithStats {
822            inner: Arc<dyn TableProvider>,
823            stats: Option<Statistics>,
824        }
825
826        #[async_trait]
827        impl TableProvider for TableWithStats {
828            fn schema(&self) -> SchemaRef {
829                self.inner.schema()
830            }
831            fn table_type(&self) -> TableType {
832                self.inner.table_type()
833            }
834            fn statistics(&self) -> Option<Statistics> {
835                self.stats.clone()
836            }
837            async fn scan(
838                &self,
839                session: &dyn Session,
840                projection: Option<&Vec<usize>>,
841                filters: &[Expr],
842                limit: Option<usize>,
843            ) -> Result<Arc<dyn ExecutionPlan>> {
844                self.inner.scan(session, projection, filters, limit).await
845            }
846        }
847
848        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
849
850        let batch = RecordBatch::try_new(
851            Arc::clone(&schema),
852            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
853        )?;
854
855        let ctx = Arc::new(SessionContext::new());
856        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
857        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
858
859        // Provider without statistics should cross the boundary as None.
860        let no_stats_inner = Arc::new(MemTable::try_new(
861            Arc::clone(&schema),
862            vec![vec![batch.clone()]],
863        )?);
864        let no_stats_provider = Arc::new(TableWithStats {
865            inner: no_stats_inner,
866            stats: None,
867        });
868        let mut ffi_provider = FFI_TableProvider::new(
869            no_stats_provider,
870            true,
871            None,
872            task_ctx_provider.clone(),
873            None,
874        );
875        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
876        let foreign: Arc<dyn TableProvider> = (&ffi_provider).into();
877        assert!(foreign.statistics().is_none());
878
879        // Provider with statistics should round-trip faithfully.
880        let original_stats = Statistics {
881            num_rows: Precision::Exact(3),
882            total_byte_size: Precision::Inexact(12),
883            column_statistics: vec![ColumnStatistics {
884                null_count: Precision::Exact(0),
885                max_value: Precision::Exact(ScalarValue::Int32(Some(3))),
886                min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
887                sum_value: Precision::Exact(ScalarValue::Int64(Some(6))),
888                distinct_count: Precision::Exact(3),
889                byte_size: Precision::Exact(12),
890            }],
891        };
892        let stats_inner =
893            Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
894        let stats_provider = Arc::new(TableWithStats {
895            inner: stats_inner,
896            stats: Some(original_stats.clone()),
897        });
898        let mut ffi_provider =
899            FFI_TableProvider::new(stats_provider, true, None, task_ctx_provider, None);
900        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
901        let foreign: Arc<dyn TableProvider> = (&ffi_provider).into();
902        assert_eq!(foreign.statistics().as_ref(), Some(&original_stats));
903
904        Ok(())
905    }
906}