1use std::ffi::c_void;
19use std::sync::Arc;
20
21use abi_stable::StableAbi;
22use abi_stable::std_types::{RResult, RSlice, RStr, RVec};
23use arrow::datatypes::SchemaRef;
24use datafusion_catalog::TableProvider;
25use datafusion_common::error::Result;
26use datafusion_common::{TableReference, not_impl_err};
27use datafusion_datasource::file_format::FileFormatFactory;
28use datafusion_execution::{TaskContext, TaskContextProvider};
29use datafusion_expr::{
30 AggregateUDF, AggregateUDFImpl, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl,
31 WindowUDF, WindowUDFImpl,
32};
33use datafusion_proto::logical_plan::{
34 DefaultLogicalExtensionCodec, LogicalExtensionCodec,
35};
36use tokio::runtime::Handle;
37
38use crate::arrow_wrappers::WrappedSchema;
39use crate::execution::FFI_TaskContextProvider;
40use crate::table_provider::FFI_TableProvider;
41use crate::udaf::FFI_AggregateUDF;
42use crate::udf::FFI_ScalarUDF;
43use crate::udwf::FFI_WindowUDF;
44use crate::util::FFIResult;
45use crate::{df_result, rresult_return};
46
47#[repr(C)]
49#[derive(Debug, StableAbi)]
50pub struct FFI_LogicalExtensionCodec {
51 try_decode_table_provider: unsafe extern "C" fn(
53 &Self,
54 buf: RSlice<u8>,
55 table_ref: RStr,
56 schema: WrappedSchema,
57 ) -> FFIResult<FFI_TableProvider>,
58
59 try_encode_table_provider: unsafe extern "C" fn(
61 &Self,
62 table_ref: RStr,
63 node: FFI_TableProvider,
64 ) -> FFIResult<RVec<u8>>,
65
66 try_decode_udf: unsafe extern "C" fn(
68 &Self,
69 name: RStr,
70 buf: RSlice<u8>,
71 ) -> FFIResult<FFI_ScalarUDF>,
72
73 try_encode_udf:
75 unsafe extern "C" fn(&Self, node: FFI_ScalarUDF) -> FFIResult<RVec<u8>>,
76
77 try_decode_udaf: unsafe extern "C" fn(
79 &Self,
80 name: RStr,
81 buf: RSlice<u8>,
82 ) -> FFIResult<FFI_AggregateUDF>,
83
84 try_encode_udaf:
86 unsafe extern "C" fn(&Self, node: FFI_AggregateUDF) -> FFIResult<RVec<u8>>,
87
88 try_decode_udwf: unsafe extern "C" fn(
90 &Self,
91 name: RStr,
92 buf: RSlice<u8>,
93 ) -> FFIResult<FFI_WindowUDF>,
94
95 try_encode_udwf:
97 unsafe extern "C" fn(&Self, node: FFI_WindowUDF) -> FFIResult<RVec<u8>>,
98
99 pub task_ctx_provider: FFI_TaskContextProvider,
100
101 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
104
105 pub release: unsafe extern "C" fn(arg: &mut Self),
107
108 pub version: unsafe extern "C" fn() -> u64,
110
111 pub private_data: *mut c_void,
114
115 pub library_marker_id: extern "C" fn() -> usize,
118}
119
120unsafe impl Send for FFI_LogicalExtensionCodec {}
121unsafe impl Sync for FFI_LogicalExtensionCodec {}
122
123struct LogicalExtensionCodecPrivateData {
124 codec: Arc<dyn LogicalExtensionCodec>,
125 runtime: Option<Handle>,
126}
127
128impl FFI_LogicalExtensionCodec {
129 fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
130 let private_data = self.private_data as *const LogicalExtensionCodecPrivateData;
131 unsafe { &(*private_data).codec }
132 }
133
134 fn runtime(&self) -> &Option<Handle> {
135 let private_data = self.private_data as *const LogicalExtensionCodecPrivateData;
136 unsafe { &(*private_data).runtime }
137 }
138
139 fn task_ctx(&self) -> Result<Arc<TaskContext>> {
140 (&self.task_ctx_provider).try_into()
141 }
142}
143
144unsafe extern "C" fn try_decode_table_provider_fn_wrapper(
145 codec: &FFI_LogicalExtensionCodec,
146 buf: RSlice<u8>,
147 table_ref: RStr,
148 schema: WrappedSchema,
149) -> FFIResult<FFI_TableProvider> {
150 let ctx = rresult_return!(codec.task_ctx());
151 let runtime = codec.runtime().clone();
152 let codec_inner = codec.inner();
153 let table_ref = TableReference::from(table_ref.as_str());
154 let schema: SchemaRef = schema.into();
155
156 let table_provider = rresult_return!(codec_inner.try_decode_table_provider(
157 buf.as_ref(),
158 &table_ref,
159 schema,
160 ctx.as_ref()
161 ));
162
163 RResult::ROk(FFI_TableProvider::new_with_ffi_codec(
164 table_provider,
165 true,
166 runtime,
167 codec.clone(),
168 ))
169}
170
171unsafe extern "C" fn try_encode_table_provider_fn_wrapper(
172 codec: &FFI_LogicalExtensionCodec,
173 table_ref: RStr,
174 node: FFI_TableProvider,
175) -> FFIResult<RVec<u8>> {
176 let table_ref = TableReference::from(table_ref.as_str());
177 let table_provider: Arc<dyn TableProvider> = (&node).into();
178 let codec = codec.inner();
179
180 let mut bytes = Vec::new();
181 rresult_return!(codec.try_encode_table_provider(
182 &table_ref,
183 table_provider,
184 &mut bytes
185 ));
186
187 RResult::ROk(bytes.into())
188}
189
190unsafe extern "C" fn try_decode_udf_fn_wrapper(
191 codec: &FFI_LogicalExtensionCodec,
192 name: RStr,
193 buf: RSlice<u8>,
194) -> FFIResult<FFI_ScalarUDF> {
195 let codec = codec.inner();
196
197 let udf = rresult_return!(codec.try_decode_udf(name.as_str(), buf.as_ref()));
198 let udf = FFI_ScalarUDF::from(udf);
199
200 RResult::ROk(udf)
201}
202
203unsafe extern "C" fn try_encode_udf_fn_wrapper(
204 codec: &FFI_LogicalExtensionCodec,
205 node: FFI_ScalarUDF,
206) -> FFIResult<RVec<u8>> {
207 let codec = codec.inner();
208 let node: Arc<dyn ScalarUDFImpl> = (&node).into();
209 let node = ScalarUDF::new_from_shared_impl(node);
210
211 let mut bytes = Vec::new();
212 rresult_return!(codec.try_encode_udf(&node, &mut bytes));
213
214 RResult::ROk(bytes.into())
215}
216
217unsafe extern "C" fn try_decode_udaf_fn_wrapper(
218 codec: &FFI_LogicalExtensionCodec,
219 name: RStr,
220 buf: RSlice<u8>,
221) -> FFIResult<FFI_AggregateUDF> {
222 let codec_inner = codec.inner();
223 let udaf = rresult_return!(codec_inner.try_decode_udaf(name.into(), buf.as_ref()));
224 let udaf = FFI_AggregateUDF::from(udaf);
225
226 RResult::ROk(udaf)
227}
228
229unsafe extern "C" fn try_encode_udaf_fn_wrapper(
230 codec: &FFI_LogicalExtensionCodec,
231 node: FFI_AggregateUDF,
232) -> FFIResult<RVec<u8>> {
233 let codec = codec.inner();
234 let udaf: Arc<dyn AggregateUDFImpl> = (&node).into();
235 let udaf = AggregateUDF::new_from_shared_impl(udaf);
236
237 let mut bytes = Vec::new();
238 rresult_return!(codec.try_encode_udaf(&udaf, &mut bytes));
239
240 RResult::ROk(bytes.into())
241}
242
243unsafe extern "C" fn try_decode_udwf_fn_wrapper(
244 codec: &FFI_LogicalExtensionCodec,
245 name: RStr,
246 buf: RSlice<u8>,
247) -> FFIResult<FFI_WindowUDF> {
248 let codec = codec.inner();
249 let udwf = rresult_return!(codec.try_decode_udwf(name.into(), buf.as_ref()));
250 let udwf = FFI_WindowUDF::from(udwf);
251
252 RResult::ROk(udwf)
253}
254
255unsafe extern "C" fn try_encode_udwf_fn_wrapper(
256 codec: &FFI_LogicalExtensionCodec,
257 node: FFI_WindowUDF,
258) -> FFIResult<RVec<u8>> {
259 let codec = codec.inner();
260 let udwf: Arc<dyn WindowUDFImpl> = (&node).into();
261 let udwf = WindowUDF::new_from_shared_impl(udwf);
262
263 let mut bytes = Vec::new();
264 rresult_return!(codec.try_encode_udwf(&udwf, &mut bytes));
265
266 RResult::ROk(bytes.into())
267}
268
269unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_LogicalExtensionCodec) {
270 unsafe {
271 let private_data =
272 Box::from_raw(provider.private_data as *mut LogicalExtensionCodecPrivateData);
273 drop(private_data);
274 }
275}
276
277unsafe extern "C" fn clone_fn_wrapper(
278 codec: &FFI_LogicalExtensionCodec,
279) -> FFI_LogicalExtensionCodec {
280 let old_codec = Arc::clone(codec.inner());
281 let runtime = codec.runtime().clone();
282
283 FFI_LogicalExtensionCodec::new(old_codec, runtime, codec.task_ctx_provider.clone())
284}
285
286impl Drop for FFI_LogicalExtensionCodec {
287 fn drop(&mut self) {
288 unsafe { (self.release)(self) }
289 }
290}
291
292impl FFI_LogicalExtensionCodec {
293 pub fn new(
295 codec: Arc<dyn LogicalExtensionCodec + Send>,
296 runtime: Option<Handle>,
297 task_ctx_provider: impl Into<FFI_TaskContextProvider>,
298 ) -> Self {
299 let task_ctx_provider = task_ctx_provider.into();
300 let private_data = Box::new(LogicalExtensionCodecPrivateData { codec, runtime });
301
302 Self {
303 try_decode_table_provider: try_decode_table_provider_fn_wrapper,
304 try_encode_table_provider: try_encode_table_provider_fn_wrapper,
305 try_decode_udf: try_decode_udf_fn_wrapper,
306 try_encode_udf: try_encode_udf_fn_wrapper,
307 try_decode_udaf: try_decode_udaf_fn_wrapper,
308 try_encode_udaf: try_encode_udaf_fn_wrapper,
309 try_decode_udwf: try_decode_udwf_fn_wrapper,
310 try_encode_udwf: try_encode_udwf_fn_wrapper,
311 task_ctx_provider,
312
313 clone: clone_fn_wrapper,
314 release: release_fn_wrapper,
315 version: crate::version,
316 private_data: Box::into_raw(private_data) as *mut c_void,
317 library_marker_id: crate::get_library_marker_id,
318 }
319 }
320
321 pub fn new_default(task_ctx_provider: &Arc<dyn TaskContextProvider>) -> Self {
322 let task_ctx_provider = FFI_TaskContextProvider::from(task_ctx_provider);
323 let codec = Arc::new(DefaultLogicalExtensionCodec {});
324
325 Self::new(codec, None, task_ctx_provider)
326 }
327}
328
329#[derive(Debug)]
334pub struct ForeignLogicalExtensionCodec(pub FFI_LogicalExtensionCodec);
335
336unsafe impl Send for ForeignLogicalExtensionCodec {}
337unsafe impl Sync for ForeignLogicalExtensionCodec {}
338
339impl From<&FFI_LogicalExtensionCodec> for Arc<dyn LogicalExtensionCodec> {
340 fn from(provider: &FFI_LogicalExtensionCodec) -> Self {
341 if (provider.library_marker_id)() == crate::get_library_marker_id() {
342 Arc::clone(provider.inner())
343 } else {
344 Arc::new(ForeignLogicalExtensionCodec(provider.clone()))
345 }
346 }
347}
348
349impl Clone for FFI_LogicalExtensionCodec {
350 fn clone(&self) -> Self {
351 unsafe { (self.clone)(self) }
352 }
353}
354
355impl LogicalExtensionCodec for ForeignLogicalExtensionCodec {
356 fn try_decode(
357 &self,
358 _buf: &[u8],
359 _inputs: &[LogicalPlan],
360 _ctx: &TaskContext,
361 ) -> Result<Extension> {
362 not_impl_err!("FFI does not support decode of Extensions")
363 }
364
365 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
366 not_impl_err!("FFI does not support encode of Extensions")
367 }
368
369 fn try_decode_table_provider(
370 &self,
371 buf: &[u8],
372 table_ref: &TableReference,
373 schema: SchemaRef,
374 _ctx: &TaskContext,
375 ) -> Result<Arc<dyn TableProvider>> {
376 let table_ref = table_ref.to_string();
377 let schema: WrappedSchema = schema.into();
378
379 let ffi_table_provider = unsafe {
380 df_result!((self.0.try_decode_table_provider)(
381 &self.0,
382 buf.into(),
383 table_ref.as_str().into(),
384 schema
385 ))
386 }?;
387
388 Ok((&ffi_table_provider).into())
389 }
390
391 fn try_encode_table_provider(
392 &self,
393 table_ref: &TableReference,
394 node: Arc<dyn TableProvider>,
395 buf: &mut Vec<u8>,
396 ) -> Result<()> {
397 let table_ref = table_ref.to_string();
398 let node =
399 FFI_TableProvider::new_with_ffi_codec(node, true, None, self.0.clone());
400
401 let bytes = df_result!(unsafe {
402 (self.0.try_encode_table_provider)(&self.0, table_ref.as_str().into(), node)
403 })?;
404
405 buf.extend(bytes);
406
407 Ok(())
408 }
409
410 fn try_decode_file_format(
411 &self,
412 _buf: &[u8],
413 _ctx: &TaskContext,
414 ) -> Result<Arc<dyn FileFormatFactory>> {
415 not_impl_err!("FFI does not support decode_file_format")
416 }
417
418 fn try_encode_file_format(
419 &self,
420 _buf: &mut Vec<u8>,
421 _node: Arc<dyn FileFormatFactory>,
422 ) -> Result<()> {
423 not_impl_err!("FFI does not support encode_file_format")
424 }
425
426 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
427 let udf = unsafe {
428 df_result!((self.0.try_decode_udf)(&self.0, name.into(), buf.into()))
429 }?;
430 let udf: Arc<dyn ScalarUDFImpl> = (&udf).into();
431
432 Ok(Arc::new(ScalarUDF::new_from_shared_impl(udf)))
433 }
434
435 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
436 let node = FFI_ScalarUDF::from(Arc::new(node.clone()));
437 let bytes = df_result!(unsafe { (self.0.try_encode_udf)(&self.0, node) })?;
438
439 buf.extend(bytes);
440
441 Ok(())
442 }
443
444 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
445 let udaf = unsafe {
446 df_result!((self.0.try_decode_udaf)(&self.0, name.into(), buf.into()))
447 }?;
448 let udaf: Arc<dyn AggregateUDFImpl> = (&udaf).into();
449
450 Ok(Arc::new(AggregateUDF::new_from_shared_impl(udaf)))
451 }
452
453 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
454 let node = Arc::new(node.clone());
455 let node = FFI_AggregateUDF::from(node);
456 let bytes = df_result!(unsafe { (self.0.try_encode_udaf)(&self.0, node) })?;
457
458 buf.extend(bytes);
459
460 Ok(())
461 }
462
463 fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
464 let udwf = unsafe {
465 df_result!((self.0.try_decode_udwf)(&self.0, name.into(), buf.into()))
466 }?;
467 let udwf: Arc<dyn WindowUDFImpl> = (&udwf).into();
468
469 Ok(Arc::new(WindowUDF::new_from_shared_impl(udwf)))
470 }
471
472 fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
473 let node = Arc::new(node.clone());
474 let node = FFI_WindowUDF::from(node);
475 let bytes = df_result!(unsafe { (self.0.try_encode_udwf)(&self.0, node) })?;
476
477 buf.extend(bytes);
478
479 Ok(())
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use std::sync::Arc;
486
487 use arrow::array::record_batch;
488 use arrow_schema::{DataType, Field, Schema, SchemaRef};
489 use datafusion_catalog::{MemTable, TableProvider};
490 use datafusion_common::{Result, TableReference, exec_err};
491 use datafusion_datasource::file_format::FileFormatFactory;
492 use datafusion_execution::TaskContext;
493 use datafusion_expr::ptr_eq::arc_ptr_eq;
494 use datafusion_expr::{AggregateUDF, Extension, LogicalPlan, ScalarUDF, WindowUDF};
495 use datafusion_functions::math::abs::AbsFunc;
496 use datafusion_functions_aggregate::sum::Sum;
497 use datafusion_functions_window::rank::{Rank, RankType};
498 use datafusion_proto::logical_plan::LogicalExtensionCodec;
499 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
500
501 use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
502 use crate::proto::physical_extension_codec::tests::TestExtensionCodec;
503
504 fn create_test_table() -> MemTable {
505 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
506 let rb = record_batch!(("a", Int32, [1, 2, 3]))
507 .expect("should be able to create a record batch");
508 MemTable::try_new(schema, vec![vec![rb]])
509 .expect("should be able to create an in memory table")
510 }
511
512 impl LogicalExtensionCodec for TestExtensionCodec {
513 fn try_decode(
514 &self,
515 _buf: &[u8],
516 _inputs: &[LogicalPlan],
517 _ctx: &TaskContext,
518 ) -> Result<Extension> {
519 unimplemented!()
520 }
521
522 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
523 unimplemented!()
524 }
525
526 fn try_decode_table_provider(
527 &self,
528 buf: &[u8],
529 _table_ref: &TableReference,
530 schema: SchemaRef,
531 _ctx: &TaskContext,
532 ) -> Result<Arc<dyn TableProvider>> {
533 if buf[0] != Self::MAGIC_NUMBER {
534 return exec_err!(
535 "TestExtensionCodec input buffer does not start with magic number"
536 );
537 }
538
539 if schema != create_test_table().schema() {
540 return exec_err!("Incorrect test table schema");
541 }
542
543 if buf.len() != 2 || buf[1] != Self::MEMTABLE_SERIALIZED {
544 return exec_err!("TestExtensionCodec unable to decode table provider");
545 }
546
547 Ok(Arc::new(create_test_table()) as Arc<dyn TableProvider>)
548 }
549
550 fn try_encode_table_provider(
551 &self,
552 _table_ref: &TableReference,
553 node: Arc<dyn TableProvider>,
554 buf: &mut Vec<u8>,
555 ) -> Result<()> {
556 buf.push(Self::MAGIC_NUMBER);
557
558 if !node.as_any().is::<MemTable>() {
559 return exec_err!("TestExtensionCodec only expects MemTable");
560 };
561
562 if node.schema() != create_test_table().schema() {
563 return exec_err!("Unexpected schema for encoding.");
564 }
565
566 buf.push(Self::MEMTABLE_SERIALIZED);
567
568 Ok(())
569 }
570
571 fn try_decode_file_format(
572 &self,
573 _buf: &[u8],
574 _ctx: &TaskContext,
575 ) -> Result<Arc<dyn FileFormatFactory>> {
576 unimplemented!()
577 }
578
579 fn try_encode_file_format(
580 &self,
581 _buf: &mut Vec<u8>,
582 _node: Arc<dyn FileFormatFactory>,
583 ) -> Result<()> {
584 unimplemented!()
585 }
586
587 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
588 PhysicalExtensionCodec::try_decode_udf(self, name, buf)
589 }
590
591 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
592 PhysicalExtensionCodec::try_encode_udf(self, node, buf)
593 }
594
595 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
596 PhysicalExtensionCodec::try_decode_udaf(self, name, buf)
597 }
598
599 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
600 PhysicalExtensionCodec::try_encode_udaf(self, node, buf)
601 }
602
603 fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
604 PhysicalExtensionCodec::try_decode_udwf(self, name, buf)
605 }
606
607 fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
608 PhysicalExtensionCodec::try_encode_udwf(self, node, buf)
609 }
610 }
611
612 #[test]
613 fn roundtrip_ffi_logical_extension_codec_table_provider() -> Result<()> {
614 let codec = Arc::new(TestExtensionCodec {});
615 let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
616
617 let mut ffi_codec =
618 FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
619 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
620 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
621
622 let table = Arc::new(create_test_table()) as Arc<dyn TableProvider>;
623 let mut bytes = Vec::new();
624 foreign_codec.try_encode_table_provider(&"my_table".into(), table, &mut bytes)?;
625
626 let returned_table = foreign_codec.try_decode_table_provider(
627 &bytes,
628 &"my_table".into(),
629 create_test_table().schema(),
630 ctx.task_ctx().as_ref(),
631 )?;
632
633 assert!(returned_table.as_any().is::<MemTable>());
634
635 Ok(())
636 }
637
638 #[test]
639 fn roundtrip_ffi_logical_extension_codec_udf() -> Result<()> {
640 let codec = Arc::new(TestExtensionCodec {});
641 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
642
643 let mut ffi_codec =
644 FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
645 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
646 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
647
648 let udf = Arc::new(ScalarUDF::from(AbsFunc::new()));
649 let mut bytes = Vec::new();
650 foreign_codec.try_encode_udf(udf.as_ref(), &mut bytes)?;
651
652 let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?;
653
654 assert!(returned_udf.inner().as_any().is::<AbsFunc>());
655
656 Ok(())
657 }
658
659 #[test]
660 fn roundtrip_ffi_logical_extension_codec_udaf() -> Result<()> {
661 let codec = Arc::new(TestExtensionCodec {});
662 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
663
664 let mut ffi_codec =
665 FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
666 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
667 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
668
669 let udf = Arc::new(AggregateUDF::from(Sum::new()));
670 let mut bytes = Vec::new();
671 foreign_codec.try_encode_udaf(udf.as_ref(), &mut bytes)?;
672
673 let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?;
674
675 assert!(returned_udf.inner().as_any().is::<Sum>());
676
677 Ok(())
678 }
679
680 #[test]
681 fn roundtrip_ffi_logical_extension_codec_udwf() -> Result<()> {
682 let codec = Arc::new(TestExtensionCodec {});
683 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
684
685 let mut ffi_codec =
686 FFI_LogicalExtensionCodec::new(codec, None, task_ctx_provider);
687 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
688 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
689
690 let udf = Arc::new(WindowUDF::from(Rank::new(
691 "my_rank".to_owned(),
692 RankType::Basic,
693 )));
694 let mut bytes = Vec::new();
695 foreign_codec.try_encode_udwf(udf.as_ref(), &mut bytes)?;
696
697 let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?;
698
699 assert!(returned_udf.inner().as_any().is::<Rank>());
700
701 Ok(())
702 }
703
704 #[test]
705 fn ffi_logical_extension_codec_local_bypass() {
706 let codec =
707 Arc::new(TestExtensionCodec {}) as Arc<dyn LogicalExtensionCodec + Send>;
708 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
709
710 let mut ffi_codec =
711 FFI_LogicalExtensionCodec::new(Arc::clone(&codec), None, task_ctx_provider);
712
713 let codec = codec as Arc<dyn LogicalExtensionCodec>;
714 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
716 assert!(arc_ptr_eq(&foreign_codec, &codec));
717
718 ffi_codec.library_marker_id = crate::mock_foreign_marker_id;
720 let foreign_codec: Arc<dyn LogicalExtensionCodec> = (&ffi_codec).into();
721 assert!(!arc_ptr_eq(&foreign_codec, &codec));
722 }
723}