use std::collections::HashMap;
use std::fmt;
use crate::{
Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr, Filter, Join,
Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort,
Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan, Unnest, Values,
Window, expr_vec_fmt,
};
use crate::dml::CopyTo;
use arrow::datatypes::Schema;
use datafusion_common::display::GraphvizBuilder;
use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{Column, DataFusionError, internal_datafusion_err};
use serde_json::json;
pub struct IndentVisitor<'a, 'b> {
f: &'a mut fmt::Formatter<'b>,
with_schema: bool,
indent: usize,
}
impl<'a, 'b> IndentVisitor<'a, 'b> {
pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
Self {
f,
with_schema,
indent: 0,
}
}
}
impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
type Node = LogicalPlan;
fn f_down(
&mut self,
plan: &'n LogicalPlan,
) -> datafusion_common::Result<TreeNodeRecursion> {
if self.indent > 0 {
writeln!(self.f)?;
}
write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
write!(self.f, "{}", plan.display())?;
if self.with_schema {
write!(self.f, " {}", display_schema(plan.schema().as_arrow()))?;
}
self.indent += 1;
Ok(TreeNodeRecursion::Continue)
}
fn f_up(
&mut self,
_plan: &'n LogicalPlan,
) -> datafusion_common::Result<TreeNodeRecursion> {
self.indent -= 1;
Ok(TreeNodeRecursion::Continue)
}
}
pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
struct Wrapper<'a>(&'a Schema);
impl fmt::Display for Wrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[")?;
for (idx, field) in self.0.fields().iter().enumerate() {
if idx > 0 {
write!(f, ", ")?;
}
let nullable_str = if field.is_nullable() { ";N" } else { "" };
write!(f, "{}:{}{}", field.name(), field.data_type(), nullable_str)?;
}
write!(f, "]")
}
}
Wrapper(schema)
}
pub struct GraphvizVisitor<'a, 'b> {
f: &'a mut fmt::Formatter<'b>,
graphviz_builder: GraphvizBuilder,
with_schema: bool,
parent_ids: Vec<usize>,
}
impl<'a, 'b> GraphvizVisitor<'a, 'b> {
pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
Self {
f,
graphviz_builder: GraphvizBuilder::default(),
with_schema: false,
parent_ids: Vec::new(),
}
}
pub fn set_with_schema(&mut self, with_schema: bool) {
self.with_schema = with_schema;
}
pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
self.graphviz_builder.start_cluster(self.f, label)
}
pub fn post_visit_plan(&mut self) -> fmt::Result {
self.graphviz_builder.end_cluster(self.f)
}
pub fn start_graph(&mut self) -> fmt::Result {
self.graphviz_builder.start_graph(self.f)
}
pub fn end_graph(&mut self) -> fmt::Result {
self.graphviz_builder.end_graph(self.f)
}
}
impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
type Node = LogicalPlan;
fn f_down(
&mut self,
plan: &'n LogicalPlan,
) -> datafusion_common::Result<TreeNodeRecursion> {
let id = self.graphviz_builder.next_id();
let label = if self.with_schema {
format!(
r"{}\nSchema: {}",
plan.display(),
display_schema(plan.schema().as_arrow())
)
} else {
format!("{}", plan.display())
};
self.graphviz_builder
.add_node(self.f, id, &label, None)
.map_err(|_e| internal_datafusion_err!("Fail to format"))?;
if let Some(parent_id) = self.parent_ids.last() {
self.graphviz_builder
.add_edge(self.f, *parent_id, id)
.map_err(|_e| internal_datafusion_err!("Fail to format"))?;
}
self.parent_ids.push(id);
Ok(TreeNodeRecursion::Continue)
}
fn f_up(
&mut self,
_plan: &LogicalPlan,
) -> datafusion_common::Result<TreeNodeRecursion> {
let res = self.parent_ids.pop();
res.ok_or(internal_datafusion_err!("Fail to format"))
.map(|_| TreeNodeRecursion::Continue)
}
}
pub struct PgJsonVisitor<'a, 'b> {
f: &'a mut fmt::Formatter<'b>,
objects: HashMap<u32, serde_json::Value>,
next_id: u32,
with_schema: bool,
parent_ids: Vec<u32>,
}
impl<'a, 'b> PgJsonVisitor<'a, 'b> {
pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
Self {
f,
objects: HashMap::new(),
next_id: 0,
with_schema: false,
parent_ids: Vec::new(),
}
}
pub fn with_schema(&mut self, with_schema: bool) {
self.with_schema = with_schema;
}
fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
match node {
LogicalPlan::EmptyRelation(_) => {
json!({
"Node Type": "EmptyRelation",
})
}
LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
json!({
"Node Type": "RecursiveQuery",
"Is Distinct": is_distinct,
})
}
LogicalPlan::Values(Values { values, .. }) => {
let str_values = values
.iter()
.take(5)
.map(|row| {
let item = row
.iter()
.map(|expr| expr.to_string())
.collect::<Vec<_>>()
.join(", ");
format!("({item})")
})
.collect::<Vec<_>>()
.join(", ");
let eclipse = if values.len() > 5 { "..." } else { "" };
let values_str = format!("{str_values}{eclipse}");
json!({
"Node Type": "Values",
"Values": values_str
})
}
LogicalPlan::TableScan(TableScan {
source,
table_name,
filters,
fetch,
..
}) => {
let mut object = json!({
"Node Type": "TableScan",
"Relation Name": table_name.table(),
});
if let Some(s) = table_name.schema() {
object["Schema"] = serde_json::Value::String(s.to_string());
}
if let Some(c) = table_name.catalog() {
object["Catalog"] = serde_json::Value::String(c.to_string());
}
if !filters.is_empty() {
let mut full_filter = vec![];
let mut partial_filter = vec![];
let mut unsupported_filters = vec![];
let filters: Vec<&Expr> = filters.iter().collect();
if let Ok(results) = source.supports_filters_pushdown(&filters) {
filters.iter().zip(results.iter()).for_each(
|(x, res)| match res {
TableProviderFilterPushDown::Exact => full_filter.push(x),
TableProviderFilterPushDown::Inexact => {
partial_filter.push(x)
}
TableProviderFilterPushDown::Unsupported => {
unsupported_filters.push(x)
}
},
);
}
if !full_filter.is_empty() {
object["Full Filters"] =
serde_json::Value::String(expr_vec_fmt!(full_filter));
};
if !partial_filter.is_empty() {
object["Partial Filters"] =
serde_json::Value::String(expr_vec_fmt!(partial_filter));
}
if !unsupported_filters.is_empty() {
object["Unsupported Filters"] =
serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
}
}
if let Some(f) = fetch {
object["Fetch"] = serde_json::Value::Number((*f).into());
}
object
}
LogicalPlan::Projection(Projection { expr, .. }) => {
json!({
"Node Type": "Projection",
"Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
})
}
LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
json!({
"Node Type": "Projection",
"Operation": op.name(),
"Table Name": table_name.table()
})
}
LogicalPlan::Copy(CopyTo {
input: _,
output_url,
file_type,
partition_by: _,
options,
output_schema: _,
}) => {
let op_str = options
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(", ");
json!({
"Node Type": "CopyTo",
"Output URL": output_url,
"File Type": format!("{}", file_type.get_ext()),
"Options": op_str
})
}
LogicalPlan::Ddl(ddl) => {
json!({
"Node Type": "Ddl",
"Operation": format!("{}", ddl.display())
})
}
LogicalPlan::Filter(Filter {
predicate: expr, ..
}) => {
json!({
"Node Type": "Filter",
"Condition": format!("{}", expr)
})
}
LogicalPlan::Window(Window { window_expr, .. }) => {
json!({
"Node Type": "WindowAggr",
"Expressions": expr_vec_fmt!(window_expr)
})
}
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
..
}) => {
json!({
"Node Type": "Aggregate",
"Group By": expr_vec_fmt!(group_expr),
"Aggregates": expr_vec_fmt!(aggr_expr)
})
}
LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
let mut object = json!({
"Node Type": "Sort",
"Sort Key": expr_vec_fmt!(expr),
});
if let Some(fetch) = fetch {
object["Fetch"] = serde_json::Value::Number((*fetch).into());
}
object
}
LogicalPlan::Join(Join {
on: keys,
filter,
join_constraint,
join_type,
..
}) => {
let join_expr: Vec<String> =
keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
let filter_expr = filter
.as_ref()
.map(|expr| format!(" Filter: {expr}"))
.unwrap_or_else(|| "".to_string());
json!({
"Node Type": format!("{} Join", join_type),
"Join Constraint": format!("{:?}", join_constraint),
"Join Keys": join_expr.join(", "),
"Filter": format!("{}", filter_expr)
})
}
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
}) => match partitioning_scheme {
Partitioning::RoundRobinBatch(n) => {
json!({
"Node Type": "Repartition",
"Partitioning Scheme": "RoundRobinBatch",
"Partition Count": n
})
}
Partitioning::Hash(expr, n) => {
let hash_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
json!({
"Node Type": "Repartition",
"Partitioning Scheme": "Hash",
"Partition Count": n,
"Partitioning Key": hash_expr
})
}
Partitioning::DistributeBy(expr) => {
let dist_by_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
json!({
"Node Type": "Repartition",
"Partitioning Scheme": "DistributeBy",
"Partitioning Key": dist_by_expr
})
}
},
LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
let mut object = serde_json::json!(
{
"Node Type": "Limit",
}
);
if let Some(s) = skip {
object["Skip"] = s.to_string().into()
};
if let Some(f) = fetch {
object["Fetch"] = f.to_string().into()
};
object
}
LogicalPlan::Subquery(Subquery { .. }) => {
json!({
"Node Type": "Subquery"
})
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
json!({
"Node Type": "Subquery",
"Alias": alias.table(),
})
}
LogicalPlan::Statement(statement) => {
json!({
"Node Type": "Statement",
"Statement": format!("{}", statement.display())
})
}
LogicalPlan::Distinct(distinct) => match distinct {
Distinct::All(_) => {
json!({
"Node Type": "DistinctAll"
})
}
Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
..
}) => {
let mut object = json!({
"Node Type": "DistinctOn",
"On": expr_vec_fmt!(on_expr),
"Select": expr_vec_fmt!(select_expr),
});
if let Some(sort_expr) = sort_expr {
object["Sort"] =
serde_json::Value::String(expr_vec_fmt!(sort_expr));
}
object
}
},
LogicalPlan::Explain { .. } => {
json!({
"Node Type": "Explain"
})
}
LogicalPlan::Analyze { .. } => {
json!({
"Node Type": "Analyze"
})
}
LogicalPlan::Union(_) => {
json!({
"Node Type": "Union"
})
}
LogicalPlan::Extension(e) => {
json!({
"Node Type": e.node.name(),
"Detail": format!("{:?}", e.node)
})
}
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
json!({
"Node Type": "DescribeTable"
})
}
LogicalPlan::Unnest(Unnest {
input: plan,
list_type_columns: list_col_indices,
struct_type_columns: struct_col_indices,
..
}) => {
let input_columns = plan.schema().columns();
let list_type_columns = list_col_indices
.iter()
.map(|(i, unnest_info)| {
format!(
"{}|depth={:?}",
&input_columns[*i].to_string(),
unnest_info.depth
)
})
.collect::<Vec<String>>();
let struct_type_columns = struct_col_indices
.iter()
.map(|i| &input_columns[*i])
.collect::<Vec<&Column>>();
json!({
"Node Type": "Unnest",
"ListColumn": expr_vec_fmt!(list_type_columns),
"StructColumn": expr_vec_fmt!(struct_type_columns),
})
}
}
}
}
impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
type Node = LogicalPlan;
fn f_down(
&mut self,
node: &'n LogicalPlan,
) -> datafusion_common::Result<TreeNodeRecursion> {
let id = self.next_id;
self.next_id += 1;
let mut object = Self::to_json_value(node);
object["Plans"] = serde_json::Value::Array(vec![]);
if self.with_schema {
object["Output"] = serde_json::Value::Array(
node.schema()
.fields()
.iter()
.map(|f| f.name().to_string())
.map(serde_json::Value::String)
.collect(),
);
};
self.objects.insert(id, object);
self.parent_ids.push(id);
Ok(TreeNodeRecursion::Continue)
}
fn f_up(
&mut self,
_node: &Self::Node,
) -> datafusion_common::Result<TreeNodeRecursion> {
let id = self.parent_ids.pop().unwrap();
let current_node = self
.objects
.remove(&id)
.ok_or_else(|| internal_datafusion_err!("Missing current node!"))?;
if let Some(parent_id) = self.parent_ids.last() {
let parent_node = self
.objects
.get_mut(parent_id)
.expect("Missing parent node!");
let plans = parent_node
.get_mut("Plans")
.and_then(|p| p.as_array_mut())
.expect("Plans should be an array");
plans.push(current_node);
} else {
let plan = serde_json::json!([{"Plan": current_node}]);
write!(
self.f,
"{}",
serde_json::to_string_pretty(&plan)
.map_err(|e| DataFusionError::External(Box::new(e)))?
)?;
}
Ok(TreeNodeRecursion::Continue)
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
use insta::assert_snapshot;
use super::*;
#[test]
fn test_display_empty_schema() {
let schema = Schema::empty();
assert_snapshot!(display_schema(&schema), @"[]");
}
#[test]
fn test_display_schema() {
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("first_name", DataType::Utf8, true),
]);
assert_snapshot!(display_schema(&schema), @"[id:Int32, first_name:Utf8;N]");
}
}