datafusion_ffi/proto/
logical_extension_codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ffi::c_void;
19use std::sync::Arc;
20
21use abi_stable::StableAbi;
22use abi_stable::std_types::{RResult, RSlice, RStr, RVec};
23use arrow::datatypes::SchemaRef;
24use datafusion_catalog::TableProvider;
25use datafusion_common::error::Result;
26use datafusion_common::{TableReference, not_impl_err};
27use datafusion_datasource::file_format::FileFormatFactory;
28use datafusion_execution::{TaskContext, TaskContextProvider};
29use datafusion_expr::{
30    AggregateUDF, AggregateUDFImpl, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl,
31    WindowUDF, WindowUDFImpl,
32};
33use datafusion_proto::logical_plan::{
34    DefaultLogicalExtensionCodec, LogicalExtensionCodec,
35};
36use tokio::runtime::Handle;
37
38use crate::arrow_wrappers::WrappedSchema;
39use crate::execution::FFI_TaskContextProvider;
40use crate::table_provider::FFI_TableProvider;
41use crate::udaf::FFI_AggregateUDF;
42use crate::udf::FFI_ScalarUDF;
43use crate::udwf::FFI_WindowUDF;
44use crate::util::FFIResult;
45use crate::{df_result, rresult_return};
46
47/// A stable struct for sharing [`LogicalExtensionCodec`] across FFI boundaries.
48#[repr(C)]
49#[derive(Debug, StableAbi)]
50pub struct FFI_LogicalExtensionCodec {
51    /// Decode bytes into a table provider.
52    try_decode_table_provider: unsafe extern "C" fn(
53        &Self,
54        buf: RSlice<u8>,
55        table_ref: RStr,
56        schema: WrappedSchema,
57    ) -> FFIResult<FFI_TableProvider>,
58
59    /// Encode a table provider into bytes.
60    try_encode_table_provider: unsafe extern "C" fn(
61        &Self,
62        table_ref: RStr,
63        node: FFI_TableProvider,
64    ) -> FFIResult<RVec<u8>>,
65
66    /// Decode bytes into a user defined scalar function.
67    try_decode_udf: unsafe extern "C" fn(
68        &Self,
69        name: RStr,
70        buf: RSlice<u8>,
71    ) -> FFIResult<FFI_ScalarUDF>,
72
73    /// Encode a user defined scalar function into bytes.
74    try_encode_udf:
75        unsafe extern "C" fn(&Self, node: FFI_ScalarUDF) -> FFIResult<RVec<u8>>,
76
77    /// Decode bytes into a user defined aggregate function.
78    try_decode_udaf: unsafe extern "C" fn(
79        &Self,
80        name: RStr,
81        buf: RSlice<u8>,
82    ) -> FFIResult<FFI_AggregateUDF>,
83
84    /// Encode a user defined aggregate function into bytes.
85    try_encode_udaf:
86        unsafe extern "C" fn(&Self, node: FFI_AggregateUDF) -> FFIResult<RVec<u8>>,
87
88    /// Decode bytes into a user defined window function.
89    try_decode_udwf: unsafe extern "C" fn(
90        &Self,
91        name: RStr,
92        buf: RSlice<u8>,
93    ) -> FFIResult<FFI_WindowUDF>,
94
95    /// Encode a user defined window function into bytes.
96    try_encode_udwf:
97        unsafe extern "C" fn(&Self, node: FFI_WindowUDF) -> FFIResult<RVec<u8>>,
98
99    pub task_ctx_provider: FFI_TaskContextProvider,
100
101    /// Used to create a clone on the provider of the execution plan. This should
102    /// only need to be called by the receiver of the plan.
103    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
104
105    /// Release the memory of the private data when it is no longer being used.
106    pub release: unsafe extern "C" fn(arg: &mut Self),
107
108    /// Return the major DataFusion version number of this provider.
109    pub version: unsafe extern "C" fn() -> u64,
110
111    /// Internal data. This is only to be accessed by the provider of the plan.
112    /// A [`ForeignLogicalExtensionCodec`] should never attempt to access this data.
113    pub private_data: *mut c_void,
114
115    /// Utility to identify when FFI objects are accessed locally through
116    /// the foreign interface.
117    pub library_marker_id: extern "C" fn() -> usize,
118}
119
120unsafe impl Send for FFI_LogicalExtensionCodec {}
121unsafe impl Sync for FFI_LogicalExtensionCodec {}
122
123struct LogicalExtensionCodecPrivateData {
124    codec: Arc<dyn LogicalExtensionCodec>,
125    runtime: Option<Handle>,
126}
127
128impl FFI_LogicalExtensionCodec {
129    fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
130        let private_data = self.private_data as *const LogicalExtensionCodecPrivateData;
131        unsafe { &(*private_data).codec }
132    }
133
134    fn runtime(&self) -> &Option<Handle> {
135        let private_data = self.private_data as *const LogicalExtensionCodecPrivateData;
136        unsafe { &(*private_data).runtime }
137    }
138
139    fn task_ctx(&self) -> Result<Arc<TaskContext>> {
140        (&self.task_ctx_provider).try_into()
141    }
142}
143
144unsafe extern "C" fn try_decode_table_provider_fn_wrapper(
145    codec: &FFI_LogicalExtensionCodec,
146    buf: RSlice<u8>,
147    table_ref: RStr,
148    schema: WrappedSchema,
149) -> FFIResult<FFI_TableProvider> {
150    let ctx = rresult_return!(codec.task_ctx());
151    let runtime = codec.runtime().clone();
152    let codec_inner = codec.inner();
153    let table_ref = TableReference::from(table_ref.as_str());
154    let schema: SchemaRef = schema.into();
155
156    let table_provider = rresult_return!(codec_inner.try_decode_table_provider(
157        buf.as_ref(),
158        &table_ref,
159        schema,
160        ctx.as_ref()
161    ));
162
163    RResult::ROk(FFI_TableProvider::new_with_ffi_codec(
164        table_provider,
165        true,
166        runtime,
167        codec.clone(),
168    ))
169}
170
171unsafe extern "C" fn try_encode_table_provider_fn_wrapper(
172    codec: &FFI_LogicalExtensionCodec,
173    table_ref: RStr,
174    node: FFI_TableProvider,
175) -> FFIResult<RVec<u8>> {
176    let table_ref = TableReference::from(table_ref.as_str());
177    let table_provider: Arc<dyn TableProvider> = (&node).into();
178    let codec = codec.inner();
179
180    let mut bytes = Vec::new();
181    rresult_return!(codec.try_encode_table_provider(
182        &table_ref,
183        table_provider,
184        &mut bytes
185    ));
186
187    RResult::ROk(bytes.into())
188}
189
190unsafe extern "C" fn try_decode_udf_fn_wrapper(
191    codec: &FFI_LogicalExtensionCodec,
192    name: RStr,
193    buf: RSlice<u8>,
194) -> FFIResult<FFI_ScalarUDF> {
195    let codec = codec.inner();
196
197    let udf = rresult_return!(codec.try_decode_udf(name.as_str(), buf.as_ref()));
198    let udf = FFI_ScalarUDF::from(udf);
199
200    RResult::ROk(udf)
201}
202
203unsafe extern "C" fn try_encode_udf_fn_wrapper(
204    codec: &FFI_LogicalExtensionCodec,
205    node: FFI_ScalarUDF,
206) -> FFIResult<RVec<u8>> {
207    let codec = codec.inner();
208    let node: Arc<dyn ScalarUDFImpl> = (&node).into();
209    let node = ScalarUDF::new_from_shared_impl(node);
210
211    let mut bytes = Vec::new();
212    rresult_return!(codec.try_encode_udf(&node, &mut bytes));
213
214    RResult::ROk(bytes.into())
215}
216
217unsafe extern "C" fn try_decode_udaf_fn_wrapper(
218    codec: &FFI_LogicalExtensionCodec,
219    name: RStr,
220    buf: RSlice<u8>,
221) -> FFIResult<FFI_AggregateUDF> {
222    let codec_inner = codec.inner();
223    let udaf = rresult_return!(codec_inner.try_decode_udaf(name.into(), buf.as_ref()));
224    let udaf = FFI_AggregateUDF::from(udaf);
225
226    RResult::ROk(udaf)
227}
228
229unsafe extern "C" fn try_encode_udaf_fn_wrapper(
230    codec: &FFI_LogicalExtensionCodec,
231    node: FFI_AggregateUDF,
232) -> FFIResult<RVec<u8>> {
233    let codec = codec.inner();
234    let udaf: Arc<dyn AggregateUDFImpl> = (&node).into();
235    let udaf = AggregateUDF::new_from_shared_impl(udaf);
236
237    let mut bytes = Vec::new();
238    rresult_return!(codec.try_encode_udaf(&udaf, &mut bytes));
239
240    RResult::ROk(bytes.into())
241}
242
243unsafe extern "C" fn try_decode_udwf_fn_wrapper(
244    codec: &FFI_LogicalExtensionCodec,
245    name: RStr,
246    buf: RSlice<u8>,
247) -> FFIResult<FFI_WindowUDF> {
248    let codec = codec.inner();
249    let udwf = rresult_return!(codec.try_decode_udwf(name.into(), buf.as_ref()));
250    let udwf = FFI_WindowUDF::from(udwf);
251
252    RResult::ROk(udwf)
253}
254
255unsafe extern "C" fn try_encode_udwf_fn_wrapper(
256    codec: &FFI_LogicalExtensionCodec,
257    node: FFI_WindowUDF,
258) -> FFIResult<RVec<u8>> {
259    let codec = codec.inner();
260    let udwf: Arc<dyn WindowUDFImpl> = (&node).into();
261    let udwf = WindowUDF::new_from_shared_impl(udwf);
262
263    let mut bytes = Vec::new();
264    rresult_return!(codec.try_encode_udwf(&udwf, &mut bytes));
265
266    RResult::ROk(bytes.into())
267}
268
269unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_LogicalExtensionCodec) {
270    unsafe {
271        let private_data =
272            Box::from_raw(provider.private_data as *mut LogicalExtensionCodecPrivateData);
273        drop(private_data);
274    }
275}
276
277unsafe extern "C" fn clone_fn_wrapper(
278    codec: &FFI_LogicalExtensionCodec,
279) -> FFI_LogicalExtensionCodec {
280    let old_codec = Arc::clone(codec.inner());
281    let runtime = codec.runtime().clone();
282
283    FFI_LogicalExtensionCodec::new(old_codec, runtime, codec.task_ctx_provider.clone())
284}
285
286impl Drop for FFI_LogicalExtensionCodec {
287    fn drop(&mut self) {
288        unsafe { (self.release)(self) }
289    }
290}
291
292impl FFI_LogicalExtensionCodec {
293    /// Creates a new [`FFI_LogicalExtensionCodec`].
294    pub fn new(
295        codec: Arc<dyn LogicalExtensionCodec + Send>,
296        runtime: Option<Handle>,
297        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
298    ) -> Self {
299        let task_ctx_provider = task_ctx_provider.into();
300        let private_data = Box::new(LogicalExtensionCodecPrivateData { codec, runtime });
301
302        Self {
303            try_decode_table_provider: try_decode_table_provider_fn_wrapper,
304            try_encode_table_provider: try_encode_table_provider_fn_wrapper,
305            try_decode_udf: try_decode_udf_fn_wrapper,
306            try_encode_udf: try_encode_udf_fn_wrapper,
307            try_decode_udaf: try_decode_udaf_fn_wrapper,
308            try_encode_udaf: try_encode_udaf_fn_wrapper,
309            try_decode_udwf: try_decode_udwf_fn_wrapper,
310            try_encode_udwf: try_encode_udwf_fn_wrapper,
311            task_ctx_provider,
312
313            clone: clone_fn_wrapper,
314            release: release_fn_wrapper,
315            version: crate::version,
316            private_data: Box::into_raw(private_data) as *mut c_void,
317            library_marker_id: crate::get_library_marker_id,
318        }
319    }
320
321    pub fn new_default(task_ctx_provider: &Arc<dyn TaskContextProvider>) -> Self {
322        let task_ctx_provider = FFI_TaskContextProvider::from(task_ctx_provider);
323        let codec = Arc::new(DefaultLogicalExtensionCodec {});
324
325        Self::new(codec, None, task_ctx_provider)
326    }
327}
328
329/// This wrapper struct exists on the receiver side of the FFI interface, so it has
330/// no guarantees about being able to access the data in `private_data`. Any functions
331/// defined on this struct must only use the stable functions provided in
332/// FFI_LogicalExtensionCodec to interact with the foreign table provider.
333#[derive(Debug)]
334pub struct ForeignLogicalExtensionCodec(pub FFI_LogicalExtensionCodec);
335
336unsafe impl Send for ForeignLogicalExtensionCodec {}
337unsafe impl Sync for ForeignLogicalExtensionCodec {}
338
339impl From<&FFI_LogicalExtensionCodec> for Arc<dyn LogicalExtensionCodec> {
340    fn from(provider: &FFI_LogicalExtensionCodec) -> Self {
341        if (provider.library_marker_id)() == crate::get_library_marker_id() {
342            Arc::clone(provider.inner())
343        } else {
344            Arc::new(ForeignLogicalExtensionCodec(provider.clone()))
345        }
346    }
347}
348
349impl Clone for FFI_LogicalExtensionCodec {
350    fn clone(&self) -> Self {
351        unsafe { (self.clone)(self) }
352    }
353}
354
355impl LogicalExtensionCodec for ForeignLogicalExtensionCodec {
356    fn try_decode(
357        &self,
358        _buf: &[u8],
359        _inputs: &[LogicalPlan],
360        _ctx: &TaskContext,
361    ) -> Result<Extension> {
362        not_impl_err!("FFI does not support decode of Extensions")
363    }
364
365    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
366        not_impl_err!("FFI does not support encode of Extensions")
367    }
368
369    fn try_decode_table_provider(
370        &self,
371        buf: &[u8],
372        table_ref: &TableReference,
373        schema: SchemaRef,
374        _ctx: &TaskContext,
375    ) -> Result<Arc<dyn TableProvider>> {
376        let table_ref = table_ref.to_string();
377        let schema: WrappedSchema = schema.into();
378
379        let ffi_table_provider = unsafe {
380            df_result!((self.0.try_decode_table_provider)(
381                &self.0,
382                buf.into(),
383                table_ref.as_str().into(),
384                schema
385            ))
386        }?;
387
388        Ok((&ffi_table_provider).into())
389    }
390
391    fn try_encode_table_provider(
392        &self,
393        table_ref: &TableReference,
394        node: Arc<dyn TableProvider>,
395        buf: &mut Vec<u8>,
396    ) -> Result<()> {
397        let table_ref = table_ref.to_string();
398        let node =
399            FFI_TableProvider::new_with_ffi_codec(node, true, None, self.0.clone());
400
401        let bytes = df_result!(unsafe {
402            (self.0.try_encode_table_provider)(&self.0, table_ref.as_str().into(), node)
403        })?;
404
405        buf.extend(bytes);
406
407        Ok(())
408    }
409
410    fn try_decode_file_format(
411        &self,
412        _buf: &[u8],
413        _ctx: &TaskContext,
414    ) -> Result<Arc<dyn FileFormatFactory>> {
415        not_impl_err!("FFI does not support decode_file_format")
416    }
417
418    fn try_encode_file_format(
419        &self,
420        _buf: &mut Vec<u8>,
421        _node: Arc<dyn FileFormatFactory>,
422    ) -> Result<()> {
423        not_impl_err!("FFI does not support encode_file_format")
424    }
425
426    fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
427        let udf = unsafe {
428            df_result!((self.0.try_decode_udf)(&self.0, name.into(), buf.into()))
429        }?;
430        let udf: Arc<dyn ScalarUDFImpl> = (&udf).into();
431
432        Ok(Arc::new(ScalarUDF::new_from_shared_impl(udf)))
433    }
434
435    fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
436        let node = FFI_ScalarUDF::from(Arc::new(node.clone()));
437        let bytes = df_result!(unsafe { (self.0.try_encode_udf)(&self.0, node) })?;
438
439        buf.extend(bytes);
440
441        Ok(())
442    }
443
444    fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
445        let udaf = unsafe {
446            df_result!((self.0.try_decode_udaf)(&self.0, name.into(), buf.into()))
447        }?;
448        let udaf: Arc<dyn AggregateUDFImpl> = (&udaf).into();
449
450        Ok(Arc::new(AggregateUDF::new_from_shared_impl(udaf)))
451    }
452
453    fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
454        let node = Arc::new(node.clone());
455        let node = FFI_AggregateUDF::from(node);
456        let bytes = df_result!(unsafe { (self.0.try_encode_udaf)(&self.0, node) })?;
457
458        buf.extend(bytes);
459
460        Ok(())
461    }
462
463    fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
464        let udwf = unsafe {
465            df_result!((self.0.try_decode_udwf)(&self.0, name.into(), buf.into()))
466        }?;
467        let udwf: Arc<dyn WindowUDFImpl> = (&udwf).into();
468
469        Ok(Arc::new(WindowUDF::new_from_shared_impl(udwf)))
470    }
471
472    fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
473        let node = Arc::new(node.clone());
474        let node = FFI_WindowUDF::from(node);
475        let bytes = df_result!(unsafe { (self.0.try_encode_udwf)(&self.0, node) })?;
476
477        buf.extend(bytes);
478
479        Ok(())
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use std::sync::Arc;
486
487    use arrow::array::record_batch;
488    use arrow_schema::{DataType, Field, Schema, SchemaRef};
489    use datafusion_catalog::{MemTable, TableProvider};
490    use datafusion_common::{Result, TableReference, exec_err};
491    use datafusion_datasource::file_format::FileFormatFactory;
492    use datafusion_execution::TaskContext;
493    use datafusion_expr::ptr_eq::arc_ptr_eq;
494    use datafusion_expr::{AggregateUDF, Extension, LogicalPlan, ScalarUDF, WindowUDF};
495    use datafusion_functions::math::abs::AbsFunc;
496    use datafusion_functions_aggregate::sum::Sum;
497    use datafusion_functions_window::rank::{Rank, RankType};
498    use datafusion_proto::logical_plan::LogicalExtensionCodec;
499    use datafusion_proto::physical_plan::PhysicalExtensionCodec;
500
501    use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
502    use crate::proto::physical_extension_codec::tests::TestExtensionCodec;
503
504    fn create_test_table() -> MemTable {
505        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
506        let rb = record_batch!(("a", Int32, [1, 2, 3]))
507            .expect("should be able to create a record batch");
508        MemTable::try_new(schema, vec![vec![rb]])
509            .expect("should be able to create an in memory table")
510    }
511
512    impl LogicalExtensionCodec for TestExtensionCodec {
513        fn try_decode(
514            &self,
515            _buf: &[u8],
516            _inputs: &[LogicalPlan],
517            _ctx: &TaskContext,
518        ) -> Result<Extension> {
519            unimplemented!()
520        }
521
522        fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
523            unimplemented!()
524        }
525
526        fn try_decode_table_provider(
527            &self,
528            buf: &[u8],
529            _table_ref: &TableReference,
530            schema: SchemaRef,
531            _ctx: &TaskContext,
532        ) -> Result<Arc<dyn TableProvider>> {
533            if buf[0] != Self::MAGIC_NUMBER {
534                return exec_err!(
535                    "TestExtensionCodec input buffer does not start with magic number"
536                );
537            }
538
539            if schema != create_test_table().schema() {
540                return exec_err!("Incorrect test table schema");
541            }
542
543            if buf.len() != 2 || buf[1] != Self::MEMTABLE_SERIALIZED {
544                return exec_err!("TestExtensionCodec unable to decode table provider");
545            }
546
547            Ok(Arc::new(create_test_table()) as Arc<dyn TableProvider>)
548        }
549
550        fn try_encode_table_provider(
551            &self,
552            _table_ref: &TableReference,
553            node: Arc<dyn TableProvider>,
554            buf: &mut Vec<u8>,
555        ) -> Result<()> {
556            buf.push(Self::MAGIC_NUMBER);
557
558            if !node.as_any().is::<MemTable>() {
559                return exec_err!("TestExtensionCodec only expects MemTable");
560            };
561
562            if node.schema() != create_test_table().schema() {
563                return exec_err!("Unexpected schema for encoding.");
564            }
565
566            buf.push(Self::MEMTABLE_SERIALIZED);
567
568            Ok(())
569        }
570
571        fn try_decode_file_format(
572            &self,
573            _buf: &[u8],
574            _ctx: &TaskContext,
575        ) -> Result<Arc<dyn FileFormatFactory>> {
576            unimplemented!()
577        }
578
579        fn try_encode_file_format(
580            &self,
581            _buf: &mut Vec<u8>,
582            _node: Arc<dyn FileFormatFactory>,
583        ) -> Result<()> {
584            unimplemented!()
585        }
586
587        fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
588            PhysicalExtensionCodec::try_decode_udf(self, name, buf)
589        }
590
591        fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
592            PhysicalExtensionCodec::try_encode_udf(self, node, buf)
593        }
594
595        fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
596            PhysicalExtensionCodec::try_decode_udaf(self, name, buf)
597        }
598
599        fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
600            PhysicalExtensionCodec::try_encode_udaf(self, node, buf)
601        }
602
603        fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
604            PhysicalExtensionCodec::try_decode_udwf(self, name, buf)
605        }
606
607        fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
608            PhysicalExtensionCodec::try_encode_udwf(self, node, buf)
609        }
610    }
611
612    #[test]
613    fn roundtrip_ffi_logical_extension_codec_table_provider() -> Result<()> {
614        let codec = Arc::new(TestExtensionCodec {});
615        let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
616
617        let mut ffi_codec =
618            FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
619        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
620        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
621
622        let table = Arc::new(create_test_table()) as Arc<dyn TableProvider>;
623        let mut bytes = Vec::new();
624        foreign_codec.try_encode_table_provider(&"my_table".into(), table, &mut bytes)?;
625
626        let returned_table = foreign_codec.try_decode_table_provider(
627            &bytes,
628            &"my_table".into(),
629            create_test_table().schema(),
630            ctx.task_ctx().as_ref(),
631        )?;
632
633        assert!(returned_table.as_any().is::<MemTable>());
634
635        Ok(())
636    }
637
638    #[test]
639    fn roundtrip_ffi_logical_extension_codec_udf() -> Result<()> {
640        let codec = Arc::new(TestExtensionCodec {});
641        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
642
643        let mut ffi_codec =
644            FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
645        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
646        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
647
648        let udf = Arc::new(ScalarUDF::from(AbsFunc::new()));
649        let mut bytes = Vec::new();
650        foreign_codec.try_encode_udf(udf.as_ref(), &mut bytes)?;
651
652        let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?;
653
654        assert!(returned_udf.inner().as_any().is::<AbsFunc>());
655
656        Ok(())
657    }
658
659    #[test]
660    fn roundtrip_ffi_logical_extension_codec_udaf() -> Result<()> {
661        let codec = Arc::new(TestExtensionCodec {});
662        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
663
664        let mut ffi_codec =
665            FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
666        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
667        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
668
669        let udf = Arc::new(AggregateUDF::from(Sum::new()));
670        let mut bytes = Vec::new();
671        foreign_codec.try_encode_udaf(udf.as_ref(), &mut bytes)?;
672
673        let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?;
674
675        assert!(returned_udf.inner().as_any().is::<Sum>());
676
677        Ok(())
678    }
679
680    #[test]
681    fn roundtrip_ffi_logical_extension_codec_udwf() -> Result<()> {
682        let codec = Arc::new(TestExtensionCodec {});
683        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
684
685        let mut ffi_codec =
686            FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
687        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
688        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
689
690        let udf = Arc::new(WindowUDF::from(Rank::new(
691            "my_rank".to_owned(),
692            RankType::Basic,
693        )));
694        let mut bytes = Vec::new();
695        foreign_codec.try_encode_udwf(udf.as_ref(), &mut bytes)?;
696
697        let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?;
698
699        assert!(returned_udf.inner().as_any().is::<Rank>());
700
701        Ok(())
702    }
703
704    #[test]
705    fn ffi_logical_extension_codec_local_bypass() {
706        let codec =
707            Arc::new(TestExtensionCodec {}) as Arc<dyn LogicalExtensionCodec + Send>;
708        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
709
710        let mut ffi_codec =
711            FFI_LogicalExtensionCodec::new(Arc::clone(&codec), None, task_ctx_provider);
712
713        let codec = codec as Arc<dyn LogicalExtensionCodec>;
714        // Verify local libraries can be downcast to their original
715        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
716        assert!(arc_ptr_eq(&foreign_codec, &codec));
717
718        // Verify different library markers generate foreign providers
719        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
720        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
721        assert!(!arc_ptr_eq(&foreign_codec, &codec));
722    }
723}