datafusion_table_providers/flight/
codec.rs1use std::sync::Arc;
21
22use crate::flight::exec::{FlightConfig, FlightExec};
23use crate::flight::to_df_err;
24use datafusion::common::DataFusionError;
25use datafusion::logical_expr::registry::FunctionRegistry;
26use datafusion::physical_plan::ExecutionPlan;
27use datafusion_proto::physical_plan::PhysicalExtensionCodec;
28
29#[derive(Clone, Debug, Default)]
31pub struct FlightPhysicalCodec {}
32
33impl PhysicalExtensionCodec for FlightPhysicalCodec {
34 fn try_decode(
35 &self,
36 buf: &[u8],
37 inputs: &[Arc<dyn ExecutionPlan>],
38 _registry: &dyn FunctionRegistry,
39 ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
40 if inputs.is_empty() {
41 let config: FlightConfig = serde_json::from_slice(buf).map_err(to_df_err)?;
42 Ok(Arc::from(FlightExec::from(config)))
43 } else {
44 Err(DataFusionError::Internal(
45 "FlightExec is not supposed to have any inputs".into(),
46 ))
47 }
48 }
49
50 fn try_encode(
51 &self,
52 node: Arc<dyn ExecutionPlan>,
53 buf: &mut Vec<u8>,
54 ) -> datafusion::common::Result<()> {
55 if let Some(flight) = node.as_any().downcast_ref::<FlightExec>() {
56 let mut bytes = serde_json::to_vec(flight.config()).map_err(to_df_err)?;
57 buf.append(&mut bytes);
58 Ok(())
59 } else {
60 Err(DataFusionError::Internal(
61 "This codec only supports the FlightExec physical plan".into(),
62 ))
63 }
64 }
65}