Skip to main content

datafusion_ffi/session/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::collections::HashMap;
20use std::ffi::c_void;
21use std::sync::Arc;
22
23use abi_stable::StableAbi;
24use abi_stable::std_types::{RHashMap, RResult, RStr, RString, RVec};
25use arrow_schema::SchemaRef;
26use arrow_schema::ffi::FFI_ArrowSchema;
27use async_ffi::{FfiFuture, FutureExt};
28use async_trait::async_trait;
29use datafusion_common::config::{ConfigFileType, ConfigOptions, TableOptions};
30use datafusion_common::{DFSchema, DataFusionError};
31use datafusion_execution::TaskContext;
32use datafusion_execution::config::SessionConfig;
33use datafusion_execution::runtime_env::RuntimeEnv;
34use datafusion_expr::execution_props::ExecutionProps;
35use datafusion_expr::{
36    AggregateUDF, AggregateUDFImpl, Expr, LogicalPlan, ScalarUDF, ScalarUDFImpl,
37    WindowUDF, WindowUDFImpl,
38};
39use datafusion_physical_expr::PhysicalExpr;
40use datafusion_physical_plan::ExecutionPlan;
41use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
42use datafusion_proto::logical_plan::LogicalExtensionCodec;
43use datafusion_proto::logical_plan::from_proto::parse_expr;
44use datafusion_proto::logical_plan::to_proto::serialize_expr;
45use datafusion_proto::protobuf::LogicalExprNode;
46use datafusion_session::Session;
47use prost::Message;
48use tokio::runtime::Handle;
49
50use crate::arrow_wrappers::WrappedSchema;
51use crate::execution::FFI_TaskContext;
52use crate::execution_plan::FFI_ExecutionPlan;
53use crate::physical_expr::FFI_PhysicalExpr;
54use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
55use crate::session::config::FFI_SessionConfig;
56use crate::udaf::FFI_AggregateUDF;
57use crate::udf::FFI_ScalarUDF;
58use crate::udwf::FFI_WindowUDF;
59use crate::util::FFIResult;
60use crate::{df_result, rresult, rresult_return};
61
62pub mod config;
63
64/// A stable struct for sharing [`Session`] across FFI boundaries.
65///
66/// Care must be taken when using this struct. Unlike most of the structs in
67/// this crate, the private data for [`FFI_SessionRef`] contains borrowed data.
68/// The lifetime of the borrow is lost when hidden within the ``*mut c_void``
69/// of the private data. For this reason, it is the user's responsibility to
70/// ensure the lifetime of the [`Session`] remains valid.
71///
72/// The reason for storing `&dyn Session` is because the primary motivation
73/// for implementing this struct is [`crate::table_provider::FFI_TableProvider`]
74/// which has methods that require `&dyn Session`. For usage within this crate
75/// we know the [`Session`] lifetimes are valid.
76#[repr(C)]
77#[derive(Debug, StableAbi)]
78pub(crate) struct FFI_SessionRef {
79    session_id: unsafe extern "C" fn(&Self) -> RStr,
80
81    config: unsafe extern "C" fn(&Self) -> FFI_SessionConfig,
82
83    create_physical_plan: unsafe extern "C" fn(
84        &Self,
85        logical_plan_serialized: RVec<u8>,
86    )
87        -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
88
89    create_physical_expr: unsafe extern "C" fn(
90        &Self,
91        expr_serialized: RVec<u8>,
92        schema: WrappedSchema,
93    ) -> FFIResult<FFI_PhysicalExpr>,
94
95    scalar_functions: unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_ScalarUDF>,
96
97    aggregate_functions:
98        unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_AggregateUDF>,
99
100    window_functions: unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_WindowUDF>,
101
102    table_options: unsafe extern "C" fn(&Self) -> RHashMap<RString, RString>,
103
104    default_table_options: unsafe extern "C" fn(&Self) -> RHashMap<RString, RString>,
105
106    task_ctx: unsafe extern "C" fn(&Self) -> FFI_TaskContext,
107
108    logical_codec: FFI_LogicalExtensionCodec,
109
110    /// Used to create a clone on the provider of the registry. This should
111    /// only need to be called by the receiver of the plan.
112    clone: unsafe extern "C" fn(plan: &Self) -> Self,
113
114    /// Release the memory of the private data when it is no longer being used.
115    release: unsafe extern "C" fn(arg: &mut Self),
116
117    /// Return the major DataFusion version number of this registry.
118    pub version: unsafe extern "C" fn() -> u64,
119
120    /// Internal data. This is only to be accessed by the provider of the plan.
121    /// A [`ForeignSession`] should never attempt to access this data.
122    private_data: *mut c_void,
123
124    /// Utility to identify when FFI objects are accessed locally through
125    /// the foreign interface.
126    pub library_marker_id: extern "C" fn() -> usize,
127}
128
129unsafe impl Send for FFI_SessionRef {}
130unsafe impl Sync for FFI_SessionRef {}
131
132struct SessionPrivateData<'a> {
133    session: &'a (dyn Session + Send + Sync),
134    runtime: Option<Handle>,
135}
136
137impl FFI_SessionRef {
138    fn inner(&self) -> &(dyn Session + Send + Sync) {
139        let private_data = self.private_data as *const SessionPrivateData;
140        unsafe { (*private_data).session }
141    }
142
143    unsafe fn runtime(&self) -> &Option<Handle> {
144        unsafe {
145            let private_data = self.private_data as *const SessionPrivateData;
146            &(*private_data).runtime
147        }
148    }
149}
150
151unsafe extern "C" fn session_id_fn_wrapper(session: &FFI_SessionRef) -> RStr<'_> {
152    let session = session.inner();
153    session.session_id().into()
154}
155
156unsafe extern "C" fn config_fn_wrapper(session: &FFI_SessionRef) -> FFI_SessionConfig {
157    let session = session.inner();
158    session.config().into()
159}
160
161unsafe extern "C" fn create_physical_plan_fn_wrapper(
162    session: &FFI_SessionRef,
163    logical_plan_serialized: RVec<u8>,
164) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
165    unsafe {
166        let runtime = session.runtime().clone();
167        let session = session.clone();
168        async move {
169            let session = session.inner();
170            let task_ctx = session.task_ctx();
171
172            let logical_plan = rresult_return!(logical_plan_from_bytes(
173                logical_plan_serialized.as_slice(),
174                task_ctx.as_ref(),
175            ));
176
177            let physical_plan = session.create_physical_plan(&logical_plan).await;
178
179            rresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime)))
180        }
181        .into_ffi()
182    }
183}
184
185unsafe extern "C" fn create_physical_expr_fn_wrapper(
186    session: &FFI_SessionRef,
187    expr_serialized: RVec<u8>,
188    schema: WrappedSchema,
189) -> FFIResult<FFI_PhysicalExpr> {
190    let codec: Arc<dyn LogicalExtensionCodec> = (&session.logical_codec).into();
191    let session = session.inner();
192
193    let logical_expr = LogicalExprNode::decode(expr_serialized.as_slice()).unwrap();
194    let logical_expr =
195        parse_expr(&logical_expr, session.task_ctx().as_ref(), codec.as_ref()).unwrap();
196    let schema: SchemaRef = schema.into();
197    let schema: DFSchema = rresult_return!(schema.try_into());
198
199    let physical_expr =
200        rresult_return!(session.create_physical_expr(logical_expr, &schema));
201
202    RResult::ROk(physical_expr.into())
203}
204
205unsafe extern "C" fn scalar_functions_fn_wrapper(
206    session: &FFI_SessionRef,
207) -> RHashMap<RString, FFI_ScalarUDF> {
208    let session = session.inner();
209    session
210        .scalar_functions()
211        .iter()
212        .map(|(name, udf)| (name.clone().into(), FFI_ScalarUDF::from(Arc::clone(udf))))
213        .collect()
214}
215
216unsafe extern "C" fn aggregate_functions_fn_wrapper(
217    session: &FFI_SessionRef,
218) -> RHashMap<RString, FFI_AggregateUDF> {
219    let session = session.inner();
220    session
221        .aggregate_functions()
222        .iter()
223        .map(|(name, udaf)| {
224            (
225                name.clone().into(),
226                FFI_AggregateUDF::from(Arc::clone(udaf)),
227            )
228        })
229        .collect()
230}
231
232unsafe extern "C" fn window_functions_fn_wrapper(
233    session: &FFI_SessionRef,
234) -> RHashMap<RString, FFI_WindowUDF> {
235    let session = session.inner();
236    session
237        .window_functions()
238        .iter()
239        .map(|(name, udwf)| (name.clone().into(), FFI_WindowUDF::from(Arc::clone(udwf))))
240        .collect()
241}
242
243fn table_options_to_rhash(mut options: TableOptions) -> RHashMap<RString, RString> {
244    // It is important that we mutate options here and set current format
245    // to None so that when we call `entries()` we get ALL format entries.
246    // We will pass current_format as a special case and strip it on the
247    // other side of the boundary.
248    let current_format = options.current_format.take();
249    let mut options: HashMap<RString, RString> = options
250        .entries()
251        .into_iter()
252        .filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into())))
253        .collect();
254    if let Some(current_format) = current_format {
255        options.insert(
256            "datafusion_ffi.table_current_format".into(),
257            match current_format {
258                ConfigFileType::JSON => "json",
259                ConfigFileType::PARQUET => "parquet",
260                ConfigFileType::CSV => "csv",
261            }
262            .into(),
263        );
264    }
265
266    options.into()
267}
268
269unsafe extern "C" fn table_options_fn_wrapper(
270    session: &FFI_SessionRef,
271) -> RHashMap<RString, RString> {
272    let session = session.inner();
273    let table_options = session.table_options();
274    table_options_to_rhash(table_options.clone())
275}
276
277unsafe extern "C" fn default_table_options_fn_wrapper(
278    session: &FFI_SessionRef,
279) -> RHashMap<RString, RString> {
280    let session = session.inner();
281    let table_options = session.default_table_options();
282
283    table_options_to_rhash(table_options)
284}
285
286unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) -> FFI_TaskContext {
287    session.inner().task_ctx().into()
288}
289
290unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SessionRef) {
291    unsafe {
292        let private_data =
293            Box::from_raw(provider.private_data as *mut SessionPrivateData);
294        drop(private_data);
295    }
296}
297
298unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_SessionRef) -> FFI_SessionRef {
299    unsafe {
300        let old_private_data = provider.private_data as *const SessionPrivateData;
301
302        let private_data = Box::into_raw(Box::new(SessionPrivateData {
303            session: (*old_private_data).session,
304            runtime: (*old_private_data).runtime.clone(),
305        })) as *mut c_void;
306
307        FFI_SessionRef {
308            session_id: session_id_fn_wrapper,
309            config: config_fn_wrapper,
310            create_physical_plan: create_physical_plan_fn_wrapper,
311            create_physical_expr: create_physical_expr_fn_wrapper,
312            scalar_functions: scalar_functions_fn_wrapper,
313            aggregate_functions: aggregate_functions_fn_wrapper,
314            window_functions: window_functions_fn_wrapper,
315            table_options: table_options_fn_wrapper,
316            default_table_options: default_table_options_fn_wrapper,
317            task_ctx: task_ctx_fn_wrapper,
318            logical_codec: provider.logical_codec.clone(),
319
320            clone: clone_fn_wrapper,
321            release: release_fn_wrapper,
322            version: super::version,
323            private_data,
324            library_marker_id: crate::get_library_marker_id,
325        }
326    }
327}
328
329impl Drop for FFI_SessionRef {
330    fn drop(&mut self) {
331        unsafe { (self.release)(self) }
332    }
333}
334
335impl FFI_SessionRef {
336    /// Creates a new [`FFI_SessionRef`].
337    pub fn new(
338        session: &(dyn Session + Send + Sync),
339        runtime: Option<Handle>,
340        logical_codec: FFI_LogicalExtensionCodec,
341    ) -> Self {
342        let private_data = Box::new(SessionPrivateData { session, runtime });
343
344        Self {
345            session_id: session_id_fn_wrapper,
346            config: config_fn_wrapper,
347            create_physical_plan: create_physical_plan_fn_wrapper,
348            create_physical_expr: create_physical_expr_fn_wrapper,
349            scalar_functions: scalar_functions_fn_wrapper,
350            aggregate_functions: aggregate_functions_fn_wrapper,
351            window_functions: window_functions_fn_wrapper,
352            table_options: table_options_fn_wrapper,
353            default_table_options: default_table_options_fn_wrapper,
354            task_ctx: task_ctx_fn_wrapper,
355            logical_codec,
356
357            clone: clone_fn_wrapper,
358            release: release_fn_wrapper,
359            version: super::version,
360            private_data: Box::into_raw(private_data) as *mut c_void,
361            library_marker_id: crate::get_library_marker_id,
362        }
363    }
364}
365
366/// This wrapper struct exists on the receiver side of the FFI interface, so it has
367/// no guarantees about being able to access the data in `private_data`. Any functions
368/// defined on this struct must only use the stable functions provided in
369/// FFI_Session to interact with the foreign table provider.
370#[derive(Debug)]
371pub struct ForeignSession {
372    session: FFI_SessionRef,
373    config: SessionConfig,
374    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
375    aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
376    window_functions: HashMap<String, Arc<WindowUDF>>,
377    table_options: TableOptions,
378    runtime_env: Arc<RuntimeEnv>,
379    props: ExecutionProps,
380}
381
382unsafe impl Send for ForeignSession {}
383unsafe impl Sync for ForeignSession {}
384
385impl FFI_SessionRef {
386    pub fn as_local(&self) -> Option<&(dyn Session + Send + Sync)> {
387        if (self.library_marker_id)() == crate::get_library_marker_id() {
388            return Some(self.inner());
389        }
390        None
391    }
392}
393
394impl TryFrom<&FFI_SessionRef> for ForeignSession {
395    type Error = DataFusionError;
396    fn try_from(session: &FFI_SessionRef) -> Result<Self, Self::Error> {
397        unsafe {
398            let table_options =
399                table_options_from_rhashmap((session.table_options)(session));
400
401            let config = (session.config)(session);
402            let config = SessionConfig::try_from(&config)?;
403
404            let scalar_functions = (session.scalar_functions)(session)
405                .into_iter()
406                .map(|kv_pair| {
407                    let udf = <Arc<dyn ScalarUDFImpl>>::from(&kv_pair.1);
408
409                    (
410                        kv_pair.0.into_string(),
411                        Arc::new(ScalarUDF::new_from_shared_impl(udf)),
412                    )
413                })
414                .collect();
415            let aggregate_functions = (session.aggregate_functions)(session)
416                .into_iter()
417                .map(|kv_pair| {
418                    let udaf = <Arc<dyn AggregateUDFImpl>>::from(&kv_pair.1);
419
420                    (
421                        kv_pair.0.into_string(),
422                        Arc::new(AggregateUDF::new_from_shared_impl(udaf)),
423                    )
424                })
425                .collect();
426            let window_functions = (session.window_functions)(session)
427                .into_iter()
428                .map(|kv_pair| {
429                    let udwf = <Arc<dyn WindowUDFImpl>>::from(&kv_pair.1);
430
431                    (
432                        kv_pair.0.into_string(),
433                        Arc::new(WindowUDF::new_from_shared_impl(udwf)),
434                    )
435                })
436                .collect();
437
438            Ok(Self {
439                session: session.clone(),
440                config,
441                table_options,
442                scalar_functions,
443                aggregate_functions,
444                window_functions,
445                runtime_env: Default::default(),
446                props: Default::default(),
447            })
448        }
449    }
450}
451
452impl Clone for FFI_SessionRef {
453    fn clone(&self) -> Self {
454        unsafe { (self.clone)(self) }
455    }
456}
457
458fn table_options_from_rhashmap(options: RHashMap<RString, RString>) -> TableOptions {
459    let mut options: HashMap<String, String> = options
460        .into_iter()
461        .map(|kv_pair| (kv_pair.0.into_string(), kv_pair.1.into_string()))
462        .collect();
463    let current_format = options.remove("datafusion_ffi.table_current_format");
464
465    let mut table_options = TableOptions::default();
466    let formats = [
467        ConfigFileType::CSV,
468        ConfigFileType::JSON,
469        ConfigFileType::PARQUET,
470    ];
471    for format in formats {
472        // It is imperative that if new enum variants are added below that they be
473        // included in the formats list above and in the extension check below.
474        let format_name = match &format {
475            ConfigFileType::CSV => "csv",
476            ConfigFileType::PARQUET => "parquet",
477            ConfigFileType::JSON => "json",
478        };
479        let format_options: HashMap<String, String> = options
480            .iter()
481            .filter_map(|(k, v)| {
482                let (prefix, key) = k.split_once(".")?;
483                if prefix == format_name {
484                    Some((format!("format.{key}"), v.to_owned()))
485                } else {
486                    None
487                }
488            })
489            .collect();
490        if !format_options.is_empty() {
491            table_options.current_format = Some(format.clone());
492            table_options
493                .alter_with_string_hash_map(&format_options)
494                .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
495        }
496    }
497
498    let extension_options: HashMap<String, String> = options
499        .iter()
500        .filter_map(|(k, v)| {
501            let (prefix, _) = k.split_once(".")?;
502            if !["json", "parquet", "csv"].contains(&prefix) {
503                Some((k.to_owned(), v.to_owned()))
504            } else {
505                None
506            }
507        })
508        .collect();
509    if !extension_options.is_empty() {
510        table_options
511            .alter_with_string_hash_map(&extension_options)
512            .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
513    }
514
515    table_options.current_format =
516        current_format.and_then(|format| match format.as_str() {
517            "csv" => Some(ConfigFileType::CSV),
518            "parquet" => Some(ConfigFileType::PARQUET),
519            "json" => Some(ConfigFileType::JSON),
520            _ => None,
521        });
522    table_options
523}
524
525#[async_trait]
526impl Session for ForeignSession {
527    fn session_id(&self) -> &str {
528        unsafe { (self.session.session_id)(&self.session).as_str() }
529    }
530
531    fn config(&self) -> &SessionConfig {
532        &self.config
533    }
534
535    fn config_options(&self) -> &ConfigOptions {
536        self.config.options()
537    }
538
539    async fn create_physical_plan(
540        &self,
541        logical_plan: &LogicalPlan,
542    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
543        unsafe {
544            let logical_plan = logical_plan_to_bytes(logical_plan)?;
545            let physical_plan = df_result!(
546                (self.session.create_physical_plan)(
547                    &self.session,
548                    logical_plan.as_ref().into()
549                )
550                .await
551            )?;
552            let physical_plan = <Arc<dyn ExecutionPlan>>::try_from(&physical_plan)?;
553
554            Ok(physical_plan)
555        }
556    }
557
558    fn create_physical_expr(
559        &self,
560        expr: Expr,
561        df_schema: &DFSchema,
562    ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
563        unsafe {
564            let codec: Arc<dyn LogicalExtensionCodec> =
565                (&self.session.logical_codec).into();
566            let logical_expr = serialize_expr(&expr, codec.as_ref())?.encode_to_vec();
567            let schema = WrappedSchema(FFI_ArrowSchema::try_from(df_schema.as_arrow())?);
568
569            let physical_expr = df_result!((self.session.create_physical_expr)(
570                &self.session,
571                logical_expr.into(),
572                schema
573            ))?;
574
575            Ok((&physical_expr).into())
576        }
577    }
578
579    fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
580        &self.scalar_functions
581    }
582
583    fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
584        &self.aggregate_functions
585    }
586
587    fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
588        &self.window_functions
589    }
590
591    fn runtime_env(&self) -> &Arc<RuntimeEnv> {
592        &self.runtime_env
593    }
594
595    fn execution_props(&self) -> &ExecutionProps {
596        &self.props
597    }
598
599    fn as_any(&self) -> &dyn Any {
600        self
601    }
602
603    fn table_options(&self) -> &TableOptions {
604        &self.table_options
605    }
606
607    fn default_table_options(&self) -> TableOptions {
608        unsafe {
609            table_options_from_rhashmap((self.session.default_table_options)(
610                &self.session,
611            ))
612        }
613    }
614
615    fn table_options_mut(&mut self) -> &mut TableOptions {
616        log::warn!(
617            "Mutating table options is not supported via FFI. Changes will not have an effect."
618        );
619        &mut self.table_options
620    }
621
622    fn task_ctx(&self) -> Arc<TaskContext> {
623        unsafe { (self.session.task_ctx)(&self.session).into() }
624    }
625}
626
627#[cfg(test)]
628mod tests {
629    use std::sync::Arc;
630
631    use arrow_schema::{DataType, Field, Schema};
632    use datafusion::execution::SessionStateBuilder;
633    use datafusion_common::DataFusionError;
634    use datafusion_expr::col;
635    use datafusion_expr::registry::FunctionRegistry;
636    use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
637
638    use super::*;
639
640    #[tokio::test]
641    async fn test_ffi_session() -> Result<(), DataFusionError> {
642        let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
643        let mut table_options = TableOptions::default();
644        table_options.csv.has_header = Some(true);
645        table_options.json.schema_infer_max_rec = Some(10);
646        table_options.parquet.global.coerce_int96 = Some("123456789".into());
647        table_options.current_format = Some(ConfigFileType::JSON);
648
649        let state = SessionStateBuilder::new_from_existing(ctx.state())
650            .with_table_options(table_options)
651            .build();
652
653        let logical_codec = FFI_LogicalExtensionCodec::new(
654            Arc::new(DefaultLogicalExtensionCodec {}),
655            None,
656            task_ctx_provider,
657        );
658
659        let local_session = FFI_SessionRef::new(&state, None, logical_codec);
660        let foreign_session = ForeignSession::try_from(&local_session)?;
661
662        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
663        let df_schema = schema.try_into()?;
664        let physical_expr = foreign_session.create_physical_expr(col("a"), &df_schema)?;
665        assert_eq!(
666            format!("{physical_expr:?}"),
667            "Column { name: \"a\", index: 0 }"
668        );
669
670        assert_eq!(foreign_session.session_id(), state.session_id());
671
672        let logical_plan = LogicalPlan::default();
673        let physical_plan = foreign_session.create_physical_plan(&logical_plan).await?;
674        assert_eq!(
675            format!("{physical_plan:?}"),
676            "EmptyExec { schema: Schema { fields: [], metadata: {} }, partitions: 1, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None } }"
677        );
678
679        assert_eq!(
680            format!("{:?}", foreign_session.default_table_options()),
681            format!("{:?}", state.default_table_options())
682        );
683
684        assert_eq!(
685            format!("{:?}", foreign_session.table_options()),
686            format!("{:?}", state.table_options())
687        );
688
689        let local_udfs = state.udfs();
690        for udf in foreign_session.scalar_functions().keys() {
691            assert!(local_udfs.contains(udf));
692        }
693        let local_udafs = state.udafs();
694        for udaf in foreign_session.aggregate_functions().keys() {
695            assert!(local_udafs.contains(udaf));
696        }
697        let local_udwfs = state.udwfs();
698        for udwf in foreign_session.window_functions().keys() {
699            assert!(local_udwfs.contains(udwf));
700        }
701
702        Ok(())
703    }
704}