Skip to main content

datafusion_ffi/proto/
logical_extension_codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::ffi::c_void;
20use std::sync::Arc;
21
22use arrow::datatypes::SchemaRef;
23use datafusion_catalog::TableProvider;
24use datafusion_common::error::Result;
25use datafusion_common::{TableReference, not_impl_err};
26use datafusion_datasource::file_format::FileFormatFactory;
27use datafusion_execution::{TaskContext, TaskContextProvider};
28use datafusion_expr::{
29    AggregateUDF, AggregateUDFImpl, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl,
30    WindowUDF, WindowUDFImpl,
31};
32use datafusion_proto::logical_plan::{
33    DefaultLogicalExtensionCodec, LogicalExtensionCodec,
34};
35
36use stabby::slice::Slice as SSlice;
37use stabby::str::Str as SStr;
38use stabby::vec::Vec as SVec;
39use tokio::runtime::Handle;
40
41use crate::arrow_wrappers::WrappedSchema;
42use crate::execution::FFI_TaskContextProvider;
43use crate::table_provider::FFI_TableProvider;
44use crate::udaf::FFI_AggregateUDF;
45use crate::udf::FFI_ScalarUDF;
46use crate::udwf::FFI_WindowUDF;
47use crate::util::FFI_Result;
48use crate::{df_result, sresult_return};
49
50/// A stable struct for sharing [`LogicalExtensionCodec`] across FFI boundaries.
51#[repr(C)]
52#[derive(Debug)]
53pub struct FFI_LogicalExtensionCodec {
54    /// Decode bytes into a table provider.
55    try_decode_table_provider: unsafe extern "C" fn(
56        &Self,
57        buf: SSlice<u8>,
58        table_ref: SStr,
59        schema: WrappedSchema,
60    ) -> FFI_Result<FFI_TableProvider>,
61
62    /// Encode a table provider into bytes.
63    try_encode_table_provider: unsafe extern "C" fn(
64        &Self,
65        table_ref: SStr,
66        node: FFI_TableProvider,
67    ) -> FFI_Result<SVec<u8>>,
68
69    /// Decode bytes into a user defined scalar function.
70    try_decode_udf: unsafe extern "C" fn(
71        &Self,
72        name: SStr,
73        buf: SSlice<u8>,
74    ) -> FFI_Result<FFI_ScalarUDF>,
75
76    /// Encode a user defined scalar function into bytes.
77    try_encode_udf:
78        unsafe extern "C" fn(&Self, node: FFI_ScalarUDF) -> FFI_Result<SVec<u8>>,
79
80    /// Decode bytes into a user defined aggregate function.
81    try_decode_udaf: unsafe extern "C" fn(
82        &Self,
83        name: SStr,
84        buf: SSlice<u8>,
85    ) -> FFI_Result<FFI_AggregateUDF>,
86
87    /// Encode a user defined aggregate function into bytes.
88    try_encode_udaf:
89        unsafe extern "C" fn(&Self, node: FFI_AggregateUDF) -> FFI_Result<SVec<u8>>,
90
91    /// Decode bytes into a user defined window function.
92    try_decode_udwf: unsafe extern "C" fn(
93        &Self,
94        name: SStr,
95        buf: SSlice<u8>,
96    ) -> FFI_Result<FFI_WindowUDF>,
97
98    /// Encode a user defined window function into bytes.
99    try_encode_udwf:
100        unsafe extern "C" fn(&Self, node: FFI_WindowUDF) -> FFI_Result<SVec<u8>>,
101
102    pub task_ctx_provider: FFI_TaskContextProvider,
103
104    /// Used to create a clone on the provider of the execution plan. This should
105    /// only need to be called by the receiver of the plan.
106    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
107
108    /// Release the memory of the private data when it is no longer being used.
109    pub release: unsafe extern "C" fn(arg: &mut Self),
110
111    /// Return the major DataFusion version number of this provider.
112    pub version: unsafe extern "C" fn() -> u64,
113
114    /// Internal data. This is only to be accessed by the provider of the plan.
115    /// A [`ForeignLogicalExtensionCodec`] should never attempt to access this data.
116    pub private_data: *mut c_void,
117
118    /// Utility to identify when FFI objects are accessed locally through
119    /// the foreign interface.
120    pub library_marker_id: extern "C" fn() -> usize,
121}
122
123unsafe impl Send for FFI_LogicalExtensionCodec {}
124unsafe impl Sync for FFI_LogicalExtensionCodec {}
125
126struct LogicalExtensionCodecPrivateData {
127    codec: Arc<dyn LogicalExtensionCodec>,
128    runtime: Option<Handle>,
129}
130
131impl FFI_LogicalExtensionCodec {
132    fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
133        let private_data = self.private_data as *const LogicalExtensionCodecPrivateData;
134        unsafe { &(*private_data).codec }
135    }
136
137    fn runtime(&self) -> &Option<Handle> {
138        let private_data = self.private_data as *const LogicalExtensionCodecPrivateData;
139        unsafe { &(*private_data).runtime }
140    }
141
142    fn task_ctx(&self) -> Result<Arc<TaskContext>> {
143        (&self.task_ctx_provider).try_into()
144    }
145}
146
147unsafe extern "C" fn try_decode_table_provider_fn_wrapper(
148    codec: &FFI_LogicalExtensionCodec,
149    buf: SSlice<u8>,
150    table_ref: SStr,
151    schema: WrappedSchema,
152) -> FFI_Result<FFI_TableProvider> {
153    let ctx = sresult_return!(codec.task_ctx());
154    let runtime = codec.runtime().clone();
155    let codec_inner = codec.inner();
156    let table_ref = TableReference::from(table_ref.as_str());
157    let schema: SchemaRef = schema.into();
158
159    let table_provider = sresult_return!(codec_inner.try_decode_table_provider(
160        buf.as_ref(),
161        &table_ref,
162        schema,
163        ctx.as_ref()
164    ));
165
166    FFI_Result::Ok(FFI_TableProvider::new_with_ffi_codec(
167        table_provider,
168        true,
169        runtime,
170        codec.clone(),
171    ))
172}
173
174unsafe extern "C" fn try_encode_table_provider_fn_wrapper(
175    codec: &FFI_LogicalExtensionCodec,
176    table_ref: SStr,
177    node: FFI_TableProvider,
178) -> FFI_Result<SVec<u8>> {
179    let table_ref = TableReference::from(table_ref.as_str());
180    let table_provider: Arc<dyn TableProvider> = (&node).into();
181    let codec = codec.inner();
182
183    let mut bytes = Vec::new();
184    sresult_return!(codec.try_encode_table_provider(
185        &table_ref,
186        table_provider,
187        &mut bytes
188    ));
189
190    FFI_Result::Ok(bytes.into_iter().collect())
191}
192
193unsafe extern "C" fn try_decode_udf_fn_wrapper(
194    codec: &FFI_LogicalExtensionCodec,
195    name: SStr,
196    buf: SSlice<u8>,
197) -> FFI_Result<FFI_ScalarUDF> {
198    let codec = codec.inner();
199
200    let udf = sresult_return!(codec.try_decode_udf(name.as_str(), buf.as_ref()));
201    let udf = FFI_ScalarUDF::from(udf);
202
203    FFI_Result::Ok(udf)
204}
205
206unsafe extern "C" fn try_encode_udf_fn_wrapper(
207    codec: &FFI_LogicalExtensionCodec,
208    node: FFI_ScalarUDF,
209) -> FFI_Result<SVec<u8>> {
210    let codec = codec.inner();
211    let node: Arc<dyn ScalarUDFImpl> = (&node).into();
212    let node = ScalarUDF::new_from_shared_impl(node);
213
214    let mut bytes = Vec::new();
215    sresult_return!(codec.try_encode_udf(&node, &mut bytes));
216
217    FFI_Result::Ok(bytes.into_iter().collect())
218}
219
220unsafe extern "C" fn try_decode_udaf_fn_wrapper(
221    codec: &FFI_LogicalExtensionCodec,
222    name: SStr,
223    buf: SSlice<u8>,
224) -> FFI_Result<FFI_AggregateUDF> {
225    let codec_inner = codec.inner();
226    let udaf = sresult_return!(codec_inner.try_decode_udaf(name.into(), buf.as_ref()));
227    let udaf = FFI_AggregateUDF::from(udaf);
228
229    FFI_Result::Ok(udaf)
230}
231
232unsafe extern "C" fn try_encode_udaf_fn_wrapper(
233    codec: &FFI_LogicalExtensionCodec,
234    node: FFI_AggregateUDF,
235) -> FFI_Result<SVec<u8>> {
236    let codec = codec.inner();
237    let udaf: Arc<dyn AggregateUDFImpl> = (&node).into();
238    let udaf = AggregateUDF::new_from_shared_impl(udaf);
239
240    let mut bytes = Vec::new();
241    sresult_return!(codec.try_encode_udaf(&udaf, &mut bytes));
242
243    FFI_Result::Ok(bytes.into_iter().collect())
244}
245
246unsafe extern "C" fn try_decode_udwf_fn_wrapper(
247    codec: &FFI_LogicalExtensionCodec,
248    name: SStr,
249    buf: SSlice<u8>,
250) -> FFI_Result<FFI_WindowUDF> {
251    let codec = codec.inner();
252    let udwf = sresult_return!(codec.try_decode_udwf(name.into(), buf.as_ref()));
253    let udwf = FFI_WindowUDF::from(udwf);
254
255    FFI_Result::Ok(udwf)
256}
257
258unsafe extern "C" fn try_encode_udwf_fn_wrapper(
259    codec: &FFI_LogicalExtensionCodec,
260    node: FFI_WindowUDF,
261) -> FFI_Result<SVec<u8>> {
262    let codec = codec.inner();
263    let udwf: Arc<dyn WindowUDFImpl> = (&node).into();
264    let udwf = WindowUDF::new_from_shared_impl(udwf);
265
266    let mut bytes = Vec::new();
267    sresult_return!(codec.try_encode_udwf(&udwf, &mut bytes));
268
269    FFI_Result::Ok(bytes.into_iter().collect())
270}
271
272unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_LogicalExtensionCodec) {
273    unsafe {
274        let private_data =
275            Box::from_raw(provider.private_data as *mut LogicalExtensionCodecPrivateData);
276        drop(private_data);
277    }
278}
279
280unsafe extern "C" fn clone_fn_wrapper(
281    codec: &FFI_LogicalExtensionCodec,
282) -> FFI_LogicalExtensionCodec {
283    let old_codec = Arc::clone(codec.inner());
284    let runtime = codec.runtime().clone();
285
286    FFI_LogicalExtensionCodec::new(old_codec, runtime, codec.task_ctx_provider.clone())
287}
288
289impl Drop for FFI_LogicalExtensionCodec {
290    fn drop(&mut self) {
291        unsafe { (self.release)(self) }
292    }
293}
294
295impl FFI_LogicalExtensionCodec {
296    /// Creates a new [`FFI_LogicalExtensionCodec`].
297    pub fn new(
298        codec: Arc<dyn LogicalExtensionCodec + Send>,
299        runtime: Option<Handle>,
300        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
301    ) -> Self {
302        if let Some(codec) = (Arc::clone(&codec) as Arc<dyn Any>)
303            .downcast_ref::<ForeignLogicalExtensionCodec>()
304        {
305            return codec.0.clone();
306        }
307
308        let task_ctx_provider = task_ctx_provider.into();
309        let private_data = Box::new(LogicalExtensionCodecPrivateData { codec, runtime });
310
311        Self {
312            try_decode_table_provider: try_decode_table_provider_fn_wrapper,
313            try_encode_table_provider: try_encode_table_provider_fn_wrapper,
314            try_decode_udf: try_decode_udf_fn_wrapper,
315            try_encode_udf: try_encode_udf_fn_wrapper,
316            try_decode_udaf: try_decode_udaf_fn_wrapper,
317            try_encode_udaf: try_encode_udaf_fn_wrapper,
318            try_decode_udwf: try_decode_udwf_fn_wrapper,
319            try_encode_udwf: try_encode_udwf_fn_wrapper,
320            task_ctx_provider,
321
322            clone: clone_fn_wrapper,
323            release: release_fn_wrapper,
324            version: crate::version,
325            private_data: Box::into_raw(private_data) as *mut c_void,
326            library_marker_id: crate::get_library_marker_id,
327        }
328    }
329
330    pub fn new_default(task_ctx_provider: &Arc<dyn TaskContextProvider>) -> Self {
331        let task_ctx_provider = FFI_TaskContextProvider::from(task_ctx_provider);
332        let codec = Arc::new(DefaultLogicalExtensionCodec {});
333
334        Self::new(codec, None, task_ctx_provider)
335    }
336}
337
338/// This wrapper struct exists on the receiver side of the FFI interface, so it has
339/// no guarantees about being able to access the data in `private_data`. Any functions
340/// defined on this struct must only use the stable functions provided in
341/// FFI_LogicalExtensionCodec to interact with the foreign table provider.
342#[derive(Debug)]
343pub struct ForeignLogicalExtensionCodec(pub FFI_LogicalExtensionCodec);
344
345unsafe impl Send for ForeignLogicalExtensionCodec {}
346unsafe impl Sync for ForeignLogicalExtensionCodec {}
347
348impl From<&FFI_LogicalExtensionCodec> for Arc<dyn LogicalExtensionCodec> {
349    fn from(provider: &FFI_LogicalExtensionCodec) -> Self {
350        if (provider.library_marker_id)() == crate::get_library_marker_id() {
351            Arc::clone(provider.inner())
352        } else {
353            Arc::new(ForeignLogicalExtensionCodec(provider.clone()))
354        }
355    }
356}
357
358impl Clone for FFI_LogicalExtensionCodec {
359    fn clone(&self) -> Self {
360        unsafe { (self.clone)(self) }
361    }
362}
363
364impl LogicalExtensionCodec for ForeignLogicalExtensionCodec {
365    fn try_decode(
366        &self,
367        _buf: &[u8],
368        _inputs: &[LogicalPlan],
369        _ctx: &TaskContext,
370    ) -> Result<Extension> {
371        not_impl_err!("FFI does not support decode of Extensions")
372    }
373
374    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
375        not_impl_err!("FFI does not support encode of Extensions")
376    }
377
378    fn try_decode_table_provider(
379        &self,
380        buf: &[u8],
381        table_ref: &TableReference,
382        schema: SchemaRef,
383        _ctx: &TaskContext,
384    ) -> Result<Arc<dyn TableProvider>> {
385        let table_ref = table_ref.to_string();
386        let schema: WrappedSchema = schema.into();
387
388        let ffi_table_provider = unsafe {
389            df_result!((self.0.try_decode_table_provider)(
390                &self.0,
391                buf.into(),
392                table_ref.as_str().into(),
393                schema
394            ))
395        }?;
396
397        Ok((&ffi_table_provider).into())
398    }
399
400    fn try_encode_table_provider(
401        &self,
402        table_ref: &TableReference,
403        node: Arc<dyn TableProvider>,
404        buf: &mut Vec<u8>,
405    ) -> Result<()> {
406        let table_ref = table_ref.to_string();
407        let node =
408            FFI_TableProvider::new_with_ffi_codec(node, true, None, self.0.clone());
409
410        let bytes = df_result!(unsafe {
411            (self.0.try_encode_table_provider)(&self.0, table_ref.as_str().into(), node)
412        })?;
413
414        buf.extend(bytes);
415
416        Ok(())
417    }
418
419    fn try_decode_file_format(
420        &self,
421        _buf: &[u8],
422        _ctx: &TaskContext,
423    ) -> Result<Arc<dyn FileFormatFactory>> {
424        not_impl_err!("FFI does not support decode_file_format")
425    }
426
427    fn try_encode_file_format(
428        &self,
429        _buf: &mut Vec<u8>,
430        _node: Arc<dyn FileFormatFactory>,
431    ) -> Result<()> {
432        not_impl_err!("FFI does not support encode_file_format")
433    }
434
435    fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
436        let udf = unsafe {
437            df_result!((self.0.try_decode_udf)(&self.0, name.into(), buf.into()))
438        }?;
439        let udf: Arc<dyn ScalarUDFImpl> = (&udf).into();
440
441        Ok(Arc::new(ScalarUDF::new_from_shared_impl(udf)))
442    }
443
444    fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
445        let node = FFI_ScalarUDF::from(Arc::new(node.clone()));
446        let bytes = df_result!(unsafe { (self.0.try_encode_udf)(&self.0, node) })?;
447
448        buf.extend(bytes);
449
450        Ok(())
451    }
452
453    fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
454        let udaf = unsafe {
455            df_result!((self.0.try_decode_udaf)(&self.0, name.into(), buf.into()))
456        }?;
457        let udaf: Arc<dyn AggregateUDFImpl> = (&udaf).into();
458
459        Ok(Arc::new(AggregateUDF::new_from_shared_impl(udaf)))
460    }
461
462    fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
463        let node = Arc::new(node.clone());
464        let node = FFI_AggregateUDF::from(node);
465        let bytes = df_result!(unsafe { (self.0.try_encode_udaf)(&self.0, node) })?;
466
467        buf.extend(bytes);
468
469        Ok(())
470    }
471
472    fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
473        let udwf = unsafe {
474            df_result!((self.0.try_decode_udwf)(&self.0, name.into(), buf.into()))
475        }?;
476        let udwf: Arc<dyn WindowUDFImpl> = (&udwf).into();
477
478        Ok(Arc::new(WindowUDF::new_from_shared_impl(udwf)))
479    }
480
481    fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
482        let node = Arc::new(node.clone());
483        let node = FFI_WindowUDF::from(node);
484        let bytes = df_result!(unsafe { (self.0.try_encode_udwf)(&self.0, node) })?;
485
486        buf.extend(bytes);
487
488        Ok(())
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use std::sync::Arc;
495
496    use arrow::array::record_batch;
497    use arrow_schema::{DataType, Field, Schema, SchemaRef};
498    use datafusion_catalog::{MemTable, TableProvider};
499    use datafusion_common::{Result, TableReference, exec_err};
500    use datafusion_datasource::file_format::FileFormatFactory;
501    use datafusion_execution::TaskContext;
502    use datafusion_expr::ptr_eq::arc_ptr_eq;
503    use datafusion_expr::{AggregateUDF, Extension, LogicalPlan, ScalarUDF, WindowUDF};
504    use datafusion_functions::math::abs::AbsFunc;
505    use datafusion_functions_aggregate::sum::Sum;
506    use datafusion_functions_window::rank::{Rank, RankType};
507    use datafusion_proto::logical_plan::LogicalExtensionCodec;
508    use datafusion_proto::physical_plan::PhysicalExtensionCodec;
509
510    use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
511    use crate::proto::physical_extension_codec::tests::TestExtensionCodec;
512
513    fn create_test_table() -> MemTable {
514        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
515        let rb = record_batch!(("a", Int32, [1, 2, 3]))
516            .expect("should be able to create a record batch");
517        MemTable::try_new(schema, vec![vec![rb]])
518            .expect("should be able to create an in memory table")
519    }
520
521    impl LogicalExtensionCodec for TestExtensionCodec {
522        fn try_decode(
523            &self,
524            _buf: &[u8],
525            _inputs: &[LogicalPlan],
526            _ctx: &TaskContext,
527        ) -> Result<Extension> {
528            unimplemented!()
529        }
530
531        fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
532            unimplemented!()
533        }
534
535        fn try_decode_table_provider(
536            &self,
537            buf: &[u8],
538            _table_ref: &TableReference,
539            schema: SchemaRef,
540            _ctx: &TaskContext,
541        ) -> Result<Arc<dyn TableProvider>> {
542            if buf[0] != Self::MAGIC_NUMBER {
543                return exec_err!(
544                    "TestExtensionCodec input buffer does not start with magic number"
545                );
546            }
547
548            if schema != create_test_table().schema() {
549                return exec_err!("Incorrect test table schema");
550            }
551
552            if buf.len() != 2 || buf[1] != Self::MEMTABLE_SERIALIZED {
553                return exec_err!("TestExtensionCodec unable to decode table provider");
554            }
555
556            Ok(Arc::new(create_test_table()) as Arc<dyn TableProvider>)
557        }
558
559        fn try_encode_table_provider(
560            &self,
561            _table_ref: &TableReference,
562            node: Arc<dyn TableProvider>,
563            buf: &mut Vec<u8>,
564        ) -> Result<()> {
565            buf.push(Self::MAGIC_NUMBER);
566
567            if !node.is::<MemTable>() {
568                return exec_err!("TestExtensionCodec only expects MemTable");
569            };
570
571            if node.schema() != create_test_table().schema() {
572                return exec_err!("Unexpected schema for encoding.");
573            }
574
575            buf.push(Self::MEMTABLE_SERIALIZED);
576
577            Ok(())
578        }
579
580        fn try_decode_file_format(
581            &self,
582            _buf: &[u8],
583            _ctx: &TaskContext,
584        ) -> Result<Arc<dyn FileFormatFactory>> {
585            unimplemented!()
586        }
587
588        fn try_encode_file_format(
589            &self,
590            _buf: &mut Vec<u8>,
591            _node: Arc<dyn FileFormatFactory>,
592        ) -> Result<()> {
593            unimplemented!()
594        }
595
596        fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
597            PhysicalExtensionCodec::try_decode_udf(self, name, buf)
598        }
599
600        fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
601            PhysicalExtensionCodec::try_encode_udf(self, node, buf)
602        }
603
604        fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
605            PhysicalExtensionCodec::try_decode_udaf(self, name, buf)
606        }
607
608        fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
609            PhysicalExtensionCodec::try_encode_udaf(self, node, buf)
610        }
611
612        fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
613            PhysicalExtensionCodec::try_decode_udwf(self, name, buf)
614        }
615
616        fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
617            PhysicalExtensionCodec::try_encode_udwf(self, node, buf)
618        }
619    }
620
621    #[test]
622    fn roundtrip_ffi_logical_extension_codec_table_provider() -> Result<()> {
623        let codec = Arc::new(TestExtensionCodec {});
624        let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
625
626        let mut ffi_codec =
627            FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
628        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
629        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
630
631        let table = Arc::new(create_test_table()) as Arc<dyn TableProvider>;
632        let mut bytes = Vec::new();
633        foreign_codec.try_encode_table_provider(&"my_table".into(), table, &mut bytes)?;
634
635        let returned_table = foreign_codec.try_decode_table_provider(
636            &bytes,
637            &"my_table".into(),
638            create_test_table().schema(),
639            ctx.task_ctx().as_ref(),
640        )?;
641
642        assert!(returned_table.is::<MemTable>());
643
644        Ok(())
645    }
646
647    #[test]
648    fn roundtrip_ffi_logical_extension_codec_udf() -> Result<()> {
649        let codec = Arc::new(TestExtensionCodec {});
650        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
651
652        let mut ffi_codec =
653            FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
654        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
655        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
656
657        let udf = Arc::new(ScalarUDF::from(AbsFunc::new()));
658        let mut bytes = Vec::new();
659        foreign_codec.try_encode_udf(udf.as_ref(), &mut bytes)?;
660
661        let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?;
662
663        assert!(returned_udf.inner().is::<AbsFunc>());
664
665        Ok(())
666    }
667
668    #[test]
669    fn roundtrip_ffi_logical_extension_codec_udaf() -> Result<()> {
670        let codec = Arc::new(TestExtensionCodec {});
671        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
672
673        let mut ffi_codec =
674            FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
675        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
676        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
677
678        let udf = Arc::new(AggregateUDF::from(Sum::new()));
679        let mut bytes = Vec::new();
680        foreign_codec.try_encode_udaf(udf.as_ref(), &mut bytes)?;
681
682        let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?;
683
684        assert!(returned_udf.inner().is::<Sum>());
685
686        Ok(())
687    }
688
689    #[test]
690    fn roundtrip_ffi_logical_extension_codec_udwf() -> Result<()> {
691        let codec = Arc::new(TestExtensionCodec {});
692        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
693
694        let mut ffi_codec =
695            FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
696        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
697        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
698
699        let udf = Arc::new(WindowUDF::from(Rank::new(
700            "my_rank".to_owned(),
701            RankType::Basic,
702        )));
703        let mut bytes = Vec::new();
704        foreign_codec.try_encode_udwf(udf.as_ref(), &mut bytes)?;
705
706        let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?;
707
708        assert!(returned_udf.inner().is::<Rank>());
709
710        Ok(())
711    }
712
713    #[test]
714    fn ffi_logical_extension_codec_local_bypass() {
715        let codec =
716            Arc::new(TestExtensionCodec {}) as Arc<dyn LogicalExtensionCodec + Send>;
717        let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
718
719        let mut ffi_codec =
720            FFI_LogicalExtensionCodec::new(Arc::clone(&codec), None, task_ctx_provider);
721
722        let codec = codec as Arc<dyn LogicalExtensionCodec>;
723        // Verify local libraries can be downcast to their original
724        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
725        assert!(arc_ptr_eq(&foreign_codec, &codec));
726
727        // Verify different library markers generate foreign providers
728        ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
729        let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
730        assert!(!arc_ptr_eq(&foreign_codec, &codec));
731    }
732}