datafusion_substrait/
serializer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::logical_plan::producer;
19
20use datafusion::common::DataFusionError;
21use datafusion::error::Result;
22use datafusion::prelude::*;
23
24use prost::Message;
25use std::path::Path;
26use substrait::proto::Plan;
27use tokio::{
28    fs::OpenOptions,
29    io::{AsyncReadExt, AsyncWriteExt},
30};
31
32/// Plans a sql and serializes the generated logical plan to bytes.
33/// The bytes are then written into a file at `path`.
34///
35/// Returns an error if the file already exists.
36pub async fn serialize(
37    sql: &str,
38    ctx: &SessionContext,
39    path: impl AsRef<Path>,
40) -> Result<()> {
41    let protobuf_out = serialize_bytes(sql, ctx).await?;
42
43    let mut file = OpenOptions::new()
44        .write(true)
45        .create_new(true)
46        .open(path)
47        .await?;
48    file.write_all(&protobuf_out).await?;
49    Ok(())
50}
51
52/// Plans a sql and serializes the generated logical plan to bytes.
53pub async fn serialize_bytes(sql: &str, ctx: &SessionContext) -> Result<Vec<u8>> {
54    let df = ctx.sql(sql).await?;
55    let plan = df.into_optimized_plan()?;
56    let proto = producer::to_substrait_plan(&plan, &ctx.state())?;
57
58    let mut protobuf_out = Vec::<u8>::new();
59    proto
60        .encode(&mut protobuf_out)
61        .map_err(|e| DataFusionError::Substrait(format!("Failed to encode plan: {e}")))?;
62    Ok(protobuf_out)
63}
64
65/// Reads the file at `path` and deserializes a plan from the bytes.
66pub async fn deserialize(path: impl AsRef<Path>) -> Result<Box<Plan>> {
67    let mut protobuf_in = Vec::<u8>::new();
68
69    let mut file = OpenOptions::new().read(true).open(path).await?;
70    file.read_to_end(&mut protobuf_in).await?;
71
72    deserialize_bytes(protobuf_in).await
73}
74
75/// Deserializes a plan from the bytes.
76pub async fn deserialize_bytes(proto_bytes: Vec<u8>) -> Result<Box<Plan>> {
77    Ok(Box::new(Message::decode(&*proto_bytes).map_err(|e| {
78        DataFusionError::Substrait(format!("Failed to decode plan: {e}"))
79    })?))
80}