1use std::any::Any;
19use std::collections::HashMap;
20use std::ffi::c_void;
21use std::sync::Arc;
22
23use abi_stable::StableAbi;
24use abi_stable::std_types::{RHashMap, RResult, RStr, RString, RVec};
25use arrow_schema::SchemaRef;
26use arrow_schema::ffi::FFI_ArrowSchema;
27use async_ffi::{FfiFuture, FutureExt};
28use async_trait::async_trait;
29use datafusion_common::config::{ConfigOptions, TableOptions};
30use datafusion_common::{DFSchema, DataFusionError};
31use datafusion_execution::TaskContext;
32use datafusion_execution::config::SessionConfig;
33use datafusion_execution::runtime_env::RuntimeEnv;
34use datafusion_expr::execution_props::ExecutionProps;
35use datafusion_expr::{
36 AggregateUDF, AggregateUDFImpl, Expr, LogicalPlan, ScalarUDF, ScalarUDFImpl,
37 WindowUDF, WindowUDFImpl,
38};
39use datafusion_physical_expr::PhysicalExpr;
40use datafusion_physical_plan::ExecutionPlan;
41use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
42use datafusion_proto::logical_plan::LogicalExtensionCodec;
43use datafusion_proto::logical_plan::from_proto::parse_expr;
44use datafusion_proto::logical_plan::to_proto::serialize_expr;
45use datafusion_proto::protobuf::LogicalExprNode;
46use datafusion_session::Session;
47use prost::Message;
48use tokio::runtime::Handle;
49
50use crate::arrow_wrappers::WrappedSchema;
51use crate::execution::FFI_TaskContext;
52use crate::execution_plan::FFI_ExecutionPlan;
53use crate::physical_expr::FFI_PhysicalExpr;
54use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
55use crate::session::config::FFI_SessionConfig;
56use crate::udaf::FFI_AggregateUDF;
57use crate::udf::FFI_ScalarUDF;
58use crate::udwf::FFI_WindowUDF;
59use crate::util::FFIResult;
60use crate::{df_result, rresult, rresult_return};
61
62pub mod config;
63
64#[repr(C)]
77#[derive(Debug, StableAbi)]
78pub(crate) struct FFI_SessionRef {
79 session_id: unsafe extern "C" fn(&Self) -> RStr,
80
81 config: unsafe extern "C" fn(&Self) -> FFI_SessionConfig,
82
83 create_physical_plan: unsafe extern "C" fn(
84 &Self,
85 logical_plan_serialized: RVec<u8>,
86 )
87 -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
88
89 create_physical_expr: unsafe extern "C" fn(
90 &Self,
91 expr_serialized: RVec<u8>,
92 schema: WrappedSchema,
93 ) -> FFIResult<FFI_PhysicalExpr>,
94
95 scalar_functions: unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_ScalarUDF>,
96
97 aggregate_functions:
98 unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_AggregateUDF>,
99
100 window_functions: unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_WindowUDF>,
101
102 table_options: unsafe extern "C" fn(&Self) -> RHashMap<RString, RString>,
103
104 default_table_options: unsafe extern "C" fn(&Self) -> RHashMap<RString, RString>,
105
106 task_ctx: unsafe extern "C" fn(&Self) -> FFI_TaskContext,
107
108 logical_codec: FFI_LogicalExtensionCodec,
109
110 clone: unsafe extern "C" fn(plan: &Self) -> Self,
113
114 release: unsafe extern "C" fn(arg: &mut Self),
116
117 pub version: unsafe extern "C" fn() -> u64,
119
120 private_data: *mut c_void,
123
124 pub library_marker_id: extern "C" fn() -> usize,
127}
128
129unsafe impl Send for FFI_SessionRef {}
130unsafe impl Sync for FFI_SessionRef {}
131
132struct SessionPrivateData<'a> {
133 session: &'a (dyn Session + Send + Sync),
134 runtime: Option<Handle>,
135}
136
137impl FFI_SessionRef {
138 fn inner(&self) -> &(dyn Session + Send + Sync) {
139 let private_data = self.private_data as *const SessionPrivateData;
140 unsafe { (*private_data).session }
141 }
142
143 unsafe fn runtime(&self) -> &Option<Handle> {
144 unsafe {
145 let private_data = self.private_data as *const SessionPrivateData;
146 &(*private_data).runtime
147 }
148 }
149}
150
151unsafe extern "C" fn session_id_fn_wrapper(session: &FFI_SessionRef) -> RStr<'_> {
152 let session = session.inner();
153 session.session_id().into()
154}
155
156unsafe extern "C" fn config_fn_wrapper(session: &FFI_SessionRef) -> FFI_SessionConfig {
157 let session = session.inner();
158 session.config().into()
159}
160
161unsafe extern "C" fn create_physical_plan_fn_wrapper(
162 session: &FFI_SessionRef,
163 logical_plan_serialized: RVec<u8>,
164) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
165 unsafe {
166 let runtime = session.runtime().clone();
167 let session = session.clone();
168 async move {
169 let session = session.inner();
170 let task_ctx = session.task_ctx();
171
172 let logical_plan = rresult_return!(logical_plan_from_bytes(
173 logical_plan_serialized.as_slice(),
174 task_ctx.as_ref(),
175 ));
176
177 let physical_plan = session.create_physical_plan(&logical_plan).await;
178
179 rresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime)))
180 }
181 .into_ffi()
182 }
183}
184
185unsafe extern "C" fn create_physical_expr_fn_wrapper(
186 session: &FFI_SessionRef,
187 expr_serialized: RVec<u8>,
188 schema: WrappedSchema,
189) -> FFIResult<FFI_PhysicalExpr> {
190 let codec: Arc<dyn LogicalExtensionCodec> = (&session.logical_codec).into();
191 let session = session.inner();
192
193 let logical_expr = LogicalExprNode::decode(expr_serialized.as_slice()).unwrap();
194 let logical_expr =
195 parse_expr(&logical_expr, session.task_ctx().as_ref(), codec.as_ref()).unwrap();
196 let schema: SchemaRef = schema.into();
197 let schema: DFSchema = rresult_return!(schema.try_into());
198
199 let physical_expr =
200 rresult_return!(session.create_physical_expr(logical_expr, &schema));
201
202 RResult::ROk(physical_expr.into())
203}
204
205unsafe extern "C" fn scalar_functions_fn_wrapper(
206 session: &FFI_SessionRef,
207) -> RHashMap<RString, FFI_ScalarUDF> {
208 let session = session.inner();
209 session
210 .scalar_functions()
211 .iter()
212 .map(|(name, udf)| (name.clone().into(), FFI_ScalarUDF::from(Arc::clone(udf))))
213 .collect()
214}
215
216unsafe extern "C" fn aggregate_functions_fn_wrapper(
217 session: &FFI_SessionRef,
218) -> RHashMap<RString, FFI_AggregateUDF> {
219 let session = session.inner();
220 session
221 .aggregate_functions()
222 .iter()
223 .map(|(name, udaf)| {
224 (
225 name.clone().into(),
226 FFI_AggregateUDF::from(Arc::clone(udaf)),
227 )
228 })
229 .collect()
230}
231
232unsafe extern "C" fn window_functions_fn_wrapper(
233 session: &FFI_SessionRef,
234) -> RHashMap<RString, FFI_WindowUDF> {
235 let session = session.inner();
236 session
237 .window_functions()
238 .iter()
239 .map(|(name, udwf)| (name.clone().into(), FFI_WindowUDF::from(Arc::clone(udwf))))
240 .collect()
241}
242
243fn table_options_to_rhash(options: &TableOptions) -> RHashMap<RString, RString> {
244 options
245 .entries()
246 .into_iter()
247 .filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into())))
248 .collect()
249}
250
251unsafe extern "C" fn table_options_fn_wrapper(
252 session: &FFI_SessionRef,
253) -> RHashMap<RString, RString> {
254 let session = session.inner();
255 let table_options = session.table_options();
256 table_options_to_rhash(table_options)
257}
258
259unsafe extern "C" fn default_table_options_fn_wrapper(
260 session: &FFI_SessionRef,
261) -> RHashMap<RString, RString> {
262 let session = session.inner();
263 let table_options = session.default_table_options();
264
265 table_options_to_rhash(&table_options)
266}
267
268unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) -> FFI_TaskContext {
269 session.inner().task_ctx().into()
270}
271
272unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SessionRef) {
273 unsafe {
274 let private_data =
275 Box::from_raw(provider.private_data as *mut SessionPrivateData);
276 drop(private_data);
277 }
278}
279
280unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_SessionRef) -> FFI_SessionRef {
281 unsafe {
282 let old_private_data = provider.private_data as *const SessionPrivateData;
283
284 let private_data = Box::into_raw(Box::new(SessionPrivateData {
285 session: (*old_private_data).session,
286 runtime: (*old_private_data).runtime.clone(),
287 })) as *mut c_void;
288
289 FFI_SessionRef {
290 session_id: session_id_fn_wrapper,
291 config: config_fn_wrapper,
292 create_physical_plan: create_physical_plan_fn_wrapper,
293 create_physical_expr: create_physical_expr_fn_wrapper,
294 scalar_functions: scalar_functions_fn_wrapper,
295 aggregate_functions: aggregate_functions_fn_wrapper,
296 window_functions: window_functions_fn_wrapper,
297 table_options: table_options_fn_wrapper,
298 default_table_options: default_table_options_fn_wrapper,
299 task_ctx: task_ctx_fn_wrapper,
300 logical_codec: provider.logical_codec.clone(),
301
302 clone: clone_fn_wrapper,
303 release: release_fn_wrapper,
304 version: super::version,
305 private_data,
306 library_marker_id: crate::get_library_marker_id,
307 }
308 }
309}
310
311impl Drop for FFI_SessionRef {
312 fn drop(&mut self) {
313 unsafe { (self.release)(self) }
314 }
315}
316
317impl FFI_SessionRef {
318 pub fn new(
320 session: &(dyn Session + Send + Sync),
321 runtime: Option<Handle>,
322 logical_codec: FFI_LogicalExtensionCodec,
323 ) -> Self {
324 let private_data = Box::new(SessionPrivateData { session, runtime });
325
326 Self {
327 session_id: session_id_fn_wrapper,
328 config: config_fn_wrapper,
329 create_physical_plan: create_physical_plan_fn_wrapper,
330 create_physical_expr: create_physical_expr_fn_wrapper,
331 scalar_functions: scalar_functions_fn_wrapper,
332 aggregate_functions: aggregate_functions_fn_wrapper,
333 window_functions: window_functions_fn_wrapper,
334 table_options: table_options_fn_wrapper,
335 default_table_options: default_table_options_fn_wrapper,
336 task_ctx: task_ctx_fn_wrapper,
337 logical_codec,
338
339 clone: clone_fn_wrapper,
340 release: release_fn_wrapper,
341 version: super::version,
342 private_data: Box::into_raw(private_data) as *mut c_void,
343 library_marker_id: crate::get_library_marker_id,
344 }
345 }
346}
347
348#[derive(Debug)]
353pub struct ForeignSession {
354 session: FFI_SessionRef,
355 config: SessionConfig,
356 scalar_functions: HashMap<String, Arc<ScalarUDF>>,
357 aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
358 window_functions: HashMap<String, Arc<WindowUDF>>,
359 table_options: TableOptions,
360 runtime_env: Arc<RuntimeEnv>,
361 props: ExecutionProps,
362}
363
364unsafe impl Send for ForeignSession {}
365unsafe impl Sync for ForeignSession {}
366
367impl FFI_SessionRef {
368 pub fn as_local(&self) -> Option<&(dyn Session + Send + Sync)> {
369 if (self.library_marker_id)() == crate::get_library_marker_id() {
370 return Some(self.inner());
371 }
372 None
373 }
374}
375
376impl TryFrom<&FFI_SessionRef> for ForeignSession {
377 type Error = DataFusionError;
378 fn try_from(session: &FFI_SessionRef) -> Result<Self, Self::Error> {
379 unsafe {
380 let table_options =
381 table_options_from_rhashmap((session.table_options)(session));
382
383 let config = (session.config)(session);
384 let config = SessionConfig::try_from(&config)?;
385
386 let scalar_functions = (session.scalar_functions)(session)
387 .into_iter()
388 .map(|kv_pair| {
389 let udf = <Arc<dyn ScalarUDFImpl>>::from(&kv_pair.1);
390
391 (
392 kv_pair.0.into_string(),
393 Arc::new(ScalarUDF::new_from_shared_impl(udf)),
394 )
395 })
396 .collect();
397 let aggregate_functions = (session.aggregate_functions)(session)
398 .into_iter()
399 .map(|kv_pair| {
400 let udaf = <Arc<dyn AggregateUDFImpl>>::from(&kv_pair.1);
401
402 (
403 kv_pair.0.into_string(),
404 Arc::new(AggregateUDF::new_from_shared_impl(udaf)),
405 )
406 })
407 .collect();
408 let window_functions = (session.window_functions)(session)
409 .into_iter()
410 .map(|kv_pair| {
411 let udwf = <Arc<dyn WindowUDFImpl>>::from(&kv_pair.1);
412
413 (
414 kv_pair.0.into_string(),
415 Arc::new(WindowUDF::new_from_shared_impl(udwf)),
416 )
417 })
418 .collect();
419
420 Ok(Self {
421 session: session.clone(),
422 config,
423 table_options,
424 scalar_functions,
425 aggregate_functions,
426 window_functions,
427 runtime_env: Default::default(),
428 props: Default::default(),
429 })
430 }
431 }
432}
433
434impl Clone for FFI_SessionRef {
435 fn clone(&self) -> Self {
436 unsafe { (self.clone)(self) }
437 }
438}
439
440fn table_options_from_rhashmap(options: RHashMap<RString, RString>) -> TableOptions {
441 let options = options
442 .into_iter()
443 .map(|kv_pair| (kv_pair.0.into_string(), kv_pair.1.into_string()))
444 .collect();
445
446 TableOptions::from_string_hash_map(&options).unwrap_or_else(|err| {
447 log::warn!("Error parsing default table options: {err}");
448 TableOptions::default()
449 })
450}
451
452#[async_trait]
453impl Session for ForeignSession {
454 fn session_id(&self) -> &str {
455 unsafe { (self.session.session_id)(&self.session).as_str() }
456 }
457
458 fn config(&self) -> &SessionConfig {
459 &self.config
460 }
461
462 fn config_options(&self) -> &ConfigOptions {
463 self.config.options()
464 }
465
466 async fn create_physical_plan(
467 &self,
468 logical_plan: &LogicalPlan,
469 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
470 unsafe {
471 let logical_plan = logical_plan_to_bytes(logical_plan)?;
472 let physical_plan = df_result!(
473 (self.session.create_physical_plan)(
474 &self.session,
475 logical_plan.as_ref().into()
476 )
477 .await
478 )?;
479 let physical_plan = <Arc<dyn ExecutionPlan>>::try_from(&physical_plan)?;
480
481 Ok(physical_plan)
482 }
483 }
484
485 fn create_physical_expr(
486 &self,
487 expr: Expr,
488 df_schema: &DFSchema,
489 ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
490 unsafe {
491 let codec: Arc<dyn LogicalExtensionCodec> =
492 (&self.session.logical_codec).into();
493 let logical_expr = serialize_expr(&expr, codec.as_ref())?.encode_to_vec();
494 let schema = WrappedSchema(FFI_ArrowSchema::try_from(df_schema.as_arrow())?);
495
496 let physical_expr = df_result!((self.session.create_physical_expr)(
497 &self.session,
498 logical_expr.into(),
499 schema
500 ))?;
501
502 Ok((&physical_expr).into())
503 }
504 }
505
506 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
507 &self.scalar_functions
508 }
509
510 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
511 &self.aggregate_functions
512 }
513
514 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
515 &self.window_functions
516 }
517
518 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
519 &self.runtime_env
520 }
521
522 fn execution_props(&self) -> &ExecutionProps {
523 &self.props
524 }
525
526 fn as_any(&self) -> &dyn Any {
527 self
528 }
529
530 fn table_options(&self) -> &TableOptions {
531 &self.table_options
532 }
533
534 fn default_table_options(&self) -> TableOptions {
535 unsafe {
536 table_options_from_rhashmap((self.session.default_table_options)(
537 &self.session,
538 ))
539 }
540 }
541
542 fn table_options_mut(&mut self) -> &mut TableOptions {
543 log::warn!(
544 "Mutating table options is not supported via FFI. Changes will not have an effect."
545 );
546 &mut self.table_options
547 }
548
549 fn task_ctx(&self) -> Arc<TaskContext> {
550 unsafe { (self.session.task_ctx)(&self.session).into() }
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use std::sync::Arc;
557
558 use arrow_schema::{DataType, Field, Schema};
559 use datafusion_common::DataFusionError;
560 use datafusion_expr::col;
561 use datafusion_expr::registry::FunctionRegistry;
562 use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
563
564 use super::*;
565
566 #[tokio::test]
567 async fn test_ffi_session() -> Result<(), DataFusionError> {
568 let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
569 let state = ctx.state();
570 let logical_codec = FFI_LogicalExtensionCodec::new(
571 Arc::new(DefaultLogicalExtensionCodec {}),
572 None,
573 task_ctx_provider,
574 );
575
576 let local_session = FFI_SessionRef::new(&state, None, logical_codec);
577 let foreign_session = ForeignSession::try_from(&local_session)?;
578
579 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
580 let df_schema = schema.try_into()?;
581 let physical_expr = foreign_session.create_physical_expr(col("a"), &df_schema)?;
582 assert_eq!(
583 format!("{physical_expr:?}"),
584 "Column { name: \"a\", index: 0 }"
585 );
586
587 assert_eq!(foreign_session.session_id(), state.session_id());
588
589 let logical_plan = LogicalPlan::default();
590 let physical_plan = foreign_session.create_physical_plan(&logical_plan).await?;
591 assert_eq!(
592 format!("{physical_plan:?}"),
593 "EmptyExec { schema: Schema { fields: [], metadata: {} }, partitions: 1, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None } }"
594 );
595
596 assert_eq!(
597 format!("{:?}", foreign_session.default_table_options()),
598 format!("{:?}", state.default_table_options())
599 );
600
601 assert_eq!(
602 format!("{:?}", foreign_session.table_options()),
603 format!("{:?}", state.table_options())
604 );
605
606 let local_udfs = state.udfs();
607 for udf in foreign_session.scalar_functions().keys() {
608 assert!(local_udfs.contains(udf));
609 }
610 let local_udafs = state.udafs();
611 for udaf in foreign_session.aggregate_functions().keys() {
612 assert!(local_udafs.contains(udaf));
613 }
614 let local_udwfs = state.udwfs();
615 for udwf in foreign_session.window_functions().keys() {
616 assert!(local_udwfs.contains(udwf));
617 }
618
619 Ok(())
620 }
621}