Skip to main content

datafusion_ffi/
table_provider.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::ffi::c_void;
20use std::sync::Arc;
21
22use abi_stable::StableAbi;
23use abi_stable::std_types::{ROption, RResult, RVec};
24use arrow::datatypes::SchemaRef;
25use async_ffi::{FfiFuture, FutureExt};
26use async_trait::async_trait;
27use datafusion_catalog::{Session, TableProvider};
28use datafusion_common::error::{DataFusionError, Result};
29use datafusion_execution::TaskContext;
30use datafusion_expr::dml::InsertOp;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datafusion_physical_plan::ExecutionPlan;
33use datafusion_proto::logical_plan::from_proto::parse_exprs;
34use datafusion_proto::logical_plan::to_proto::serialize_exprs;
35use datafusion_proto::logical_plan::{
36    DefaultLogicalExtensionCodec, LogicalExtensionCodec,
37};
38use datafusion_proto::protobuf::LogicalExprList;
39use prost::Message;
40use tokio::runtime::Handle;
41
42use super::execution_plan::FFI_ExecutionPlan;
43use super::insert_op::FFI_InsertOp;
44use crate::arrow_wrappers::WrappedSchema;
45use crate::execution::FFI_TaskContextProvider;
46use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
47use crate::session::{FFI_SessionRef, ForeignSession};
48use crate::table_source::{FFI_TableProviderFilterPushDown, FFI_TableType};
49use crate::util::FFIResult;
50use crate::{df_result, rresult_return};
51
52/// A stable struct for sharing [`TableProvider`] across FFI boundaries.
53///
54/// # Struct Layout
55///
56/// The following description applies to all structs provided in this crate.
57///
58/// Each of the exposed structs in this crate is provided with a variant prefixed
59/// with `Foreign`. This variant is designed to be used by the consumer of the
60/// foreign code. The `Foreign` structs should _never_ access the `private_data`
61/// fields. Instead they should only access the data returned through the function
62/// calls defined on the `FFI_` structs. The second purpose of the `Foreign`
63/// structs is to contain additional data that may be needed by the traits that
64/// are implemented on them. Some of these traits require borrowing data which
65/// can be far more convenient to be locally stored.
66///
67/// For example, we have a struct `FFI_TableProvider` to give access to the
68/// `TableProvider` functions like `table_type()` and `scan()`. If we write a
69/// library that wishes to expose it's `TableProvider`, then we can access the
70/// private data that contains the Arc reference to the `TableProvider` via
71/// `FFI_TableProvider`. This data is local to the library.
72///
73/// If we have a program that accesses a `TableProvider` via FFI, then it
74/// will use `ForeignTableProvider`. When using `ForeignTableProvider` we **must**
75/// not attempt to access the `private_data` field in `FFI_TableProvider`. If a
76/// user is testing locally, you may be able to successfully access this field, but
77/// it will only work if you are building against the exact same version of
78/// `DataFusion` for both libraries **and** the same compiler. It will not work
79/// in general.
80///
81/// It is worth noting that which library is the `local` and which is `foreign`
82/// depends on which interface we are considering. For example, suppose we have a
83/// Python library called `my_provider` that exposes a `TableProvider` called
84/// `MyProvider` via `FFI_TableProvider`. Within the library `my_provider` we can
85/// access the `private_data` via `FFI_TableProvider`. We connect this to
86/// `datafusion-python`, where we access it as a `ForeignTableProvider`. Now when
87/// we call `scan()` on this interface, we have to pass it a `FFI_SessionConfig`.
88/// The `SessionConfig` is local to `datafusion-python` and **not** `my_provider`.
89/// It is important to be careful when expanding these functions to be certain which
90/// side of the interface each object refers to.
91#[repr(C)]
92#[derive(Debug, StableAbi)]
93pub struct FFI_TableProvider {
94    /// Return the table schema
95    schema: unsafe extern "C" fn(provider: &Self) -> WrappedSchema,
96
97    /// Perform a scan on the table. See [`TableProvider`] for detailed usage information.
98    ///
99    /// # Arguments
100    ///
101    /// * `provider` - the table provider
102    /// * `session` - session
103    /// * `projections` - if specified, only a subset of the columns are returned
104    /// * `filters_serialized` - filters to apply to the scan, which are a
105    ///   [`LogicalExprList`] protobuf message serialized into bytes to pass
106    ///   across the FFI boundary.
107    /// * `limit` - if specified, limit the number of rows returned
108    scan: unsafe extern "C" fn(
109        provider: &Self,
110        session: FFI_SessionRef,
111        projections: ROption<RVec<usize>>,
112        filters_serialized: RVec<u8>,
113        limit: ROption<usize>,
114    ) -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
115
116    /// Return the type of table. See [`TableType`] for options.
117    table_type: unsafe extern "C" fn(provider: &Self) -> FFI_TableType,
118
119    /// Based upon the input filters, identify which are supported. The filters
120    /// are a [`LogicalExprList`] protobuf message serialized into bytes to pass
121    /// across the FFI boundary.
122    supports_filters_pushdown: Option<
123        unsafe extern "C" fn(
124            provider: &FFI_TableProvider,
125            filters_serialized: RVec<u8>,
126        ) -> FFIResult<RVec<FFI_TableProviderFilterPushDown>>,
127    >,
128
129    insert_into: unsafe extern "C" fn(
130        provider: &Self,
131        session: FFI_SessionRef,
132        input: &FFI_ExecutionPlan,
133        insert_op: FFI_InsertOp,
134    ) -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
135
136    pub logical_codec: FFI_LogicalExtensionCodec,
137
138    /// Used to create a clone on the provider of the execution plan. This should
139    /// only need to be called by the receiver of the plan.
140    clone: unsafe extern "C" fn(plan: &Self) -> Self,
141
142    /// Release the memory of the private data when it is no longer being used.
143    release: unsafe extern "C" fn(arg: &mut Self),
144
145    /// Return the major DataFusion version number of this provider.
146    pub version: unsafe extern "C" fn() -> u64,
147
148    /// Internal data. This is only to be accessed by the provider of the plan.
149    /// A [`ForeignTableProvider`] should never attempt to access this data.
150    private_data: *mut c_void,
151
152    /// Utility to identify when FFI objects are accessed locally through
153    /// the foreign interface. See [`crate::get_library_marker_id`] and
154    /// the crate's `README.md` for more information.
155    pub library_marker_id: extern "C" fn() -> usize,
156}
157
158unsafe impl Send for FFI_TableProvider {}
159unsafe impl Sync for FFI_TableProvider {}
160
161struct ProviderPrivateData {
162    provider: Arc<dyn TableProvider + Send>,
163    runtime: Option<Handle>,
164}
165
166impl FFI_TableProvider {
167    fn inner(&self) -> &Arc<dyn TableProvider + Send> {
168        let private_data = self.private_data as *const ProviderPrivateData;
169        unsafe { &(*private_data).provider }
170    }
171
172    fn runtime(&self) -> &Option<Handle> {
173        let private_data = self.private_data as *const ProviderPrivateData;
174        unsafe { &(*private_data).runtime }
175    }
176}
177
178unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema {
179    provider.inner().schema().into()
180}
181
182unsafe extern "C" fn table_type_fn_wrapper(
183    provider: &FFI_TableProvider,
184) -> FFI_TableType {
185    provider.inner().table_type().into()
186}
187
188fn supports_filters_pushdown_internal(
189    provider: &Arc<dyn TableProvider + Send>,
190    filters_serialized: &[u8],
191    task_ctx: &Arc<TaskContext>,
192    codec: &dyn LogicalExtensionCodec,
193) -> Result<RVec<FFI_TableProviderFilterPushDown>> {
194    let filters = match filters_serialized.is_empty() {
195        true => vec![],
196        false => {
197            let proto_filters = LogicalExprList::decode(filters_serialized)
198                .map_err(|e| DataFusionError::Plan(e.to_string()))?;
199
200            parse_exprs(proto_filters.expr.iter(), task_ctx.as_ref(), codec)?
201        }
202    };
203    let filters_borrowed: Vec<&Expr> = filters.iter().collect();
204
205    let results: RVec<_> = provider
206        .supports_filters_pushdown(&filters_borrowed)?
207        .iter()
208        .map(|v| v.into())
209        .collect();
210
211    Ok(results)
212}
213
214unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
215    provider: &FFI_TableProvider,
216    filters_serialized: RVec<u8>,
217) -> FFIResult<RVec<FFI_TableProviderFilterPushDown>> {
218    let logical_codec: Arc<dyn LogicalExtensionCodec> = (&provider.logical_codec).into();
219    let task_ctx = rresult_return!(<Arc<TaskContext>>::try_from(
220        &provider.logical_codec.task_ctx_provider
221    ));
222    supports_filters_pushdown_internal(
223        provider.inner(),
224        &filters_serialized,
225        &task_ctx,
226        logical_codec.as_ref(),
227    )
228    .map_err(|e| e.to_string().into())
229    .into()
230}
231
232unsafe extern "C" fn scan_fn_wrapper(
233    provider: &FFI_TableProvider,
234    session: FFI_SessionRef,
235    projections: ROption<RVec<usize>>,
236    filters_serialized: RVec<u8>,
237    limit: ROption<usize>,
238) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
239    let task_ctx: Result<Arc<TaskContext>, DataFusionError> =
240        (&provider.logical_codec.task_ctx_provider).try_into();
241    let runtime = provider.runtime().clone();
242    let logical_codec: Arc<dyn LogicalExtensionCodec> = (&provider.logical_codec).into();
243    let internal_provider = Arc::clone(provider.inner());
244
245    async move {
246        let mut foreign_session = None;
247        let session = rresult_return!(
248            session
249                .as_local()
250                .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
251                .unwrap_or_else(|| {
252                    foreign_session = Some(ForeignSession::try_from(&session)?);
253                    Ok(foreign_session.as_ref().unwrap())
254                })
255        );
256
257        let task_ctx = rresult_return!(task_ctx);
258        let filters = match filters_serialized.is_empty() {
259            true => vec![],
260            false => {
261                let proto_filters =
262                    rresult_return!(LogicalExprList::decode(filters_serialized.as_ref()));
263
264                rresult_return!(parse_exprs(
265                    proto_filters.expr.iter(),
266                    task_ctx.as_ref(),
267                    logical_codec.as_ref(),
268                ))
269            }
270        };
271
272        let projections: Option<Vec<usize>> =
273            projections.into_option().map(|p| p.into_iter().collect());
274
275        let plan = rresult_return!(
276            internal_provider
277                .scan(session, projections.as_ref(), &filters, limit.into())
278                .await
279        );
280
281        RResult::ROk(FFI_ExecutionPlan::new(plan, runtime.clone()))
282    }
283    .into_ffi()
284}
285
286unsafe extern "C" fn insert_into_fn_wrapper(
287    provider: &FFI_TableProvider,
288    session: FFI_SessionRef,
289    input: &FFI_ExecutionPlan,
290    insert_op: FFI_InsertOp,
291) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
292    let runtime = provider.runtime().clone();
293    let internal_provider = Arc::clone(provider.inner());
294    let input = input.clone();
295
296    async move {
297        let mut foreign_session = None;
298        let session = rresult_return!(
299            session
300                .as_local()
301                .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
302                .unwrap_or_else(|| {
303                    foreign_session = Some(ForeignSession::try_from(&session)?);
304                    Ok(foreign_session.as_ref().unwrap())
305                })
306        );
307
308        let input = rresult_return!(<Arc<dyn ExecutionPlan>>::try_from(&input));
309
310        let insert_op = InsertOp::from(insert_op);
311
312        let plan = rresult_return!(
313            internal_provider
314                .insert_into(session, input, insert_op)
315                .await
316        );
317
318        RResult::ROk(FFI_ExecutionPlan::new(plan, runtime.clone()))
319    }
320    .into_ffi()
321}
322
323unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) {
324    unsafe {
325        debug_assert!(!provider.private_data.is_null());
326        let private_data =
327            Box::from_raw(provider.private_data as *mut ProviderPrivateData);
328        drop(private_data);
329        provider.private_data = std::ptr::null_mut();
330    }
331}
332
333unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider {
334    let runtime = provider.runtime().clone();
335    let old_provider = Arc::clone(provider.inner());
336
337    let private_data = Box::into_raw(Box::new(ProviderPrivateData {
338        provider: old_provider,
339        runtime,
340    })) as *mut c_void;
341
342    FFI_TableProvider {
343        schema: schema_fn_wrapper,
344        scan: scan_fn_wrapper,
345        table_type: table_type_fn_wrapper,
346        supports_filters_pushdown: provider.supports_filters_pushdown,
347        insert_into: provider.insert_into,
348        logical_codec: provider.logical_codec.clone(),
349        clone: clone_fn_wrapper,
350        release: release_fn_wrapper,
351        version: super::version,
352        private_data,
353        library_marker_id: crate::get_library_marker_id,
354    }
355}
356
357impl Drop for FFI_TableProvider {
358    fn drop(&mut self) {
359        unsafe { (self.release)(self) }
360    }
361}
362
363impl FFI_TableProvider {
364    /// Creates a new [`FFI_TableProvider`].
365    pub fn new(
366        provider: Arc<dyn TableProvider + Send>,
367        can_support_pushdown_filters: bool,
368        runtime: Option<Handle>,
369        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
370        logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
371    ) -> Self {
372        let task_ctx_provider = task_ctx_provider.into();
373        let logical_codec =
374            logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
375        let logical_codec = FFI_LogicalExtensionCodec::new(
376            logical_codec,
377            runtime.clone(),
378            task_ctx_provider.clone(),
379        );
380        Self::new_with_ffi_codec(
381            provider,
382            can_support_pushdown_filters,
383            runtime,
384            logical_codec,
385        )
386    }
387
388    pub fn new_with_ffi_codec(
389        provider: Arc<dyn TableProvider + Send>,
390        can_support_pushdown_filters: bool,
391        runtime: Option<Handle>,
392        logical_codec: FFI_LogicalExtensionCodec,
393    ) -> Self {
394        if let Some(provider) = provider.as_any().downcast_ref::<ForeignTableProvider>() {
395            return provider.0.clone();
396        }
397        let private_data = Box::new(ProviderPrivateData { provider, runtime });
398
399        Self {
400            schema: schema_fn_wrapper,
401            scan: scan_fn_wrapper,
402            table_type: table_type_fn_wrapper,
403            supports_filters_pushdown: match can_support_pushdown_filters {
404                true => Some(supports_filters_pushdown_fn_wrapper),
405                false => None,
406            },
407            insert_into: insert_into_fn_wrapper,
408            logical_codec,
409            clone: clone_fn_wrapper,
410            release: release_fn_wrapper,
411            version: super::version,
412            private_data: Box::into_raw(private_data) as *mut c_void,
413            library_marker_id: crate::get_library_marker_id,
414        }
415    }
416}
417
418/// This wrapper struct exists on the receiver side of the FFI interface, so it has
419/// no guarantees about being able to access the data in `private_data`. Any functions
420/// defined on this struct must only use the stable functions provided in
421/// FFI_TableProvider to interact with the foreign table provider.
422#[derive(Debug)]
423pub struct ForeignTableProvider(pub FFI_TableProvider);
424
425unsafe impl Send for ForeignTableProvider {}
426unsafe impl Sync for ForeignTableProvider {}
427
428impl From<&FFI_TableProvider> for Arc<dyn TableProvider> {
429    fn from(provider: &FFI_TableProvider) -> Self {
430        if (provider.library_marker_id)() == crate::get_library_marker_id() {
431            Arc::clone(provider.inner()) as Arc<dyn TableProvider>
432        } else {
433            Arc::new(ForeignTableProvider(provider.clone()))
434        }
435    }
436}
437
438impl Clone for FFI_TableProvider {
439    fn clone(&self) -> Self {
440        unsafe { (self.clone)(self) }
441    }
442}
443
444#[async_trait]
445impl TableProvider for ForeignTableProvider {
446    fn as_any(&self) -> &dyn Any {
447        self
448    }
449
450    fn schema(&self) -> SchemaRef {
451        let wrapped_schema = unsafe { (self.0.schema)(&self.0) };
452        wrapped_schema.into()
453    }
454
455    fn table_type(&self) -> TableType {
456        unsafe { (self.0.table_type)(&self.0).into() }
457    }
458
459    async fn scan(
460        &self,
461        session: &dyn Session,
462        projection: Option<&Vec<usize>>,
463        filters: &[Expr],
464        limit: Option<usize>,
465    ) -> Result<Arc<dyn ExecutionPlan>> {
466        let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
467
468        let projections: ROption<RVec<usize>> = projection
469            .map(|p| p.iter().map(|v| v.to_owned()).collect())
470            .into();
471
472        let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
473        let filter_list = LogicalExprList {
474            expr: serialize_exprs(filters, codec.as_ref())?,
475        };
476        let filters_serialized = filter_list.encode_to_vec().into();
477
478        let plan = unsafe {
479            let maybe_plan = (self.0.scan)(
480                &self.0,
481                session,
482                projections,
483                filters_serialized,
484                limit.into(),
485            )
486            .await;
487
488            <Arc<dyn ExecutionPlan>>::try_from(&df_result!(maybe_plan)?)?
489        };
490
491        Ok(plan)
492    }
493
494    /// Tests whether the table provider can make use of a filter expression
495    /// to optimize data retrieval.
496    fn supports_filters_pushdown(
497        &self,
498        filters: &[&Expr],
499    ) -> Result<Vec<TableProviderFilterPushDown>> {
500        unsafe {
501            let pushdown_fn = match self.0.supports_filters_pushdown {
502                Some(func) => func,
503                None => {
504                    return Ok(vec![
505                        TableProviderFilterPushDown::Unsupported;
506                        filters.len()
507                    ]);
508                }
509            };
510
511            let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
512
513            let expr_list = LogicalExprList {
514                expr: serialize_exprs(
515                    filters.iter().map(|f| f.to_owned()),
516                    codec.as_ref(),
517                )?,
518            };
519            let serialized_filters = expr_list.encode_to_vec();
520
521            let pushdowns = df_result!(pushdown_fn(&self.0, serialized_filters.into()))?;
522
523            Ok(pushdowns.iter().map(|v| v.into()).collect())
524        }
525    }
526
527    async fn insert_into(
528        &self,
529        session: &dyn Session,
530        input: Arc<dyn ExecutionPlan>,
531        insert_op: InsertOp,
532    ) -> Result<Arc<dyn ExecutionPlan>> {
533        let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
534
535        let rc = Handle::try_current().ok();
536        let input = FFI_ExecutionPlan::new(input, rc);
537        let insert_op: FFI_InsertOp = insert_op.into();
538
539        let plan = unsafe {
540            let maybe_plan =
541                (self.0.insert_into)(&self.0, session, &input, insert_op).await;
542
543            <Arc<dyn ExecutionPlan>>::try_from(&df_result!(maybe_plan)?)?
544        };
545
546        Ok(plan)
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use arrow::datatypes::Schema;
553    use datafusion::prelude::{SessionContext, col, lit};
554    use datafusion_execution::TaskContextProvider;
555
556    use super::*;
557
558    fn create_test_table_provider() -> Result<Arc<dyn TableProvider>> {
559        use arrow::datatypes::Field;
560        use datafusion::arrow::array::Float32Array;
561        use datafusion::arrow::datatypes::DataType;
562        use datafusion::arrow::record_batch::RecordBatch;
563        use datafusion::datasource::MemTable;
564
565        let schema =
566            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
567
568        // define data in two partitions
569        let batch1 = RecordBatch::try_new(
570            Arc::clone(&schema),
571            vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
572        )?;
573        let batch2 = RecordBatch::try_new(
574            Arc::clone(&schema),
575            vec![Arc::new(Float32Array::from(vec![64.0]))],
576        )?;
577
578        Ok(Arc::new(MemTable::try_new(
579            schema,
580            vec![vec![batch1], vec![batch2]],
581        )?))
582    }
583
584    #[tokio::test]
585    async fn test_round_trip_ffi_table_provider_scan() -> Result<()> {
586        let provider = create_test_table_provider()?;
587        let ctx = Arc::new(SessionContext::new());
588        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
589        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
590
591        let mut ffi_provider =
592            FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
593        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
594
595        let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
596
597        ctx.register_table("t", foreign_table_provider)?;
598
599        let df = ctx.table("t").await?;
600
601        df.select(vec![col("a")])?
602            .filter(col("a").gt(lit(3.0)))?
603            .show()
604            .await?;
605
606        Ok(())
607    }
608
609    #[tokio::test]
610    async fn test_round_trip_ffi_table_provider_insert_into() -> Result<()> {
611        let provider = create_test_table_provider()?;
612        let ctx = Arc::new(SessionContext::new());
613        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
614        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
615
616        let mut ffi_provider =
617            FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
618        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
619
620        let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
621
622        ctx.register_table("t", foreign_table_provider)?;
623
624        let result = ctx
625            .sql("INSERT INTO t VALUES (128.0);")
626            .await?
627            .collect()
628            .await?;
629
630        assert!(result.len() == 1 && result[0].num_rows() == 1);
631
632        ctx.table("t")
633            .await?
634            .select(vec![col("a")])?
635            .filter(col("a").gt(lit(3.0)))?
636            .show()
637            .await?;
638
639        Ok(())
640    }
641
642    #[tokio::test]
643    async fn test_aggregation() -> Result<()> {
644        use arrow::datatypes::Field;
645        use datafusion::arrow::array::Float32Array;
646        use datafusion::arrow::datatypes::DataType;
647        use datafusion::arrow::record_batch::RecordBatch;
648        use datafusion::common::assert_batches_eq;
649        use datafusion::datasource::MemTable;
650
651        let schema =
652            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
653
654        // define data in two partitions
655        let batch1 = RecordBatch::try_new(
656            Arc::clone(&schema),
657            vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
658        )?;
659
660        let ctx = Arc::new(SessionContext::new());
661        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
662        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
663
664        let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch1]])?);
665
666        let mut ffi_provider =
667            FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
668        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
669
670        let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
671
672        ctx.register_table("t", foreign_table_provider)?;
673
674        let result = ctx
675            .sql("SELECT COUNT(*) as cnt FROM t")
676            .await?
677            .collect()
678            .await?;
679        #[rustfmt::skip]
680        let expected = [
681            "+-----+",
682            "| cnt |",
683            "+-----+",
684            "| 3   |",
685            "+-----+"
686        ];
687        assert_batches_eq!(expected, &result);
688        Ok(())
689    }
690
691    #[test]
692    fn test_ffi_table_provider_local_bypass() -> Result<()> {
693        let table_provider = create_test_table_provider()?;
694
695        let ctx = Arc::new(SessionContext::new()) as Arc<dyn TaskContextProvider>;
696        let task_ctx_provider = FFI_TaskContextProvider::from(&ctx);
697        let mut ffi_table =
698            FFI_TableProvider::new(table_provider, false, None, task_ctx_provider, None);
699
700        // Verify local libraries can be downcast to their original
701        let foreign_table: Arc<dyn TableProvider> = (&ffi_table).into();
702        assert!(
703            foreign_table
704                .as_any()
705                .downcast_ref::<datafusion::datasource::MemTable>()
706                .is_some()
707        );
708
709        // Verify different library markers generate foreign providers
710        ffi_table.library_marker_id = crate::mock_foreign_marker_id;
711        let foreign_table: Arc<dyn TableProvider> = (&ffi_table).into();
712        assert!(
713            foreign_table
714                .as_any()
715                .downcast_ref::<ForeignTableProvider>()
716                .is_some()
717        );
718
719        Ok(())
720    }
721
722    #[tokio::test]
723    async fn test_scan_with_none_projection_returns_all_columns() -> Result<()> {
724        use arrow::datatypes::Field;
725        use datafusion::arrow::array::Float32Array;
726        use datafusion::arrow::datatypes::DataType;
727        use datafusion::arrow::record_batch::RecordBatch;
728        use datafusion::datasource::MemTable;
729        use datafusion::physical_plan::collect;
730
731        let schema = Arc::new(Schema::new(vec![
732            Field::new("a", DataType::Float32, false),
733            Field::new("b", DataType::Float32, false),
734            Field::new("c", DataType::Float32, false),
735        ]));
736
737        let batch = RecordBatch::try_new(
738            Arc::clone(&schema),
739            vec![
740                Arc::new(Float32Array::from(vec![1.0, 2.0])),
741                Arc::new(Float32Array::from(vec![3.0, 4.0])),
742                Arc::new(Float32Array::from(vec![5.0, 6.0])),
743            ],
744        )?;
745
746        let provider =
747            Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
748
749        let ctx = Arc::new(SessionContext::new());
750        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
751        let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
752
753        // Wrap in FFI and force the foreign path (not local bypass)
754        let mut ffi_provider =
755            FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
756        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
757
758        let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
759
760        // Call scan with projection=None, meaning "return all columns"
761        let plan = foreign_table_provider
762            .scan(&ctx.state(), None, &[], None)
763            .await?;
764        assert_eq!(
765            plan.schema().fields().len(),
766            3,
767            "scan(projection=None) should return all columns; got {}",
768            plan.schema().fields().len()
769        );
770
771        // Also verify we can execute and get correct data
772        let batches = collect(plan, ctx.task_ctx()).await?;
773        assert_eq!(batches.len(), 1);
774        assert_eq!(batches[0].num_columns(), 3);
775        assert_eq!(batches[0].num_rows(), 2);
776
777        Ok(())
778    }
779}