datafusion_substrait/
serializer.rs1use crate::logical_plan::producer;
19
20use datafusion::common::DataFusionError;
21use datafusion::error::Result;
22use datafusion::prelude::*;
23
24use prost::Message;
25use std::path::Path;
26use substrait::proto::Plan;
27use tokio::{
28 fs::OpenOptions,
29 io::{AsyncReadExt, AsyncWriteExt},
30};
31
32pub async fn serialize(
37 sql: &str,
38 ctx: &SessionContext,
39 path: impl AsRef<Path>,
40) -> Result<()> {
41 let protobuf_out = serialize_bytes(sql, ctx).await?;
42
43 let mut file = OpenOptions::new()
44 .write(true)
45 .create_new(true)
46 .open(path)
47 .await?;
48 file.write_all(&protobuf_out).await?;
49 Ok(())
50}
51
52pub async fn serialize_bytes(sql: &str, ctx: &SessionContext) -> Result<Vec<u8>> {
54 let df = ctx.sql(sql).await?;
55 let plan = df.into_optimized_plan()?;
56 let proto = producer::to_substrait_plan(&plan, &ctx.state())?;
57
58 let mut protobuf_out = Vec::<u8>::new();
59 proto
60 .encode(&mut protobuf_out)
61 .map_err(|e| DataFusionError::Substrait(format!("Failed to encode plan: {e}")))?;
62 Ok(protobuf_out)
63}
64
65pub async fn deserialize(path: impl AsRef<Path>) -> Result<Box<Plan>> {
67 let mut protobuf_in = Vec::<u8>::new();
68
69 let mut file = OpenOptions::new().read(true).open(path).await?;
70 file.read_to_end(&mut protobuf_in).await?;
71
72 deserialize_bytes(protobuf_in).await
73}
74
75pub async fn deserialize_bytes(proto_bytes: Vec<u8>) -> Result<Box<Plan>> {
77 Ok(Box::new(Message::decode(&*proto_bytes).map_err(|e| {
78 DataFusionError::Substrait(format!("Failed to decode plan: {e}"))
79 })?))
80}