1use std::any::Any;
19use std::ffi::c_void;
20use std::sync::Arc;
21
22use arrow::datatypes::SchemaRef;
23use datafusion_catalog::TableProvider;
24use datafusion_common::error::Result;
25use datafusion_common::{TableReference, not_impl_err};
26use datafusion_datasource::file_format::FileFormatFactory;
27use datafusion_execution::{TaskContext, TaskContextProvider};
28use datafusion_expr::{
29 AggregateUDF, AggregateUDFImpl, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl,
30 WindowUDF, WindowUDFImpl,
31};
32use datafusion_proto::logical_plan::{
33 DefaultLogicalExtensionCodec, LogicalExtensionCodec,
34};
35
36use stabby::slice::Slice as SSlice;
37use stabby::str::Str as SStr;
38use stabby::vec::Vec as SVec;
39use tokio::runtime::Handle;
40
41use crate::arrow_wrappers::WrappedSchema;
42use crate::execution::FFI_TaskContextProvider;
43use crate::table_provider::FFI_TableProvider;
44use crate::udaf::FFI_AggregateUDF;
45use crate::udf::FFI_ScalarUDF;
46use crate::udwf::FFI_WindowUDF;
47use crate::util::FFI_Result;
48use crate::{df_result, sresult_return};
49
50#[repr(C)]
52#[derive(Debug)]
53pub struct FFI_LogicalExtensionCodec {
54 try_decode_table_provider: unsafe extern "C" fn(
56 &Self,
57 buf: SSlice<u8>,
58 table_ref: SStr,
59 schema: WrappedSchema,
60 ) -> FFI_Result<FFI_TableProvider>,
61
62 try_encode_table_provider: unsafe extern "C" fn(
64 &Self,
65 table_ref: SStr,
66 node: FFI_TableProvider,
67 ) -> FFI_Result<SVec<u8>>,
68
69 try_decode_udf: unsafe extern "C" fn(
71 &Self,
72 name: SStr,
73 buf: SSlice<u8>,
74 ) -> FFI_Result<FFI_ScalarUDF>,
75
76 try_encode_udf:
78 unsafe extern "C" fn(&Self, node: FFI_ScalarUDF) -> FFI_Result<SVec<u8>>,
79
80 try_decode_udaf: unsafe extern "C" fn(
82 &Self,
83 name: SStr,
84 buf: SSlice<u8>,
85 ) -> FFI_Result<FFI_AggregateUDF>,
86
87 try_encode_udaf:
89 unsafe extern "C" fn(&Self, node: FFI_AggregateUDF) -> FFI_Result<SVec<u8>>,
90
91 try_decode_udwf: unsafe extern "C" fn(
93 &Self,
94 name: SStr,
95 buf: SSlice<u8>,
96 ) -> FFI_Result<FFI_WindowUDF>,
97
98 try_encode_udwf:
100 unsafe extern "C" fn(&Self, node: FFI_WindowUDF) -> FFI_Result<SVec<u8>>,
101
102 pub task_ctx_provider: FFI_TaskContextProvider,
103
104 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
107
108 pub release: unsafe extern "C" fn(arg: &mut Self),
110
111 pub version: unsafe extern "C" fn() -> u64,
113
114 pub private_data: *mut c_void,
117
118 pub library_marker_id: extern "C" fn() -> usize,
121}
122
123unsafe impl Send for FFI_LogicalExtensionCodec {}
124unsafe impl Sync for FFI_LogicalExtensionCodec {}
125
126struct LogicalExtensionCodecPrivateData {
127 codec: Arc<dyn LogicalExtensionCodec>,
128 runtime: Option<Handle>,
129}
130
131impl FFI_LogicalExtensionCodec {
132 fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
133 let private_data = self.private_data as *const LogicalExtensionCodecPrivateData;
134 unsafe { &(*private_data).codec }
135 }
136
137 fn runtime(&self) -> &Option<Handle> {
138 let private_data = self.private_data as *const LogicalExtensionCodecPrivateData;
139 unsafe { &(*private_data).runtime }
140 }
141
142 fn task_ctx(&self) -> Result<Arc<TaskContext>> {
143 (&self.task_ctx_provider).try_into()
144 }
145}
146
147unsafe extern "C" fn try_decode_table_provider_fn_wrapper(
148 codec: &FFI_LogicalExtensionCodec,
149 buf: SSlice<u8>,
150 table_ref: SStr,
151 schema: WrappedSchema,
152) -> FFI_Result<FFI_TableProvider> {
153 let ctx = sresult_return!(codec.task_ctx());
154 let runtime = codec.runtime().clone();
155 let codec_inner = codec.inner();
156 let table_ref = TableReference::from(table_ref.as_str());
157 let schema: SchemaRef = schema.into();
158
159 let table_provider = sresult_return!(codec_inner.try_decode_table_provider(
160 buf.as_ref(),
161 &table_ref,
162 schema,
163 ctx.as_ref()
164 ));
165
166 FFI_Result::Ok(FFI_TableProvider::new_with_ffi_codec(
167 table_provider,
168 true,
169 runtime,
170 codec.clone(),
171 ))
172}
173
174unsafe extern "C" fn try_encode_table_provider_fn_wrapper(
175 codec: &FFI_LogicalExtensionCodec,
176 table_ref: SStr,
177 node: FFI_TableProvider,
178) -> FFI_Result<SVec<u8>> {
179 let table_ref = TableReference::from(table_ref.as_str());
180 let table_provider: Arc<dyn TableProvider> = (&node).into();
181 let codec = codec.inner();
182
183 let mut bytes = Vec::new();
184 sresult_return!(codec.try_encode_table_provider(
185 &table_ref,
186 table_provider,
187 &mut bytes
188 ));
189
190 FFI_Result::Ok(bytes.into_iter().collect())
191}
192
193unsafe extern "C" fn try_decode_udf_fn_wrapper(
194 codec: &FFI_LogicalExtensionCodec,
195 name: SStr,
196 buf: SSlice<u8>,
197) -> FFI_Result<FFI_ScalarUDF> {
198 let codec = codec.inner();
199
200 let udf = sresult_return!(codec.try_decode_udf(name.as_str(), buf.as_ref()));
201 let udf = FFI_ScalarUDF::from(udf);
202
203 FFI_Result::Ok(udf)
204}
205
206unsafe extern "C" fn try_encode_udf_fn_wrapper(
207 codec: &FFI_LogicalExtensionCodec,
208 node: FFI_ScalarUDF,
209) -> FFI_Result<SVec<u8>> {
210 let codec = codec.inner();
211 let node: Arc<dyn ScalarUDFImpl> = (&node).into();
212 let node = ScalarUDF::new_from_shared_impl(node);
213
214 let mut bytes = Vec::new();
215 sresult_return!(codec.try_encode_udf(&node, &mut bytes));
216
217 FFI_Result::Ok(bytes.into_iter().collect())
218}
219
220unsafe extern "C" fn try_decode_udaf_fn_wrapper(
221 codec: &FFI_LogicalExtensionCodec,
222 name: SStr,
223 buf: SSlice<u8>,
224) -> FFI_Result<FFI_AggregateUDF> {
225 let codec_inner = codec.inner();
226 let udaf = sresult_return!(codec_inner.try_decode_udaf(name.into(), buf.as_ref()));
227 let udaf = FFI_AggregateUDF::from(udaf);
228
229 FFI_Result::Ok(udaf)
230}
231
232unsafe extern "C" fn try_encode_udaf_fn_wrapper(
233 codec: &FFI_LogicalExtensionCodec,
234 node: FFI_AggregateUDF,
235) -> FFI_Result<SVec<u8>> {
236 let codec = codec.inner();
237 let udaf: Arc<dyn AggregateUDFImpl> = (&node).into();
238 let udaf = AggregateUDF::new_from_shared_impl(udaf);
239
240 let mut bytes = Vec::new();
241 sresult_return!(codec.try_encode_udaf(&udaf, &mut bytes));
242
243 FFI_Result::Ok(bytes.into_iter().collect())
244}
245
246unsafe extern "C" fn try_decode_udwf_fn_wrapper(
247 codec: &FFI_LogicalExtensionCodec,
248 name: SStr,
249 buf: SSlice<u8>,
250) -> FFI_Result<FFI_WindowUDF> {
251 let codec = codec.inner();
252 let udwf = sresult_return!(codec.try_decode_udwf(name.into(), buf.as_ref()));
253 let udwf = FFI_WindowUDF::from(udwf);
254
255 FFI_Result::Ok(udwf)
256}
257
258unsafe extern "C" fn try_encode_udwf_fn_wrapper(
259 codec: &FFI_LogicalExtensionCodec,
260 node: FFI_WindowUDF,
261) -> FFI_Result<SVec<u8>> {
262 let codec = codec.inner();
263 let udwf: Arc<dyn WindowUDFImpl> = (&node).into();
264 let udwf = WindowUDF::new_from_shared_impl(udwf);
265
266 let mut bytes = Vec::new();
267 sresult_return!(codec.try_encode_udwf(&udwf, &mut bytes));
268
269 FFI_Result::Ok(bytes.into_iter().collect())
270}
271
272unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_LogicalExtensionCodec) {
273 unsafe {
274 let private_data =
275 Box::from_raw(provider.private_data as *mut LogicalExtensionCodecPrivateData);
276 drop(private_data);
277 }
278}
279
280unsafe extern "C" fn clone_fn_wrapper(
281 codec: &FFI_LogicalExtensionCodec,
282) -> FFI_LogicalExtensionCodec {
283 let old_codec = Arc::clone(codec.inner());
284 let runtime = codec.runtime().clone();
285
286 FFI_LogicalExtensionCodec::new(old_codec, runtime, codec.task_ctx_provider.clone())
287}
288
289impl Drop for FFI_LogicalExtensionCodec {
290 fn drop(&mut self) {
291 unsafe { (self.release)(self) }
292 }
293}
294
295impl FFI_LogicalExtensionCodec {
296 pub fn new(
298 codec: Arc<dyn LogicalExtensionCodec + Send>,
299 runtime: Option<Handle>,
300 task_ctx_provider: impl Into<FFI_TaskContextProvider>,
301 ) -> Self {
302 if let Some(codec) = (Arc::clone(&codec) as Arc<dyn Any>)
303 .downcast_ref::<ForeignLogicalExtensionCodec>()
304 {
305 return codec.0.clone();
306 }
307
308 let task_ctx_provider = task_ctx_provider.into();
309 let private_data = Box::new(LogicalExtensionCodecPrivateData { codec, runtime });
310
311 Self {
312 try_decode_table_provider: try_decode_table_provider_fn_wrapper,
313 try_encode_table_provider: try_encode_table_provider_fn_wrapper,
314 try_decode_udf: try_decode_udf_fn_wrapper,
315 try_encode_udf: try_encode_udf_fn_wrapper,
316 try_decode_udaf: try_decode_udaf_fn_wrapper,
317 try_encode_udaf: try_encode_udaf_fn_wrapper,
318 try_decode_udwf: try_decode_udwf_fn_wrapper,
319 try_encode_udwf: try_encode_udwf_fn_wrapper,
320 task_ctx_provider,
321
322 clone: clone_fn_wrapper,
323 release: release_fn_wrapper,
324 version: crate::version,
325 private_data: Box::into_raw(private_data) as *mut c_void,
326 library_marker_id: crate::get_library_marker_id,
327 }
328 }
329
330 pub fn new_default(task_ctx_provider: &Arc<dyn TaskContextProvider>) -> Self {
331 let task_ctx_provider = FFI_TaskContextProvider::from(task_ctx_provider);
332 let codec = Arc::new(DefaultLogicalExtensionCodec {});
333
334 Self::new(codec, None, task_ctx_provider)
335 }
336}
337
338#[derive(Debug)]
343pub struct ForeignLogicalExtensionCodec(pub FFI_LogicalExtensionCodec);
344
345unsafe impl Send for ForeignLogicalExtensionCodec {}
346unsafe impl Sync for ForeignLogicalExtensionCodec {}
347
348impl From<&FFI_LogicalExtensionCodec> for Arc<dyn LogicalExtensionCodec> {
349 fn from(provider: &FFI_LogicalExtensionCodec) -> Self {
350 if (provider.library_marker_id)() == crate::get_library_marker_id() {
351 Arc::clone(provider.inner())
352 } else {
353 Arc::new(ForeignLogicalExtensionCodec(provider.clone()))
354 }
355 }
356}
357
358impl Clone for FFI_LogicalExtensionCodec {
359 fn clone(&self) -> Self {
360 unsafe { (self.clone)(self) }
361 }
362}
363
364impl LogicalExtensionCodec for ForeignLogicalExtensionCodec {
365 fn try_decode(
366 &self,
367 _buf: &[u8],
368 _inputs: &[LogicalPlan],
369 _ctx: &TaskContext,
370 ) -> Result<Extension> {
371 not_impl_err!("FFI does not support decode of Extensions")
372 }
373
374 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
375 not_impl_err!("FFI does not support encode of Extensions")
376 }
377
378 fn try_decode_table_provider(
379 &self,
380 buf: &[u8],
381 table_ref: &TableReference,
382 schema: SchemaRef,
383 _ctx: &TaskContext,
384 ) -> Result<Arc<dyn TableProvider>> {
385 let table_ref = table_ref.to_string();
386 let schema: WrappedSchema = schema.into();
387
388 let ffi_table_provider = unsafe {
389 df_result!((self.0.try_decode_table_provider)(
390 &self.0,
391 buf.into(),
392 table_ref.as_str().into(),
393 schema
394 ))
395 }?;
396
397 Ok((&ffi_table_provider).into())
398 }
399
400 fn try_encode_table_provider(
401 &self,
402 table_ref: &TableReference,
403 node: Arc<dyn TableProvider>,
404 buf: &mut Vec<u8>,
405 ) -> Result<()> {
406 let table_ref = table_ref.to_string();
407 let node =
408 FFI_TableProvider::new_with_ffi_codec(node, true, None, self.0.clone());
409
410 let bytes = df_result!(unsafe {
411 (self.0.try_encode_table_provider)(&self.0, table_ref.as_str().into(), node)
412 })?;
413
414 buf.extend(bytes);
415
416 Ok(())
417 }
418
419 fn try_decode_file_format(
420 &self,
421 _buf: &[u8],
422 _ctx: &TaskContext,
423 ) -> Result<Arc<dyn FileFormatFactory>> {
424 not_impl_err!("FFI does not support decode_file_format")
425 }
426
427 fn try_encode_file_format(
428 &self,
429 _buf: &mut Vec<u8>,
430 _node: Arc<dyn FileFormatFactory>,
431 ) -> Result<()> {
432 not_impl_err!("FFI does not support encode_file_format")
433 }
434
435 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
436 let udf = unsafe {
437 df_result!((self.0.try_decode_udf)(&self.0, name.into(), buf.into()))
438 }?;
439 let udf: Arc<dyn ScalarUDFImpl> = (&udf).into();
440
441 Ok(Arc::new(ScalarUDF::new_from_shared_impl(udf)))
442 }
443
444 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
445 let node = FFI_ScalarUDF::from(Arc::new(node.clone()));
446 let bytes = df_result!(unsafe { (self.0.try_encode_udf)(&self.0, node) })?;
447
448 buf.extend(bytes);
449
450 Ok(())
451 }
452
453 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
454 let udaf = unsafe {
455 df_result!((self.0.try_decode_udaf)(&self.0, name.into(), buf.into()))
456 }?;
457 let udaf: Arc<dyn AggregateUDFImpl> = (&udaf).into();
458
459 Ok(Arc::new(AggregateUDF::new_from_shared_impl(udaf)))
460 }
461
462 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
463 let node = Arc::new(node.clone());
464 let node = FFI_AggregateUDF::from(node);
465 let bytes = df_result!(unsafe { (self.0.try_encode_udaf)(&self.0, node) })?;
466
467 buf.extend(bytes);
468
469 Ok(())
470 }
471
472 fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
473 let udwf = unsafe {
474 df_result!((self.0.try_decode_udwf)(&self.0, name.into(), buf.into()))
475 }?;
476 let udwf: Arc<dyn WindowUDFImpl> = (&udwf).into();
477
478 Ok(Arc::new(WindowUDF::new_from_shared_impl(udwf)))
479 }
480
481 fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
482 let node = Arc::new(node.clone());
483 let node = FFI_WindowUDF::from(node);
484 let bytes = df_result!(unsafe { (self.0.try_encode_udwf)(&self.0, node) })?;
485
486 buf.extend(bytes);
487
488 Ok(())
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use std::sync::Arc;
495
496 use arrow::array::record_batch;
497 use arrow_schema::{DataType, Field, Schema, SchemaRef};
498 use datafusion_catalog::{MemTable, TableProvider};
499 use datafusion_common::{Result, TableReference, exec_err};
500 use datafusion_datasource::file_format::FileFormatFactory;
501 use datafusion_execution::TaskContext;
502 use datafusion_expr::ptr_eq::arc_ptr_eq;
503 use datafusion_expr::{AggregateUDF, Extension, LogicalPlan, ScalarUDF, WindowUDF};
504 use datafusion_functions::math::abs::AbsFunc;
505 use datafusion_functions_aggregate::sum::Sum;
506 use datafusion_functions_window::rank::{Rank, RankType};
507 use datafusion_proto::logical_plan::LogicalExtensionCodec;
508 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
509
510 use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
511 use crate::proto::physical_extension_codec::tests::TestExtensionCodec;
512
513 fn create_test_table() -> MemTable {
514 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
515 let rb = record_batch!(("a", Int32, [1, 2, 3]))
516 .expect("should be able to create a record batch");
517 MemTable::try_new(schema, vec![vec![rb]])
518 .expect("should be able to create an in memory table")
519 }
520
521 impl LogicalExtensionCodec for TestExtensionCodec {
522 fn try_decode(
523 &self,
524 _buf: &[u8],
525 _inputs: &[LogicalPlan],
526 _ctx: &TaskContext,
527 ) -> Result<Extension> {
528 unimplemented!()
529 }
530
531 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
532 unimplemented!()
533 }
534
535 fn try_decode_table_provider(
536 &self,
537 buf: &[u8],
538 _table_ref: &TableReference,
539 schema: SchemaRef,
540 _ctx: &TaskContext,
541 ) -> Result<Arc<dyn TableProvider>> {
542 if buf[0] != Self::MAGIC_NUMBER {
543 return exec_err!(
544 "TestExtensionCodec input buffer does not start with magic number"
545 );
546 }
547
548 if schema != create_test_table().schema() {
549 return exec_err!("Incorrect test table schema");
550 }
551
552 if buf.len() != 2 || buf[1] != Self::MEMTABLE_SERIALIZED {
553 return exec_err!("TestExtensionCodec unable to decode table provider");
554 }
555
556 Ok(Arc::new(create_test_table()) as Arc<dyn TableProvider>)
557 }
558
559 fn try_encode_table_provider(
560 &self,
561 _table_ref: &TableReference,
562 node: Arc<dyn TableProvider>,
563 buf: &mut Vec<u8>,
564 ) -> Result<()> {
565 buf.push(Self::MAGIC_NUMBER);
566
567 if !node.is::<MemTable>() {
568 return exec_err!("TestExtensionCodec only expects MemTable");
569 };
570
571 if node.schema() != create_test_table().schema() {
572 return exec_err!("Unexpected schema for encoding.");
573 }
574
575 buf.push(Self::MEMTABLE_SERIALIZED);
576
577 Ok(())
578 }
579
580 fn try_decode_file_format(
581 &self,
582 _buf: &[u8],
583 _ctx: &TaskContext,
584 ) -> Result<Arc<dyn FileFormatFactory>> {
585 unimplemented!()
586 }
587
588 fn try_encode_file_format(
589 &self,
590 _buf: &mut Vec<u8>,
591 _node: Arc<dyn FileFormatFactory>,
592 ) -> Result<()> {
593 unimplemented!()
594 }
595
596 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
597 PhysicalExtensionCodec::try_decode_udf(self, name, buf)
598 }
599
600 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
601 PhysicalExtensionCodec::try_encode_udf(self, node, buf)
602 }
603
604 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
605 PhysicalExtensionCodec::try_decode_udaf(self, name, buf)
606 }
607
608 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
609 PhysicalExtensionCodec::try_encode_udaf(self, node, buf)
610 }
611
612 fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
613 PhysicalExtensionCodec::try_decode_udwf(self, name, buf)
614 }
615
616 fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
617 PhysicalExtensionCodec::try_encode_udwf(self, node, buf)
618 }
619 }
620
621 #[test]
622 fn roundtrip_ffi_logical_extension_codec_table_provider() -> Result<()> {
623 let codec = Arc::new(TestExtensionCodec {});
624 let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
625
626 let mut ffi_codec =
627 FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
628 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
629 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
630
631 let table = Arc::new(create_test_table()) as Arc<dyn TableProvider>;
632 let mut bytes = Vec::new();
633 foreign_codec.try_encode_table_provider(&"my_table".into(), table, &mut bytes)?;
634
635 let returned_table = foreign_codec.try_decode_table_provider(
636 &bytes,
637 &"my_table".into(),
638 create_test_table().schema(),
639 ctx.task_ctx().as_ref(),
640 )?;
641
642 assert!(returned_table.is::<MemTable>());
643
644 Ok(())
645 }
646
647 #[test]
648 fn roundtrip_ffi_logical_extension_codec_udf() -> Result<()> {
649 let codec = Arc::new(TestExtensionCodec {});
650 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
651
652 let mut ffi_codec =
653 FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
654 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
655 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
656
657 let udf = Arc::new(ScalarUDF::from(AbsFunc::new()));
658 let mut bytes = Vec::new();
659 foreign_codec.try_encode_udf(udf.as_ref(), &mut bytes)?;
660
661 let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?;
662
663 assert!(returned_udf.inner().is::<AbsFunc>());
664
665 Ok(())
666 }
667
668 #[test]
669 fn roundtrip_ffi_logical_extension_codec_udaf() -> Result<()> {
670 let codec = Arc::new(TestExtensionCodec {});
671 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
672
673 let mut ffi_codec =
674 FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
675 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
676 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
677
678 let udf = Arc::new(AggregateUDF::from(Sum::new()));
679 let mut bytes = Vec::new();
680 foreign_codec.try_encode_udaf(udf.as_ref(), &mut bytes)?;
681
682 let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?;
683
684 assert!(returned_udf.inner().is::<Sum>());
685
686 Ok(())
687 }
688
689 #[test]
690 fn roundtrip_ffi_logical_extension_codec_udwf() -> Result<()> {
691 let codec = Arc::new(TestExtensionCodec {});
692 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
693
694 let mut ffi_codec =
695 FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
696 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
697 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
698
699 let udf = Arc::new(WindowUDF::from(Rank::new(
700 "my_rank".to_owned(),
701 RankType::Basic,
702 )));
703 let mut bytes = Vec::new();
704 foreign_codec.try_encode_udwf(udf.as_ref(), &mut bytes)?;
705
706 let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?;
707
708 assert!(returned_udf.inner().is::<Rank>());
709
710 Ok(())
711 }
712
713 #[test]
714 fn ffi_logical_extension_codec_local_bypass() {
715 let codec =
716 Arc::new(TestExtensionCodec {}) as Arc<dyn LogicalExtensionCodec + Send>;
717 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
718
719 let mut ffi_codec =
720 FFI_LogicalExtensionCodec::new(Arc::clone(&codec), None, task_ctx_provider);
721
722 let codec = codec as Arc<dyn LogicalExtensionCodec>;
723 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
725 assert!(arc_ptr_eq(&foreign_codec, &codec));
726
727 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
729 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
730 assert!(!arc_ptr_eq(&foreign_codec, &codec));
731 }
732}