Skip to main content

datafusion_expr/logical_plan/
display.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides logic for displaying LogicalPlans in various styles
19
20use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24    Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr, Filter, Join,
25    Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort,
26    Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan, Unnest, Values,
27    Window, expr_vec_fmt,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{Column, DataFusionError, internal_datafusion_err};
35use serde_json::json;
36
37/// Formats plans with a single line per node. For example:
38///
39/// Projection: id
40///    Filter: state Eq Utf8(\"CO\")\
41///       CsvScan: employee.csv projection=Some([0, 3])";
42pub struct IndentVisitor<'a, 'b> {
43    f: &'a mut fmt::Formatter<'b>,
44    /// If true, includes summarized schema information
45    with_schema: bool,
46    /// The current indent
47    indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51    /// Create a visitor that will write a formatted LogicalPlan to f. If `with_schema` is
52    /// true, includes schema information on each line.
53    pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54        Self {
55            f,
56            with_schema,
57            indent: 0,
58        }
59    }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63    type Node = LogicalPlan;
64
65    fn f_down(
66        &mut self,
67        plan: &'n LogicalPlan,
68    ) -> datafusion_common::Result<TreeNodeRecursion> {
69        if self.indent > 0 {
70            writeln!(self.f)?;
71        }
72        write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73        write!(self.f, "{}", plan.display())?;
74        if self.with_schema {
75            write!(self.f, " {}", display_schema(plan.schema().as_arrow()))?;
76        }
77
78        self.indent += 1;
79        Ok(TreeNodeRecursion::Continue)
80    }
81
82    fn f_up(
83        &mut self,
84        _plan: &'n LogicalPlan,
85    ) -> datafusion_common::Result<TreeNodeRecursion> {
86        self.indent -= 1;
87        Ok(TreeNodeRecursion::Continue)
88    }
89}
90
91/// Print the schema in a compact representation to `buf`
92///
93/// For example: `foo:Utf8` if `foo` can not be null, and
94/// `foo:Utf8;N` if `foo` is nullable.
95///
96/// ```
97/// use arrow::datatypes::{DataType, Field, Schema};
98/// # use datafusion_expr::logical_plan::display_schema;
99/// let schema = Schema::new(vec![
100///     Field::new("id", DataType::Int32, false),
101///     Field::new("first_name", DataType::Utf8, true),
102/// ]);
103///
104/// assert_eq!(
105///     "[id:Int32, first_name:Utf8;N]",
106///     format!("{}", display_schema(&schema))
107/// );
108/// ```
109pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
110    struct Wrapper<'a>(&'a Schema);
111
112    impl fmt::Display for Wrapper<'_> {
113        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114            write!(f, "[")?;
115            for (idx, field) in self.0.fields().iter().enumerate() {
116                if idx > 0 {
117                    write!(f, ", ")?;
118                }
119                let nullable_str = if field.is_nullable() { ";N" } else { "" };
120                write!(f, "{}:{}{}", field.name(), field.data_type(), nullable_str)?;
121            }
122            write!(f, "]")
123        }
124    }
125    Wrapper(schema)
126}
127
128/// Formats plans for graphical display using the `DOT` language. This
129/// format can be visualized using software from
130/// [`graphviz`](https://graphviz.org/)
131pub struct GraphvizVisitor<'a, 'b> {
132    f: &'a mut fmt::Formatter<'b>,
133    graphviz_builder: GraphvizBuilder,
134    /// If true, includes summarized schema information
135    with_schema: bool,
136
137    /// Holds the ids (as generated from `graphviz_builder` of all
138    /// parent nodes
139    parent_ids: Vec<usize>,
140}
141
142impl<'a, 'b> GraphvizVisitor<'a, 'b> {
143    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
144        Self {
145            f,
146            graphviz_builder: GraphvizBuilder::default(),
147            with_schema: false,
148            parent_ids: Vec::new(),
149        }
150    }
151
152    /// Sets a flag which controls if the output schema is displayed
153    pub fn set_with_schema(&mut self, with_schema: bool) {
154        self.with_schema = with_schema;
155    }
156
157    pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
158        self.graphviz_builder.start_cluster(self.f, label)
159    }
160
161    pub fn post_visit_plan(&mut self) -> fmt::Result {
162        self.graphviz_builder.end_cluster(self.f)
163    }
164
165    pub fn start_graph(&mut self) -> fmt::Result {
166        self.graphviz_builder.start_graph(self.f)
167    }
168
169    pub fn end_graph(&mut self) -> fmt::Result {
170        self.graphviz_builder.end_graph(self.f)
171    }
172}
173
174impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
175    type Node = LogicalPlan;
176
177    fn f_down(
178        &mut self,
179        plan: &'n LogicalPlan,
180    ) -> datafusion_common::Result<TreeNodeRecursion> {
181        let id = self.graphviz_builder.next_id();
182
183        // Create a new graph node for `plan` such as
184        // id [label="foo"]
185        let label = if self.with_schema {
186            format!(
187                r"{}\nSchema: {}",
188                plan.display(),
189                display_schema(plan.schema().as_arrow())
190            )
191        } else {
192            format!("{}", plan.display())
193        };
194
195        self.graphviz_builder
196            .add_node(self.f, id, &label, None)
197            .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
198
199        // Create an edge to our parent node, if any
200        //  parent_id -> id
201        if let Some(parent_id) = self.parent_ids.last() {
202            self.graphviz_builder
203                .add_edge(self.f, *parent_id, id)
204                .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
205        }
206
207        self.parent_ids.push(id);
208        Ok(TreeNodeRecursion::Continue)
209    }
210
211    fn f_up(
212        &mut self,
213        _plan: &LogicalPlan,
214    ) -> datafusion_common::Result<TreeNodeRecursion> {
215        // always be non-empty as pre_visit always pushes
216        // So it should always be Ok(true)
217        let res = self.parent_ids.pop();
218        res.ok_or(internal_datafusion_err!("Fail to format"))
219            .map(|_| TreeNodeRecursion::Continue)
220    }
221}
222
223/// Formats plans to display as postgresql plan json format.
224///
225/// There are already many existing visualizer for this format, for example [dalibo](https://explain.dalibo.com/).
226/// Unfortunately, there is no formal spec for this format, but it is widely used in the PostgreSQL community.
227///
228/// Here is an example of the format:
229///
230/// ```json
231/// [
232///     {
233///         "Plan": {
234///             "Node Type": "Sort",
235///             "Output": [
236///                 "question_1.id",
237///                 "question_1.title",
238///                 "question_1.text",
239///                 "question_1.file",
240///                 "question_1.type",
241///                 "question_1.source",
242///                 "question_1.exam_id"
243///             ],
244///             "Sort Key": [
245///                 "question_1.id"
246///             ],
247///             "Plans": [
248///                 {
249///                     "Node Type": "Seq Scan",
250///                     "Parent Relationship": "Left",
251///                     "Relation Name": "question",
252///                     "Schema": "public",
253///                     "Alias": "question_1",
254///                     "Output": [
255///                        "question_1.id",
256///                         "question_1.title",
257///                        "question_1.text",
258///                         "question_1.file",
259///                         "question_1.type",
260///                         "question_1.source",
261///                         "question_1.exam_id"
262///                     ],
263///                     "Filter": "(question_1.exam_id = 1)"
264///                 }
265///             ]
266///         }
267///     }
268/// ]
269/// ```
270pub struct PgJsonVisitor<'a, 'b> {
271    f: &'a mut fmt::Formatter<'b>,
272
273    /// A mapping from plan node id to the plan node json representation.
274    objects: HashMap<u32, serde_json::Value>,
275
276    next_id: u32,
277
278    /// If true, includes summarized schema information
279    with_schema: bool,
280
281    /// Holds the ids (as generated from `graphviz_builder` of all
282    /// parent nodes
283    parent_ids: Vec<u32>,
284}
285
286impl<'a, 'b> PgJsonVisitor<'a, 'b> {
287    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
288        Self {
289            f,
290            objects: HashMap::new(),
291            next_id: 0,
292            with_schema: false,
293            parent_ids: Vec::new(),
294        }
295    }
296
297    /// Sets a flag which controls if the output schema is displayed
298    pub fn with_schema(&mut self, with_schema: bool) {
299        self.with_schema = with_schema;
300    }
301
302    /// Converts a logical plan node to a json object.
303    fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
304        match node {
305            LogicalPlan::EmptyRelation(_) => {
306                json!({
307                    "Node Type": "EmptyRelation",
308                })
309            }
310            LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
311                json!({
312                    "Node Type": "RecursiveQuery",
313                    "Is Distinct": is_distinct,
314                })
315            }
316            LogicalPlan::Values(Values { values, .. }) => {
317                let str_values = values
318                    .iter()
319                    // limit to only 5 values to avoid horrible display
320                    .take(5)
321                    .map(|row| {
322                        let item = row
323                            .iter()
324                            .map(|expr| expr.to_string())
325                            .collect::<Vec<_>>()
326                            .join(", ");
327                        format!("({item})")
328                    })
329                    .collect::<Vec<_>>()
330                    .join(", ");
331
332                let eclipse = if values.len() > 5 { "..." } else { "" };
333
334                let values_str = format!("{str_values}{eclipse}");
335                json!({
336                    "Node Type": "Values",
337                    "Values": values_str
338                })
339            }
340            LogicalPlan::TableScan(TableScan {
341                source,
342                table_name,
343                filters,
344                fetch,
345                ..
346            }) => {
347                let mut object = json!({
348                    "Node Type": "TableScan",
349                    "Relation Name": table_name.table(),
350                });
351
352                if let Some(s) = table_name.schema() {
353                    object["Schema"] = serde_json::Value::String(s.to_string());
354                }
355
356                if let Some(c) = table_name.catalog() {
357                    object["Catalog"] = serde_json::Value::String(c.to_string());
358                }
359
360                if !filters.is_empty() {
361                    let mut full_filter = vec![];
362                    let mut partial_filter = vec![];
363                    let mut unsupported_filters = vec![];
364                    let filters: Vec<&Expr> = filters.iter().collect();
365
366                    if let Ok(results) = source.supports_filters_pushdown(&filters) {
367                        filters.iter().zip(results.iter()).for_each(
368                            |(x, res)| match res {
369                                TableProviderFilterPushDown::Exact => full_filter.push(x),
370                                TableProviderFilterPushDown::Inexact => {
371                                    partial_filter.push(x)
372                                }
373                                TableProviderFilterPushDown::Unsupported => {
374                                    unsupported_filters.push(x)
375                                }
376                            },
377                        );
378                    }
379
380                    if !full_filter.is_empty() {
381                        object["Full Filters"] =
382                            serde_json::Value::String(expr_vec_fmt!(full_filter));
383                    };
384                    if !partial_filter.is_empty() {
385                        object["Partial Filters"] =
386                            serde_json::Value::String(expr_vec_fmt!(partial_filter));
387                    }
388                    if !unsupported_filters.is_empty() {
389                        object["Unsupported Filters"] =
390                            serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
391                    }
392                }
393
394                if let Some(f) = fetch {
395                    object["Fetch"] = serde_json::Value::Number((*f).into());
396                }
397
398                object
399            }
400            LogicalPlan::Projection(Projection { expr, .. }) => {
401                json!({
402                    "Node Type": "Projection",
403                    "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
404                })
405            }
406            LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
407                json!({
408                    "Node Type": "Projection",
409                    "Operation": op.name(),
410                    "Table Name": table_name.table()
411                })
412            }
413            LogicalPlan::Copy(CopyTo {
414                input: _,
415                output_url,
416                file_type,
417                partition_by: _,
418                options,
419                output_schema: _,
420            }) => {
421                let op_str = options
422                    .iter()
423                    .map(|(k, v)| format!("{k}={v}"))
424                    .collect::<Vec<_>>()
425                    .join(", ");
426                json!({
427                    "Node Type": "CopyTo",
428                    "Output URL": output_url,
429                    "File Type": format!("{}", file_type.get_ext()),
430                    "Options": op_str
431                })
432            }
433            LogicalPlan::Ddl(ddl) => {
434                json!({
435                    "Node Type": "Ddl",
436                    "Operation": format!("{}", ddl.display())
437                })
438            }
439            LogicalPlan::Filter(Filter {
440                predicate: expr, ..
441            }) => {
442                json!({
443                    "Node Type": "Filter",
444                    "Condition": format!("{}", expr)
445                })
446            }
447            LogicalPlan::Window(Window { window_expr, .. }) => {
448                json!({
449                    "Node Type": "WindowAggr",
450                    "Expressions": expr_vec_fmt!(window_expr)
451                })
452            }
453            LogicalPlan::Aggregate(Aggregate {
454                group_expr,
455                aggr_expr,
456                ..
457            }) => {
458                json!({
459                    "Node Type": "Aggregate",
460                    "Group By": expr_vec_fmt!(group_expr),
461                    "Aggregates": expr_vec_fmt!(aggr_expr)
462                })
463            }
464            LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
465                let mut object = json!({
466                    "Node Type": "Sort",
467                    "Sort Key": expr_vec_fmt!(expr),
468                });
469
470                if let Some(fetch) = fetch {
471                    object["Fetch"] = serde_json::Value::Number((*fetch).into());
472                }
473
474                object
475            }
476            LogicalPlan::Join(Join {
477                on: keys,
478                filter,
479                join_constraint,
480                join_type,
481                ..
482            }) => {
483                let join_expr: Vec<String> =
484                    keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
485                let filter_expr = filter
486                    .as_ref()
487                    .map(|expr| format!(" Filter: {expr}"))
488                    .unwrap_or_else(|| "".to_string());
489                json!({
490                    "Node Type": format!("{} Join", join_type),
491                    "Join Constraint": format!("{:?}", join_constraint),
492                    "Join Keys": join_expr.join(", "),
493                    "Filter": format!("{}", filter_expr)
494                })
495            }
496            LogicalPlan::Repartition(Repartition {
497                partitioning_scheme,
498                ..
499            }) => match partitioning_scheme {
500                Partitioning::RoundRobinBatch(n) => {
501                    json!({
502                        "Node Type": "Repartition",
503                        "Partitioning Scheme": "RoundRobinBatch",
504                        "Partition Count": n
505                    })
506                }
507                Partitioning::Hash(expr, n) => {
508                    let hash_expr: Vec<String> =
509                        expr.iter().map(|e| format!("{e}")).collect();
510
511                    json!({
512                        "Node Type": "Repartition",
513                        "Partitioning Scheme": "Hash",
514                        "Partition Count": n,
515                        "Partitioning Key": hash_expr
516                    })
517                }
518                Partitioning::DistributeBy(expr) => {
519                    let dist_by_expr: Vec<String> =
520                        expr.iter().map(|e| format!("{e}")).collect();
521                    json!({
522                        "Node Type": "Repartition",
523                        "Partitioning Scheme": "DistributeBy",
524                        "Partitioning Key": dist_by_expr
525                    })
526                }
527            },
528            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
529                let mut object = serde_json::json!(
530                    {
531                        "Node Type": "Limit",
532                    }
533                );
534                if let Some(s) = skip {
535                    object["Skip"] = s.to_string().into()
536                };
537                if let Some(f) = fetch {
538                    object["Fetch"] = f.to_string().into()
539                };
540                object
541            }
542            LogicalPlan::Subquery(Subquery { .. }) => {
543                json!({
544                    "Node Type": "Subquery"
545                })
546            }
547            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
548                json!({
549                    "Node Type": "Subquery",
550                    "Alias": alias.table(),
551                })
552            }
553            LogicalPlan::Statement(statement) => {
554                json!({
555                    "Node Type": "Statement",
556                    "Statement": format!("{}", statement.display())
557                })
558            }
559            LogicalPlan::Distinct(distinct) => match distinct {
560                Distinct::All(_) => {
561                    json!({
562                        "Node Type": "DistinctAll"
563                    })
564                }
565                Distinct::On(DistinctOn {
566                    on_expr,
567                    select_expr,
568                    sort_expr,
569                    ..
570                }) => {
571                    let mut object = json!({
572                        "Node Type": "DistinctOn",
573                        "On": expr_vec_fmt!(on_expr),
574                        "Select": expr_vec_fmt!(select_expr),
575                    });
576                    if let Some(sort_expr) = sort_expr {
577                        object["Sort"] =
578                            serde_json::Value::String(expr_vec_fmt!(sort_expr));
579                    }
580
581                    object
582                }
583            },
584            LogicalPlan::Explain { .. } => {
585                json!({
586                    "Node Type": "Explain"
587                })
588            }
589            LogicalPlan::Analyze { .. } => {
590                json!({
591                    "Node Type": "Analyze"
592                })
593            }
594            LogicalPlan::Union(_) => {
595                json!({
596                    "Node Type": "Union"
597                })
598            }
599            LogicalPlan::Extension(e) => {
600                json!({
601                    "Node Type": e.node.name(),
602                    "Detail": format!("{:?}", e.node)
603                })
604            }
605            LogicalPlan::DescribeTable(DescribeTable { .. }) => {
606                json!({
607                    "Node Type": "DescribeTable"
608                })
609            }
610            LogicalPlan::Unnest(Unnest {
611                input: plan,
612                list_type_columns: list_col_indices,
613                struct_type_columns: struct_col_indices,
614                ..
615            }) => {
616                let input_columns = plan.schema().columns();
617                let list_type_columns = list_col_indices
618                    .iter()
619                    .map(|(i, unnest_info)| {
620                        format!(
621                            "{}|depth={:?}",
622                            &input_columns[*i].to_string(),
623                            unnest_info.depth
624                        )
625                    })
626                    .collect::<Vec<String>>();
627                let struct_type_columns = struct_col_indices
628                    .iter()
629                    .map(|i| &input_columns[*i])
630                    .collect::<Vec<&Column>>();
631                json!({
632                    "Node Type": "Unnest",
633                    "ListColumn": expr_vec_fmt!(list_type_columns),
634                    "StructColumn": expr_vec_fmt!(struct_type_columns),
635                })
636            }
637        }
638    }
639}
640
641impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
642    type Node = LogicalPlan;
643
644    fn f_down(
645        &mut self,
646        node: &'n LogicalPlan,
647    ) -> datafusion_common::Result<TreeNodeRecursion> {
648        let id = self.next_id;
649        self.next_id += 1;
650        let mut object = Self::to_json_value(node);
651
652        object["Plans"] = serde_json::Value::Array(vec![]);
653
654        if self.with_schema {
655            object["Output"] = serde_json::Value::Array(
656                node.schema()
657                    .fields()
658                    .iter()
659                    .map(|f| f.name().to_string())
660                    .map(serde_json::Value::String)
661                    .collect(),
662            );
663        };
664
665        self.objects.insert(id, object);
666        self.parent_ids.push(id);
667        Ok(TreeNodeRecursion::Continue)
668    }
669
670    fn f_up(
671        &mut self,
672        _node: &Self::Node,
673    ) -> datafusion_common::Result<TreeNodeRecursion> {
674        let id = self.parent_ids.pop().unwrap();
675
676        let current_node = self
677            .objects
678            .remove(&id)
679            .ok_or_else(|| internal_datafusion_err!("Missing current node!"))?;
680
681        if let Some(parent_id) = self.parent_ids.last() {
682            let parent_node = self
683                .objects
684                .get_mut(parent_id)
685                .expect("Missing parent node!");
686            let plans = parent_node
687                .get_mut("Plans")
688                .and_then(|p| p.as_array_mut())
689                .expect("Plans should be an array");
690
691            plans.push(current_node);
692        } else {
693            // This is the root node
694            let plan = serde_json::json!([{"Plan": current_node}]);
695            write!(
696                self.f,
697                "{}",
698                serde_json::to_string_pretty(&plan)
699                    .map_err(|e| DataFusionError::External(Box::new(e)))?
700            )?;
701        }
702
703        Ok(TreeNodeRecursion::Continue)
704    }
705}
706
707#[cfg(test)]
708mod tests {
709    use arrow::datatypes::{DataType, Field};
710    use insta::assert_snapshot;
711
712    use super::*;
713
714    #[test]
715    fn test_display_empty_schema() {
716        let schema = Schema::empty();
717        assert_snapshot!(display_schema(&schema), @"[]");
718    }
719
720    #[test]
721    fn test_display_schema() {
722        let schema = Schema::new(vec![
723            Field::new("id", DataType::Int32, false),
724            Field::new("first_name", DataType::Utf8, true),
725        ]);
726
727        assert_snapshot!(display_schema(&schema), @"[id:Int32, first_name:Utf8;N]");
728    }
729}