datafusion_table_providers/flight/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Codec to enable running in a distributed environment
19
20use std::sync::Arc;
21
22use crate::flight::exec::{FlightConfig, FlightExec};
23use crate::flight::to_df_err;
24use datafusion::common::DataFusionError;
25use datafusion::logical_expr::registry::FunctionRegistry;
26use datafusion::physical_plan::ExecutionPlan;
27use datafusion_proto::physical_plan::PhysicalExtensionCodec;
28
29/// Physical extension codec for FlightExec
30#[derive(Clone, Debug, Default)]
31pub struct FlightPhysicalCodec {}
32
33impl PhysicalExtensionCodec for FlightPhysicalCodec {
34    fn try_decode(
35        &self,
36        buf: &[u8],
37        inputs: &[Arc<dyn ExecutionPlan>],
38        _registry: &dyn FunctionRegistry,
39    ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
40        if inputs.is_empty() {
41            let config: FlightConfig = serde_json::from_slice(buf).map_err(to_df_err)?;
42            Ok(Arc::from(FlightExec::from(config)))
43        } else {
44            Err(DataFusionError::Internal(
45                "FlightExec is not supposed to have any inputs".into(),
46            ))
47        }
48    }
49
50    fn try_encode(
51        &self,
52        node: Arc<dyn ExecutionPlan>,
53        buf: &mut Vec<u8>,
54    ) -> datafusion::common::Result<()> {
55        if let Some(flight) = node.as_any().downcast_ref::<FlightExec>() {
56            let mut bytes = serde_json::to_vec(flight.config()).map_err(to_df_err)?;
57            buf.append(&mut bytes);
58            Ok(())
59        } else {
60            Err(DataFusionError::Internal(
61                "This codec only supports the FlightExec physical plan".into(),
62            ))
63        }
64    }
65}