datafusion_substrait/
serializer.rs1use crate::logical_plan::producer;
19
20use datafusion::common::DataFusionError;
21use datafusion::error::Result;
22use datafusion::prelude::*;
23
24use prost::Message;
25use std::path::Path;
26use substrait::proto::Plan;
27use tokio::{
28 fs::OpenOptions,
29 io::{AsyncReadExt, AsyncWriteExt},
30};
31
32pub async fn serialize(
37 sql: &str,
38 ctx: &SessionContext,
39 path: impl AsRef<Path>,
40) -> Result<()> {
41 let protobuf_out = serialize_bytes(sql, ctx).await?;
42
43 let mut file = OpenOptions::new()
44 .write(true)
45 .create_new(true)
46 .open(path)
47 .await?;
48 file.write_all(&protobuf_out).await?;
49 file.flush().await?;
50 Ok(())
51}
52
53pub async fn serialize_bytes(sql: &str, ctx: &SessionContext) -> Result<Vec<u8>> {
55 let df = ctx.sql(sql).await?;
56 let plan = df.into_optimized_plan()?;
57 let proto = producer::to_substrait_plan(&plan, &ctx.state())?;
58
59 let mut protobuf_out = Vec::<u8>::new();
60 proto
61 .encode(&mut protobuf_out)
62 .map_err(|e| DataFusionError::Substrait(format!("Failed to encode plan: {e}")))?;
63 Ok(protobuf_out)
64}
65
66pub async fn deserialize(path: impl AsRef<Path>) -> Result<Box<Plan>> {
68 let mut protobuf_in = Vec::<u8>::new();
69
70 let mut file = OpenOptions::new().read(true).open(path).await?;
71 file.read_to_end(&mut protobuf_in).await?;
72
73 deserialize_bytes(protobuf_in).await
74}
75
76pub async fn deserialize_bytes(proto_bytes: Vec<u8>) -> Result<Box<Plan>> {
78 Ok(Box::new(Message::decode(&*proto_bytes).map_err(|e| {
79 DataFusionError::Substrait(format!("Failed to decode plan: {e}"))
80 })?))
81}