Skip to main content

datafusion_ffi/proto/
physical_extension_codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::ffi::c_void;
20use std::sync::Arc;
21
22use datafusion_common::error::Result;
23use datafusion_execution::TaskContext;
24use datafusion_expr::{
25    AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl,
26};
27use datafusion_physical_plan::ExecutionPlan;
28use datafusion_proto::physical_plan::PhysicalExtensionCodec;
29
30use stabby::slice::Slice as SSlice;
31use stabby::str::Str as SStr;
32use stabby::vec::Vec as SVec;
33use tokio::runtime::Handle;
34
35use crate::execution::FFI_TaskContextProvider;
36use crate::execution_plan::FFI_ExecutionPlan;
37use crate::udaf::FFI_AggregateUDF;
38use crate::udf::FFI_ScalarUDF;
39use crate::udwf::FFI_WindowUDF;
40use crate::util::FFI_Result;
41use crate::{df_result, sresult_return};
42
43/// A stable struct for sharing [`PhysicalExtensionCodec`] across FFI boundaries.
44#[repr(C)]
45#[derive(Debug)]
46pub struct FFI_PhysicalExtensionCodec {
47    /// Decode bytes into an execution plan.
48    try_decode: unsafe extern "C" fn(
49        &Self,
50        buf: SSlice<u8>,
51        inputs: SVec<FFI_ExecutionPlan>,
52    ) -> FFI_Result<FFI_ExecutionPlan>,
53
54    /// Encode an execution plan into bytes.
55    try_encode:
56        unsafe extern "C" fn(&Self, node: FFI_ExecutionPlan) -> FFI_Result<SVec<u8>>,
57
58    /// Decode bytes into a user defined scalar function.
59    try_decode_udf: unsafe extern "C" fn(
60        &Self,
61        name: SStr,
62        buf: SSlice<u8>,
63    ) -> FFI_Result<FFI_ScalarUDF>,
64
65    /// Encode a user defined scalar function into bytes.
66    try_encode_udf:
67        unsafe extern "C" fn(&Self, node: FFI_ScalarUDF) -> FFI_Result<SVec<u8>>,
68
69    /// Decode bytes into a user defined aggregate function.
70    try_decode_udaf: unsafe extern "C" fn(
71        &Self,
72        name: SStr,
73        buf: SSlice<u8>,
74    ) -> FFI_Result<FFI_AggregateUDF>,
75
76    /// Encode a user defined aggregate function into bytes.
77    try_encode_udaf:
78        unsafe extern "C" fn(&Self, node: FFI_AggregateUDF) -> FFI_Result<SVec<u8>>,
79
80    /// Decode bytes into a user defined window function.
81    try_decode_udwf: unsafe extern "C" fn(
82        &Self,
83        name: SStr,
84        buf: SSlice<u8>,
85    ) -> FFI_Result<FFI_WindowUDF>,
86
87    /// Encode a user defined window function into bytes.
88    try_encode_udwf:
89        unsafe extern "C" fn(&Self, node: FFI_WindowUDF) -> FFI_Result<SVec<u8>>,
90
91    /// Access the current [`TaskContext`].
92    task_ctx_provider: FFI_TaskContextProvider,
93
94    /// Used to create a clone on the provider of the execution plan. This should
95    /// only need to be called by the receiver of the plan.
96    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
97
98    /// Release the memory of the private data when it is no longer being used.
99    pub release: unsafe extern "C" fn(arg: &mut Self),
100
101    /// Return the major DataFusion version number of this provider.
102    pub version: unsafe extern "C" fn() -> u64,
103
104    /// Internal data. This is only to be accessed by the provider of the plan.
105    /// A [`ForeignPhysicalExtensionCodec`] should never attempt to access this data.
106    pub private_data: *mut c_void,
107
108    /// Utility to identify when FFI objects are accessed locally through
109    /// the foreign interface.
110    pub library_marker_id: extern "C" fn() -> usize,
111}
112
113unsafe impl Send for FFI_PhysicalExtensionCodec {}
114unsafe impl Sync for FFI_PhysicalExtensionCodec {}
115
116struct PhysicalExtensionCodecPrivateData {
117    codec: Arc<dyn PhysicalExtensionCodec>,
118    runtime: Option<Handle>,
119}
120
121impl FFI_PhysicalExtensionCodec {
122    fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
123        let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
124        unsafe { &(*private_data).codec }
125    }
126
127    fn runtime(&self) -> &Option<Handle> {
128        let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
129        unsafe { &(*private_data).runtime }
130    }
131}
132
133unsafe extern "C" fn try_decode_fn_wrapper(
134    codec: &FFI_PhysicalExtensionCodec,
135    buf: SSlice<u8>,
136    inputs: SVec<FFI_ExecutionPlan>,
137) -> FFI_Result<FFI_ExecutionPlan> {
138    let runtime = codec.runtime().clone();
139    let task_ctx: Arc<TaskContext> =
140        sresult_return!((&codec.task_ctx_provider).try_into());
141    let codec = codec.inner();
142    let inputs = inputs
143        .into_iter()
144        .map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
145        .collect::<Result<Vec<_>>>();
146    let inputs = sresult_return!(inputs);
147
148    let plan =
149        sresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref()));
150
151    FFI_Result::Ok(FFI_ExecutionPlan::new(plan, runtime))
152}
153
154unsafe extern "C" fn try_encode_fn_wrapper(
155    codec: &FFI_PhysicalExtensionCodec,
156    node: FFI_ExecutionPlan,
157) -> FFI_Result<SVec<u8>> {
158    let codec = codec.inner();
159
160    let plan: Arc<dyn ExecutionPlan> = sresult_return!((&node).try_into());
161
162    let mut bytes = Vec::new();
163    sresult_return!(codec.try_encode(plan, &mut bytes));
164
165    FFI_Result::Ok(bytes.into_iter().collect())
166}
167
168unsafe extern "C" fn try_decode_udf_fn_wrapper(
169    codec: &FFI_PhysicalExtensionCodec,
170    name: SStr,
171    buf: SSlice<u8>,
172) -> FFI_Result<FFI_ScalarUDF> {
173    let codec = codec.inner();
174
175    let udf = sresult_return!(codec.try_decode_udf(name.as_str(), buf.as_ref()));
176    let udf = FFI_ScalarUDF::from(udf);
177
178    FFI_Result::Ok(udf)
179}
180
181unsafe extern "C" fn try_encode_udf_fn_wrapper(
182    codec: &FFI_PhysicalExtensionCodec,
183    node: FFI_ScalarUDF,
184) -> FFI_Result<SVec<u8>> {
185    let codec = codec.inner();
186    let node: Arc<dyn ScalarUDFImpl> = (&node).into();
187    let node = ScalarUDF::new_from_shared_impl(node);
188
189    let mut bytes = Vec::new();
190    sresult_return!(codec.try_encode_udf(&node, &mut bytes));
191
192    FFI_Result::Ok(bytes.into_iter().collect())
193}
194
195unsafe extern "C" fn try_decode_udaf_fn_wrapper(
196    codec: &FFI_PhysicalExtensionCodec,
197    name: SStr,
198    buf: SSlice<u8>,
199) -> FFI_Result<FFI_AggregateUDF> {
200    let codec_inner = codec.inner();
201    let udaf = sresult_return!(codec_inner.try_decode_udaf(name.into(), buf.as_ref()));
202    let udaf = FFI_AggregateUDF::from(udaf);
203
204    FFI_Result::Ok(udaf)
205}
206
207unsafe extern "C" fn try_encode_udaf_fn_wrapper(
208    codec: &FFI_PhysicalExtensionCodec,
209    node: FFI_AggregateUDF,
210) -> FFI_Result<SVec<u8>> {
211    let codec = codec.inner();
212    let udaf: Arc<dyn AggregateUDFImpl> = (&node).into();
213    let udaf = AggregateUDF::new_from_shared_impl(udaf);
214
215    let mut bytes = Vec::new();
216    sresult_return!(codec.try_encode_udaf(&udaf, &mut bytes));
217
218    FFI_Result::Ok(bytes.into_iter().collect())
219}
220
221unsafe extern "C" fn try_decode_udwf_fn_wrapper(
222    codec: &FFI_PhysicalExtensionCodec,
223    name: SStr,
224    buf: SSlice<u8>,
225) -> FFI_Result<FFI_WindowUDF> {
226    let codec = codec.inner();
227    let udwf = sresult_return!(codec.try_decode_udwf(name.into(), buf.as_ref()));
228    let udwf = FFI_WindowUDF::from(udwf);
229
230    FFI_Result::Ok(udwf)
231}
232
233unsafe extern "C" fn try_encode_udwf_fn_wrapper(
234    codec: &FFI_PhysicalExtensionCodec,
235    node: FFI_WindowUDF,
236) -> FFI_Result<SVec<u8>> {
237    let codec = codec.inner();
238    let udwf: Arc<dyn WindowUDFImpl> = (&node).into();
239    let udwf = WindowUDF::new_from_shared_impl(udwf);
240
241    let mut bytes = Vec::new();
242    sresult_return!(codec.try_encode_udwf(&udwf, &mut bytes));
243
244    FFI_Result::Ok(bytes.into_iter().collect())
245}
246
247unsafe extern "C" fn release_fn_wrapper(codec: &mut FFI_PhysicalExtensionCodec) {
248    unsafe {
249        let private_data =
250            Box::from_raw(codec.private_data as *mut PhysicalExtensionCodecPrivateData);
251        drop(private_data);
252    }
253}
254
255unsafe extern "C" fn clone_fn_wrapper(
256    codec: &FFI_PhysicalExtensionCodec,
257) -> FFI_PhysicalExtensionCodec {
258    let old_codec = Arc::clone(codec.inner());
259    let runtime = codec.runtime().clone();
260
261    FFI_PhysicalExtensionCodec::new(old_codec, runtime, codec.task_ctx_provider.clone())
262}
263
264impl Drop for FFI_PhysicalExtensionCodec {
265    fn drop(&mut self) {
266        unsafe { (self.release)(self) }
267    }
268}
269
270impl FFI_PhysicalExtensionCodec {
271    /// Creates a new [`FFI_PhysicalExtensionCodec`].
272    pub fn new(
273        codec: Arc<dyn PhysicalExtensionCodec + Send>,
274        runtime: Option<Handle>,
275        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
276    ) -> Self {
277        if let Some(codec) = (Arc::clone(&codec) as Arc<dyn Any>)
278            .downcast_ref::<ForeignPhysicalExtensionCodec>()
279        {
280            return codec.0.clone();
281        }
282
283        let task_ctx_provider = task_ctx_provider.into();
284        let private_data = Box::new(PhysicalExtensionCodecPrivateData { codec, runtime });
285
286        Self {
287            try_decode: try_decode_fn_wrapper,
288            try_encode: try_encode_fn_wrapper,
289            try_decode_udf: try_decode_udf_fn_wrapper,
290            try_encode_udf: try_encode_udf_fn_wrapper,
291            try_decode_udaf: try_decode_udaf_fn_wrapper,
292            try_encode_udaf: try_encode_udaf_fn_wrapper,
293            try_decode_udwf: try_decode_udwf_fn_wrapper,
294            try_encode_udwf: try_encode_udwf_fn_wrapper,
295            task_ctx_provider,
296
297            clone: clone_fn_wrapper,
298            release: release_fn_wrapper,
299            version: crate::version,
300            private_data: Box::into_raw(private_data) as *mut c_void,
301            library_marker_id: crate::get_library_marker_id,
302        }
303    }
304}
305
306/// This wrapper struct exists on the receiver side of the FFI interface, so it has
307/// no guarantees about being able to access the data in `private_data`. Any functions
308/// defined on this struct must only use the stable functions provided in
309/// FFI_PhysicalExtensionCodec to interact with the foreign table provider.
310#[derive(Debug)]
311pub struct ForeignPhysicalExtensionCodec(pub FFI_PhysicalExtensionCodec);
312
313unsafe impl Send for ForeignPhysicalExtensionCodec {}
314unsafe impl Sync for ForeignPhysicalExtensionCodec {}
315
316impl From<&FFI_PhysicalExtensionCodec> for Arc<dyn PhysicalExtensionCodec> {
317    fn from(codec: &FFI_PhysicalExtensionCodec) -> Self {
318        if (codec.library_marker_id)() == crate::get_library_marker_id() {
319            Arc::clone(codec.inner())
320        } else {
321            Arc::new(ForeignPhysicalExtensionCodec(codec.clone()))
322        }
323    }
324}
325
326impl Clone for FFI_PhysicalExtensionCodec {
327    fn clone(&self) -> Self {
328        unsafe { (self.clone)(self) }
329    }
330}
331
332impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
333    fn try_decode(
334        &self,
335        buf: &[u8],
336        inputs: &[Arc<dyn ExecutionPlan>],
337        _ctx: &TaskContext,
338    ) -> Result<Arc<dyn ExecutionPlan>> {
339        let inputs = inputs
340            .iter()
341            .map(|plan| FFI_ExecutionPlan::new(Arc::clone(plan), None))
342            .collect();
343
344        let plan =
345            df_result!(unsafe { (self.0.try_decode)(&self.0, buf.into(), inputs) })?;
346        let plan: Arc<dyn ExecutionPlan> = (&plan).try_into()?;
347
348        Ok(plan)
349    }
350
351    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
352        let plan = FFI_ExecutionPlan::new(node, None);
353        let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?;
354
355        buf.extend(bytes);
356        Ok(())
357    }
358
359    fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
360        let udf = unsafe {
361            df_result!((self.0.try_decode_udf)(&self.0, name.into(), buf.into()))
362        }?;
363        let udf: Arc<dyn ScalarUDFImpl> = (&udf).into();
364
365        Ok(Arc::new(ScalarUDF::new_from_shared_impl(udf)))
366    }
367
368    fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
369        let node = FFI_ScalarUDF::from(Arc::new(node.clone()));
370        let bytes = df_result!(unsafe { (self.0.try_encode_udf)(&self.0, node) })?;
371
372        buf.extend(bytes);
373
374        Ok(())
375    }
376
377    fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
378        let udaf = unsafe {
379            df_result!((self.0.try_decode_udaf)(&self.0, name.into(), buf.into()))
380        }?;
381        let udaf: Arc<dyn AggregateUDFImpl> = (&udaf).into();
382
383        Ok(Arc::new(AggregateUDF::new_from_shared_impl(udaf)))
384    }
385
386    fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
387        let node = Arc::new(node.clone());
388        let node = FFI_AggregateUDF::from(node);
389        let bytes = df_result!(unsafe { (self.0.try_encode_udaf)(&self.0, node) })?;
390
391        buf.extend(bytes);
392
393        Ok(())
394    }
395
396    fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
397        let udwf = unsafe {
398            df_result!((self.0.try_decode_udwf)(&self.0, name.into(), buf.into()))
399        }?;
400        let udwf: Arc<dyn WindowUDFImpl> = (&udwf).into();
401
402        Ok(Arc::new(WindowUDF::new_from_shared_impl(udwf)))
403    }
404
405    fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
406        let node = Arc::new(node.clone());
407        let node = FFI_WindowUDF::from(node);
408        let bytes = df_result!(unsafe { (self.0.try_encode_udwf)(&self.0, node) })?;
409
410        buf.extend(bytes);
411
412        Ok(())
413    }
414}
415
416#[cfg(test)]
417pub(crate) mod tests {
418    use std::sync::Arc;
419
420    use arrow_schema::{DataType, Field, Schema};
421    use datafusion_common::{Result, exec_err};
422    use datafusion_execution::TaskContext;
423    use datafusion_expr::ptr_eq::arc_ptr_eq;
424    use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF, WindowUDFImpl};
425    use datafusion_functions::math::abs::AbsFunc;
426    use datafusion_functions_aggregate::sum::Sum;
427    use datafusion_functions_window::rank::{Rank, RankType};
428    use datafusion_physical_plan::ExecutionPlan;
429    use datafusion_proto::physical_plan::PhysicalExtensionCodec;
430
431    use crate::execution_plan::tests::EmptyExec;
432    use crate::proto::physical_extension_codec::FFI_PhysicalExtensionCodec;
433
434    #[derive(Debug)]
435    pub(crate) struct TestExtensionCodec;
436
437    impl TestExtensionCodec {
438        pub(crate) const MAGIC_NUMBER: u8 = 127;
439        pub(crate) const EMPTY_EXEC_SERIALIZED: u8 = 1;
440        pub(crate) const ABS_FUNC_SERIALIZED: u8 = 2;
441        pub(crate) const SUM_UDAF_SERIALIZED: u8 = 3;
442        pub(crate) const RANK_UDWF_SERIALIZED: u8 = 4;
443        pub(crate) const MEMTABLE_SERIALIZED: u8 = 5;
444    }
445
446    impl PhysicalExtensionCodec for TestExtensionCodec {
447        fn try_decode(
448            &self,
449            buf: &[u8],
450            _inputs: &[Arc<dyn ExecutionPlan>],
451            _ctx: &TaskContext,
452        ) -> Result<Arc<dyn ExecutionPlan>> {
453            if buf[0] != Self::MAGIC_NUMBER {
454                return exec_err!(
455                    "TestExtensionCodec input buffer does not start with magic number"
456                );
457            }
458
459            if buf.len() != 2 || buf[1] != Self::EMPTY_EXEC_SERIALIZED {
460                return exec_err!("TestExtensionCodec unable to decode execution plan");
461            }
462
463            Ok(create_test_exec())
464        }
465
466        fn try_encode(
467            &self,
468            node: Arc<dyn ExecutionPlan>,
469            buf: &mut Vec<u8>,
470        ) -> Result<()> {
471            buf.push(Self::MAGIC_NUMBER);
472
473            let Some(_) = node.downcast_ref::<EmptyExec>() else {
474                return exec_err!("TestExtensionCodec only expects EmptyExec");
475            };
476
477            buf.push(Self::EMPTY_EXEC_SERIALIZED);
478
479            Ok(())
480        }
481
482        fn try_decode_udf(&self, _name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
483            if buf[0] != Self::MAGIC_NUMBER {
484                return exec_err!(
485                    "TestExtensionCodec input buffer does not start with magic number"
486                );
487            }
488
489            if buf.len() != 2 || buf[1] != Self::ABS_FUNC_SERIALIZED {
490                return exec_err!("TestExtensionCodec unable to decode udf");
491            }
492
493            Ok(Arc::new(ScalarUDF::from(AbsFunc::new())))
494        }
495
496        fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
497            buf.push(Self::MAGIC_NUMBER);
498
499            let udf = node.inner();
500            if !udf.is::<AbsFunc>() {
501                return exec_err!("TestExtensionCodec only expects Abs UDF");
502            };
503
504            buf.push(Self::ABS_FUNC_SERIALIZED);
505
506            Ok(())
507        }
508
509        fn try_decode_udaf(&self, _name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
510            if buf[0] != Self::MAGIC_NUMBER {
511                return exec_err!(
512                    "TestExtensionCodec input buffer does not start with magic number"
513                );
514            }
515
516            if buf.len() != 2 || buf[1] != Self::SUM_UDAF_SERIALIZED {
517                return exec_err!("TestExtensionCodec unable to decode udaf");
518            }
519
520            Ok(Arc::new(AggregateUDF::from(Sum::new())))
521        }
522
523        fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
524            buf.push(Self::MAGIC_NUMBER);
525
526            let udf = node.inner();
527            let Some(_udf) = udf.downcast_ref::<Sum>() else {
528                return exec_err!("TestExtensionCodec only expects Sum UDAF");
529            };
530
531            buf.push(Self::SUM_UDAF_SERIALIZED);
532
533            Ok(())
534        }
535
536        fn try_decode_udwf(&self, _name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
537            if buf[0] != Self::MAGIC_NUMBER {
538                return exec_err!(
539                    "TestExtensionCodec input buffer does not start with magic number"
540                );
541            }
542
543            if buf.len() != 2 || buf[1] != Self::RANK_UDWF_SERIALIZED {
544                return exec_err!("TestExtensionCodec unable to decode udwf");
545            }
546
547            Ok(Arc::new(WindowUDF::from(Rank::new(
548                "my_rank".to_owned(),
549                RankType::Basic,
550            ))))
551        }
552
553        fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
554            buf.push(Self::MAGIC_NUMBER);
555
556            let udf = node.inner();
557            let Some(udf) = udf.downcast_ref::<Rank>() else {
558                return exec_err!("TestExtensionCodec only expects Rank UDWF");
559            };
560
561            if udf.name() != "my_rank" {
562                return exec_err!("TestExtensionCodec only expects my_rank UDWF name");
563            }
564
565            buf.push(Self::RANK_UDWF_SERIALIZED);
566
567            Ok(())
568        }
569    }
570
571    fn create_test_exec() -> Arc<dyn ExecutionPlan> {
572        let schema =
573            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
574        Arc::new(EmptyExec::new(schema)) as Arc<dyn ExecutionPlan>
575    }
576
577    #[test]
578    fn roundtrip_ffi_physical_extension_codec_exec_plan() -> Result<()> {
579        let codec = Arc::new(TestExtensionCodec {});
580        let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
581
582        let mut ffi_codec =
583            FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
584        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
585        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
586
587        let exec = create_test_exec();
588        let input_execs = [create_test_exec()];
589        let mut bytes = Vec::new();
590        foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?;
591
592        let returned_exec =
593            foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?;
594
595        assert!(returned_exec.is::<EmptyExec>());
596
597        Ok(())
598    }
599
600    #[test]
601    fn roundtrip_ffi_physical_extension_codec_udf() -> Result<()> {
602        let codec = Arc::new(TestExtensionCodec {});
603        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
604
605        let mut ffi_codec =
606            FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
607        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
608        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
609
610        let udf = Arc::new(ScalarUDF::from(AbsFunc::new()));
611        let mut bytes = Vec::new();
612        foreign_codec.try_encode_udf(udf.as_ref(), &mut bytes)?;
613
614        let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?;
615
616        assert!(returned_udf.inner().is::<AbsFunc>());
617
618        Ok(())
619    }
620
621    #[test]
622    fn roundtrip_ffi_physical_extension_codec_udaf() -> Result<()> {
623        let codec = Arc::new(TestExtensionCodec {});
624        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
625
626        let mut ffi_codec =
627            FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
628        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
629        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
630
631        let udf = Arc::new(AggregateUDF::from(Sum::new()));
632        let mut bytes = Vec::new();
633        foreign_codec.try_encode_udaf(udf.as_ref(), &mut bytes)?;
634
635        let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?;
636
637        assert!(returned_udf.inner().is::<Sum>());
638
639        Ok(())
640    }
641
642    #[test]
643    fn roundtrip_ffi_physical_extension_codec_udwf() -> Result<()> {
644        let codec = Arc::new(TestExtensionCodec {});
645        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
646
647        let mut ffi_codec =
648            FFI_PhysicalExtensionCodec::new(codec, None, task_ctx_provider);
649        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
650        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
651
652        let udf = Arc::new(WindowUDF::from(Rank::new(
653            "my_rank".to_owned(),
654            RankType::Basic,
655        )));
656        let mut bytes = Vec::new();
657        foreign_codec.try_encode_udwf(udf.as_ref(), &mut bytes)?;
658
659        let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?;
660
661        assert!(returned_udf.inner().is::<Rank>());
662
663        Ok(())
664    }
665
666    #[test]
667    fn ffi_physical_extension_codec_local_bypass() {
668        let codec =
669            Arc::new(TestExtensionCodec {}) as Arc<dyn PhysicalExtensionCodec + Send>;
670        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
671
672        let mut ffi_codec =
673            FFI_PhysicalExtensionCodec::new(Arc::clone(&codec), None, task_ctx_provider);
674
675        let codec = codec as Arc<dyn PhysicalExtensionCodec>;
676        // Verify local libraries can be downcast to their original
677        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
678        assert!(arc_ptr_eq(&foreign_codec, &codec));
679
680        // Verify different library markers generate foreign providers
681        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
682        let foreign_codec: Arc<dyn PhysicalExtensionCodec> = (&ffi_codec).into();
683        assert!(!arc_ptr_eq(&foreign_codec, &codec));
684    }
685}