datafusion_remote_table/
codec.rs

1use crate::{DFResult, RemoteTableExec, Transform};
2use datafusion::common::DataFusionError;
3use datafusion::execution::FunctionRegistry;
4use datafusion::physical_plan::ExecutionPlan;
5use datafusion_proto::physical_plan::PhysicalExtensionCodec;
6use std::fmt::Debug;
7use std::sync::Arc;
8
9pub trait TransformCodec: Debug + Send + Sync {
10    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
11    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
12}
13
14#[derive(Debug)]
15pub struct RemotePhysicalCodec {
16    transform_codec: Option<Arc<dyn TransformCodec>>,
17}
18
19impl RemotePhysicalCodec {
20    pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
21        Self { transform_codec }
22    }
23}
24
25impl PhysicalExtensionCodec for RemotePhysicalCodec {
26    fn try_decode(
27        &self,
28        _buf: &[u8],
29        _inputs: &[Arc<dyn ExecutionPlan>],
30        _registry: &dyn FunctionRegistry,
31    ) -> DFResult<Arc<dyn ExecutionPlan>> {
32        todo!()
33    }
34
35    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, _buf: &mut Vec<u8>) -> DFResult<()> {
36        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
37            let _serialized_transform = if let Some(transform) = exec.transform.as_ref() {
38                let Some(transform_codec) = self.transform_codec.as_ref() else {
39                    return Err(DataFusionError::Execution(
40                        "No transform codec found".to_string(),
41                    ));
42                };
43                let bytes = transform_codec.try_encode(transform.as_ref())?;
44                Some(bytes)
45            } else {
46                None
47            };
48            todo!()
49        } else {
50            Err(DataFusionError::Execution(format!(
51                "Failed to encode {}",
52                RemoteTableExec::static_name()
53            )))
54        }
55    }
56}