use std::sync::Arc;
use crate::flight::exec::{FlightConfig, FlightExec};
use crate::flight::to_df_err;
use datafusion::common::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
#[derive(Clone, Debug, Default)]
pub struct FlightPhysicalCodec {}
impl PhysicalExtensionCodec for FlightPhysicalCodec {
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_registry: &TaskContext,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
if inputs.is_empty() {
let config: FlightConfig = serde_json::from_slice(buf).map_err(to_df_err)?;
Ok(Arc::from(FlightExec::from(config)))
} else {
Err(DataFusionError::Internal(
"FlightExec is not supposed to have any inputs".into(),
))
}
}
fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
) -> datafusion::common::Result<()> {
if let Some(flight) = node.downcast_ref::<FlightExec>() {
let mut bytes = serde_json::to_vec(flight.config()).map_err(to_df_err)?;
buf.append(&mut bytes);
Ok(())
} else {
Err(DataFusionError::Internal(
"This codec only supports the FlightExec physical plan".into(),
))
}
}
}