1use std::any::Any;
19use std::collections::HashMap;
20use std::ffi::c_void;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef;
24use arrow_schema::ffi::FFI_ArrowSchema;
25use async_ffi::{FfiFuture, FutureExt};
26use async_trait::async_trait;
27use datafusion_common::config::{ConfigFileType, ConfigOptions, TableOptions};
28use datafusion_common::{DFSchema, DataFusionError};
29use datafusion_execution::TaskContext;
30use datafusion_execution::config::SessionConfig;
31use datafusion_execution::runtime_env::RuntimeEnv;
32use datafusion_expr::execution_props::ExecutionProps;
33use datafusion_expr::registry::{ExtensionTypeRegistryRef, MemoryExtensionTypeRegistry};
34use datafusion_expr::{
35 AggregateUDF, AggregateUDFImpl, Expr, HigherOrderUDF, LogicalPlan, ScalarUDF,
36 ScalarUDFImpl, WindowUDF, WindowUDFImpl,
37};
38use datafusion_physical_expr::PhysicalExpr;
39use datafusion_physical_plan::ExecutionPlan;
40use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
41use datafusion_proto::logical_plan::LogicalExtensionCodec;
42use datafusion_proto::logical_plan::from_proto::parse_expr;
43use datafusion_proto::logical_plan::to_proto::serialize_expr;
44use datafusion_proto::protobuf::LogicalExprNode;
45use datafusion_session::Session;
46use prost::Message;
47
48use stabby::str::Str as SStr;
49use stabby::string::String as SString;
50use stabby::vec::Vec as SVec;
51use tokio::runtime::Handle;
52
53use crate::arrow_wrappers::WrappedSchema;
54use crate::execution::FFI_TaskContext;
55use crate::execution_plan::FFI_ExecutionPlan;
56use crate::physical_expr::FFI_PhysicalExpr;
57use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
58use crate::session::config::FFI_SessionConfig;
59use crate::udaf::FFI_AggregateUDF;
60use crate::udf::FFI_ScalarUDF;
61use crate::udwf::FFI_WindowUDF;
62use crate::util::FFI_Result;
63use crate::{df_result, sresult, sresult_return};
64
65pub mod config;
66
67#[repr(C)]
80#[derive(Debug)]
81pub(crate) struct FFI_SessionRef {
82 session_id: unsafe extern "C" fn(&Self) -> SStr,
83
84 config: unsafe extern "C" fn(&Self) -> FFI_SessionConfig,
85
86 create_physical_plan:
87 unsafe extern "C" fn(
88 &Self,
89 logical_plan_serialized: SVec<u8>,
90 ) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>>,
91
92 create_physical_expr: unsafe extern "C" fn(
93 &Self,
94 expr_serialized: SVec<u8>,
95 schema: WrappedSchema,
96 ) -> FFI_Result<FFI_PhysicalExpr>,
97
98 scalar_functions: unsafe extern "C" fn(&Self) -> SVec<(SString, FFI_ScalarUDF)>,
99
100 aggregate_functions: unsafe extern "C" fn(&Self) -> SVec<(SString, FFI_AggregateUDF)>,
101
102 window_functions: unsafe extern "C" fn(&Self) -> SVec<(SString, FFI_WindowUDF)>,
103
104 table_options: unsafe extern "C" fn(&Self) -> SVec<(SString, SString)>,
105
106 default_table_options: unsafe extern "C" fn(&Self) -> SVec<(SString, SString)>,
107
108 task_ctx: unsafe extern "C" fn(&Self) -> FFI_TaskContext,
109
110 logical_codec: FFI_LogicalExtensionCodec,
111
112 clone: unsafe extern "C" fn(plan: &Self) -> Self,
115
116 release: unsafe extern "C" fn(arg: &mut Self),
118
119 pub version: unsafe extern "C" fn() -> u64,
121
122 private_data: *mut c_void,
125
126 pub library_marker_id: extern "C" fn() -> usize,
129}
130
131unsafe impl Send for FFI_SessionRef {}
132unsafe impl Sync for FFI_SessionRef {}
133
134struct SessionPrivateData<'a> {
135 session: &'a (dyn Session + Send + Sync),
136 runtime: Option<Handle>,
137}
138
139impl FFI_SessionRef {
140 fn inner(&self) -> &(dyn Session + Send + Sync) {
141 let private_data = self.private_data as *const SessionPrivateData;
142 unsafe { (*private_data).session }
143 }
144
145 unsafe fn runtime(&self) -> &Option<Handle> {
146 unsafe {
147 let private_data = self.private_data as *const SessionPrivateData;
148 &(*private_data).runtime
149 }
150 }
151}
152
153unsafe extern "C" fn session_id_fn_wrapper(session: &FFI_SessionRef) -> SStr<'_> {
154 let session = session.inner();
155 session.session_id().into()
156}
157
158unsafe extern "C" fn config_fn_wrapper(session: &FFI_SessionRef) -> FFI_SessionConfig {
159 let session = session.inner();
160 session.config().into()
161}
162
163unsafe extern "C" fn create_physical_plan_fn_wrapper(
164 session: &FFI_SessionRef,
165 logical_plan_serialized: SVec<u8>,
166) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>> {
167 unsafe {
168 let runtime = session.runtime().clone();
169 let session = session.clone();
170 async move {
171 let session = session.inner();
172 let task_ctx = session.task_ctx();
173
174 let logical_plan = sresult_return!(logical_plan_from_bytes(
175 logical_plan_serialized.as_slice(),
176 task_ctx.as_ref(),
177 ));
178
179 let physical_plan = session.create_physical_plan(&logical_plan).await;
180
181 sresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime)))
182 }
183 .into_ffi()
184 }
185}
186
187unsafe extern "C" fn create_physical_expr_fn_wrapper(
188 session: &FFI_SessionRef,
189 expr_serialized: SVec<u8>,
190 schema: WrappedSchema,
191) -> FFI_Result<FFI_PhysicalExpr> {
192 let codec: Arc<dyn LogicalExtensionCodec> = (&session.logical_codec).into();
193 let session = session.inner();
194
195 let logical_expr = LogicalExprNode::decode(expr_serialized.as_slice()).unwrap();
196 let logical_expr =
197 parse_expr(&logical_expr, session.task_ctx().as_ref(), codec.as_ref()).unwrap();
198 let schema: SchemaRef = schema.into();
199 let schema: DFSchema = sresult_return!(schema.try_into());
200
201 let physical_expr =
202 sresult_return!(session.create_physical_expr(logical_expr, &schema));
203
204 FFI_Result::Ok(physical_expr.into())
205}
206
207unsafe extern "C" fn scalar_functions_fn_wrapper(
208 session: &FFI_SessionRef,
209) -> SVec<(SString, FFI_ScalarUDF)> {
210 let session = session.inner();
211 session
212 .scalar_functions()
213 .iter()
214 .map(|(name, udf)| (name.clone().into(), FFI_ScalarUDF::from(Arc::clone(udf))))
215 .collect()
216}
217
218unsafe extern "C" fn aggregate_functions_fn_wrapper(
219 session: &FFI_SessionRef,
220) -> SVec<(SString, FFI_AggregateUDF)> {
221 let session = session.inner();
222 session
223 .aggregate_functions()
224 .iter()
225 .map(|(name, udaf)| {
226 (
227 name.clone().into(),
228 FFI_AggregateUDF::from(Arc::clone(udaf)),
229 )
230 })
231 .collect()
232}
233
234unsafe extern "C" fn window_functions_fn_wrapper(
235 session: &FFI_SessionRef,
236) -> SVec<(SString, FFI_WindowUDF)> {
237 let session = session.inner();
238 session
239 .window_functions()
240 .iter()
241 .map(|(name, udwf)| (name.clone().into(), FFI_WindowUDF::from(Arc::clone(udwf))))
242 .collect()
243}
244
245fn table_options_to_rhash(mut options: TableOptions) -> SVec<(SString, SString)> {
246 let current_format = options.current_format.take();
251 let mut options: HashMap<SString, SString> = options
252 .entries()
253 .into_iter()
254 .filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into())))
255 .collect();
256 if let Some(current_format) = current_format {
257 options.insert(
258 "datafusion_ffi.table_current_format".into(),
259 match current_format {
260 ConfigFileType::JSON => "json",
261 ConfigFileType::PARQUET => "parquet",
262 ConfigFileType::CSV => "csv",
263 }
264 .into(),
265 );
266 }
267
268 options.into_iter().collect()
269}
270
271unsafe extern "C" fn table_options_fn_wrapper(
272 session: &FFI_SessionRef,
273) -> SVec<(SString, SString)> {
274 let session = session.inner();
275 let table_options = session.table_options();
276 table_options_to_rhash(table_options.clone())
277}
278
279unsafe extern "C" fn default_table_options_fn_wrapper(
280 session: &FFI_SessionRef,
281) -> SVec<(SString, SString)> {
282 let session = session.inner();
283 let table_options = session.default_table_options();
284
285 table_options_to_rhash(table_options)
286}
287
288unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) -> FFI_TaskContext {
289 session.inner().task_ctx().into()
290}
291
292unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SessionRef) {
293 unsafe {
294 let private_data =
295 Box::from_raw(provider.private_data as *mut SessionPrivateData);
296 drop(private_data);
297 }
298}
299
300unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_SessionRef) -> FFI_SessionRef {
301 unsafe {
302 let old_private_data = provider.private_data as *const SessionPrivateData;
303
304 let private_data = Box::into_raw(Box::new(SessionPrivateData {
305 session: (*old_private_data).session,
306 runtime: (*old_private_data).runtime.clone(),
307 })) as *mut c_void;
308
309 FFI_SessionRef {
310 session_id: session_id_fn_wrapper,
311 config: config_fn_wrapper,
312 create_physical_plan: create_physical_plan_fn_wrapper,
313 create_physical_expr: create_physical_expr_fn_wrapper,
314 scalar_functions: scalar_functions_fn_wrapper,
315 aggregate_functions: aggregate_functions_fn_wrapper,
316 window_functions: window_functions_fn_wrapper,
317 table_options: table_options_fn_wrapper,
318 default_table_options: default_table_options_fn_wrapper,
319 task_ctx: task_ctx_fn_wrapper,
320 logical_codec: provider.logical_codec.clone(),
321
322 clone: clone_fn_wrapper,
323 release: release_fn_wrapper,
324 version: super::version,
325 private_data,
326 library_marker_id: crate::get_library_marker_id,
327 }
328 }
329}
330
331impl Drop for FFI_SessionRef {
332 fn drop(&mut self) {
333 unsafe { (self.release)(self) }
334 }
335}
336
337impl FFI_SessionRef {
338 pub fn new(
340 session: &(dyn Session + Send + Sync),
341 runtime: Option<Handle>,
342 logical_codec: FFI_LogicalExtensionCodec,
343 ) -> Self {
344 if let Some(session) = session.as_any().downcast_ref::<ForeignSession>() {
345 return session.session.clone();
346 }
347
348 let private_data = Box::new(SessionPrivateData { session, runtime });
349
350 Self {
351 session_id: session_id_fn_wrapper,
352 config: config_fn_wrapper,
353 create_physical_plan: create_physical_plan_fn_wrapper,
354 create_physical_expr: create_physical_expr_fn_wrapper,
355 scalar_functions: scalar_functions_fn_wrapper,
356 aggregate_functions: aggregate_functions_fn_wrapper,
357 window_functions: window_functions_fn_wrapper,
358 table_options: table_options_fn_wrapper,
359 default_table_options: default_table_options_fn_wrapper,
360 task_ctx: task_ctx_fn_wrapper,
361 logical_codec,
362
363 clone: clone_fn_wrapper,
364 release: release_fn_wrapper,
365 version: super::version,
366 private_data: Box::into_raw(private_data) as *mut c_void,
367 library_marker_id: crate::get_library_marker_id,
368 }
369 }
370}
371
372#[derive(Debug)]
377pub struct ForeignSession {
378 session: FFI_SessionRef,
379 config: SessionConfig,
380 scalar_functions: HashMap<String, Arc<ScalarUDF>>,
381 higher_order_functions: HashMap<String, Arc<HigherOrderUDF>>,
382 aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
383 window_functions: HashMap<String, Arc<WindowUDF>>,
384 extension_types: ExtensionTypeRegistryRef,
385 table_options: TableOptions,
386 runtime_env: Arc<RuntimeEnv>,
387 props: ExecutionProps,
388}
389
390unsafe impl Send for ForeignSession {}
391unsafe impl Sync for ForeignSession {}
392
393impl FFI_SessionRef {
394 pub fn as_local(&self) -> Option<&(dyn Session + Send + Sync)> {
395 if (self.library_marker_id)() == crate::get_library_marker_id() {
396 return Some(self.inner());
397 }
398 None
399 }
400}
401
402impl TryFrom<&FFI_SessionRef> for ForeignSession {
403 type Error = DataFusionError;
404 fn try_from(session: &FFI_SessionRef) -> Result<Self, Self::Error> {
405 unsafe {
406 let table_options =
407 table_options_from_rhashmap((session.table_options)(session));
408
409 let config = (session.config)(session);
410 let config = SessionConfig::try_from(&config)?;
411
412 let scalar_functions = (session.scalar_functions)(session)
413 .into_iter()
414 .map(|kv_pair| {
415 let udf = <Arc<dyn ScalarUDFImpl>>::from(&kv_pair.1);
416
417 (
418 kv_pair.0.to_string(),
419 Arc::new(ScalarUDF::new_from_shared_impl(udf)),
420 )
421 })
422 .collect();
423 let aggregate_functions = (session.aggregate_functions)(session)
424 .into_iter()
425 .map(|kv_pair| {
426 let udaf = <Arc<dyn AggregateUDFImpl>>::from(&kv_pair.1);
427
428 (
429 kv_pair.0.to_string(),
430 Arc::new(AggregateUDF::new_from_shared_impl(udaf)),
431 )
432 })
433 .collect();
434 let window_functions = (session.window_functions)(session)
435 .into_iter()
436 .map(|kv_pair| {
437 let udwf = <Arc<dyn WindowUDFImpl>>::from(&kv_pair.1);
438
439 (
440 kv_pair.0.to_string(),
441 Arc::new(WindowUDF::new_from_shared_impl(udwf)),
442 )
443 })
444 .collect();
445
446 Ok(Self {
447 session: session.clone(),
448 config,
449 table_options,
450 scalar_functions,
451 higher_order_functions: HashMap::new(),
452 aggregate_functions,
453 window_functions,
454 extension_types: Arc::new(MemoryExtensionTypeRegistry::default()),
455 runtime_env: Default::default(),
456 props: Default::default(),
457 })
458 }
459 }
460}
461
462impl Clone for FFI_SessionRef {
463 fn clone(&self) -> Self {
464 unsafe { (self.clone)(self) }
465 }
466}
467
468fn table_options_from_rhashmap(options: SVec<(SString, SString)>) -> TableOptions {
469 let mut options: HashMap<String, String> = options
470 .into_iter()
471 .map(|kv_pair| (kv_pair.0.to_string(), kv_pair.1.to_string()))
472 .collect();
473 let current_format = options.remove("datafusion_ffi.table_current_format");
474
475 let mut table_options = TableOptions::default();
476 let formats = [
477 ConfigFileType::CSV,
478 ConfigFileType::JSON,
479 ConfigFileType::PARQUET,
480 ];
481 for format in formats {
482 let format_name = match &format {
485 ConfigFileType::CSV => "csv",
486 ConfigFileType::PARQUET => "parquet",
487 ConfigFileType::JSON => "json",
488 };
489 let format_options: HashMap<String, String> = options
490 .iter()
491 .filter_map(|(k, v)| {
492 let (prefix, key) = k.split_once(".")?;
493 if prefix == format_name {
494 Some((format!("format.{key}"), v.to_owned()))
495 } else {
496 None
497 }
498 })
499 .collect();
500 if !format_options.is_empty() {
501 table_options.current_format = Some(format.clone());
502 table_options
503 .alter_with_string_hash_map(&format_options)
504 .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
505 }
506 }
507
508 let extension_options: HashMap<String, String> = options
509 .iter()
510 .filter_map(|(k, v)| {
511 let (prefix, _) = k.split_once(".")?;
512 if !["json", "parquet", "csv"].contains(&prefix) {
513 Some((k.to_owned(), v.to_owned()))
514 } else {
515 None
516 }
517 })
518 .collect();
519 if !extension_options.is_empty() {
520 table_options
521 .alter_with_string_hash_map(&extension_options)
522 .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
523 }
524
525 table_options.current_format =
526 current_format.and_then(|format| match format.as_str() {
527 "csv" => Some(ConfigFileType::CSV),
528 "parquet" => Some(ConfigFileType::PARQUET),
529 "json" => Some(ConfigFileType::JSON),
530 _ => None,
531 });
532 table_options
533}
534
535#[async_trait]
536impl Session for ForeignSession {
537 fn session_id(&self) -> &str {
538 unsafe { (self.session.session_id)(&self.session).as_str() }
539 }
540
541 fn config(&self) -> &SessionConfig {
542 &self.config
543 }
544
545 fn config_options(&self) -> &ConfigOptions {
546 self.config.options()
547 }
548
549 async fn create_physical_plan(
550 &self,
551 logical_plan: &LogicalPlan,
552 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
553 unsafe {
554 let logical_plan = logical_plan_to_bytes(logical_plan)?;
555 let physical_plan = df_result!(
556 (self.session.create_physical_plan)(
557 &self.session,
558 logical_plan.as_ref().into()
559 )
560 .await
561 )?;
562 let physical_plan = <Arc<dyn ExecutionPlan>>::try_from(&physical_plan)?;
563
564 Ok(physical_plan)
565 }
566 }
567
568 fn create_physical_expr(
569 &self,
570 expr: Expr,
571 df_schema: &DFSchema,
572 ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
573 unsafe {
574 let codec: Arc<dyn LogicalExtensionCodec> =
575 (&self.session.logical_codec).into();
576 let logical_expr = serialize_expr(&expr, codec.as_ref())?.encode_to_vec();
577 let schema = WrappedSchema(FFI_ArrowSchema::try_from(df_schema.as_arrow())?);
578
579 let physical_expr = df_result!((self.session.create_physical_expr)(
580 &self.session,
581 logical_expr.into_iter().collect(),
582 schema
583 ))?;
584
585 Ok((&physical_expr).into())
586 }
587 }
588
589 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
590 &self.scalar_functions
591 }
592
593 fn higher_order_functions(&self) -> &HashMap<String, Arc<HigherOrderUDF>> {
594 &self.higher_order_functions
595 }
596
597 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
598 &self.aggregate_functions
599 }
600
601 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
602 &self.window_functions
603 }
604
605 fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef {
606 &self.extension_types
607 }
608
609 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
610 &self.runtime_env
611 }
612
613 fn execution_props(&self) -> &ExecutionProps {
614 &self.props
615 }
616
617 fn as_any(&self) -> &dyn Any {
618 self
619 }
620
621 fn table_options(&self) -> &TableOptions {
622 &self.table_options
623 }
624
625 fn default_table_options(&self) -> TableOptions {
626 unsafe {
627 table_options_from_rhashmap((self.session.default_table_options)(
628 &self.session,
629 ))
630 }
631 }
632
633 fn table_options_mut(&mut self) -> &mut TableOptions {
634 log::warn!(
635 "Mutating table options is not supported via FFI. Changes will not have an effect."
636 );
637 &mut self.table_options
638 }
639
640 fn task_ctx(&self) -> Arc<TaskContext> {
641 unsafe { (self.session.task_ctx)(&self.session).into() }
642 }
643}
644
645#[cfg(test)]
646mod tests {
647 use std::sync::Arc;
648
649 use arrow_schema::{DataType, Field, Schema};
650 use datafusion::execution::SessionStateBuilder;
651 use datafusion_common::DataFusionError;
652 use datafusion_expr::col;
653 use datafusion_expr::registry::FunctionRegistry;
654 use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
655
656 use super::*;
657
658 #[tokio::test]
659 async fn test_ffi_session() -> Result<(), DataFusionError> {
660 let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
661 let mut table_options = TableOptions::default();
662 table_options.csv.has_header = Some(true);
663 table_options.json.schema_infer_max_rec = Some(10);
664 table_options.parquet.global.coerce_int96 = Some("123456789".into());
665 table_options.current_format = Some(ConfigFileType::JSON);
666
667 let state = SessionStateBuilder::new_from_existing(ctx.state())
668 .with_table_options(table_options)
669 .build();
670
671 let logical_codec = FFI_LogicalExtensionCodec::new(
672 Arc::new(DefaultLogicalExtensionCodec {}),
673 None,
674 task_ctx_provider,
675 );
676
677 let local_session = FFI_SessionRef::new(&state, None, logical_codec);
678 let foreign_session = ForeignSession::try_from(&local_session)?;
679
680 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
681 let df_schema = schema.try_into()?;
682 let physical_expr = foreign_session.create_physical_expr(col("a"), &df_schema)?;
683 assert_eq!(
684 format!("{physical_expr:?}"),
685 "Column { name: \"a\", index: 0 }"
686 );
687
688 assert_eq!(foreign_session.session_id(), state.session_id());
689
690 let logical_plan = LogicalPlan::default();
691 let physical_plan = foreign_session.create_physical_plan(&logical_plan).await?;
692 assert_eq!(
693 format!("{physical_plan:?}"),
694 "EmptyExec { schema: Schema { fields: [], metadata: {} }, partitions: 1, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None } }"
695 );
696
697 assert_eq!(
698 format!("{:?}", foreign_session.default_table_options()),
699 format!("{:?}", state.default_table_options())
700 );
701
702 assert_eq!(
703 format!("{:?}", foreign_session.table_options()),
704 format!("{:?}", state.table_options())
705 );
706
707 let local_udfs = state.udfs();
708 for udf in foreign_session.scalar_functions().keys() {
709 assert!(local_udfs.contains(udf));
710 }
711 let local_udafs = state.udafs();
712 for udaf in foreign_session.aggregate_functions().keys() {
713 assert!(local_udafs.contains(udaf));
714 }
715 let local_udwfs = state.udwfs();
716 for udwf in foreign_session.window_functions().keys() {
717 assert!(local_udwfs.contains(udwf));
718 }
719
720 Ok(())
721 }
722}