1use std::ffi::c_void;
19use std::sync::Arc;
20
21use abi_stable::StableAbi;
22use abi_stable::std_types::{RResult, RSlice, RStr, RVec};
23use datafusion_common::error::Result;
24use datafusion_execution::TaskContext;
25use datafusion_expr::{
26 AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl,
27};
28use datafusion_physical_plan::ExecutionPlan;
29use datafusion_proto::physical_plan::PhysicalExtensionCodec;
30use tokio::runtime::Handle;
31
32use crate::execution::FFI_TaskContextProvider;
33use crate::execution_plan::FFI_ExecutionPlan;
34use crate::udaf::FFI_AggregateUDF;
35use crate::udf::FFI_ScalarUDF;
36use crate::udwf::FFI_WindowUDF;
37use crate::util::FFIResult;
38use crate::{df_result, rresult_return};
39
40#[repr(C)]
42#[derive(Debug, StableAbi)]
43pub struct FFI_PhysicalExtensionCodec {
44 try_decode: unsafe extern "C" fn(
46 &Self,
47 buf: RSlice<u8>,
48 inputs: RVec<FFI_ExecutionPlan>,
49 ) -> FFIResult<FFI_ExecutionPlan>,
50
51 try_encode:
53 unsafe extern "C" fn(&Self, node: FFI_ExecutionPlan) -> FFIResult<RVec<u8>>,
54
55 try_decode_udf: unsafe extern "C" fn(
57 &Self,
58 name: RStr,
59 buf: RSlice<u8>,
60 ) -> FFIResult<FFI_ScalarUDF>,
61
62 try_encode_udf:
64 unsafe extern "C" fn(&Self, node: FFI_ScalarUDF) -> FFIResult<RVec<u8>>,
65
66 try_decode_udaf: unsafe extern "C" fn(
68 &Self,
69 name: RStr,
70 buf: RSlice<u8>,
71 ) -> FFIResult<FFI_AggregateUDF>,
72
73 try_encode_udaf:
75 unsafe extern "C" fn(&Self, node: FFI_AggregateUDF) -> FFIResult<RVec<u8>>,
76
77 try_decode_udwf: unsafe extern "C" fn(
79 &Self,
80 name: RStr,
81 buf: RSlice<u8>,
82 ) -> FFIResult<FFI_WindowUDF>,
83
84 try_encode_udwf:
86 unsafe extern "C" fn(&Self, node: FFI_WindowUDF) -> FFIResult<RVec<u8>>,
87
88 task_ctx_provider: FFI_TaskContextProvider,
90
91 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
94
95 pub release: unsafe extern "C" fn(arg: &mut Self),
97
98 pub version: unsafe extern "C" fn() -> u64,
100
101 pub private_data: *mut c_void,
104
105 pub library_marker_id: extern "C" fn() -> usize,
108}
109
110unsafe impl Send for FFI_PhysicalExtensionCodec {}
111unsafe impl Sync for FFI_PhysicalExtensionCodec {}
112
113struct PhysicalExtensionCodecPrivateData {
114 provider: Arc<dyn PhysicalExtensionCodec>,
115 runtime: Option<Handle>,
116}
117
118impl FFI_PhysicalExtensionCodec {
119 fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
120 let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
121 unsafe { &(*private_data).provider }
122 }
123
124 fn runtime(&self) -> &Option<Handle> {
125 let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
126 unsafe { &(*private_data).runtime }
127 }
128}
129
130unsafe extern "C" fn try_decode_fn_wrapper(
131 codec: &FFI_PhysicalExtensionCodec,
132 buf: RSlice<u8>,
133 inputs: RVec<FFI_ExecutionPlan>,
134) -> FFIResult<FFI_ExecutionPlan> {
135 let task_ctx: Arc<TaskContext> =
136 rresult_return!((&codec.task_ctx_provider).try_into());
137 let codec = codec.inner();
138 let inputs = inputs
139 .into_iter()
140 .map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
141 .collect::<Result<Vec<_>>>();
142 let inputs = rresult_return!(inputs);
143
144 let plan =
145 rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref()));
146
147 RResult::ROk(FFI_ExecutionPlan::new(plan, None))
148}
149
150unsafe extern "C" fn try_encode_fn_wrapper(
151 codec: &FFI_PhysicalExtensionCodec,
152 node: FFI_ExecutionPlan,
153) -> FFIResult<RVec<u8>> {
154 let codec = codec.inner();
155
156 let plan: Arc<dyn ExecutionPlan> = rresult_return!((&node).try_into());
157
158 let mut bytes = Vec::new();
159 rresult_return!(codec.try_encode(plan, &mut bytes));
160
161 RResult::ROk(bytes.into())
162}
163
164unsafe extern "C" fn try_decode_udf_fn_wrapper(
165 codec: &FFI_PhysicalExtensionCodec,
166 name: RStr,
167 buf: RSlice<u8>,
168) -> FFIResult<FFI_ScalarUDF> {
169 let codec = codec.inner();
170
171 let udf = rresult_return!(codec.try_decode_udf(name.as_str(), buf.as_ref()));
172 let udf = FFI_ScalarUDF::from(udf);
173
174 RResult::ROk(udf)
175}
176
177unsafe extern "C" fn try_encode_udf_fn_wrapper(
178 codec: &FFI_PhysicalExtensionCodec,
179 node: FFI_ScalarUDF,
180) -> FFIResult<RVec<u8>> {
181 let codec = codec.inner();
182 let node: Arc<dyn ScalarUDFImpl> = (&node).into();
183 let node = ScalarUDF::new_from_shared_impl(node);
184
185 let mut bytes = Vec::new();
186 rresult_return!(codec.try_encode_udf(&node, &mut bytes));
187
188 RResult::ROk(bytes.into())
189}
190
191unsafe extern "C" fn try_decode_udaf_fn_wrapper(
192 codec: &FFI_PhysicalExtensionCodec,
193 name: RStr,
194 buf: RSlice<u8>,
195) -> FFIResult<FFI_AggregateUDF> {
196 let codec_inner = codec.inner();
197 let udaf = rresult_return!(codec_inner.try_decode_udaf(name.into(), buf.as_ref()));
198 let udaf = FFI_AggregateUDF::from(udaf);
199
200 RResult::ROk(udaf)
201}
202
203unsafe extern "C" fn try_encode_udaf_fn_wrapper(
204 codec: &FFI_PhysicalExtensionCodec,
205 node: FFI_AggregateUDF,
206) -> FFIResult<RVec<u8>> {
207 let codec = codec.inner();
208 let udaf: Arc<dyn AggregateUDFImpl> = (&node).into();
209 let udaf = AggregateUDF::new_from_shared_impl(udaf);
210
211 let mut bytes = Vec::new();
212 rresult_return!(codec.try_encode_udaf(&udaf, &mut bytes));
213
214 RResult::ROk(bytes.into())
215}
216
217unsafe extern "C" fn try_decode_udwf_fn_wrapper(
218 codec: &FFI_PhysicalExtensionCodec,
219 name: RStr,
220 buf: RSlice<u8>,
221) -> FFIResult<FFI_WindowUDF> {
222 let codec = codec.inner();
223 let udwf = rresult_return!(codec.try_decode_udwf(name.into(), buf.as_ref()));
224 let udwf = FFI_WindowUDF::from(udwf);
225
226 RResult::ROk(udwf)
227}
228
229unsafe extern "C" fn try_encode_udwf_fn_wrapper(
230 codec: &FFI_PhysicalExtensionCodec,
231 node: FFI_WindowUDF,
232) -> FFIResult<RVec<u8>> {
233 let codec = codec.inner();
234 let udwf: Arc<dyn WindowUDFImpl> = (&node).into();
235 let udwf = WindowUDF::new_from_shared_impl(udwf);
236
237 let mut bytes = Vec::new();
238 rresult_return!(codec.try_encode_udwf(&udwf, &mut bytes));
239
240 RResult::ROk(bytes.into())
241}
242
243unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_PhysicalExtensionCodec) {
244 unsafe {
245 let private_data = Box::from_raw(
246 provider.private_data as *mut PhysicalExtensionCodecPrivateData,
247 );
248 drop(private_data);
249 }
250}
251
252unsafe extern "C" fn clone_fn_wrapper(
253 codec: &FFI_PhysicalExtensionCodec,
254) -> FFI_PhysicalExtensionCodec {
255 let old_codec = Arc::clone(codec.inner());
256 let runtime = codec.runtime().clone();
257
258 FFI_PhysicalExtensionCodec::new(old_codec, runtime, codec.task_ctx_provider.clone())
259}
260
261impl Drop for FFI_PhysicalExtensionCodec {
262 fn drop(&mut self) {
263 unsafe { (self.release)(self) }
264 }
265}
266
267impl FFI_PhysicalExtensionCodec {
268 pub fn new(
270 provider: Arc<dyn PhysicalExtensionCodec + Send>,
271 runtime: Option<Handle>,
272 task_ctx_provider: impl Into<FFI_TaskContextProvider>,
273 ) -> Self {
274 let task_ctx_provider = task_ctx_provider.into();
275 let private_data =
276 Box::new(PhysicalExtensionCodecPrivateData { provider, runtime });
277
278 Self {
279 try_decode: try_decode_fn_wrapper,
280 try_encode: try_encode_fn_wrapper,
281 try_decode_udf: try_decode_udf_fn_wrapper,
282 try_encode_udf: try_encode_udf_fn_wrapper,
283 try_decode_udaf: try_decode_udaf_fn_wrapper,
284 try_encode_udaf: try_encode_udaf_fn_wrapper,
285 try_decode_udwf: try_decode_udwf_fn_wrapper,
286 try_encode_udwf: try_encode_udwf_fn_wrapper,
287 task_ctx_provider,
288
289 clone: clone_fn_wrapper,
290 release: release_fn_wrapper,
291 version: crate::version,
292 private_data: Box::into_raw(private_data) as *mut c_void,
293 library_marker_id: crate::get_library_marker_id,
294 }
295 }
296}
297
298#[derive(Debug)]
303pub struct ForeignPhysicalExtensionCodec(pub FFI_PhysicalExtensionCodec);
304
305unsafe impl Send for ForeignPhysicalExtensionCodec {}
306unsafe impl Sync for ForeignPhysicalExtensionCodec {}
307
308impl From<&FFI_PhysicalExtensionCodec> for Arc<dyn PhysicalExtensionCodec> {
309 fn from(provider: &FFI_PhysicalExtensionCodec) -> Self {
310 if (provider.library_marker_id)() == crate::get_library_marker_id() {
311 Arc::clone(provider.inner())
312 } else {
313 Arc::new(ForeignPhysicalExtensionCodec(provider.clone()))
314 }
315 }
316}
317
318impl Clone for FFI_PhysicalExtensionCodec {
319 fn clone(&self) -> Self {
320 unsafe { (self.clone)(self) }
321 }
322}
323
324impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
325 fn try_decode(
326 &self,
327 buf: &[u8],
328 inputs: &[Arc<dyn ExecutionPlan>],
329 _ctx: &TaskContext,
330 ) -> Result<Arc<dyn ExecutionPlan>> {
331 let inputs = inputs
332 .iter()
333 .map(|plan| FFI_ExecutionPlan::new(Arc::clone(plan), None))
334 .collect();
335
336 let plan =
337 df_result!(unsafe { (self.0.try_decode)(&self.0, buf.into(), inputs) })?;
338 let plan: Arc<dyn ExecutionPlan> = (&plan).try_into()?;
339
340 Ok(plan)
341 }
342
343 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
344 let plan = FFI_ExecutionPlan::new(node, None);
345 let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?;
346
347 buf.extend(bytes);
348 Ok(())
349 }
350
351 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
352 let udf = unsafe {
353 df_result!((self.0.try_decode_udf)(&self.0, name.into(), buf.into()))
354 }?;
355 let udf: Arc<dyn ScalarUDFImpl> = (&udf).into();
356
357 Ok(Arc::new(ScalarUDF::new_from_shared_impl(udf)))
358 }
359
360 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
361 let node = FFI_ScalarUDF::from(Arc::new(node.clone()));
362 let bytes = df_result!(unsafe { (self.0.try_encode_udf)(&self.0, node) })?;
363
364 buf.extend(bytes);
365
366 Ok(())
367 }
368
369 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
370 let udaf = unsafe {
371 df_result!((self.0.try_decode_udaf)(&self.0, name.into(), buf.into()))
372 }?;
373 let udaf: Arc<dyn AggregateUDFImpl> = (&udaf).into();
374
375 Ok(Arc::new(AggregateUDF::new_from_shared_impl(udaf)))
376 }
377
378 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
379 let node = Arc::new(node.clone());
380 let node = FFI_AggregateUDF::from(node);
381 let bytes = df_result!(unsafe { (self.0.try_encode_udaf)(&self.0, node) })?;
382
383 buf.extend(bytes);
384
385 Ok(())
386 }
387
388 fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
389 let udwf = unsafe {
390 df_result!((self.0.try_decode_udwf)(&self.0, name.into(), buf.into()))
391 }?;
392 let udwf: Arc<dyn WindowUDFImpl> = (&udwf).into();
393
394 Ok(Arc::new(WindowUDF::new_from_shared_impl(udwf)))
395 }
396
397 fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
398 let node = Arc::new(node.clone());
399 let node = FFI_WindowUDF::from(node);
400 let bytes = df_result!(unsafe { (self.0.try_encode_udwf)(&self.0, node) })?;
401
402 buf.extend(bytes);
403
404 Ok(())
405 }
406}
407
408#[cfg(test)]
409pub(crate) mod tests {
410 use std::sync::Arc;
411
412 use arrow_schema::{DataType, Field, Schema};
413 use datafusion_common::{Result, exec_err};
414 use datafusion_execution::TaskContext;
415 use datafusion_expr::ptr_eq::arc_ptr_eq;
416 use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF, WindowUDFImpl};
417 use datafusion_functions::math::abs::AbsFunc;
418 use datafusion_functions_aggregate::sum::Sum;
419 use datafusion_functions_window::rank::{Rank, RankType};
420 use datafusion_physical_plan::ExecutionPlan;
421 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
422
423 use crate::execution_plan::tests::EmptyExec;
424 use crate::proto::physical_extension_codec::FFI_PhysicalExtensionCodec;
425
426 #[derive(Debug)]
427 pub(crate) struct TestExtensionCodec;
428
429 impl TestExtensionCodec {
430 pub(crate) const MAGIC_NUMBER: u8 = 127;
431 pub(crate) const EMPTY_EXEC_SERIALIZED: u8 = 1;
432 pub(crate) const ABS_FUNC_SERIALIZED: u8 = 2;
433 pub(crate) const SUM_UDAF_SERIALIZED: u8 = 3;
434 pub(crate) const RANK_UDWF_SERIALIZED: u8 = 4;
435 pub(crate) const MEMTABLE_SERIALIZED: u8 = 5;
436 }
437
438 impl PhysicalExtensionCodec for TestExtensionCodec {
439 fn try_decode(
440 &self,
441 buf: &[u8],
442 _inputs: &[Arc<dyn ExecutionPlan>],
443 _ctx: &TaskContext,
444 ) -> Result<Arc<dyn ExecutionPlan>> {
445 if buf[0] != Self::MAGIC_NUMBER {
446 return exec_err!(
447 "TestExtensionCodec input buffer does not start with magic number"
448 );
449 }
450
451 if buf.len() != 2 || buf[1] != Self::EMPTY_EXEC_SERIALIZED {
452 return exec_err!("TestExtensionCodec unable to decode execution plan");
453 }
454
455 Ok(create_test_exec())
456 }
457
458 fn try_encode(
459 &self,
460 node: Arc<dyn ExecutionPlan>,
461 buf: &mut Vec<u8>,
462 ) -> Result<()> {
463 buf.push(Self::MAGIC_NUMBER);
464
465 let Some(_) = node.as_any().downcast_ref::<EmptyExec>() else {
466 return exec_err!("TestExtensionCodec only expects EmptyExec");
467 };
468
469 buf.push(Self::EMPTY_EXEC_SERIALIZED);
470
471 Ok(())
472 }
473
474 fn try_decode_udf(&self, _name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
475 if buf[0] != Self::MAGIC_NUMBER {
476 return exec_err!(
477 "TestExtensionCodec input buffer does not start with magic number"
478 );
479 }
480
481 if buf.len() != 2 || buf[1] != Self::ABS_FUNC_SERIALIZED {
482 return exec_err!("TestExtensionCodec unable to decode udf");
483 }
484
485 Ok(Arc::new(ScalarUDF::from(AbsFunc::new())))
486 }
487
488 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
489 buf.push(Self::MAGIC_NUMBER);
490
491 let udf = node.inner();
492 if !udf.as_any().is::<AbsFunc>() {
493 return exec_err!("TestExtensionCodec only expects Abs UDF");
494 };
495
496 buf.push(Self::ABS_FUNC_SERIALIZED);
497
498 Ok(())
499 }
500
501 fn try_decode_udaf(&self, _name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
502 if buf[0] != Self::MAGIC_NUMBER {
503 return exec_err!(
504 "TestExtensionCodec input buffer does not start with magic number"
505 );
506 }
507
508 if buf.len() != 2 || buf[1] != Self::SUM_UDAF_SERIALIZED {
509 return exec_err!("TestExtensionCodec unable to decode udaf");
510 }
511
512 Ok(Arc::new(AggregateUDF::from(Sum::new())))
513 }
514
515 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
516 buf.push(Self::MAGIC_NUMBER);
517
518 let udf = node.inner();
519 let Some(_udf) = udf.as_any().downcast_ref::<Sum>() else {
520 return exec_err!("TestExtensionCodec only expects Sum UDAF");
521 };
522
523 buf.push(Self::SUM_UDAF_SERIALIZED);
524
525 Ok(())
526 }
527
528 fn try_decode_udwf(&self, _name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
529 if buf[0] != Self::MAGIC_NUMBER {
530 return exec_err!(
531 "TestExtensionCodec input buffer does not start with magic number"
532 );
533 }
534
535 if buf.len() != 2 || buf[1] != Self::RANK_UDWF_SERIALIZED {
536 return exec_err!("TestExtensionCodec unable to decode udwf");
537 }
538
539 Ok(Arc::new(WindowUDF::from(Rank::new(
540 "my_rank".to_owned(),
541 RankType::Basic,
542 ))))
543 }
544
545 fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
546 buf.push(Self::MAGIC_NUMBER);
547
548 let udf = node.inner();
549 let Some(udf) = udf.as_any().downcast_ref::<Rank>() else {
550 return exec_err!("TestExtensionCodec only expects Rank UDWF");
551 };
552
553 if udf.name() != "my_rank" {
554 return exec_err!("TestExtensionCodec only expects my_rank UDWF name");
555 }
556
557 buf.push(Self::RANK_UDWF_SERIALIZED);
558
559 Ok(())
560 }
561 }
562
563 fn create_test_exec() -> Arc<dyn ExecutionPlan> {
564 let schema =
565 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
566 Arc::new(EmptyExec::new(schema)) as Arc<dyn ExecutionPlan>
567 }
568
569 #[test]
570 fn roundtrip_ffi_physical_extension_codec_exec_plan() -> Result<()> {
571 let codec = Arc::new(TestExtensionCodec {});
572 let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
573
574 let mut ffi_codec =
575 FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
576 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
577 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
578
579 let exec = create_test_exec();
580 let input_execs = [create_test_exec()];
581 let mut bytes = Vec::new();
582 foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?;
583
584 let returned_exec =
585 foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?;
586
587 assert!(returned_exec.as_any().is::<EmptyExec>());
588
589 Ok(())
590 }
591
592 #[test]
593 fn roundtrip_ffi_physical_extension_codec_udf() -> Result<()> {
594 let codec = Arc::new(TestExtensionCodec {});
595 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
596
597 let mut ffi_codec =
598 FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
599 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
600 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
601
602 let udf = Arc::new(ScalarUDF::from(AbsFunc::new()));
603 let mut bytes = Vec::new();
604 foreign_codec.try_encode_udf(udf.as_ref(), &mut bytes)?;
605
606 let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?;
607
608 assert!(returned_udf.inner().as_any().is::<AbsFunc>());
609
610 Ok(())
611 }
612
613 #[test]
614 fn roundtrip_ffi_physical_extension_codec_udaf() -> Result<()> {
615 let codec = Arc::new(TestExtensionCodec {});
616 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
617
618 let mut ffi_codec =
619 FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
620 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
621 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
622
623 let udf = Arc::new(AggregateUDF::from(Sum::new()));
624 let mut bytes = Vec::new();
625 foreign_codec.try_encode_udaf(udf.as_ref(), &mut bytes)?;
626
627 let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?;
628
629 assert!(returned_udf.inner().as_any().is::<Sum>());
630
631 Ok(())
632 }
633
634 #[test]
635 fn roundtrip_ffi_physical_extension_codec_udwf() -> Result<()> {
636 let codec = Arc::new(TestExtensionCodec {});
637 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
638
639 let mut ffi_codec =
640 FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
641 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
642 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
643
644 let udf = Arc::new(WindowUDF::from(Rank::new(
645 "my_rank".to_owned(),
646 RankType::Basic,
647 )));
648 let mut bytes = Vec::new();
649 foreign_codec.try_encode_udwf(udf.as_ref(), &mut bytes)?;
650
651 let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?;
652
653 assert!(returned_udf.inner().as_any().is::<Rank>());
654
655 Ok(())
656 }
657
658 #[test]
659 fn ffi_physical_extension_codec_local_bypass() {
660 let codec =
661 Arc::new(TestExtensionCodec {}) as Arc<dyn PhysicalExtensionCodec + Send>;
662 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
663
664 let mut ffi_codec =
665 FFI_PhysicalExtensionCodec::new(Arc::clone(&codec), None, task_ctx_provider);
666
667 let codec = codec as Arc<dyn PhysicalExtensionCodec>;
668 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
670 assert!(arc_ptr_eq(&foreign_codec, &codec));
671
672 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
674 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
675 assert!(!arc_ptr_eq(&foreign_codec, &codec));
676 }
677}