Skip to main content

datafusion_ffi/session/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::collections::HashMap;
20use std::ffi::c_void;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef;
24use arrow_schema::ffi::FFI_ArrowSchema;
25use async_ffi::{FfiFuture, FutureExt};
26use async_trait::async_trait;
27use datafusion_common::config::{ConfigFileType, ConfigOptions, TableOptions};
28use datafusion_common::{DFSchema, DataFusionError};
29use datafusion_execution::TaskContext;
30use datafusion_execution::config::SessionConfig;
31use datafusion_execution::runtime_env::RuntimeEnv;
32use datafusion_expr::execution_props::ExecutionProps;
33use datafusion_expr::registry::{ExtensionTypeRegistryRef, MemoryExtensionTypeRegistry};
34use datafusion_expr::{
35    AggregateUDF, AggregateUDFImpl, Expr, HigherOrderUDF, LogicalPlan, ScalarUDF,
36    ScalarUDFImpl, WindowUDF, WindowUDFImpl,
37};
38use datafusion_physical_expr::PhysicalExpr;
39use datafusion_physical_plan::ExecutionPlan;
40use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
41use datafusion_proto::logical_plan::LogicalExtensionCodec;
42use datafusion_proto::logical_plan::from_proto::parse_expr;
43use datafusion_proto::logical_plan::to_proto::serialize_expr;
44use datafusion_proto::protobuf::LogicalExprNode;
45use datafusion_session::Session;
46use prost::Message;
47
48use stabby::str::Str as SStr;
49use stabby::string::String as SString;
50use stabby::vec::Vec as SVec;
51use tokio::runtime::Handle;
52
53use crate::arrow_wrappers::WrappedSchema;
54use crate::execution::FFI_TaskContext;
55use crate::execution_plan::FFI_ExecutionPlan;
56use crate::physical_expr::FFI_PhysicalExpr;
57use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
58use crate::session::config::FFI_SessionConfig;
59use crate::udaf::FFI_AggregateUDF;
60use crate::udf::FFI_ScalarUDF;
61use crate::udwf::FFI_WindowUDF;
62use crate::util::FFI_Result;
63use crate::{df_result, sresult, sresult_return};
64
65pub mod config;
66
67/// A stable struct for sharing [`Session`] across FFI boundaries.
68///
69/// Care must be taken when using this struct. Unlike most of the structs in
70/// this crate, the private data for [`FFI_SessionRef`] contains borrowed data.
71/// The lifetime of the borrow is lost when hidden within the ``*mut c_void``
72/// of the private data. For this reason, it is the user's responsibility to
73/// ensure the lifetime of the [`Session`] remains valid.
74///
75/// The reason for storing `&dyn Session` is because the primary motivation
76/// for implementing this struct is [`crate::table_provider::FFI_TableProvider`]
77/// which has methods that require `&dyn Session`. For usage within this crate
78/// we know the [`Session`] lifetimes are valid.
79#[repr(C)]
80#[derive(Debug)]
81pub(crate) struct FFI_SessionRef {
82    session_id: unsafe extern "C" fn(&Self) -> SStr,
83
84    config: unsafe extern "C" fn(&Self) -> FFI_SessionConfig,
85
86    create_physical_plan:
87        unsafe extern "C" fn(
88            &Self,
89            logical_plan_serialized: SVec<u8>,
90        ) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>>,
91
92    create_physical_expr: unsafe extern "C" fn(
93        &Self,
94        expr_serialized: SVec<u8>,
95        schema: WrappedSchema,
96    ) -> FFI_Result<FFI_PhysicalExpr>,
97
98    scalar_functions: unsafe extern "C" fn(&Self) -> SVec<(SString, FFI_ScalarUDF)>,
99
100    aggregate_functions: unsafe extern "C" fn(&Self) -> SVec<(SString, FFI_AggregateUDF)>,
101
102    window_functions: unsafe extern "C" fn(&Self) -> SVec<(SString, FFI_WindowUDF)>,
103
104    table_options: unsafe extern "C" fn(&Self) -> SVec<(SString, SString)>,
105
106    default_table_options: unsafe extern "C" fn(&Self) -> SVec<(SString, SString)>,
107
108    task_ctx: unsafe extern "C" fn(&Self) -> FFI_TaskContext,
109
110    logical_codec: FFI_LogicalExtensionCodec,
111
112    /// Used to create a clone on the provider of the registry. This should
113    /// only need to be called by the receiver of the plan.
114    clone: unsafe extern "C" fn(plan: &Self) -> Self,
115
116    /// Release the memory of the private data when it is no longer being used.
117    release: unsafe extern "C" fn(arg: &mut Self),
118
119    /// Return the major DataFusion version number of this registry.
120    pub version: unsafe extern "C" fn() -> u64,
121
122    /// Internal data. This is only to be accessed by the provider of the plan.
123    /// A [`ForeignSession`] should never attempt to access this data.
124    private_data: *mut c_void,
125
126    /// Utility to identify when FFI objects are accessed locally through
127    /// the foreign interface.
128    pub library_marker_id: extern "C" fn() -> usize,
129}
130
131unsafe impl Send for FFI_SessionRef {}
132unsafe impl Sync for FFI_SessionRef {}
133
134struct SessionPrivateData<'a> {
135    session: &'a (dyn Session + Send + Sync),
136    runtime: Option<Handle>,
137}
138
139impl FFI_SessionRef {
140    fn inner(&self) -> &(dyn Session + Send + Sync) {
141        let private_data = self.private_data as *const SessionPrivateData;
142        unsafe { (*private_data).session }
143    }
144
145    unsafe fn runtime(&self) -> &Option<Handle> {
146        unsafe {
147            let private_data = self.private_data as *const SessionPrivateData;
148            &(*private_data).runtime
149        }
150    }
151}
152
153unsafe extern "C" fn session_id_fn_wrapper(session: &FFI_SessionRef) -> SStr<'_> {
154    let session = session.inner();
155    session.session_id().into()
156}
157
158unsafe extern "C" fn config_fn_wrapper(session: &FFI_SessionRef) -> FFI_SessionConfig {
159    let session = session.inner();
160    session.config().into()
161}
162
163unsafe extern "C" fn create_physical_plan_fn_wrapper(
164    session: &FFI_SessionRef,
165    logical_plan_serialized: SVec<u8>,
166) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>> {
167    unsafe {
168        let runtime = session.runtime().clone();
169        let session = session.clone();
170        async move {
171            let session = session.inner();
172            let task_ctx = session.task_ctx();
173
174            let logical_plan = sresult_return!(logical_plan_from_bytes(
175                logical_plan_serialized.as_slice(),
176                task_ctx.as_ref(),
177            ));
178
179            let physical_plan = session.create_physical_plan(&logical_plan).await;
180
181            sresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime)))
182        }
183        .into_ffi()
184    }
185}
186
187unsafe extern "C" fn create_physical_expr_fn_wrapper(
188    session: &FFI_SessionRef,
189    expr_serialized: SVec<u8>,
190    schema: WrappedSchema,
191) -> FFI_Result<FFI_PhysicalExpr> {
192    let codec: Arc<dyn LogicalExtensionCodec> = (&session.logical_codec).into();
193    let session = session.inner();
194
195    let logical_expr = LogicalExprNode::decode(expr_serialized.as_slice()).unwrap();
196    let logical_expr =
197        parse_expr(&logical_expr, session.task_ctx().as_ref(), codec.as_ref()).unwrap();
198    let schema: SchemaRef = schema.into();
199    let schema: DFSchema = sresult_return!(schema.try_into());
200
201    let physical_expr =
202        sresult_return!(session.create_physical_expr(logical_expr, &schema));
203
204    FFI_Result::Ok(physical_expr.into())
205}
206
207unsafe extern "C" fn scalar_functions_fn_wrapper(
208    session: &FFI_SessionRef,
209) -> SVec<(SString, FFI_ScalarUDF)> {
210    let session = session.inner();
211    session
212        .scalar_functions()
213        .iter()
214        .map(|(name, udf)| (name.clone().into(), FFI_ScalarUDF::from(Arc::clone(udf))))
215        .collect()
216}
217
218unsafe extern "C" fn aggregate_functions_fn_wrapper(
219    session: &FFI_SessionRef,
220) -> SVec<(SString, FFI_AggregateUDF)> {
221    let session = session.inner();
222    session
223        .aggregate_functions()
224        .iter()
225        .map(|(name, udaf)| {
226            (
227                name.clone().into(),
228                FFI_AggregateUDF::from(Arc::clone(udaf)),
229            )
230        })
231        .collect()
232}
233
234unsafe extern "C" fn window_functions_fn_wrapper(
235    session: &FFI_SessionRef,
236) -> SVec<(SString, FFI_WindowUDF)> {
237    let session = session.inner();
238    session
239        .window_functions()
240        .iter()
241        .map(|(name, udwf)| (name.clone().into(), FFI_WindowUDF::from(Arc::clone(udwf))))
242        .collect()
243}
244
245fn table_options_to_rhash(mut options: TableOptions) -> SVec<(SString, SString)> {
246    // It is important that we mutate options here and set current format
247    // to None so that when we call `entries()` we get ALL format entries.
248    // We will pass current_format as a special case and strip it on the
249    // other side of the boundary.
250    let current_format = options.current_format.take();
251    let mut options: HashMap<SString, SString> = options
252        .entries()
253        .into_iter()
254        .filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into())))
255        .collect();
256    if let Some(current_format) = current_format {
257        options.insert(
258            "datafusion_ffi.table_current_format".into(),
259            match current_format {
260                ConfigFileType::JSON => "json",
261                ConfigFileType::PARQUET => "parquet",
262                ConfigFileType::CSV => "csv",
263            }
264            .into(),
265        );
266    }
267
268    options.into_iter().collect()
269}
270
271unsafe extern "C" fn table_options_fn_wrapper(
272    session: &FFI_SessionRef,
273) -> SVec<(SString, SString)> {
274    let session = session.inner();
275    let table_options = session.table_options();
276    table_options_to_rhash(table_options.clone())
277}
278
279unsafe extern "C" fn default_table_options_fn_wrapper(
280    session: &FFI_SessionRef,
281) -> SVec<(SString, SString)> {
282    let session = session.inner();
283    let table_options = session.default_table_options();
284
285    table_options_to_rhash(table_options)
286}
287
288unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) -> FFI_TaskContext {
289    session.inner().task_ctx().into()
290}
291
292unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SessionRef) {
293    unsafe {
294        let private_data =
295            Box::from_raw(provider.private_data as *mut SessionPrivateData);
296        drop(private_data);
297    }
298}
299
300unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_SessionRef) -> FFI_SessionRef {
301    unsafe {
302        let old_private_data = provider.private_data as *const SessionPrivateData;
303
304        let private_data = Box::into_raw(Box::new(SessionPrivateData {
305            session: (*old_private_data).session,
306            runtime: (*old_private_data).runtime.clone(),
307        })) as *mut c_void;
308
309        FFI_SessionRef {
310            session_id: session_id_fn_wrapper,
311            config: config_fn_wrapper,
312            create_physical_plan: create_physical_plan_fn_wrapper,
313            create_physical_expr: create_physical_expr_fn_wrapper,
314            scalar_functions: scalar_functions_fn_wrapper,
315            aggregate_functions: aggregate_functions_fn_wrapper,
316            window_functions: window_functions_fn_wrapper,
317            table_options: table_options_fn_wrapper,
318            default_table_options: default_table_options_fn_wrapper,
319            task_ctx: task_ctx_fn_wrapper,
320            logical_codec: provider.logical_codec.clone(),
321
322            clone: clone_fn_wrapper,
323            release: release_fn_wrapper,
324            version: super::version,
325            private_data,
326            library_marker_id: crate::get_library_marker_id,
327        }
328    }
329}
330
331impl Drop for FFI_SessionRef {
332    fn drop(&mut self) {
333        unsafe { (self.release)(self) }
334    }
335}
336
337impl FFI_SessionRef {
338    /// Creates a new [`FFI_SessionRef`].
339    pub fn new(
340        session: &(dyn Session + Send + Sync),
341        runtime: Option<Handle>,
342        logical_codec: FFI_LogicalExtensionCodec,
343    ) -> Self {
344        if let Some(session) = session.as_any().downcast_ref::<ForeignSession>() {
345            return session.session.clone();
346        }
347
348        let private_data = Box::new(SessionPrivateData { session, runtime });
349
350        Self {
351            session_id: session_id_fn_wrapper,
352            config: config_fn_wrapper,
353            create_physical_plan: create_physical_plan_fn_wrapper,
354            create_physical_expr: create_physical_expr_fn_wrapper,
355            scalar_functions: scalar_functions_fn_wrapper,
356            aggregate_functions: aggregate_functions_fn_wrapper,
357            window_functions: window_functions_fn_wrapper,
358            table_options: table_options_fn_wrapper,
359            default_table_options: default_table_options_fn_wrapper,
360            task_ctx: task_ctx_fn_wrapper,
361            logical_codec,
362
363            clone: clone_fn_wrapper,
364            release: release_fn_wrapper,
365            version: super::version,
366            private_data: Box::into_raw(private_data) as *mut c_void,
367            library_marker_id: crate::get_library_marker_id,
368        }
369    }
370}
371
372/// This wrapper struct exists on the receiver side of the FFI interface, so it has
373/// no guarantees about being able to access the data in `private_data`. Any functions
374/// defined on this struct must only use the stable functions provided in
375/// FFI_Session to interact with the foreign table provider.
376#[derive(Debug)]
377pub struct ForeignSession {
378    session: FFI_SessionRef,
379    config: SessionConfig,
380    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
381    higher_order_functions: HashMap<String, Arc<HigherOrderUDF>>,
382    aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
383    window_functions: HashMap<String, Arc<WindowUDF>>,
384    extension_types: ExtensionTypeRegistryRef,
385    table_options: TableOptions,
386    runtime_env: Arc<RuntimeEnv>,
387    props: ExecutionProps,
388}
389
390unsafe impl Send for ForeignSession {}
391unsafe impl Sync for ForeignSession {}
392
393impl FFI_SessionRef {
394    pub fn as_local(&self) -> Option<&(dyn Session + Send + Sync)> {
395        if (self.library_marker_id)() == crate::get_library_marker_id() {
396            return Some(self.inner());
397        }
398        None
399    }
400}
401
402impl TryFrom<&FFI_SessionRef> for ForeignSession {
403    type Error = DataFusionError;
404    fn try_from(session: &FFI_SessionRef) -> Result<Self, Self::Error> {
405        unsafe {
406            let table_options =
407                table_options_from_rhashmap((session.table_options)(session));
408
409            let config = (session.config)(session);
410            let config = SessionConfig::try_from(&config)?;
411
412            let scalar_functions = (session.scalar_functions)(session)
413                .into_iter()
414                .map(|kv_pair| {
415                    let udf = <Arc<dyn ScalarUDFImpl>>::from(&kv_pair.1);
416
417                    (
418                        kv_pair.0.to_string(),
419                        Arc::new(ScalarUDF::new_from_shared_impl(udf)),
420                    )
421                })
422                .collect();
423            let aggregate_functions = (session.aggregate_functions)(session)
424                .into_iter()
425                .map(|kv_pair| {
426                    let udaf = <Arc<dyn AggregateUDFImpl>>::from(&kv_pair.1);
427
428                    (
429                        kv_pair.0.to_string(),
430                        Arc::new(AggregateUDF::new_from_shared_impl(udaf)),
431                    )
432                })
433                .collect();
434            let window_functions = (session.window_functions)(session)
435                .into_iter()
436                .map(|kv_pair| {
437                    let udwf = <Arc<dyn WindowUDFImpl>>::from(&kv_pair.1);
438
439                    (
440                        kv_pair.0.to_string(),
441                        Arc::new(WindowUDF::new_from_shared_impl(udwf)),
442                    )
443                })
444                .collect();
445
446            Ok(Self {
447                session: session.clone(),
448                config,
449                table_options,
450                scalar_functions,
451                higher_order_functions: HashMap::new(),
452                aggregate_functions,
453                window_functions,
454                extension_types: Arc::new(MemoryExtensionTypeRegistry::default()),
455                runtime_env: Default::default(),
456                props: Default::default(),
457            })
458        }
459    }
460}
461
462impl Clone for FFI_SessionRef {
463    fn clone(&self) -> Self {
464        unsafe { (self.clone)(self) }
465    }
466}
467
468fn table_options_from_rhashmap(options: SVec<(SString, SString)>) -> TableOptions {
469    let mut options: HashMap<String, String> = options
470        .into_iter()
471        .map(|kv_pair| (kv_pair.0.to_string(), kv_pair.1.to_string()))
472        .collect();
473    let current_format = options.remove("datafusion_ffi.table_current_format");
474
475    let mut table_options = TableOptions::default();
476    let formats = [
477        ConfigFileType::CSV,
478        ConfigFileType::JSON,
479        ConfigFileType::PARQUET,
480    ];
481    for format in formats {
482        // It is imperative that if new enum variants are added below that they be
483        // included in the formats list above and in the extension check below.
484        let format_name = match &format {
485            ConfigFileType::CSV => "csv",
486            ConfigFileType::PARQUET => "parquet",
487            ConfigFileType::JSON => "json",
488        };
489        let format_options: HashMap<String, String> = options
490            .iter()
491            .filter_map(|(k, v)| {
492                let (prefix, key) = k.split_once(".")?;
493                if prefix == format_name {
494                    Some((format!("format.{key}"), v.to_owned()))
495                } else {
496                    None
497                }
498            })
499            .collect();
500        if !format_options.is_empty() {
501            table_options.current_format = Some(format.clone());
502            table_options
503                .alter_with_string_hash_map(&format_options)
504                .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
505        }
506    }
507
508    let extension_options: HashMap<String, String> = options
509        .iter()
510        .filter_map(|(k, v)| {
511            let (prefix, _) = k.split_once(".")?;
512            if !["json", "parquet", "csv"].contains(&prefix) {
513                Some((k.to_owned(), v.to_owned()))
514            } else {
515                None
516            }
517        })
518        .collect();
519    if !extension_options.is_empty() {
520        table_options
521            .alter_with_string_hash_map(&extension_options)
522            .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
523    }
524
525    table_options.current_format =
526        current_format.and_then(|format| match format.as_str() {
527            "csv" => Some(ConfigFileType::CSV),
528            "parquet" => Some(ConfigFileType::PARQUET),
529            "json" => Some(ConfigFileType::JSON),
530            _ => None,
531        });
532    table_options
533}
534
535#[async_trait]
536impl Session for ForeignSession {
537    fn session_id(&self) -> &str {
538        unsafe { (self.session.session_id)(&self.session).as_str() }
539    }
540
541    fn config(&self) -> &SessionConfig {
542        &self.config
543    }
544
545    fn config_options(&self) -> &ConfigOptions {
546        self.config.options()
547    }
548
549    async fn create_physical_plan(
550        &self,
551        logical_plan: &LogicalPlan,
552    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
553        unsafe {
554            let logical_plan = logical_plan_to_bytes(logical_plan)?;
555            let physical_plan = df_result!(
556                (self.session.create_physical_plan)(
557                    &self.session,
558                    logical_plan.as_ref().into()
559                )
560                .await
561            )?;
562            let physical_plan = <Arc<dyn ExecutionPlan>>::try_from(&physical_plan)?;
563
564            Ok(physical_plan)
565        }
566    }
567
568    fn create_physical_expr(
569        &self,
570        expr: Expr,
571        df_schema: &DFSchema,
572    ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
573        unsafe {
574            let codec: Arc<dyn LogicalExtensionCodec> =
575                (&self.session.logical_codec).into();
576            let logical_expr = serialize_expr(&expr, codec.as_ref())?.encode_to_vec();
577            let schema = WrappedSchema(FFI_ArrowSchema::try_from(df_schema.as_arrow())?);
578
579            let physical_expr = df_result!((self.session.create_physical_expr)(
580                &self.session,
581                logical_expr.into_iter().collect(),
582                schema
583            ))?;
584
585            Ok((&physical_expr).into())
586        }
587    }
588
589    fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
590        &self.scalar_functions
591    }
592
593    fn higher_order_functions(&self) -> &HashMap<String, Arc<HigherOrderUDF>> {
594        &self.higher_order_functions
595    }
596
597    fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
598        &self.aggregate_functions
599    }
600
601    fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
602        &self.window_functions
603    }
604
605    fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef {
606        &self.extension_types
607    }
608
609    fn runtime_env(&self) -> &Arc<RuntimeEnv> {
610        &self.runtime_env
611    }
612
613    fn execution_props(&self) -> &ExecutionProps {
614        &self.props
615    }
616
617    fn as_any(&self) -> &dyn Any {
618        self
619    }
620
621    fn table_options(&self) -> &TableOptions {
622        &self.table_options
623    }
624
625    fn default_table_options(&self) -> TableOptions {
626        unsafe {
627            table_options_from_rhashmap((self.session.default_table_options)(
628                &self.session,
629            ))
630        }
631    }
632
633    fn table_options_mut(&mut self) -> &mut TableOptions {
634        log::warn!(
635            "Mutating table options is not supported via FFI. Changes will not have an effect."
636        );
637        &mut self.table_options
638    }
639
640    fn task_ctx(&self) -> Arc<TaskContext> {
641        unsafe { (self.session.task_ctx)(&self.session).into() }
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use std::sync::Arc;
648
649    use arrow_schema::{DataType, Field, Schema};
650    use datafusion::execution::SessionStateBuilder;
651    use datafusion_common::DataFusionError;
652    use datafusion_expr::col;
653    use datafusion_expr::registry::FunctionRegistry;
654    use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
655
656    use super::*;
657
658    #[tokio::test]
659    async fn test_ffi_session() -> Result<(), DataFusionError> {
660        let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
661        let mut table_options = TableOptions::default();
662        table_options.csv.has_header = Some(true);
663        table_options.json.schema_infer_max_rec = Some(10);
664        table_options.parquet.global.coerce_int96 = Some("123456789".into());
665        table_options.current_format = Some(ConfigFileType::JSON);
666
667        let state = SessionStateBuilder::new_from_existing(ctx.state())
668            .with_table_options(table_options)
669            .build();
670
671        let logical_codec = FFI_LogicalExtensionCodec::new(
672            Arc::new(DefaultLogicalExtensionCodec {}),
673            None,
674            task_ctx_provider,
675        );
676
677        let local_session = FFI_SessionRef::new(&state, None, logical_codec);
678        let foreign_session = ForeignSession::try_from(&local_session)?;
679
680        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
681        let df_schema = schema.try_into()?;
682        let physical_expr = foreign_session.create_physical_expr(col("a"), &df_schema)?;
683        assert_eq!(
684            format!("{physical_expr:?}"),
685            "Column { name: \"a\", index: 0 }"
686        );
687
688        assert_eq!(foreign_session.session_id(), state.session_id());
689
690        let logical_plan = LogicalPlan::default();
691        let physical_plan = foreign_session.create_physical_plan(&logical_plan).await?;
692        assert_eq!(
693            format!("{physical_plan:?}"),
694            "EmptyExec { schema: Schema { fields: [], metadata: {} }, partitions: 1, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None } }"
695        );
696
697        assert_eq!(
698            format!("{:?}", foreign_session.default_table_options()),
699            format!("{:?}", state.default_table_options())
700        );
701
702        assert_eq!(
703            format!("{:?}", foreign_session.table_options()),
704            format!("{:?}", state.table_options())
705        );
706
707        let local_udfs = state.udfs();
708        for udf in foreign_session.scalar_functions().keys() {
709            assert!(local_udfs.contains(udf));
710        }
711        let local_udafs = state.udafs();
712        for udaf in foreign_session.aggregate_functions().keys() {
713            assert!(local_udafs.contains(udaf));
714        }
715        let local_udwfs = state.udwfs();
716        for udwf in foreign_session.window_functions().keys() {
717            assert!(local_udwfs.contains(udwf));
718        }
719
720        Ok(())
721    }
722}