datafusion_expr/logical_plan/
display.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides logic for displaying LogicalPlans in various styles
19
20use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24    expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
25    Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
26    Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
27    Unnest, Values, Window,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{Column, DataFusionError};
35use serde_json::json;
36
37/// Formats plans with a single line per node. For example:
38///
39/// Projection: id
40///    Filter: state Eq Utf8(\"CO\")\
41///       CsvScan: employee.csv projection=Some([0, 3])";
42pub struct IndentVisitor<'a, 'b> {
43    f: &'a mut fmt::Formatter<'b>,
44    /// If true, includes summarized schema information
45    with_schema: bool,
46    /// The current indent
47    indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51    /// Create a visitor that will write a formatted LogicalPlan to f. If `with_schema` is
52    /// true, includes schema information on each line.
53    pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54        Self {
55            f,
56            with_schema,
57            indent: 0,
58        }
59    }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63    type Node = LogicalPlan;
64
65    fn f_down(
66        &mut self,
67        plan: &'n LogicalPlan,
68    ) -> datafusion_common::Result<TreeNodeRecursion> {
69        if self.indent > 0 {
70            writeln!(self.f)?;
71        }
72        write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73        write!(self.f, "{}", plan.display())?;
74        if self.with_schema {
75            write!(
76                self.f,
77                " {}",
78                display_schema(&plan.schema().as_ref().to_owned().into())
79            )?;
80        }
81
82        self.indent += 1;
83        Ok(TreeNodeRecursion::Continue)
84    }
85
86    fn f_up(
87        &mut self,
88        _plan: &'n LogicalPlan,
89    ) -> datafusion_common::Result<TreeNodeRecursion> {
90        self.indent -= 1;
91        Ok(TreeNodeRecursion::Continue)
92    }
93}
94
95/// Print the schema in a compact representation to `buf`
96///
97/// For example: `foo:Utf8` if `foo` can not be null, and
98/// `foo:Utf8;N` if `foo` is nullable.
99///
100/// ```
101/// use arrow::datatypes::{Field, Schema, DataType};
102/// # use datafusion_expr::logical_plan::display_schema;
103/// let schema = Schema::new(vec![
104///     Field::new("id", DataType::Int32, false),
105///     Field::new("first_name", DataType::Utf8, true),
106///  ]);
107///
108///  assert_eq!(
109///      "[id:Int32, first_name:Utf8;N]",
110///      format!("{}", display_schema(&schema))
111///  );
112/// ```
113pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
114    struct Wrapper<'a>(&'a Schema);
115
116    impl fmt::Display for Wrapper<'_> {
117        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118            write!(f, "[")?;
119            for (idx, field) in self.0.fields().iter().enumerate() {
120                if idx > 0 {
121                    write!(f, ", ")?;
122                }
123                let nullable_str = if field.is_nullable() { ";N" } else { "" };
124                write!(
125                    f,
126                    "{}:{:?}{}",
127                    field.name(),
128                    field.data_type(),
129                    nullable_str
130                )?;
131            }
132            write!(f, "]")
133        }
134    }
135    Wrapper(schema)
136}
137
138/// Formats plans for graphical display using the `DOT` language. This
139/// format can be visualized using software from
140/// [`graphviz`](https://graphviz.org/)
141pub struct GraphvizVisitor<'a, 'b> {
142    f: &'a mut fmt::Formatter<'b>,
143    graphviz_builder: GraphvizBuilder,
144    /// If true, includes summarized schema information
145    with_schema: bool,
146
147    /// Holds the ids (as generated from `graphviz_builder` of all
148    /// parent nodes
149    parent_ids: Vec<usize>,
150}
151
152impl<'a, 'b> GraphvizVisitor<'a, 'b> {
153    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
154        Self {
155            f,
156            graphviz_builder: GraphvizBuilder::default(),
157            with_schema: false,
158            parent_ids: Vec::new(),
159        }
160    }
161
162    /// Sets a flag which controls if the output schema is displayed
163    pub fn set_with_schema(&mut self, with_schema: bool) {
164        self.with_schema = with_schema;
165    }
166
167    pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
168        self.graphviz_builder.start_cluster(self.f, label)
169    }
170
171    pub fn post_visit_plan(&mut self) -> fmt::Result {
172        self.graphviz_builder.end_cluster(self.f)
173    }
174
175    pub fn start_graph(&mut self) -> fmt::Result {
176        self.graphviz_builder.start_graph(self.f)
177    }
178
179    pub fn end_graph(&mut self) -> fmt::Result {
180        self.graphviz_builder.end_graph(self.f)
181    }
182}
183
184impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
185    type Node = LogicalPlan;
186
187    fn f_down(
188        &mut self,
189        plan: &'n LogicalPlan,
190    ) -> datafusion_common::Result<TreeNodeRecursion> {
191        let id = self.graphviz_builder.next_id();
192
193        // Create a new graph node for `plan` such as
194        // id [label="foo"]
195        let label = if self.with_schema {
196            format!(
197                r"{}\nSchema: {}",
198                plan.display(),
199                display_schema(&plan.schema().as_ref().to_owned().into())
200            )
201        } else {
202            format!("{}", plan.display())
203        };
204
205        self.graphviz_builder
206            .add_node(self.f, id, &label, None)
207            .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?;
208
209        // Create an edge to our parent node, if any
210        //  parent_id -> id
211        if let Some(parent_id) = self.parent_ids.last() {
212            self.graphviz_builder
213                .add_edge(self.f, *parent_id, id)
214                .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?;
215        }
216
217        self.parent_ids.push(id);
218        Ok(TreeNodeRecursion::Continue)
219    }
220
221    fn f_up(
222        &mut self,
223        _plan: &LogicalPlan,
224    ) -> datafusion_common::Result<TreeNodeRecursion> {
225        // always be non-empty as pre_visit always pushes
226        // So it should always be Ok(true)
227        let res = self.parent_ids.pop();
228        res.ok_or(DataFusionError::Internal("Fail to format".to_string()))
229            .map(|_| TreeNodeRecursion::Continue)
230    }
231}
232
233/// Formats plans to display as postgresql plan json format.
234///
235/// There are already many existing visualizer for this format, for example [dalibo](https://explain.dalibo.com/).
236/// Unfortunately, there is no formal spec for this format, but it is widely used in the PostgreSQL community.
237///
238/// Here is an example of the format:
239///
240/// ```json
241/// [
242///     {
243///         "Plan": {
244///             "Node Type": "Sort",
245///             "Output": [
246///                 "question_1.id",
247///                 "question_1.title",
248///                 "question_1.text",
249///                 "question_1.file",
250///                 "question_1.type",
251///                 "question_1.source",
252///                 "question_1.exam_id"
253///             ],
254///             "Sort Key": [
255///                 "question_1.id"
256///             ],
257///             "Plans": [
258///                 {
259///                     "Node Type": "Seq Scan",
260///                     "Parent Relationship": "Left",
261///                     "Relation Name": "question",
262///                     "Schema": "public",
263///                     "Alias": "question_1",
264///                     "Output": [
265///                        "question_1.id",
266///                         "question_1.title",
267///                        "question_1.text",
268///                         "question_1.file",
269///                         "question_1.type",
270///                         "question_1.source",
271///                         "question_1.exam_id"
272///                     ],
273///                     "Filter": "(question_1.exam_id = 1)"
274///                 }
275///             ]
276///         }
277///     }
278/// ]
279/// ```
280pub struct PgJsonVisitor<'a, 'b> {
281    f: &'a mut fmt::Formatter<'b>,
282
283    /// A mapping from plan node id to the plan node json representation.
284    objects: HashMap<u32, serde_json::Value>,
285
286    next_id: u32,
287
288    /// If true, includes summarized schema information
289    with_schema: bool,
290
291    /// Holds the ids (as generated from `graphviz_builder` of all
292    /// parent nodes
293    parent_ids: Vec<u32>,
294}
295
296impl<'a, 'b> PgJsonVisitor<'a, 'b> {
297    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
298        Self {
299            f,
300            objects: HashMap::new(),
301            next_id: 0,
302            with_schema: false,
303            parent_ids: Vec::new(),
304        }
305    }
306
307    /// Sets a flag which controls if the output schema is displayed
308    pub fn with_schema(&mut self, with_schema: bool) {
309        self.with_schema = with_schema;
310    }
311
312    /// Converts a logical plan node to a json object.
313    fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
314        match node {
315            LogicalPlan::EmptyRelation(_) => {
316                json!({
317                    "Node Type": "EmptyRelation",
318                })
319            }
320            LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
321                json!({
322                    "Node Type": "RecursiveQuery",
323                    "Is Distinct": is_distinct,
324                })
325            }
326            LogicalPlan::Values(Values { ref values, .. }) => {
327                let str_values = values
328                    .iter()
329                    // limit to only 5 values to avoid horrible display
330                    .take(5)
331                    .map(|row| {
332                        let item = row
333                            .iter()
334                            .map(|expr| expr.to_string())
335                            .collect::<Vec<_>>()
336                            .join(", ");
337                        format!("({item})")
338                    })
339                    .collect::<Vec<_>>()
340                    .join(", ");
341
342                let eclipse = if values.len() > 5 { "..." } else { "" };
343
344                let values_str = format!("{str_values}{eclipse}");
345                json!({
346                    "Node Type": "Values",
347                    "Values": values_str
348                })
349            }
350            LogicalPlan::TableScan(TableScan {
351                ref source,
352                ref table_name,
353                ref filters,
354                ref fetch,
355                ..
356            }) => {
357                let mut object = json!({
358                    "Node Type": "TableScan",
359                    "Relation Name": table_name.table(),
360                });
361
362                if let Some(s) = table_name.schema() {
363                    object["Schema"] = serde_json::Value::String(s.to_string());
364                }
365
366                if let Some(c) = table_name.catalog() {
367                    object["Catalog"] = serde_json::Value::String(c.to_string());
368                }
369
370                if !filters.is_empty() {
371                    let mut full_filter = vec![];
372                    let mut partial_filter = vec![];
373                    let mut unsupported_filters = vec![];
374                    let filters: Vec<&Expr> = filters.iter().collect();
375
376                    if let Ok(results) = source.supports_filters_pushdown(&filters) {
377                        filters.iter().zip(results.iter()).for_each(
378                            |(x, res)| match res {
379                                TableProviderFilterPushDown::Exact => full_filter.push(x),
380                                TableProviderFilterPushDown::Inexact => {
381                                    partial_filter.push(x)
382                                }
383                                TableProviderFilterPushDown::Unsupported => {
384                                    unsupported_filters.push(x)
385                                }
386                            },
387                        );
388                    }
389
390                    if !full_filter.is_empty() {
391                        object["Full Filters"] =
392                            serde_json::Value::String(expr_vec_fmt!(full_filter));
393                    };
394                    if !partial_filter.is_empty() {
395                        object["Partial Filters"] =
396                            serde_json::Value::String(expr_vec_fmt!(partial_filter));
397                    }
398                    if !unsupported_filters.is_empty() {
399                        object["Unsupported Filters"] =
400                            serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
401                    }
402                }
403
404                if let Some(f) = fetch {
405                    object["Fetch"] = serde_json::Value::Number((*f).into());
406                }
407
408                object
409            }
410            LogicalPlan::Projection(Projection { ref expr, .. }) => {
411                json!({
412                    "Node Type": "Projection",
413                    "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
414                })
415            }
416            LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
417                json!({
418                    "Node Type": "Projection",
419                    "Operation": op.name(),
420                    "Table Name": table_name.table()
421                })
422            }
423            LogicalPlan::Copy(CopyTo {
424                input: _,
425                output_url,
426                file_type,
427                partition_by: _,
428                options,
429                output_schema: _,
430            }) => {
431                let op_str = options
432                    .iter()
433                    .map(|(k, v)| format!("{k}={v}"))
434                    .collect::<Vec<_>>()
435                    .join(", ");
436                json!({
437                    "Node Type": "CopyTo",
438                    "Output URL": output_url,
439                    "File Type": format!("{}", file_type.get_ext()),
440                    "Options": op_str
441                })
442            }
443            LogicalPlan::Ddl(ddl) => {
444                json!({
445                    "Node Type": "Ddl",
446                    "Operation": format!("{}", ddl.display())
447                })
448            }
449            LogicalPlan::Filter(Filter {
450                predicate: ref expr,
451                ..
452            }) => {
453                json!({
454                    "Node Type": "Filter",
455                    "Condition": format!("{}", expr)
456                })
457            }
458            LogicalPlan::Window(Window {
459                ref window_expr, ..
460            }) => {
461                json!({
462                    "Node Type": "WindowAggr",
463                    "Expressions": expr_vec_fmt!(window_expr)
464                })
465            }
466            LogicalPlan::Aggregate(Aggregate {
467                ref group_expr,
468                ref aggr_expr,
469                ..
470            }) => {
471                json!({
472                    "Node Type": "Aggregate",
473                    "Group By": expr_vec_fmt!(group_expr),
474                    "Aggregates": expr_vec_fmt!(aggr_expr)
475                })
476            }
477            LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
478                let mut object = json!({
479                    "Node Type": "Sort",
480                    "Sort Key": expr_vec_fmt!(expr),
481                });
482
483                if let Some(fetch) = fetch {
484                    object["Fetch"] = serde_json::Value::Number((*fetch).into());
485                }
486
487                object
488            }
489            LogicalPlan::Join(Join {
490                on: ref keys,
491                filter,
492                join_constraint,
493                join_type,
494                ..
495            }) => {
496                let join_expr: Vec<String> =
497                    keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
498                let filter_expr = filter
499                    .as_ref()
500                    .map(|expr| format!(" Filter: {expr}"))
501                    .unwrap_or_else(|| "".to_string());
502                json!({
503                    "Node Type": format!("{} Join", join_type),
504                    "Join Constraint": format!("{:?}", join_constraint),
505                    "Join Keys": join_expr.join(", "),
506                    "Filter": format!("{}", filter_expr)
507                })
508            }
509            LogicalPlan::Repartition(Repartition {
510                partitioning_scheme,
511                ..
512            }) => match partitioning_scheme {
513                Partitioning::RoundRobinBatch(n) => {
514                    json!({
515                        "Node Type": "Repartition",
516                        "Partitioning Scheme": "RoundRobinBatch",
517                        "Partition Count": n
518                    })
519                }
520                Partitioning::Hash(expr, n) => {
521                    let hash_expr: Vec<String> =
522                        expr.iter().map(|e| format!("{e}")).collect();
523
524                    json!({
525                        "Node Type": "Repartition",
526                        "Partitioning Scheme": "Hash",
527                        "Partition Count": n,
528                        "Partitioning Key": hash_expr
529                    })
530                }
531                Partitioning::DistributeBy(expr) => {
532                    let dist_by_expr: Vec<String> =
533                        expr.iter().map(|e| format!("{e}")).collect();
534                    json!({
535                        "Node Type": "Repartition",
536                        "Partitioning Scheme": "DistributeBy",
537                        "Partitioning Key": dist_by_expr
538                    })
539                }
540            },
541            LogicalPlan::Limit(Limit {
542                ref skip,
543                ref fetch,
544                ..
545            }) => {
546                let mut object = serde_json::json!(
547                    {
548                        "Node Type": "Limit",
549                    }
550                );
551                if let Some(s) = skip {
552                    object["Skip"] = s.to_string().into()
553                };
554                if let Some(f) = fetch {
555                    object["Fetch"] = f.to_string().into()
556                };
557                object
558            }
559            LogicalPlan::Subquery(Subquery { .. }) => {
560                json!({
561                    "Node Type": "Subquery"
562                })
563            }
564            LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
565                json!({
566                    "Node Type": "Subquery",
567                    "Alias": alias.table(),
568                })
569            }
570            LogicalPlan::Statement(statement) => {
571                json!({
572                    "Node Type": "Statement",
573                    "Statement": format!("{}", statement.display())
574                })
575            }
576            LogicalPlan::Distinct(distinct) => match distinct {
577                Distinct::All(_) => {
578                    json!({
579                        "Node Type": "DistinctAll"
580                    })
581                }
582                Distinct::On(DistinctOn {
583                    on_expr,
584                    select_expr,
585                    sort_expr,
586                    ..
587                }) => {
588                    let mut object = json!({
589                        "Node Type": "DistinctOn",
590                        "On": expr_vec_fmt!(on_expr),
591                        "Select": expr_vec_fmt!(select_expr),
592                    });
593                    if let Some(sort_expr) = sort_expr {
594                        object["Sort"] =
595                            serde_json::Value::String(expr_vec_fmt!(sort_expr));
596                    }
597
598                    object
599                }
600            },
601            LogicalPlan::Explain { .. } => {
602                json!({
603                    "Node Type": "Explain"
604                })
605            }
606            LogicalPlan::Analyze { .. } => {
607                json!({
608                    "Node Type": "Analyze"
609                })
610            }
611            LogicalPlan::Union(_) => {
612                json!({
613                    "Node Type": "Union"
614                })
615            }
616            LogicalPlan::Extension(e) => {
617                json!({
618                    "Node Type": e.node.name(),
619                    "Detail": format!("{:?}", e.node)
620                })
621            }
622            LogicalPlan::DescribeTable(DescribeTable { .. }) => {
623                json!({
624                    "Node Type": "DescribeTable"
625                })
626            }
627            LogicalPlan::Unnest(Unnest {
628                input: plan,
629                list_type_columns: list_col_indices,
630                struct_type_columns: struct_col_indices,
631                ..
632            }) => {
633                let input_columns = plan.schema().columns();
634                let list_type_columns = list_col_indices
635                    .iter()
636                    .map(|(i, unnest_info)| {
637                        format!(
638                            "{}|depth={:?}",
639                            &input_columns[*i].to_string(),
640                            unnest_info.depth
641                        )
642                    })
643                    .collect::<Vec<String>>();
644                let struct_type_columns = struct_col_indices
645                    .iter()
646                    .map(|i| &input_columns[*i])
647                    .collect::<Vec<&Column>>();
648                json!({
649                    "Node Type": "Unnest",
650                    "ListColumn": expr_vec_fmt!(list_type_columns),
651                    "StructColumn": expr_vec_fmt!(struct_type_columns),
652                })
653            }
654        }
655    }
656}
657
658impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
659    type Node = LogicalPlan;
660
661    fn f_down(
662        &mut self,
663        node: &'n LogicalPlan,
664    ) -> datafusion_common::Result<TreeNodeRecursion> {
665        let id = self.next_id;
666        self.next_id += 1;
667        let mut object = Self::to_json_value(node);
668
669        object["Plans"] = serde_json::Value::Array(vec![]);
670
671        if self.with_schema {
672            object["Output"] = serde_json::Value::Array(
673                node.schema()
674                    .fields()
675                    .iter()
676                    .map(|f| f.name().to_string())
677                    .map(serde_json::Value::String)
678                    .collect(),
679            );
680        };
681
682        self.objects.insert(id, object);
683        self.parent_ids.push(id);
684        Ok(TreeNodeRecursion::Continue)
685    }
686
687    fn f_up(
688        &mut self,
689        _node: &Self::Node,
690    ) -> datafusion_common::Result<TreeNodeRecursion> {
691        let id = self.parent_ids.pop().unwrap();
692
693        let current_node = self.objects.remove(&id).ok_or_else(|| {
694            DataFusionError::Internal("Missing current node!".to_string())
695        })?;
696
697        if let Some(parent_id) = self.parent_ids.last() {
698            let parent_node = self
699                .objects
700                .get_mut(parent_id)
701                .expect("Missing parent node!");
702            let plans = parent_node
703                .get_mut("Plans")
704                .and_then(|p| p.as_array_mut())
705                .expect("Plans should be an array");
706
707            plans.push(current_node);
708        } else {
709            // This is the root node
710            let plan = serde_json::json!([{"Plan": current_node}]);
711            write!(
712                self.f,
713                "{}",
714                serde_json::to_string_pretty(&plan)
715                    .map_err(|e| DataFusionError::External(Box::new(e)))?
716            )?;
717        }
718
719        Ok(TreeNodeRecursion::Continue)
720    }
721}
722
723#[cfg(test)]
724mod tests {
725    use arrow::datatypes::{DataType, Field};
726    use insta::assert_snapshot;
727
728    use super::*;
729
730    #[test]
731    fn test_display_empty_schema() {
732        let schema = Schema::empty();
733        assert_snapshot!(display_schema(&schema), @"[]");
734    }
735
736    #[test]
737    fn test_display_schema() {
738        let schema = Schema::new(vec![
739            Field::new("id", DataType::Int32, false),
740            Field::new("first_name", DataType::Utf8, true),
741        ]);
742
743        assert_snapshot!(display_schema(&schema), @"[id:Int32, first_name:Utf8;N]");
744    }
745}