datafusion_ffi/session/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::collections::HashMap;
20use std::ffi::c_void;
21use std::sync::Arc;
22
23use abi_stable::StableAbi;
24use abi_stable::std_types::{RHashMap, RResult, RStr, RString, RVec};
25use arrow_schema::SchemaRef;
26use arrow_schema::ffi::FFI_ArrowSchema;
27use async_ffi::{FfiFuture, FutureExt};
28use async_trait::async_trait;
29use datafusion_common::config::{ConfigOptions, TableOptions};
30use datafusion_common::{DFSchema, DataFusionError};
31use datafusion_execution::TaskContext;
32use datafusion_execution::config::SessionConfig;
33use datafusion_execution::runtime_env::RuntimeEnv;
34use datafusion_expr::execution_props::ExecutionProps;
35use datafusion_expr::{
36    AggregateUDF, AggregateUDFImpl, Expr, LogicalPlan, ScalarUDF, ScalarUDFImpl,
37    WindowUDF, WindowUDFImpl,
38};
39use datafusion_physical_expr::PhysicalExpr;
40use datafusion_physical_plan::ExecutionPlan;
41use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
42use datafusion_proto::logical_plan::LogicalExtensionCodec;
43use datafusion_proto::logical_plan::from_proto::parse_expr;
44use datafusion_proto::logical_plan::to_proto::serialize_expr;
45use datafusion_proto::protobuf::LogicalExprNode;
46use datafusion_session::Session;
47use prost::Message;
48use tokio::runtime::Handle;
49
50use crate::arrow_wrappers::WrappedSchema;
51use crate::execution::FFI_TaskContext;
52use crate::execution_plan::FFI_ExecutionPlan;
53use crate::physical_expr::FFI_PhysicalExpr;
54use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
55use crate::session::config::FFI_SessionConfig;
56use crate::udaf::FFI_AggregateUDF;
57use crate::udf::FFI_ScalarUDF;
58use crate::udwf::FFI_WindowUDF;
59use crate::util::FFIResult;
60use crate::{df_result, rresult, rresult_return};
61
62pub mod config;
63
64/// A stable struct for sharing [`Session`] across FFI boundaries.
65///
66/// Care must be taken when using this struct. Unlike most of the structs in
67/// this crate, the private data for [`FFI_SessionRef`] contains borrowed data.
68/// The lifetime of the borrow is lost when hidden within the ``*mut c_void``
69/// of the private data. For this reason, it is the user's responsibility to
70/// ensure the lifetime of the [`Session`] remains valid.
71///
72/// The reason for storing `&dyn Session` is because the primary motivation
73/// for implementing this struct is [`crate::table_provider::FFI_TableProvider`]
74/// which has methods that require `&dyn Session`. For usage within this crate
75/// we know the [`Session`] lifetimes are valid.
76#[repr(C)]
77#[derive(Debug, StableAbi)]
78pub(crate) struct FFI_SessionRef {
79    session_id: unsafe extern "C" fn(&Self) -> RStr,
80
81    config: unsafe extern "C" fn(&Self) -> FFI_SessionConfig,
82
83    create_physical_plan: unsafe extern "C" fn(
84        &Self,
85        logical_plan_serialized: RVec<u8>,
86    )
87        -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
88
89    create_physical_expr: unsafe extern "C" fn(
90        &Self,
91        expr_serialized: RVec<u8>,
92        schema: WrappedSchema,
93    ) -> FFIResult<FFI_PhysicalExpr>,
94
95    scalar_functions: unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_ScalarUDF>,
96
97    aggregate_functions:
98        unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_AggregateUDF>,
99
100    window_functions: unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_WindowUDF>,
101
102    table_options: unsafe extern "C" fn(&Self) -> RHashMap<RString, RString>,
103
104    default_table_options: unsafe extern "C" fn(&Self) -> RHashMap<RString, RString>,
105
106    task_ctx: unsafe extern "C" fn(&Self) -> FFI_TaskContext,
107
108    logical_codec: FFI_LogicalExtensionCodec,
109
110    /// Used to create a clone on the provider of the registry. This should
111    /// only need to be called by the receiver of the plan.
112    clone: unsafe extern "C" fn(plan: &Self) -> Self,
113
114    /// Release the memory of the private data when it is no longer being used.
115    release: unsafe extern "C" fn(arg: &mut Self),
116
117    /// Return the major DataFusion version number of this registry.
118    pub version: unsafe extern "C" fn() -> u64,
119
120    /// Internal data. This is only to be accessed by the provider of the plan.
121    /// A [`ForeignSession`] should never attempt to access this data.
122    private_data: *mut c_void,
123
124    /// Utility to identify when FFI objects are accessed locally through
125    /// the foreign interface.
126    pub library_marker_id: extern "C" fn() -> usize,
127}
128
129unsafe impl Send for FFI_SessionRef {}
130unsafe impl Sync for FFI_SessionRef {}
131
132struct SessionPrivateData<'a> {
133    session: &'a (dyn Session + Send + Sync),
134    runtime: Option<Handle>,
135}
136
137impl FFI_SessionRef {
138    fn inner(&self) -> &(dyn Session + Send + Sync) {
139        let private_data = self.private_data as *const SessionPrivateData;
140        unsafe { (*private_data).session }
141    }
142
143    unsafe fn runtime(&self) -> &Option<Handle> {
144        unsafe {
145            let private_data = self.private_data as *const SessionPrivateData;
146            &(*private_data).runtime
147        }
148    }
149}
150
151unsafe extern "C" fn session_id_fn_wrapper(session: &FFI_SessionRef) -> RStr<'_> {
152    let session = session.inner();
153    session.session_id().into()
154}
155
156unsafe extern "C" fn config_fn_wrapper(session: &FFI_SessionRef) -> FFI_SessionConfig {
157    let session = session.inner();
158    session.config().into()
159}
160
161unsafe extern "C" fn create_physical_plan_fn_wrapper(
162    session: &FFI_SessionRef,
163    logical_plan_serialized: RVec<u8>,
164) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
165    unsafe {
166        let runtime = session.runtime().clone();
167        let session = session.clone();
168        async move {
169            let session = session.inner();
170            let task_ctx = session.task_ctx();
171
172            let logical_plan = rresult_return!(logical_plan_from_bytes(
173                logical_plan_serialized.as_slice(),
174                task_ctx.as_ref(),
175            ));
176
177            let physical_plan = session.create_physical_plan(&logical_plan).await;
178
179            rresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime)))
180        }
181        .into_ffi()
182    }
183}
184
185unsafe extern "C" fn create_physical_expr_fn_wrapper(
186    session: &FFI_SessionRef,
187    expr_serialized: RVec<u8>,
188    schema: WrappedSchema,
189) -> FFIResult<FFI_PhysicalExpr> {
190    let codec: Arc<dyn LogicalExtensionCodec> = (&session.logical_codec).into();
191    let session = session.inner();
192
193    let logical_expr = LogicalExprNode::decode(expr_serialized.as_slice()).unwrap();
194    let logical_expr =
195        parse_expr(&logical_expr, session.task_ctx().as_ref(), codec.as_ref()).unwrap();
196    let schema: SchemaRef = schema.into();
197    let schema: DFSchema = rresult_return!(schema.try_into());
198
199    let physical_expr =
200        rresult_return!(session.create_physical_expr(logical_expr, &schema));
201
202    RResult::ROk(physical_expr.into())
203}
204
205unsafe extern "C" fn scalar_functions_fn_wrapper(
206    session: &FFI_SessionRef,
207) -> RHashMap<RString, FFI_ScalarUDF> {
208    let session = session.inner();
209    session
210        .scalar_functions()
211        .iter()
212        .map(|(name, udf)| (name.clone().into(), FFI_ScalarUDF::from(Arc::clone(udf))))
213        .collect()
214}
215
216unsafe extern "C" fn aggregate_functions_fn_wrapper(
217    session: &FFI_SessionRef,
218) -> RHashMap<RString, FFI_AggregateUDF> {
219    let session = session.inner();
220    session
221        .aggregate_functions()
222        .iter()
223        .map(|(name, udaf)| {
224            (
225                name.clone().into(),
226                FFI_AggregateUDF::from(Arc::clone(udaf)),
227            )
228        })
229        .collect()
230}
231
232unsafe extern "C" fn window_functions_fn_wrapper(
233    session: &FFI_SessionRef,
234) -> RHashMap<RString, FFI_WindowUDF> {
235    let session = session.inner();
236    session
237        .window_functions()
238        .iter()
239        .map(|(name, udwf)| (name.clone().into(), FFI_WindowUDF::from(Arc::clone(udwf))))
240        .collect()
241}
242
243fn table_options_to_rhash(options: &TableOptions) -> RHashMap<RString, RString> {
244    options
245        .entries()
246        .into_iter()
247        .filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into())))
248        .collect()
249}
250
251unsafe extern "C" fn table_options_fn_wrapper(
252    session: &FFI_SessionRef,
253) -> RHashMap<RString, RString> {
254    let session = session.inner();
255    let table_options = session.table_options();
256    table_options_to_rhash(table_options)
257}
258
259unsafe extern "C" fn default_table_options_fn_wrapper(
260    session: &FFI_SessionRef,
261) -> RHashMap<RString, RString> {
262    let session = session.inner();
263    let table_options = session.default_table_options();
264
265    table_options_to_rhash(&table_options)
266}
267
268unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) -> FFI_TaskContext {
269    session.inner().task_ctx().into()
270}
271
272unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SessionRef) {
273    unsafe {
274        let private_data =
275            Box::from_raw(provider.private_data as *mut SessionPrivateData);
276        drop(private_data);
277    }
278}
279
280unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_SessionRef) -> FFI_SessionRef {
281    unsafe {
282        let old_private_data = provider.private_data as *const SessionPrivateData;
283
284        let private_data = Box::into_raw(Box::new(SessionPrivateData {
285            session: (*old_private_data).session,
286            runtime: (*old_private_data).runtime.clone(),
287        })) as *mut c_void;
288
289        FFI_SessionRef {
290            session_id: session_id_fn_wrapper,
291            config: config_fn_wrapper,
292            create_physical_plan: create_physical_plan_fn_wrapper,
293            create_physical_expr: create_physical_expr_fn_wrapper,
294            scalar_functions: scalar_functions_fn_wrapper,
295            aggregate_functions: aggregate_functions_fn_wrapper,
296            window_functions: window_functions_fn_wrapper,
297            table_options: table_options_fn_wrapper,
298            default_table_options: default_table_options_fn_wrapper,
299            task_ctx: task_ctx_fn_wrapper,
300            logical_codec: provider.logical_codec.clone(),
301
302            clone: clone_fn_wrapper,
303            release: release_fn_wrapper,
304            version: super::version,
305            private_data,
306            library_marker_id: crate::get_library_marker_id,
307        }
308    }
309}
310
311impl Drop for FFI_SessionRef {
312    fn drop(&mut self) {
313        unsafe { (self.release)(self) }
314    }
315}
316
317impl FFI_SessionRef {
318    /// Creates a new [`FFI_SessionRef`].
319    pub fn new(
320        session: &(dyn Session + Send + Sync),
321        runtime: Option<Handle>,
322        logical_codec: FFI_LogicalExtensionCodec,
323    ) -> Self {
324        let private_data = Box::new(SessionPrivateData { session, runtime });
325
326        Self {
327            session_id: session_id_fn_wrapper,
328            config: config_fn_wrapper,
329            create_physical_plan: create_physical_plan_fn_wrapper,
330            create_physical_expr: create_physical_expr_fn_wrapper,
331            scalar_functions: scalar_functions_fn_wrapper,
332            aggregate_functions: aggregate_functions_fn_wrapper,
333            window_functions: window_functions_fn_wrapper,
334            table_options: table_options_fn_wrapper,
335            default_table_options: default_table_options_fn_wrapper,
336            task_ctx: task_ctx_fn_wrapper,
337            logical_codec,
338
339            clone: clone_fn_wrapper,
340            release: release_fn_wrapper,
341            version: super::version,
342            private_data: Box::into_raw(private_data) as *mut c_void,
343            library_marker_id: crate::get_library_marker_id,
344        }
345    }
346}
347
348/// This wrapper struct exists on the receiver side of the FFI interface, so it has
349/// no guarantees about being able to access the data in `private_data`. Any functions
350/// defined on this struct must only use the stable functions provided in
351/// FFI_Session to interact with the foreign table provider.
352#[derive(Debug)]
353pub struct ForeignSession {
354    session: FFI_SessionRef,
355    config: SessionConfig,
356    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
357    aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
358    window_functions: HashMap<String, Arc<WindowUDF>>,
359    table_options: TableOptions,
360    runtime_env: Arc<RuntimeEnv>,
361    props: ExecutionProps,
362}
363
364unsafe impl Send for ForeignSession {}
365unsafe impl Sync for ForeignSession {}
366
367impl FFI_SessionRef {
368    pub fn as_local(&self) -> Option<&(dyn Session + Send + Sync)> {
369        if (self.library_marker_id)() == crate::get_library_marker_id() {
370            return Some(self.inner());
371        }
372        None
373    }
374}
375
376impl TryFrom<&FFI_SessionRef> for ForeignSession {
377    type Error = DataFusionError;
378    fn try_from(session: &FFI_SessionRef) -> Result<Self, Self::Error> {
379        unsafe {
380            let table_options =
381                table_options_from_rhashmap((session.table_options)(session));
382
383            let config = (session.config)(session);
384            let config = SessionConfig::try_from(&config)?;
385
386            let scalar_functions = (session.scalar_functions)(session)
387                .into_iter()
388                .map(|kv_pair| {
389                    let udf = <Arc<dyn ScalarUDFImpl>>::from(&kv_pair.1);
390
391                    (
392                        kv_pair.0.into_string(),
393                        Arc::new(ScalarUDF::new_from_shared_impl(udf)),
394                    )
395                })
396                .collect();
397            let aggregate_functions = (session.aggregate_functions)(session)
398                .into_iter()
399                .map(|kv_pair| {
400                    let udaf = <Arc<dyn AggregateUDFImpl>>::from(&kv_pair.1);
401
402                    (
403                        kv_pair.0.into_string(),
404                        Arc::new(AggregateUDF::new_from_shared_impl(udaf)),
405                    )
406                })
407                .collect();
408            let window_functions = (session.window_functions)(session)
409                .into_iter()
410                .map(|kv_pair| {
411                    let udwf = <Arc<dyn WindowUDFImpl>>::from(&kv_pair.1);
412
413                    (
414                        kv_pair.0.into_string(),
415                        Arc::new(WindowUDF::new_from_shared_impl(udwf)),
416                    )
417                })
418                .collect();
419
420            Ok(Self {
421                session: session.clone(),
422                config,
423                table_options,
424                scalar_functions,
425                aggregate_functions,
426                window_functions,
427                runtime_env: Default::default(),
428                props: Default::default(),
429            })
430        }
431    }
432}
433
434impl Clone for FFI_SessionRef {
435    fn clone(&self) -> Self {
436        unsafe { (self.clone)(self) }
437    }
438}
439
440fn table_options_from_rhashmap(options: RHashMap<RString, RString>) -> TableOptions {
441    let options = options
442        .into_iter()
443        .map(|kv_pair| (kv_pair.0.into_string(), kv_pair.1.into_string()))
444        .collect();
445
446    TableOptions::from_string_hash_map(&options).unwrap_or_else(|err| {
447        log::warn!("Error parsing default table options: {err}");
448        TableOptions::default()
449    })
450}
451
452#[async_trait]
453impl Session for ForeignSession {
454    fn session_id(&self) -> &str {
455        unsafe { (self.session.session_id)(&self.session).as_str() }
456    }
457
458    fn config(&self) -> &SessionConfig {
459        &self.config
460    }
461
462    fn config_options(&self) -> &ConfigOptions {
463        self.config.options()
464    }
465
466    async fn create_physical_plan(
467        &self,
468        logical_plan: &LogicalPlan,
469    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
470        unsafe {
471            let logical_plan = logical_plan_to_bytes(logical_plan)?;
472            let physical_plan = df_result!(
473                (self.session.create_physical_plan)(
474                    &self.session,
475                    logical_plan.as_ref().into()
476                )
477                .await
478            )?;
479            let physical_plan = <Arc<dyn ExecutionPlan>>::try_from(&physical_plan)?;
480
481            Ok(physical_plan)
482        }
483    }
484
485    fn create_physical_expr(
486        &self,
487        expr: Expr,
488        df_schema: &DFSchema,
489    ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
490        unsafe {
491            let codec: Arc<dyn LogicalExtensionCodec> =
492                (&self.session.logical_codec).into();
493            let logical_expr = serialize_expr(&expr, codec.as_ref())?.encode_to_vec();
494            let schema = WrappedSchema(FFI_ArrowSchema::try_from(df_schema.as_arrow())?);
495
496            let physical_expr = df_result!((self.session.create_physical_expr)(
497                &self.session,
498                logical_expr.into(),
499                schema
500            ))?;
501
502            Ok((&physical_expr).into())
503        }
504    }
505
506    fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
507        &self.scalar_functions
508    }
509
510    fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
511        &self.aggregate_functions
512    }
513
514    fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
515        &self.window_functions
516    }
517
518    fn runtime_env(&self) -> &Arc<RuntimeEnv> {
519        &self.runtime_env
520    }
521
522    fn execution_props(&self) -> &ExecutionProps {
523        &self.props
524    }
525
526    fn as_any(&self) -> &dyn Any {
527        self
528    }
529
530    fn table_options(&self) -> &TableOptions {
531        &self.table_options
532    }
533
534    fn default_table_options(&self) -> TableOptions {
535        unsafe {
536            table_options_from_rhashmap((self.session.default_table_options)(
537                &self.session,
538            ))
539        }
540    }
541
542    fn table_options_mut(&mut self) -> &mut TableOptions {
543        log::warn!(
544            "Mutating table options is not supported via FFI. Changes will not have an effect."
545        );
546        &mut self.table_options
547    }
548
549    fn task_ctx(&self) -> Arc<TaskContext> {
550        unsafe { (self.session.task_ctx)(&self.session).into() }
551    }
552}
553
554#[cfg(test)]
555mod tests {
556    use std::sync::Arc;
557
558    use arrow_schema::{DataType, Field, Schema};
559    use datafusion_common::DataFusionError;
560    use datafusion_expr::col;
561    use datafusion_expr::registry::FunctionRegistry;
562    use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
563
564    use super::*;
565
566    #[tokio::test]
567    async fn test_ffi_session() -> Result<(), DataFusionError> {
568        let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
569        let state = ctx.state();
570        let logical_codec = FFI_LogicalExtensionCodec::new(
571            Arc::new(DefaultLogicalExtensionCodec {}),
572            None,
573            task_ctx_provider,
574        );
575
576        let local_session = FFI_SessionRef::new(&state, None, logical_codec);
577        let foreign_session = ForeignSession::try_from(&local_session)?;
578
579        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
580        let df_schema = schema.try_into()?;
581        let physical_expr = foreign_session.create_physical_expr(col("a"), &df_schema)?;
582        assert_eq!(
583            format!("{physical_expr:?}"),
584            "Column { name: \"a\", index: 0 }"
585        );
586
587        assert_eq!(foreign_session.session_id(), state.session_id());
588
589        let logical_plan = LogicalPlan::default();
590        let physical_plan = foreign_session.create_physical_plan(&logical_plan).await?;
591        assert_eq!(
592            format!("{physical_plan:?}"),
593            "EmptyExec { schema: Schema { fields: [], metadata: {} }, partitions: 1, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None } }"
594        );
595
596        assert_eq!(
597            format!("{:?}", foreign_session.default_table_options()),
598            format!("{:?}", state.default_table_options())
599        );
600
601        assert_eq!(
602            format!("{:?}", foreign_session.table_options()),
603            format!("{:?}", state.table_options())
604        );
605
606        let local_udfs = state.udfs();
607        for udf in foreign_session.scalar_functions().keys() {
608            assert!(local_udfs.contains(udf));
609        }
610        let local_udafs = state.udafs();
611        for udaf in foreign_session.aggregate_functions().keys() {
612            assert!(local_udafs.contains(udaf));
613        }
614        let local_udwfs = state.udwfs();
615        for udwf in foreign_session.window_functions().keys() {
616            assert!(local_udwfs.contains(udwf));
617        }
618
619        Ok(())
620    }
621}