1use std::any::Any;
19use std::ffi::c_void;
20use std::sync::Arc;
21
22use datafusion_common::error::Result;
23use datafusion_execution::TaskContext;
24use datafusion_expr::{
25 AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl,
26};
27use datafusion_physical_plan::ExecutionPlan;
28use datafusion_proto::physical_plan::PhysicalExtensionCodec;
29
30use stabby::slice::Slice as SSlice;
31use stabby::str::Str as SStr;
32use stabby::vec::Vec as SVec;
33use tokio::runtime::Handle;
34
35use crate::execution::FFI_TaskContextProvider;
36use crate::execution_plan::FFI_ExecutionPlan;
37use crate::udaf::FFI_AggregateUDF;
38use crate::udf::FFI_ScalarUDF;
39use crate::udwf::FFI_WindowUDF;
40use crate::util::FFI_Result;
41use crate::{df_result, sresult_return};
42
43#[repr(C)]
45#[derive(Debug)]
46pub struct FFI_PhysicalExtensionCodec {
47 try_decode: unsafe extern "C" fn(
49 &Self,
50 buf: SSlice<u8>,
51 inputs: SVec<FFI_ExecutionPlan>,
52 ) -> FFI_Result<FFI_ExecutionPlan>,
53
54 try_encode:
56 unsafe extern "C" fn(&Self, node: FFI_ExecutionPlan) -> FFI_Result<SVec<u8>>,
57
58 try_decode_udf: unsafe extern "C" fn(
60 &Self,
61 name: SStr,
62 buf: SSlice<u8>,
63 ) -> FFI_Result<FFI_ScalarUDF>,
64
65 try_encode_udf:
67 unsafe extern "C" fn(&Self, node: FFI_ScalarUDF) -> FFI_Result<SVec<u8>>,
68
69 try_decode_udaf: unsafe extern "C" fn(
71 &Self,
72 name: SStr,
73 buf: SSlice<u8>,
74 ) -> FFI_Result<FFI_AggregateUDF>,
75
76 try_encode_udaf:
78 unsafe extern "C" fn(&Self, node: FFI_AggregateUDF) -> FFI_Result<SVec<u8>>,
79
80 try_decode_udwf: unsafe extern "C" fn(
82 &Self,
83 name: SStr,
84 buf: SSlice<u8>,
85 ) -> FFI_Result<FFI_WindowUDF>,
86
87 try_encode_udwf:
89 unsafe extern "C" fn(&Self, node: FFI_WindowUDF) -> FFI_Result<SVec<u8>>,
90
91 task_ctx_provider: FFI_TaskContextProvider,
93
94 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
97
98 pub release: unsafe extern "C" fn(arg: &mut Self),
100
101 pub version: unsafe extern "C" fn() -> u64,
103
104 pub private_data: *mut c_void,
107
108 pub library_marker_id: extern "C" fn() -> usize,
111}
112
113unsafe impl Send for FFI_PhysicalExtensionCodec {}
114unsafe impl Sync for FFI_PhysicalExtensionCodec {}
115
116struct PhysicalExtensionCodecPrivateData {
117 codec: Arc<dyn PhysicalExtensionCodec>,
118 runtime: Option<Handle>,
119}
120
121impl FFI_PhysicalExtensionCodec {
122 fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
123 let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
124 unsafe { &(*private_data).codec }
125 }
126
127 fn runtime(&self) -> &Option<Handle> {
128 let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
129 unsafe { &(*private_data).runtime }
130 }
131}
132
133unsafe extern "C" fn try_decode_fn_wrapper(
134 codec: &FFI_PhysicalExtensionCodec,
135 buf: SSlice<u8>,
136 inputs: SVec<FFI_ExecutionPlan>,
137) -> FFI_Result<FFI_ExecutionPlan> {
138 let runtime = codec.runtime().clone();
139 let task_ctx: Arc<TaskContext> =
140 sresult_return!((&codec.task_ctx_provider).try_into());
141 let codec = codec.inner();
142 let inputs = inputs
143 .into_iter()
144 .map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
145 .collect::<Result<Vec<_>>>();
146 let inputs = sresult_return!(inputs);
147
148 let plan =
149 sresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref()));
150
151 FFI_Result::Ok(FFI_ExecutionPlan::new(plan, runtime))
152}
153
154unsafe extern "C" fn try_encode_fn_wrapper(
155 codec: &FFI_PhysicalExtensionCodec,
156 node: FFI_ExecutionPlan,
157) -> FFI_Result<SVec<u8>> {
158 let codec = codec.inner();
159
160 let plan: Arc<dyn ExecutionPlan> = sresult_return!((&node).try_into());
161
162 let mut bytes = Vec::new();
163 sresult_return!(codec.try_encode(plan, &mut bytes));
164
165 FFI_Result::Ok(bytes.into_iter().collect())
166}
167
168unsafe extern "C" fn try_decode_udf_fn_wrapper(
169 codec: &FFI_PhysicalExtensionCodec,
170 name: SStr,
171 buf: SSlice<u8>,
172) -> FFI_Result<FFI_ScalarUDF> {
173 let codec = codec.inner();
174
175 let udf = sresult_return!(codec.try_decode_udf(name.as_str(), buf.as_ref()));
176 let udf = FFI_ScalarUDF::from(udf);
177
178 FFI_Result::Ok(udf)
179}
180
181unsafe extern "C" fn try_encode_udf_fn_wrapper(
182 codec: &FFI_PhysicalExtensionCodec,
183 node: FFI_ScalarUDF,
184) -> FFI_Result<SVec<u8>> {
185 let codec = codec.inner();
186 let node: Arc<dyn ScalarUDFImpl> = (&node).into();
187 let node = ScalarUDF::new_from_shared_impl(node);
188
189 let mut bytes = Vec::new();
190 sresult_return!(codec.try_encode_udf(&node, &mut bytes));
191
192 FFI_Result::Ok(bytes.into_iter().collect())
193}
194
195unsafe extern "C" fn try_decode_udaf_fn_wrapper(
196 codec: &FFI_PhysicalExtensionCodec,
197 name: SStr,
198 buf: SSlice<u8>,
199) -> FFI_Result<FFI_AggregateUDF> {
200 let codec_inner = codec.inner();
201 let udaf = sresult_return!(codec_inner.try_decode_udaf(name.into(), buf.as_ref()));
202 let udaf = FFI_AggregateUDF::from(udaf);
203
204 FFI_Result::Ok(udaf)
205}
206
207unsafe extern "C" fn try_encode_udaf_fn_wrapper(
208 codec: &FFI_PhysicalExtensionCodec,
209 node: FFI_AggregateUDF,
210) -> FFI_Result<SVec<u8>> {
211 let codec = codec.inner();
212 let udaf: Arc<dyn AggregateUDFImpl> = (&node).into();
213 let udaf = AggregateUDF::new_from_shared_impl(udaf);
214
215 let mut bytes = Vec::new();
216 sresult_return!(codec.try_encode_udaf(&udaf, &mut bytes));
217
218 FFI_Result::Ok(bytes.into_iter().collect())
219}
220
221unsafe extern "C" fn try_decode_udwf_fn_wrapper(
222 codec: &FFI_PhysicalExtensionCodec,
223 name: SStr,
224 buf: SSlice<u8>,
225) -> FFI_Result<FFI_WindowUDF> {
226 let codec = codec.inner();
227 let udwf = sresult_return!(codec.try_decode_udwf(name.into(), buf.as_ref()));
228 let udwf = FFI_WindowUDF::from(udwf);
229
230 FFI_Result::Ok(udwf)
231}
232
233unsafe extern "C" fn try_encode_udwf_fn_wrapper(
234 codec: &FFI_PhysicalExtensionCodec,
235 node: FFI_WindowUDF,
236) -> FFI_Result<SVec<u8>> {
237 let codec = codec.inner();
238 let udwf: Arc<dyn WindowUDFImpl> = (&node).into();
239 let udwf = WindowUDF::new_from_shared_impl(udwf);
240
241 let mut bytes = Vec::new();
242 sresult_return!(codec.try_encode_udwf(&udwf, &mut bytes));
243
244 FFI_Result::Ok(bytes.into_iter().collect())
245}
246
247unsafe extern "C" fn release_fn_wrapper(codec: &mut FFI_PhysicalExtensionCodec) {
248 unsafe {
249 let private_data =
250 Box::from_raw(codec.private_data as *mut PhysicalExtensionCodecPrivateData);
251 drop(private_data);
252 }
253}
254
255unsafe extern "C" fn clone_fn_wrapper(
256 codec: &FFI_PhysicalExtensionCodec,
257) -> FFI_PhysicalExtensionCodec {
258 let old_codec = Arc::clone(codec.inner());
259 let runtime = codec.runtime().clone();
260
261 FFI_PhysicalExtensionCodec::new(old_codec, runtime, codec.task_ctx_provider.clone())
262}
263
264impl Drop for FFI_PhysicalExtensionCodec {
265 fn drop(&mut self) {
266 unsafe { (self.release)(self) }
267 }
268}
269
270impl FFI_PhysicalExtensionCodec {
271 pub fn new(
273 codec: Arc<dyn PhysicalExtensionCodec + Send>,
274 runtime: Option<Handle>,
275 task_ctx_provider: impl Into<FFI_TaskContextProvider>,
276 ) -> Self {
277 if let Some(codec) = (Arc::clone(&codec) as Arc<dyn Any>)
278 .downcast_ref::<ForeignPhysicalExtensionCodec>()
279 {
280 return codec.0.clone();
281 }
282
283 let task_ctx_provider = task_ctx_provider.into();
284 let private_data = Box::new(PhysicalExtensionCodecPrivateData { codec, runtime });
285
286 Self {
287 try_decode: try_decode_fn_wrapper,
288 try_encode: try_encode_fn_wrapper,
289 try_decode_udf: try_decode_udf_fn_wrapper,
290 try_encode_udf: try_encode_udf_fn_wrapper,
291 try_decode_udaf: try_decode_udaf_fn_wrapper,
292 try_encode_udaf: try_encode_udaf_fn_wrapper,
293 try_decode_udwf: try_decode_udwf_fn_wrapper,
294 try_encode_udwf: try_encode_udwf_fn_wrapper,
295 task_ctx_provider,
296
297 clone: clone_fn_wrapper,
298 release: release_fn_wrapper,
299 version: crate::version,
300 private_data: Box::into_raw(private_data) as *mut c_void,
301 library_marker_id: crate::get_library_marker_id,
302 }
303 }
304}
305
306#[derive(Debug)]
311pub struct ForeignPhysicalExtensionCodec(pub FFI_PhysicalExtensionCodec);
312
313unsafe impl Send for ForeignPhysicalExtensionCodec {}
314unsafe impl Sync for ForeignPhysicalExtensionCodec {}
315
316impl From<&FFI_PhysicalExtensionCodec> for Arc<dyn PhysicalExtensionCodec> {
317 fn from(codec: &FFI_PhysicalExtensionCodec) -> Self {
318 if (codec.library_marker_id)() == crate::get_library_marker_id() {
319 Arc::clone(codec.inner())
320 } else {
321 Arc::new(ForeignPhysicalExtensionCodec(codec.clone()))
322 }
323 }
324}
325
326impl Clone for FFI_PhysicalExtensionCodec {
327 fn clone(&self) -> Self {
328 unsafe { (self.clone)(self) }
329 }
330}
331
332impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
333 fn try_decode(
334 &self,
335 buf: &[u8],
336 inputs: &[Arc<dyn ExecutionPlan>],
337 _ctx: &TaskContext,
338 ) -> Result<Arc<dyn ExecutionPlan>> {
339 let inputs = inputs
340 .iter()
341 .map(|plan| FFI_ExecutionPlan::new(Arc::clone(plan), None))
342 .collect();
343
344 let plan =
345 df_result!(unsafe { (self.0.try_decode)(&self.0, buf.into(), inputs) })?;
346 let plan: Arc<dyn ExecutionPlan> = (&plan).try_into()?;
347
348 Ok(plan)
349 }
350
351 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
352 let plan = FFI_ExecutionPlan::new(node, None);
353 let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?;
354
355 buf.extend(bytes);
356 Ok(())
357 }
358
359 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
360 let udf = unsafe {
361 df_result!((self.0.try_decode_udf)(&self.0, name.into(), buf.into()))
362 }?;
363 let udf: Arc<dyn ScalarUDFImpl> = (&udf).into();
364
365 Ok(Arc::new(ScalarUDF::new_from_shared_impl(udf)))
366 }
367
368 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
369 let node = FFI_ScalarUDF::from(Arc::new(node.clone()));
370 let bytes = df_result!(unsafe { (self.0.try_encode_udf)(&self.0, node) })?;
371
372 buf.extend(bytes);
373
374 Ok(())
375 }
376
377 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
378 let udaf = unsafe {
379 df_result!((self.0.try_decode_udaf)(&self.0, name.into(), buf.into()))
380 }?;
381 let udaf: Arc<dyn AggregateUDFImpl> = (&udaf).into();
382
383 Ok(Arc::new(AggregateUDF::new_from_shared_impl(udaf)))
384 }
385
386 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
387 let node = Arc::new(node.clone());
388 let node = FFI_AggregateUDF::from(node);
389 let bytes = df_result!(unsafe { (self.0.try_encode_udaf)(&self.0, node) })?;
390
391 buf.extend(bytes);
392
393 Ok(())
394 }
395
396 fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
397 let udwf = unsafe {
398 df_result!((self.0.try_decode_udwf)(&self.0, name.into(), buf.into()))
399 }?;
400 let udwf: Arc<dyn WindowUDFImpl> = (&udwf).into();
401
402 Ok(Arc::new(WindowUDF::new_from_shared_impl(udwf)))
403 }
404
405 fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
406 let node = Arc::new(node.clone());
407 let node = FFI_WindowUDF::from(node);
408 let bytes = df_result!(unsafe { (self.0.try_encode_udwf)(&self.0, node) })?;
409
410 buf.extend(bytes);
411
412 Ok(())
413 }
414}
415
416#[cfg(test)]
417pub(crate) mod tests {
418 use std::sync::Arc;
419
420 use arrow_schema::{DataType, Field, Schema};
421 use datafusion_common::{Result, exec_err};
422 use datafusion_execution::TaskContext;
423 use datafusion_expr::ptr_eq::arc_ptr_eq;
424 use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF, WindowUDFImpl};
425 use datafusion_functions::math::abs::AbsFunc;
426 use datafusion_functions_aggregate::sum::Sum;
427 use datafusion_functions_window::rank::{Rank, RankType};
428 use datafusion_physical_plan::ExecutionPlan;
429 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
430
431 use crate::execution_plan::tests::EmptyExec;
432 use crate::proto::physical_extension_codec::FFI_PhysicalExtensionCodec;
433
434 #[derive(Debug)]
435 pub(crate) struct TestExtensionCodec;
436
437 impl TestExtensionCodec {
438 pub(crate) const MAGIC_NUMBER: u8 = 127;
439 pub(crate) const EMPTY_EXEC_SERIALIZED: u8 = 1;
440 pub(crate) const ABS_FUNC_SERIALIZED: u8 = 2;
441 pub(crate) const SUM_UDAF_SERIALIZED: u8 = 3;
442 pub(crate) const RANK_UDWF_SERIALIZED: u8 = 4;
443 pub(crate) const MEMTABLE_SERIALIZED: u8 = 5;
444 }
445
446 impl PhysicalExtensionCodec for TestExtensionCodec {
447 fn try_decode(
448 &self,
449 buf: &[u8],
450 _inputs: &[Arc<dyn ExecutionPlan>],
451 _ctx: &TaskContext,
452 ) -> Result<Arc<dyn ExecutionPlan>> {
453 if buf[0] != Self::MAGIC_NUMBER {
454 return exec_err!(
455 "TestExtensionCodec input buffer does not start with magic number"
456 );
457 }
458
459 if buf.len() != 2 || buf[1] != Self::EMPTY_EXEC_SERIALIZED {
460 return exec_err!("TestExtensionCodec unable to decode execution plan");
461 }
462
463 Ok(create_test_exec())
464 }
465
466 fn try_encode(
467 &self,
468 node: Arc<dyn ExecutionPlan>,
469 buf: &mut Vec<u8>,
470 ) -> Result<()> {
471 buf.push(Self::MAGIC_NUMBER);
472
473 let Some(_) = node.downcast_ref::<EmptyExec>() else {
474 return exec_err!("TestExtensionCodec only expects EmptyExec");
475 };
476
477 buf.push(Self::EMPTY_EXEC_SERIALIZED);
478
479 Ok(())
480 }
481
482 fn try_decode_udf(&self, _name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
483 if buf[0] != Self::MAGIC_NUMBER {
484 return exec_err!(
485 "TestExtensionCodec input buffer does not start with magic number"
486 );
487 }
488
489 if buf.len() != 2 || buf[1] != Self::ABS_FUNC_SERIALIZED {
490 return exec_err!("TestExtensionCodec unable to decode udf");
491 }
492
493 Ok(Arc::new(ScalarUDF::from(AbsFunc::new())))
494 }
495
496 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
497 buf.push(Self::MAGIC_NUMBER);
498
499 let udf = node.inner();
500 if !udf.is::<AbsFunc>() {
501 return exec_err!("TestExtensionCodec only expects Abs UDF");
502 };
503
504 buf.push(Self::ABS_FUNC_SERIALIZED);
505
506 Ok(())
507 }
508
509 fn try_decode_udaf(&self, _name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
510 if buf[0] != Self::MAGIC_NUMBER {
511 return exec_err!(
512 "TestExtensionCodec input buffer does not start with magic number"
513 );
514 }
515
516 if buf.len() != 2 || buf[1] != Self::SUM_UDAF_SERIALIZED {
517 return exec_err!("TestExtensionCodec unable to decode udaf");
518 }
519
520 Ok(Arc::new(AggregateUDF::from(Sum::new())))
521 }
522
523 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
524 buf.push(Self::MAGIC_NUMBER);
525
526 let udf = node.inner();
527 let Some(_udf) = udf.downcast_ref::<Sum>() else {
528 return exec_err!("TestExtensionCodec only expects Sum UDAF");
529 };
530
531 buf.push(Self::SUM_UDAF_SERIALIZED);
532
533 Ok(())
534 }
535
536 fn try_decode_udwf(&self, _name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
537 if buf[0] != Self::MAGIC_NUMBER {
538 return exec_err!(
539 "TestExtensionCodec input buffer does not start with magic number"
540 );
541 }
542
543 if buf.len() != 2 || buf[1] != Self::RANK_UDWF_SERIALIZED {
544 return exec_err!("TestExtensionCodec unable to decode udwf");
545 }
546
547 Ok(Arc::new(WindowUDF::from(Rank::new(
548 "my_rank".to_owned(),
549 RankType::Basic,
550 ))))
551 }
552
553 fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
554 buf.push(Self::MAGIC_NUMBER);
555
556 let udf = node.inner();
557 let Some(udf) = udf.downcast_ref::<Rank>() else {
558 return exec_err!("TestExtensionCodec only expects Rank UDWF");
559 };
560
561 if udf.name() != "my_rank" {
562 return exec_err!("TestExtensionCodec only expects my_rank UDWF name");
563 }
564
565 buf.push(Self::RANK_UDWF_SERIALIZED);
566
567 Ok(())
568 }
569 }
570
571 fn create_test_exec() -> Arc<dyn ExecutionPlan> {
572 let schema =
573 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
574 Arc::new(EmptyExec::new(schema)) as Arc<dyn ExecutionPlan>
575 }
576
577 #[test]
578 fn roundtrip_ffi_physical_extension_codec_exec_plan() -> Result<()> {
579 let codec = Arc::new(TestExtensionCodec {});
580 let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
581
582 let mut ffi_codec =
583 FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
584 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
585 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
586
587 let exec = create_test_exec();
588 let input_execs = [create_test_exec()];
589 let mut bytes = Vec::new();
590 foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?;
591
592 let returned_exec =
593 foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?;
594
595 assert!(returned_exec.is::<EmptyExec>());
596
597 Ok(())
598 }
599
600 #[test]
601 fn roundtrip_ffi_physical_extension_codec_udf() -> Result<()> {
602 let codec = Arc::new(TestExtensionCodec {});
603 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
604
605 let mut ffi_codec =
606 FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
607 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
608 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
609
610 let udf = Arc::new(ScalarUDF::from(AbsFunc::new()));
611 let mut bytes = Vec::new();
612 foreign_codec.try_encode_udf(udf.as_ref(), &mut bytes)?;
613
614 let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?;
615
616 assert!(returned_udf.inner().is::<AbsFunc>());
617
618 Ok(())
619 }
620
621 #[test]
622 fn roundtrip_ffi_physical_extension_codec_udaf() -> Result<()> {
623 let codec = Arc::new(TestExtensionCodec {});
624 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
625
626 let mut ffi_codec =
627 FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
628 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
629 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
630
631 let udf = Arc::new(AggregateUDF::from(Sum::new()));
632 let mut bytes = Vec::new();
633 foreign_codec.try_encode_udaf(udf.as_ref(), &mut bytes)?;
634
635 let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?;
636
637 assert!(returned_udf.inner().is::<Sum>());
638
639 Ok(())
640 }
641
642 #[test]
643 fn roundtrip_ffi_physical_extension_codec_udwf() -> Result<()> {
644 let codec = Arc::new(TestExtensionCodec {});
645 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
646
647 let mut ffi_codec =
648 FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
649 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
650 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
651
652 let udf = Arc::new(WindowUDF::from(Rank::new(
653 "my_rank".to_owned(),
654 RankType::Basic,
655 )));
656 let mut bytes = Vec::new();
657 foreign_codec.try_encode_udwf(udf.as_ref(), &mut bytes)?;
658
659 let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?;
660
661 assert!(returned_udf.inner().is::<Rank>());
662
663 Ok(())
664 }
665
666 #[test]
667 fn ffi_physical_extension_codec_local_bypass() {
668 let codec =
669 Arc::new(TestExtensionCodec {}) as Arc<dyn PhysicalExtensionCodec + Send>;
670 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
671
672 let mut ffi_codec =
673 FFI_PhysicalExtensionCodec::new(Arc::clone(&codec), None, task_ctx_provider);
674
675 let codec = codec as Arc<dyn PhysicalExtensionCodec>;
676 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
678 assert!(arc_ptr_eq(&foreign_codec, &codec));
679
680 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
682 let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
683 assert!(!arc_ptr_eq(&foreign_codec, &codec));
684 }
685}