datafusion_remote_table/
codec.rs1use crate::{DFResult, RemoteTableExec, Transform};
2use datafusion::common::DataFusionError;
3use datafusion::execution::FunctionRegistry;
4use datafusion::physical_plan::ExecutionPlan;
5use datafusion_proto::physical_plan::PhysicalExtensionCodec;
6use std::fmt::Debug;
7use std::sync::Arc;
8
9pub trait TransformCodec: Debug + Send + Sync {
10 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
11 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
12}
13
14#[derive(Debug)]
15pub struct RemotePhysicalCodec {
16 transform_codec: Option<Arc<dyn TransformCodec>>,
17}
18
19impl RemotePhysicalCodec {
20 pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
21 Self { transform_codec }
22 }
23}
24
25impl PhysicalExtensionCodec for RemotePhysicalCodec {
26 fn try_decode(
27 &self,
28 _buf: &[u8],
29 _inputs: &[Arc<dyn ExecutionPlan>],
30 _registry: &dyn FunctionRegistry,
31 ) -> DFResult<Arc<dyn ExecutionPlan>> {
32 todo!()
33 }
34
35 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, _buf: &mut Vec<u8>) -> DFResult<()> {
36 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
37 let _serialized_transform = if let Some(transform) = exec.transform.as_ref() {
38 let Some(transform_codec) = self.transform_codec.as_ref() else {
39 return Err(DataFusionError::Execution(
40 "No transform codec found".to_string(),
41 ));
42 };
43 let bytes = transform_codec.try_encode(transform.as_ref())?;
44 Some(bytes)
45 } else {
46 None
47 };
48 todo!()
49 } else {
50 Err(DataFusionError::Execution(format!(
51 "Failed to encode {}",
52 RemoteTableExec::static_name()
53 )))
54 }
55 }
56}