datafusion_ffi/proto/
physical_extension_codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ffi::c_void;
19use std::sync::Arc;
20
21use abi_stable::StableAbi;
22use abi_stable::std_types::{RResult, RSlice, RStr, RVec};
23use datafusion_common::error::Result;
24use datafusion_execution::TaskContext;
25use datafusion_expr::{
26    AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl,
27};
28use datafusion_physical_plan::ExecutionPlan;
29use datafusion_proto::physical_plan::PhysicalExtensionCodec;
30use tokio::runtime::Handle;
31
32use crate::execution::FFI_TaskContextProvider;
33use crate::execution_plan::FFI_ExecutionPlan;
34use crate::udaf::FFI_AggregateUDF;
35use crate::udf::FFI_ScalarUDF;
36use crate::udwf::FFI_WindowUDF;
37use crate::util::FFIResult;
38use crate::{df_result, rresult_return};
39
40/// A stable struct for sharing [`PhysicalExtensionCodec`] across FFI boundaries.
41#[repr(C)]
42#[derive(Debug, StableAbi)]
43pub struct FFI_PhysicalExtensionCodec {
44    /// Decode bytes into an execution plan.
45    try_decode: unsafe extern "C" fn(
46        &Self,
47        buf: RSlice<u8>,
48        inputs: RVec<FFI_ExecutionPlan>,
49    ) -> FFIResult<FFI_ExecutionPlan>,
50
51    /// Encode an execution plan into bytes.
52    try_encode:
53        unsafe extern "C" fn(&Self, node: FFI_ExecutionPlan) -> FFIResult<RVec<u8>>,
54
55    /// Decode bytes into a user defined scalar function.
56    try_decode_udf: unsafe extern "C" fn(
57        &Self,
58        name: RStr,
59        buf: RSlice<u8>,
60    ) -> FFIResult<FFI_ScalarUDF>,
61
62    /// Encode a user defined scalar function into bytes.
63    try_encode_udf:
64        unsafe extern "C" fn(&Self, node: FFI_ScalarUDF) -> FFIResult<RVec<u8>>,
65
66    /// Decode bytes into a user defined aggregate function.
67    try_decode_udaf: unsafe extern "C" fn(
68        &Self,
69        name: RStr,
70        buf: RSlice<u8>,
71    ) -> FFIResult<FFI_AggregateUDF>,
72
73    /// Encode a user defined aggregate function into bytes.
74    try_encode_udaf:
75        unsafe extern "C" fn(&Self, node: FFI_AggregateUDF) -> FFIResult<RVec<u8>>,
76
77    /// Decode bytes into a user defined window function.
78    try_decode_udwf: unsafe extern "C" fn(
79        &Self,
80        name: RStr,
81        buf: RSlice<u8>,
82    ) -> FFIResult<FFI_WindowUDF>,
83
84    /// Encode a user defined window function into bytes.
85    try_encode_udwf:
86        unsafe extern "C" fn(&Self, node: FFI_WindowUDF) -> FFIResult<RVec<u8>>,
87
88    /// Access the current [`TaskContext`].
89    task_ctx_provider: FFI_TaskContextProvider,
90
91    /// Used to create a clone on the provider of the execution plan. This should
92    /// only need to be called by the receiver of the plan.
93    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
94
95    /// Release the memory of the private data when it is no longer being used.
96    pub release: unsafe extern "C" fn(arg: &mut Self),
97
98    /// Return the major DataFusion version number of this provider.
99    pub version: unsafe extern "C" fn() -> u64,
100
101    /// Internal data. This is only to be accessed by the provider of the plan.
102    /// A [`ForeignPhysicalExtensionCodec`] should never attempt to access this data.
103    pub private_data: *mut c_void,
104
105    /// Utility to identify when FFI objects are accessed locally through
106    /// the foreign interface.
107    pub library_marker_id: extern "C" fn() -> usize,
108}
109
110unsafe impl Send for FFI_PhysicalExtensionCodec {}
111unsafe impl Sync for FFI_PhysicalExtensionCodec {}
112
113struct PhysicalExtensionCodecPrivateData {
114    provider: Arc<dyn PhysicalExtensionCodec>,
115    runtime: Option<Handle>,
116}
117
118impl FFI_PhysicalExtensionCodec {
119    fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
120        let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
121        unsafe { &(*private_data).provider }
122    }
123
124    fn runtime(&self) -> &Option<Handle> {
125        let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
126        unsafe { &(*private_data).runtime }
127    }
128}
129
130unsafe extern "C" fn try_decode_fn_wrapper(
131    codec: &FFI_PhysicalExtensionCodec,
132    buf: RSlice<u8>,
133    inputs: RVec<FFI_ExecutionPlan>,
134) -> FFIResult<FFI_ExecutionPlan> {
135    let task_ctx: Arc<TaskContext> =
136        rresult_return!((&codec.task_ctx_provider).try_into());
137    let codec = codec.inner();
138    let inputs = inputs
139        .into_iter()
140        .map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
141        .collect::<Result<Vec<_>>>();
142    let inputs = rresult_return!(inputs);
143
144    let plan =
145        rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref()));
146
147    RResult::ROk(FFI_ExecutionPlan::new(plan, None))
148}
149
150unsafe extern "C" fn try_encode_fn_wrapper(
151    codec: &FFI_PhysicalExtensionCodec,
152    node: FFI_ExecutionPlan,
153) -> FFIResult<RVec<u8>> {
154    let codec = codec.inner();
155
156    let plan: Arc<dyn ExecutionPlan> = rresult_return!((&node).try_into());
157
158    let mut bytes = Vec::new();
159    rresult_return!(codec.try_encode(plan, &mut bytes));
160
161    RResult::ROk(bytes.into())
162}
163
164unsafe extern "C" fn try_decode_udf_fn_wrapper(
165    codec: &FFI_PhysicalExtensionCodec,
166    name: RStr,
167    buf: RSlice<u8>,
168) -> FFIResult<FFI_ScalarUDF> {
169    let codec = codec.inner();
170
171    let udf = rresult_return!(codec.try_decode_udf(name.as_str(), buf.as_ref()));
172    let udf = FFI_ScalarUDF::from(udf);
173
174    RResult::ROk(udf)
175}
176
177unsafe extern "C" fn try_encode_udf_fn_wrapper(
178    codec: &FFI_PhysicalExtensionCodec,
179    node: FFI_ScalarUDF,
180) -> FFIResult<RVec<u8>> {
181    let codec = codec.inner();
182    let node: Arc<dyn ScalarUDFImpl> = (&node).into();
183    let node = ScalarUDF::new_from_shared_impl(node);
184
185    let mut bytes = Vec::new();
186    rresult_return!(codec.try_encode_udf(&node, &mut bytes));
187
188    RResult::ROk(bytes.into())
189}
190
191unsafe extern "C" fn try_decode_udaf_fn_wrapper(
192    codec: &FFI_PhysicalExtensionCodec,
193    name: RStr,
194    buf: RSlice<u8>,
195) -> FFIResult<FFI_AggregateUDF> {
196    let codec_inner = codec.inner();
197    let udaf = rresult_return!(codec_inner.try_decode_udaf(name.into(), buf.as_ref()));
198    let udaf = FFI_AggregateUDF::from(udaf);
199
200    RResult::ROk(udaf)
201}
202
203unsafe extern "C" fn try_encode_udaf_fn_wrapper(
204    codec: &FFI_PhysicalExtensionCodec,
205    node: FFI_AggregateUDF,
206) -> FFIResult<RVec<u8>> {
207    let codec = codec.inner();
208    let udaf: Arc<dyn AggregateUDFImpl> = (&node).into();
209    let udaf = AggregateUDF::new_from_shared_impl(udaf);
210
211    let mut bytes = Vec::new();
212    rresult_return!(codec.try_encode_udaf(&udaf, &mut bytes));
213
214    RResult::ROk(bytes.into())
215}
216
217unsafe extern "C" fn try_decode_udwf_fn_wrapper(
218    codec: &FFI_PhysicalExtensionCodec,
219    name: RStr,
220    buf: RSlice<u8>,
221) -> FFIResult<FFI_WindowUDF> {
222    let codec = codec.inner();
223    let udwf = rresult_return!(codec.try_decode_udwf(name.into(), buf.as_ref()));
224    let udwf = FFI_WindowUDF::from(udwf);
225
226    RResult::ROk(udwf)
227}
228
229unsafe extern "C" fn try_encode_udwf_fn_wrapper(
230    codec: &FFI_PhysicalExtensionCodec,
231    node: FFI_WindowUDF,
232) -> FFIResult<RVec<u8>> {
233    let codec = codec.inner();
234    let udwf: Arc<dyn WindowUDFImpl> = (&node).into();
235    let udwf = WindowUDF::new_from_shared_impl(udwf);
236
237    let mut bytes = Vec::new();
238    rresult_return!(codec.try_encode_udwf(&udwf, &mut bytes));
239
240    RResult::ROk(bytes.into())
241}
242
243unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_PhysicalExtensionCodec) {
244    unsafe {
245        let private_data = Box::from_raw(
246            provider.private_data as *mut PhysicalExtensionCodecPrivateData,
247        );
248        drop(private_data);
249    }
250}
251
252unsafe extern "C" fn clone_fn_wrapper(
253    codec: &FFI_PhysicalExtensionCodec,
254) -> FFI_PhysicalExtensionCodec {
255    let old_codec = Arc::clone(codec.inner());
256    let runtime = codec.runtime().clone();
257
258    FFI_PhysicalExtensionCodec::new(old_codec, runtime, codec.task_ctx_provider.clone())
259}
260
261impl Drop for FFI_PhysicalExtensionCodec {
262    fn drop(&mut self) {
263        unsafe { (self.release)(self) }
264    }
265}
266
267impl FFI_PhysicalExtensionCodec {
268    /// Creates a new [`FFI_PhysicalExtensionCodec`].
269    pub fn new(
270        provider: Arc<dyn PhysicalExtensionCodec + Send>,
271        runtime: Option<Handle>,
272        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
273    ) -> Self {
274        let task_ctx_provider = task_ctx_provider.into();
275        let private_data =
276            Box::new(PhysicalExtensionCodecPrivateData { provider, runtime });
277
278        Self {
279            try_decode: try_decode_fn_wrapper,
280            try_encode: try_encode_fn_wrapper,
281            try_decode_udf: try_decode_udf_fn_wrapper,
282            try_encode_udf: try_encode_udf_fn_wrapper,
283            try_decode_udaf: try_decode_udaf_fn_wrapper,
284            try_encode_udaf: try_encode_udaf_fn_wrapper,
285            try_decode_udwf: try_decode_udwf_fn_wrapper,
286            try_encode_udwf: try_encode_udwf_fn_wrapper,
287            task_ctx_provider,
288
289            clone: clone_fn_wrapper,
290            release: release_fn_wrapper,
291            version: crate::version,
292            private_data: Box::into_raw(private_data) as *mut c_void,
293            library_marker_id: crate::get_library_marker_id,
294        }
295    }
296}
297
298/// This wrapper struct exists on the receiver side of the FFI interface, so it has
299/// no guarantees about being able to access the data in `private_data`. Any functions
300/// defined on this struct must only use the stable functions provided in
301/// FFI_PhysicalExtensionCodec to interact with the foreign table provider.
302#[derive(Debug)]
303pub struct ForeignPhysicalExtensionCodec(pub FFI_PhysicalExtensionCodec);
304
305unsafe impl Send for ForeignPhysicalExtensionCodec {}
306unsafe impl Sync for ForeignPhysicalExtensionCodec {}
307
308impl From<&FFI_PhysicalExtensionCodec> for Arc<dyn PhysicalExtensionCodec> {
309    fn from(provider: &FFI_PhysicalExtensionCodec) -> Self {
310        if (provider.library_marker_id)() == crate::get_library_marker_id() {
311            Arc::clone(provider.inner())
312        } else {
313            Arc::new(ForeignPhysicalExtensionCodec(provider.clone()))
314        }
315    }
316}
317
318impl Clone for FFI_PhysicalExtensionCodec {
319    fn clone(&self) -> Self {
320        unsafe { (self.clone)(self) }
321    }
322}
323
324impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
325    fn try_decode(
326        &self,
327        buf: &[u8],
328        inputs: &[Arc<dyn ExecutionPlan>],
329        _ctx: &TaskContext,
330    ) -> Result<Arc<dyn ExecutionPlan>> {
331        let inputs = inputs
332            .iter()
333            .map(|plan| FFI_ExecutionPlan::new(Arc::clone(plan), None))
334            .collect();
335
336        let plan =
337            df_result!(unsafe { (self.0.try_decode)(&self.0, buf.into(), inputs) })?;
338        let plan: Arc<dyn ExecutionPlan> = (&plan).try_into()?;
339
340        Ok(plan)
341    }
342
343    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
344        let plan = FFI_ExecutionPlan::new(node, None);
345        let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?;
346
347        buf.extend(bytes);
348        Ok(())
349    }
350
351    fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
352        let udf = unsafe {
353            df_result!((self.0.try_decode_udf)(&self.0, name.into(), buf.into()))
354        }?;
355        let udf: Arc<dyn ScalarUDFImpl> = (&udf).into();
356
357        Ok(Arc::new(ScalarUDF::new_from_shared_impl(udf)))
358    }
359
360    fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
361        let node = FFI_ScalarUDF::from(Arc::new(node.clone()));
362        let bytes = df_result!(unsafe { (self.0.try_encode_udf)(&self.0, node) })?;
363
364        buf.extend(bytes);
365
366        Ok(())
367    }
368
369    fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
370        let udaf = unsafe {
371            df_result!((self.0.try_decode_udaf)(&self.0, name.into(), buf.into()))
372        }?;
373        let udaf: Arc<dyn AggregateUDFImpl> = (&udaf).into();
374
375        Ok(Arc::new(AggregateUDF::new_from_shared_impl(udaf)))
376    }
377
378    fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
379        let node = Arc::new(node.clone());
380        let node = FFI_AggregateUDF::from(node);
381        let bytes = df_result!(unsafe { (self.0.try_encode_udaf)(&self.0, node) })?;
382
383        buf.extend(bytes);
384
385        Ok(())
386    }
387
388    fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
389        let udwf = unsafe {
390            df_result!((self.0.try_decode_udwf)(&self.0, name.into(), buf.into()))
391        }?;
392        let udwf: Arc<dyn WindowUDFImpl> = (&udwf).into();
393
394        Ok(Arc::new(WindowUDF::new_from_shared_impl(udwf)))
395    }
396
397    fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
398        let node = Arc::new(node.clone());
399        let node = FFI_WindowUDF::from(node);
400        let bytes = df_result!(unsafe { (self.0.try_encode_udwf)(&self.0, node) })?;
401
402        buf.extend(bytes);
403
404        Ok(())
405    }
406}
407
408#[cfg(test)]
409pub(crate) mod tests {
410    use std::sync::Arc;
411
412    use arrow_schema::{DataType, Field, Schema};
413    use datafusion_common::{Result, exec_err};
414    use datafusion_execution::TaskContext;
415    use datafusion_expr::ptr_eq::arc_ptr_eq;
416    use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF, WindowUDFImpl};
417    use datafusion_functions::math::abs::AbsFunc;
418    use datafusion_functions_aggregate::sum::Sum;
419    use datafusion_functions_window::rank::{Rank, RankType};
420    use datafusion_physical_plan::ExecutionPlan;
421    use datafusion_proto::physical_plan::PhysicalExtensionCodec;
422
423    use crate::execution_plan::tests::EmptyExec;
424    use crate::proto::physical_extension_codec::FFI_PhysicalExtensionCodec;
425
426    #[derive(Debug)]
427    pub(crate) struct TestExtensionCodec;
428
429    impl TestExtensionCodec {
430        pub(crate) const MAGIC_NUMBER: u8 = 127;
431        pub(crate) const EMPTY_EXEC_SERIALIZED: u8 = 1;
432        pub(crate) const ABS_FUNC_SERIALIZED: u8 = 2;
433        pub(crate) const SUM_UDAF_SERIALIZED: u8 = 3;
434        pub(crate) const RANK_UDWF_SERIALIZED: u8 = 4;
435        pub(crate) const MEMTABLE_SERIALIZED: u8 = 5;
436    }
437
438    impl PhysicalExtensionCodec for TestExtensionCodec {
439        fn try_decode(
440            &self,
441            buf: &[u8],
442            _inputs: &[Arc<dyn ExecutionPlan>],
443            _ctx: &TaskContext,
444        ) -> Result<Arc<dyn ExecutionPlan>> {
445            if buf[0] != Self::MAGIC_NUMBER {
446                return exec_err!(
447                    "TestExtensionCodec input buffer does not start with magic number"
448                );
449            }
450
451            if buf.len() != 2 || buf[1] != Self::EMPTY_EXEC_SERIALIZED {
452                return exec_err!("TestExtensionCodec unable to decode execution plan");
453            }
454
455            Ok(create_test_exec())
456        }
457
458        fn try_encode(
459            &self,
460            node: Arc<dyn ExecutionPlan>,
461            buf: &mut Vec<u8>,
462        ) -> Result<()> {
463            buf.push(Self::MAGIC_NUMBER);
464
465            let Some(_) = node.as_any().downcast_ref::<EmptyExec>() else {
466                return exec_err!("TestExtensionCodec only expects EmptyExec");
467            };
468
469            buf.push(Self::EMPTY_EXEC_SERIALIZED);
470
471            Ok(())
472        }
473
474        fn try_decode_udf(&self, _name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
475            if buf[0] != Self::MAGIC_NUMBER {
476                return exec_err!(
477                    "TestExtensionCodec input buffer does not start with magic number"
478                );
479            }
480
481            if buf.len() != 2 || buf[1] != Self::ABS_FUNC_SERIALIZED {
482                return exec_err!("TestExtensionCodec unable to decode udf");
483            }
484
485            Ok(Arc::new(ScalarUDF::from(AbsFunc::new())))
486        }
487
488        fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
489            buf.push(Self::MAGIC_NUMBER);
490
491            let udf = node.inner();
492            if !udf.as_any().is::<AbsFunc>() {
493                return exec_err!("TestExtensionCodec only expects Abs UDF");
494            };
495
496            buf.push(Self::ABS_FUNC_SERIALIZED);
497
498            Ok(())
499        }
500
501        fn try_decode_udaf(&self, _name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
502            if buf[0] != Self::MAGIC_NUMBER {
503                return exec_err!(
504                    "TestExtensionCodec input buffer does not start with magic number"
505                );
506            }
507
508            if buf.len() != 2 || buf[1] != Self::SUM_UDAF_SERIALIZED {
509                return exec_err!("TestExtensionCodec unable to decode udaf");
510            }
511
512            Ok(Arc::new(AggregateUDF::from(Sum::new())))
513        }
514
515        fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
516            buf.push(Self::MAGIC_NUMBER);
517
518            let udf = node.inner();
519            let Some(_udf) = udf.as_any().downcast_ref::<Sum>() else {
520                return exec_err!("TestExtensionCodec only expects Sum UDAF");
521            };
522
523            buf.push(Self::SUM_UDAF_SERIALIZED);
524
525            Ok(())
526        }
527
528        fn try_decode_udwf(&self, _name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
529            if buf[0] != Self::MAGIC_NUMBER {
530                return exec_err!(
531                    "TestExtensionCodec input buffer does not start with magic number"
532                );
533            }
534
535            if buf.len() != 2 || buf[1] != Self::RANK_UDWF_SERIALIZED {
536                return exec_err!("TestExtensionCodec unable to decode udwf");
537            }
538
539            Ok(Arc::new(WindowUDF::from(Rank::new(
540                "my_rank".to_owned(),
541                RankType::Basic,
542            ))))
543        }
544
545        fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
546            buf.push(Self::MAGIC_NUMBER);
547
548            let udf = node.inner();
549            let Some(udf) = udf.as_any().downcast_ref::<Rank>() else {
550                return exec_err!("TestExtensionCodec only expects Rank UDWF");
551            };
552
553            if udf.name() != "my_rank" {
554                return exec_err!("TestExtensionCodec only expects my_rank UDWF name");
555            }
556
557            buf.push(Self::RANK_UDWF_SERIALIZED);
558
559            Ok(())
560        }
561    }
562
563    fn create_test_exec() -> Arc<dyn ExecutionPlan> {
564        let schema =
565            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
566        Arc::new(EmptyExec::new(schema)) as Arc<dyn ExecutionPlan>
567    }
568
569    #[test]
570    fn roundtrip_ffi_physical_extension_codec_exec_plan() -> Result<()> {
571        let codec = Arc::new(TestExtensionCodec {});
572        let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
573
574        let mut ffi_codec =
575            FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
576        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
577        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
578
579        let exec = create_test_exec();
580        let input_execs = [create_test_exec()];
581        let mut bytes = Vec::new();
582        foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?;
583
584        let returned_exec =
585            foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?;
586
587        assert!(returned_exec.as_any().is::<EmptyExec>());
588
589        Ok(())
590    }
591
592    #[test]
593    fn roundtrip_ffi_physical_extension_codec_udf() -> Result<()> {
594        let codec = Arc::new(TestExtensionCodec {});
595        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
596
597        let mut ffi_codec =
598            FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
599        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
600        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
601
602        let udf = Arc::new(ScalarUDF::from(AbsFunc::new()));
603        let mut bytes = Vec::new();
604        foreign_codec.try_encode_udf(udf.as_ref(), &mut bytes)?;
605
606        let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?;
607
608        assert!(returned_udf.inner().as_any().is::<AbsFunc>());
609
610        Ok(())
611    }
612
613    #[test]
614    fn roundtrip_ffi_physical_extension_codec_udaf() -> Result<()> {
615        let codec = Arc::new(TestExtensionCodec {});
616        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
617
618        let mut ffi_codec =
619            FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
620        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
621        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
622
623        let udf = Arc::new(AggregateUDF::from(Sum::new()));
624        let mut bytes = Vec::new();
625        foreign_codec.try_encode_udaf(udf.as_ref(), &mut bytes)?;
626
627        let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?;
628
629        assert!(returned_udf.inner().as_any().is::<Sum>());
630
631        Ok(())
632    }
633
634    #[test]
635    fn roundtrip_ffi_physical_extension_codec_udwf() -> Result<()> {
636        let codec = Arc::new(TestExtensionCodec {});
637        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
638
639        let mut ffi_codec =
640            FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
641        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
642        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
643
644        let udf = Arc::new(WindowUDF::from(Rank::new(
645            "my_rank".to_owned(),
646            RankType::Basic,
647        )));
648        let mut bytes = Vec::new();
649        foreign_codec.try_encode_udwf(udf.as_ref(), &mut bytes)?;
650
651        let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?;
652
653        assert!(returned_udf.inner().as_any().is::<Rank>());
654
655        Ok(())
656    }
657
658    #[test]
659    fn ffi_physical_extension_codec_local_bypass() {
660        let codec =
661            Arc::new(TestExtensionCodec {}) as Arc<dyn PhysicalExtensionCodec + Send>;
662        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
663
664        let mut ffi_codec =
665            FFI_PhysicalExtensionCodec::new(Arc::clone(&codec), None, task_ctx_provider);
666
667        let codec = codec as Arc<dyn PhysicalExtensionCodec>;
668        // Verify local libraries can be downcast to their original
669        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
670        assert!(arc_ptr_eq(&foreign_codec, &codec));
671
672        // Verify different library markers generate foreign providers
673        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
674        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
675        assert!(!arc_ptr_eq(&foreign_codec, &codec));
676    }
677}