1use std::any::Any;
19use std::collections::HashMap;
20use std::ffi::c_void;
21use std::sync::Arc;
22
23use abi_stable::StableAbi;
24use abi_stable::std_types::{RHashMap, RResult, RStr, RString, RVec};
25use arrow_schema::SchemaRef;
26use arrow_schema::ffi::FFI_ArrowSchema;
27use async_ffi::{FfiFuture, FutureExt};
28use async_trait::async_trait;
29use datafusion_common::config::{ConfigFileType, ConfigOptions, TableOptions};
30use datafusion_common::{DFSchema, DataFusionError};
31use datafusion_execution::TaskContext;
32use datafusion_execution::config::SessionConfig;
33use datafusion_execution::runtime_env::RuntimeEnv;
34use datafusion_expr::execution_props::ExecutionProps;
35use datafusion_expr::{
36 AggregateUDF, AggregateUDFImpl, Expr, LogicalPlan, ScalarUDF, ScalarUDFImpl,
37 WindowUDF, WindowUDFImpl,
38};
39use datafusion_physical_expr::PhysicalExpr;
40use datafusion_physical_plan::ExecutionPlan;
41use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
42use datafusion_proto::logical_plan::LogicalExtensionCodec;
43use datafusion_proto::logical_plan::from_proto::parse_expr;
44use datafusion_proto::logical_plan::to_proto::serialize_expr;
45use datafusion_proto::protobuf::LogicalExprNode;
46use datafusion_session::Session;
47use prost::Message;
48use tokio::runtime::Handle;
49
50use crate::arrow_wrappers::WrappedSchema;
51use crate::execution::FFI_TaskContext;
52use crate::execution_plan::FFI_ExecutionPlan;
53use crate::physical_expr::FFI_PhysicalExpr;
54use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
55use crate::session::config::FFI_SessionConfig;
56use crate::udaf::FFI_AggregateUDF;
57use crate::udf::FFI_ScalarUDF;
58use crate::udwf::FFI_WindowUDF;
59use crate::util::FFIResult;
60use crate::{df_result, rresult, rresult_return};
61
62pub mod config;
63
64#[repr(C)]
77#[derive(Debug, StableAbi)]
78pub(crate) struct FFI_SessionRef {
79 session_id: unsafe extern "C" fn(&Self) -> RStr,
80
81 config: unsafe extern "C" fn(&Self) -> FFI_SessionConfig,
82
83 create_physical_plan: unsafe extern "C" fn(
84 &Self,
85 logical_plan_serialized: RVec<u8>,
86 )
87 -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
88
89 create_physical_expr: unsafe extern "C" fn(
90 &Self,
91 expr_serialized: RVec<u8>,
92 schema: WrappedSchema,
93 ) -> FFIResult<FFI_PhysicalExpr>,
94
95 scalar_functions: unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_ScalarUDF>,
96
97 aggregate_functions:
98 unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_AggregateUDF>,
99
100 window_functions: unsafe extern "C" fn(&Self) -> RHashMap<RString, FFI_WindowUDF>,
101
102 table_options: unsafe extern "C" fn(&Self) -> RHashMap<RString, RString>,
103
104 default_table_options: unsafe extern "C" fn(&Self) -> RHashMap<RString, RString>,
105
106 task_ctx: unsafe extern "C" fn(&Self) -> FFI_TaskContext,
107
108 logical_codec: FFI_LogicalExtensionCodec,
109
110 clone: unsafe extern "C" fn(plan: &Self) -> Self,
113
114 release: unsafe extern "C" fn(arg: &mut Self),
116
117 pub version: unsafe extern "C" fn() -> u64,
119
120 private_data: *mut c_void,
123
124 pub library_marker_id: extern "C" fn() -> usize,
127}
128
129unsafe impl Send for FFI_SessionRef {}
130unsafe impl Sync for FFI_SessionRef {}
131
132struct SessionPrivateData<'a> {
133 session: &'a (dyn Session + Send + Sync),
134 runtime: Option<Handle>,
135}
136
137impl FFI_SessionRef {
138 fn inner(&self) -> &(dyn Session + Send + Sync) {
139 let private_data = self.private_data as *const SessionPrivateData;
140 unsafe { (*private_data).session }
141 }
142
143 unsafe fn runtime(&self) -> &Option<Handle> {
144 unsafe {
145 let private_data = self.private_data as *const SessionPrivateData;
146 &(*private_data).runtime
147 }
148 }
149}
150
151unsafe extern "C" fn session_id_fn_wrapper(session: &FFI_SessionRef) -> RStr<'_> {
152 let session = session.inner();
153 session.session_id().into()
154}
155
156unsafe extern "C" fn config_fn_wrapper(session: &FFI_SessionRef) -> FFI_SessionConfig {
157 let session = session.inner();
158 session.config().into()
159}
160
161unsafe extern "C" fn create_physical_plan_fn_wrapper(
162 session: &FFI_SessionRef,
163 logical_plan_serialized: RVec<u8>,
164) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
165 unsafe {
166 let runtime = session.runtime().clone();
167 let session = session.clone();
168 async move {
169 let session = session.inner();
170 let task_ctx = session.task_ctx();
171
172 let logical_plan = rresult_return!(logical_plan_from_bytes(
173 logical_plan_serialized.as_slice(),
174 task_ctx.as_ref(),
175 ));
176
177 let physical_plan = session.create_physical_plan(&logical_plan).await;
178
179 rresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime)))
180 }
181 .into_ffi()
182 }
183}
184
185unsafe extern "C" fn create_physical_expr_fn_wrapper(
186 session: &FFI_SessionRef,
187 expr_serialized: RVec<u8>,
188 schema: WrappedSchema,
189) -> FFIResult<FFI_PhysicalExpr> {
190 let codec: Arc<dyn LogicalExtensionCodec> = (&session.logical_codec).into();
191 let session = session.inner();
192
193 let logical_expr = LogicalExprNode::decode(expr_serialized.as_slice()).unwrap();
194 let logical_expr =
195 parse_expr(&logical_expr, session.task_ctx().as_ref(), codec.as_ref()).unwrap();
196 let schema: SchemaRef = schema.into();
197 let schema: DFSchema = rresult_return!(schema.try_into());
198
199 let physical_expr =
200 rresult_return!(session.create_physical_expr(logical_expr, &schema));
201
202 RResult::ROk(physical_expr.into())
203}
204
205unsafe extern "C" fn scalar_functions_fn_wrapper(
206 session: &FFI_SessionRef,
207) -> RHashMap<RString, FFI_ScalarUDF> {
208 let session = session.inner();
209 session
210 .scalar_functions()
211 .iter()
212 .map(|(name, udf)| (name.clone().into(), FFI_ScalarUDF::from(Arc::clone(udf))))
213 .collect()
214}
215
216unsafe extern "C" fn aggregate_functions_fn_wrapper(
217 session: &FFI_SessionRef,
218) -> RHashMap<RString, FFI_AggregateUDF> {
219 let session = session.inner();
220 session
221 .aggregate_functions()
222 .iter()
223 .map(|(name, udaf)| {
224 (
225 name.clone().into(),
226 FFI_AggregateUDF::from(Arc::clone(udaf)),
227 )
228 })
229 .collect()
230}
231
232unsafe extern "C" fn window_functions_fn_wrapper(
233 session: &FFI_SessionRef,
234) -> RHashMap<RString, FFI_WindowUDF> {
235 let session = session.inner();
236 session
237 .window_functions()
238 .iter()
239 .map(|(name, udwf)| (name.clone().into(), FFI_WindowUDF::from(Arc::clone(udwf))))
240 .collect()
241}
242
243fn table_options_to_rhash(mut options: TableOptions) -> RHashMap<RString, RString> {
244 let current_format = options.current_format.take();
249 let mut options: HashMap<RString, RString> = options
250 .entries()
251 .into_iter()
252 .filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into())))
253 .collect();
254 if let Some(current_format) = current_format {
255 options.insert(
256 "datafusion_ffi.table_current_format".into(),
257 match current_format {
258 ConfigFileType::JSON => "json",
259 ConfigFileType::PARQUET => "parquet",
260 ConfigFileType::CSV => "csv",
261 }
262 .into(),
263 );
264 }
265
266 options.into()
267}
268
269unsafe extern "C" fn table_options_fn_wrapper(
270 session: &FFI_SessionRef,
271) -> RHashMap<RString, RString> {
272 let session = session.inner();
273 let table_options = session.table_options();
274 table_options_to_rhash(table_options.clone())
275}
276
277unsafe extern "C" fn default_table_options_fn_wrapper(
278 session: &FFI_SessionRef,
279) -> RHashMap<RString, RString> {
280 let session = session.inner();
281 let table_options = session.default_table_options();
282
283 table_options_to_rhash(table_options)
284}
285
286unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) -> FFI_TaskContext {
287 session.inner().task_ctx().into()
288}
289
290unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SessionRef) {
291 unsafe {
292 let private_data =
293 Box::from_raw(provider.private_data as *mut SessionPrivateData);
294 drop(private_data);
295 }
296}
297
298unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_SessionRef) -> FFI_SessionRef {
299 unsafe {
300 let old_private_data = provider.private_data as *const SessionPrivateData;
301
302 let private_data = Box::into_raw(Box::new(SessionPrivateData {
303 session: (*old_private_data).session,
304 runtime: (*old_private_data).runtime.clone(),
305 })) as *mut c_void;
306
307 FFI_SessionRef {
308 session_id: session_id_fn_wrapper,
309 config: config_fn_wrapper,
310 create_physical_plan: create_physical_plan_fn_wrapper,
311 create_physical_expr: create_physical_expr_fn_wrapper,
312 scalar_functions: scalar_functions_fn_wrapper,
313 aggregate_functions: aggregate_functions_fn_wrapper,
314 window_functions: window_functions_fn_wrapper,
315 table_options: table_options_fn_wrapper,
316 default_table_options: default_table_options_fn_wrapper,
317 task_ctx: task_ctx_fn_wrapper,
318 logical_codec: provider.logical_codec.clone(),
319
320 clone: clone_fn_wrapper,
321 release: release_fn_wrapper,
322 version: super::version,
323 private_data,
324 library_marker_id: crate::get_library_marker_id,
325 }
326 }
327}
328
329impl Drop for FFI_SessionRef {
330 fn drop(&mut self) {
331 unsafe { (self.release)(self) }
332 }
333}
334
335impl FFI_SessionRef {
336 pub fn new(
338 session: &(dyn Session + Send + Sync),
339 runtime: Option<Handle>,
340 logical_codec: FFI_LogicalExtensionCodec,
341 ) -> Self {
342 let private_data = Box::new(SessionPrivateData { session, runtime });
343
344 Self {
345 session_id: session_id_fn_wrapper,
346 config: config_fn_wrapper,
347 create_physical_plan: create_physical_plan_fn_wrapper,
348 create_physical_expr: create_physical_expr_fn_wrapper,
349 scalar_functions: scalar_functions_fn_wrapper,
350 aggregate_functions: aggregate_functions_fn_wrapper,
351 window_functions: window_functions_fn_wrapper,
352 table_options: table_options_fn_wrapper,
353 default_table_options: default_table_options_fn_wrapper,
354 task_ctx: task_ctx_fn_wrapper,
355 logical_codec,
356
357 clone: clone_fn_wrapper,
358 release: release_fn_wrapper,
359 version: super::version,
360 private_data: Box::into_raw(private_data) as *mut c_void,
361 library_marker_id: crate::get_library_marker_id,
362 }
363 }
364}
365
366#[derive(Debug)]
371pub struct ForeignSession {
372 session: FFI_SessionRef,
373 config: SessionConfig,
374 scalar_functions: HashMap<String, Arc<ScalarUDF>>,
375 aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
376 window_functions: HashMap<String, Arc<WindowUDF>>,
377 table_options: TableOptions,
378 runtime_env: Arc<RuntimeEnv>,
379 props: ExecutionProps,
380}
381
382unsafe impl Send for ForeignSession {}
383unsafe impl Sync for ForeignSession {}
384
385impl FFI_SessionRef {
386 pub fn as_local(&self) -> Option<&(dyn Session + Send + Sync)> {
387 if (self.library_marker_id)() == crate::get_library_marker_id() {
388 return Some(self.inner());
389 }
390 None
391 }
392}
393
394impl TryFrom<&FFI_SessionRef> for ForeignSession {
395 type Error = DataFusionError;
396 fn try_from(session: &FFI_SessionRef) -> Result<Self, Self::Error> {
397 unsafe {
398 let table_options =
399 table_options_from_rhashmap((session.table_options)(session));
400
401 let config = (session.config)(session);
402 let config = SessionConfig::try_from(&config)?;
403
404 let scalar_functions = (session.scalar_functions)(session)
405 .into_iter()
406 .map(|kv_pair| {
407 let udf = <Arc<dyn ScalarUDFImpl>>::from(&kv_pair.1);
408
409 (
410 kv_pair.0.into_string(),
411 Arc::new(ScalarUDF::new_from_shared_impl(udf)),
412 )
413 })
414 .collect();
415 let aggregate_functions = (session.aggregate_functions)(session)
416 .into_iter()
417 .map(|kv_pair| {
418 let udaf = <Arc<dyn AggregateUDFImpl>>::from(&kv_pair.1);
419
420 (
421 kv_pair.0.into_string(),
422 Arc::new(AggregateUDF::new_from_shared_impl(udaf)),
423 )
424 })
425 .collect();
426 let window_functions = (session.window_functions)(session)
427 .into_iter()
428 .map(|kv_pair| {
429 let udwf = <Arc<dyn WindowUDFImpl>>::from(&kv_pair.1);
430
431 (
432 kv_pair.0.into_string(),
433 Arc::new(WindowUDF::new_from_shared_impl(udwf)),
434 )
435 })
436 .collect();
437
438 Ok(Self {
439 session: session.clone(),
440 config,
441 table_options,
442 scalar_functions,
443 aggregate_functions,
444 window_functions,
445 runtime_env: Default::default(),
446 props: Default::default(),
447 })
448 }
449 }
450}
451
452impl Clone for FFI_SessionRef {
453 fn clone(&self) -> Self {
454 unsafe { (self.clone)(self) }
455 }
456}
457
458fn table_options_from_rhashmap(options: RHashMap<RString, RString>) -> TableOptions {
459 let mut options: HashMap<String, String> = options
460 .into_iter()
461 .map(|kv_pair| (kv_pair.0.into_string(), kv_pair.1.into_string()))
462 .collect();
463 let current_format = options.remove("datafusion_ffi.table_current_format");
464
465 let mut table_options = TableOptions::default();
466 let formats = [
467 ConfigFileType::CSV,
468 ConfigFileType::JSON,
469 ConfigFileType::PARQUET,
470 ];
471 for format in formats {
472 let format_name = match &format {
475 ConfigFileType::CSV => "csv",
476 ConfigFileType::PARQUET => "parquet",
477 ConfigFileType::JSON => "json",
478 };
479 let format_options: HashMap<String, String> = options
480 .iter()
481 .filter_map(|(k, v)| {
482 let (prefix, key) = k.split_once(".")?;
483 if prefix == format_name {
484 Some((format!("format.{key}"), v.to_owned()))
485 } else {
486 None
487 }
488 })
489 .collect();
490 if !format_options.is_empty() {
491 table_options.current_format = Some(format.clone());
492 table_options
493 .alter_with_string_hash_map(&format_options)
494 .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
495 }
496 }
497
498 let extension_options: HashMap<String, String> = options
499 .iter()
500 .filter_map(|(k, v)| {
501 let (prefix, _) = k.split_once(".")?;
502 if !["json", "parquet", "csv"].contains(&prefix) {
503 Some((k.to_owned(), v.to_owned()))
504 } else {
505 None
506 }
507 })
508 .collect();
509 if !extension_options.is_empty() {
510 table_options
511 .alter_with_string_hash_map(&extension_options)
512 .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
513 }
514
515 table_options.current_format =
516 current_format.and_then(|format| match format.as_str() {
517 "csv" => Some(ConfigFileType::CSV),
518 "parquet" => Some(ConfigFileType::PARQUET),
519 "json" => Some(ConfigFileType::JSON),
520 _ => None,
521 });
522 table_options
523}
524
525#[async_trait]
526impl Session for ForeignSession {
527 fn session_id(&self) -> &str {
528 unsafe { (self.session.session_id)(&self.session).as_str() }
529 }
530
531 fn config(&self) -> &SessionConfig {
532 &self.config
533 }
534
535 fn config_options(&self) -> &ConfigOptions {
536 self.config.options()
537 }
538
539 async fn create_physical_plan(
540 &self,
541 logical_plan: &LogicalPlan,
542 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
543 unsafe {
544 let logical_plan = logical_plan_to_bytes(logical_plan)?;
545 let physical_plan = df_result!(
546 (self.session.create_physical_plan)(
547 &self.session,
548 logical_plan.as_ref().into()
549 )
550 .await
551 )?;
552 let physical_plan = <Arc<dyn ExecutionPlan>>::try_from(&physical_plan)?;
553
554 Ok(physical_plan)
555 }
556 }
557
558 fn create_physical_expr(
559 &self,
560 expr: Expr,
561 df_schema: &DFSchema,
562 ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
563 unsafe {
564 let codec: Arc<dyn LogicalExtensionCodec> =
565 (&self.session.logical_codec).into();
566 let logical_expr = serialize_expr(&expr, codec.as_ref())?.encode_to_vec();
567 let schema = WrappedSchema(FFI_ArrowSchema::try_from(df_schema.as_arrow())?);
568
569 let physical_expr = df_result!((self.session.create_physical_expr)(
570 &self.session,
571 logical_expr.into(),
572 schema
573 ))?;
574
575 Ok((&physical_expr).into())
576 }
577 }
578
579 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
580 &self.scalar_functions
581 }
582
583 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
584 &self.aggregate_functions
585 }
586
587 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
588 &self.window_functions
589 }
590
591 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
592 &self.runtime_env
593 }
594
595 fn execution_props(&self) -> &ExecutionProps {
596 &self.props
597 }
598
599 fn as_any(&self) -> &dyn Any {
600 self
601 }
602
603 fn table_options(&self) -> &TableOptions {
604 &self.table_options
605 }
606
607 fn default_table_options(&self) -> TableOptions {
608 unsafe {
609 table_options_from_rhashmap((self.session.default_table_options)(
610 &self.session,
611 ))
612 }
613 }
614
615 fn table_options_mut(&mut self) -> &mut TableOptions {
616 log::warn!(
617 "Mutating table options is not supported via FFI. Changes will not have an effect."
618 );
619 &mut self.table_options
620 }
621
622 fn task_ctx(&self) -> Arc<TaskContext> {
623 unsafe { (self.session.task_ctx)(&self.session).into() }
624 }
625}
626
627#[cfg(test)]
628mod tests {
629 use std::sync::Arc;
630
631 use arrow_schema::{DataType, Field, Schema};
632 use datafusion::execution::SessionStateBuilder;
633 use datafusion_common::DataFusionError;
634 use datafusion_expr::col;
635 use datafusion_expr::registry::FunctionRegistry;
636 use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
637
638 use super::*;
639
640 #[tokio::test]
641 async fn test_ffi_session() -> Result<(), DataFusionError> {
642 let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
643 let mut table_options = TableOptions::default();
644 table_options.csv.has_header = Some(true);
645 table_options.json.schema_infer_max_rec = Some(10);
646 table_options.parquet.global.coerce_int96 = Some("123456789".into());
647 table_options.current_format = Some(ConfigFileType::JSON);
648
649 let state = SessionStateBuilder::new_from_existing(ctx.state())
650 .with_table_options(table_options)
651 .build();
652
653 let logical_codec = FFI_LogicalExtensionCodec::new(
654 Arc::new(DefaultLogicalExtensionCodec {}),
655 None,
656 task_ctx_provider,
657 );
658
659 let local_session = FFI_SessionRef::new(&state, None, logical_codec);
660 let foreign_session = ForeignSession::try_from(&local_session)?;
661
662 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
663 let df_schema = schema.try_into()?;
664 let physical_expr = foreign_session.create_physical_expr(col("a"), &df_schema)?;
665 assert_eq!(
666 format!("{physical_expr:?}"),
667 "Column { name: \"a\", index: 0 }"
668 );
669
670 assert_eq!(foreign_session.session_id(), state.session_id());
671
672 let logical_plan = LogicalPlan::default();
673 let physical_plan = foreign_session.create_physical_plan(&logical_plan).await?;
674 assert_eq!(
675 format!("{physical_plan:?}"),
676 "EmptyExec { schema: Schema { fields: [], metadata: {} }, partitions: 1, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None } }"
677 );
678
679 assert_eq!(
680 format!("{:?}", foreign_session.default_table_options()),
681 format!("{:?}", state.default_table_options())
682 );
683
684 assert_eq!(
685 format!("{:?}", foreign_session.table_options()),
686 format!("{:?}", state.table_options())
687 );
688
689 let local_udfs = state.udfs();
690 for udf in foreign_session.scalar_functions().keys() {
691 assert!(local_udfs.contains(udf));
692 }
693 let local_udafs = state.udafs();
694 for udaf in foreign_session.aggregate_functions().keys() {
695 assert!(local_udafs.contains(udaf));
696 }
697 let local_udwfs = state.udwfs();
698 for udwf in foreign_session.window_functions().keys() {
699 assert!(local_udwfs.contains(udwf));
700 }
701
702 Ok(())
703 }
704}