datafusion_expr/logical_plan/
display.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides logic for displaying LogicalPlans in various styles
19
20use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24    expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
25    Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
26    Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
27    Unnest, Values, Window,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{internal_datafusion_err, Column, DataFusionError};
35use serde_json::json;
36
37/// Formats plans with a single line per node. For example:
38///
39/// Projection: id
40///    Filter: state Eq Utf8(\"CO\")\
41///       CsvScan: employee.csv projection=Some([0, 3])";
42pub struct IndentVisitor<'a, 'b> {
43    f: &'a mut fmt::Formatter<'b>,
44    /// If true, includes summarized schema information
45    with_schema: bool,
46    /// The current indent
47    indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51    /// Create a visitor that will write a formatted LogicalPlan to f. If `with_schema` is
52    /// true, includes schema information on each line.
53    pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54        Self {
55            f,
56            with_schema,
57            indent: 0,
58        }
59    }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63    type Node = LogicalPlan;
64
65    fn f_down(
66        &mut self,
67        plan: &'n LogicalPlan,
68    ) -> datafusion_common::Result<TreeNodeRecursion> {
69        if self.indent > 0 {
70            writeln!(self.f)?;
71        }
72        write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73        write!(self.f, "{}", plan.display())?;
74        if self.with_schema {
75            write!(self.f, " {}", display_schema(plan.schema().as_arrow()))?;
76        }
77
78        self.indent += 1;
79        Ok(TreeNodeRecursion::Continue)
80    }
81
82    fn f_up(
83        &mut self,
84        _plan: &'n LogicalPlan,
85    ) -> datafusion_common::Result<TreeNodeRecursion> {
86        self.indent -= 1;
87        Ok(TreeNodeRecursion::Continue)
88    }
89}
90
91/// Print the schema in a compact representation to `buf`
92///
93/// For example: `foo:Utf8` if `foo` can not be null, and
94/// `foo:Utf8;N` if `foo` is nullable.
95///
96/// ```
97/// use arrow::datatypes::{DataType, Field, Schema};
98/// # use datafusion_expr::logical_plan::display_schema;
99/// let schema = Schema::new(vec![
100///     Field::new("id", DataType::Int32, false),
101///     Field::new("first_name", DataType::Utf8, true),
102/// ]);
103///
104/// assert_eq!(
105///     "[id:Int32, first_name:Utf8;N]",
106///     format!("{}", display_schema(&schema))
107/// );
108/// ```
109pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
110    struct Wrapper<'a>(&'a Schema);
111
112    impl fmt::Display for Wrapper<'_> {
113        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114            write!(f, "[")?;
115            for (idx, field) in self.0.fields().iter().enumerate() {
116                if idx > 0 {
117                    write!(f, ", ")?;
118                }
119                let nullable_str = if field.is_nullable() { ";N" } else { "" };
120                write!(
121                    f,
122                    "{}:{:?}{}",
123                    field.name(),
124                    field.data_type(),
125                    nullable_str
126                )?;
127            }
128            write!(f, "]")
129        }
130    }
131    Wrapper(schema)
132}
133
134/// Formats plans for graphical display using the `DOT` language. This
135/// format can be visualized using software from
136/// [`graphviz`](https://graphviz.org/)
137pub struct GraphvizVisitor<'a, 'b> {
138    f: &'a mut fmt::Formatter<'b>,
139    graphviz_builder: GraphvizBuilder,
140    /// If true, includes summarized schema information
141    with_schema: bool,
142
143    /// Holds the ids (as generated from `graphviz_builder` of all
144    /// parent nodes
145    parent_ids: Vec<usize>,
146}
147
148impl<'a, 'b> GraphvizVisitor<'a, 'b> {
149    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
150        Self {
151            f,
152            graphviz_builder: GraphvizBuilder::default(),
153            with_schema: false,
154            parent_ids: Vec::new(),
155        }
156    }
157
158    /// Sets a flag which controls if the output schema is displayed
159    pub fn set_with_schema(&mut self, with_schema: bool) {
160        self.with_schema = with_schema;
161    }
162
163    pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
164        self.graphviz_builder.start_cluster(self.f, label)
165    }
166
167    pub fn post_visit_plan(&mut self) -> fmt::Result {
168        self.graphviz_builder.end_cluster(self.f)
169    }
170
171    pub fn start_graph(&mut self) -> fmt::Result {
172        self.graphviz_builder.start_graph(self.f)
173    }
174
175    pub fn end_graph(&mut self) -> fmt::Result {
176        self.graphviz_builder.end_graph(self.f)
177    }
178}
179
180impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
181    type Node = LogicalPlan;
182
183    fn f_down(
184        &mut self,
185        plan: &'n LogicalPlan,
186    ) -> datafusion_common::Result<TreeNodeRecursion> {
187        let id = self.graphviz_builder.next_id();
188
189        // Create a new graph node for `plan` such as
190        // id [label="foo"]
191        let label = if self.with_schema {
192            format!(
193                r"{}\nSchema: {}",
194                plan.display(),
195                display_schema(plan.schema().as_arrow())
196            )
197        } else {
198            format!("{}", plan.display())
199        };
200
201        self.graphviz_builder
202            .add_node(self.f, id, &label, None)
203            .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
204
205        // Create an edge to our parent node, if any
206        //  parent_id -> id
207        if let Some(parent_id) = self.parent_ids.last() {
208            self.graphviz_builder
209                .add_edge(self.f, *parent_id, id)
210                .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
211        }
212
213        self.parent_ids.push(id);
214        Ok(TreeNodeRecursion::Continue)
215    }
216
217    fn f_up(
218        &mut self,
219        _plan: &LogicalPlan,
220    ) -> datafusion_common::Result<TreeNodeRecursion> {
221        // always be non-empty as pre_visit always pushes
222        // So it should always be Ok(true)
223        let res = self.parent_ids.pop();
224        res.ok_or(internal_datafusion_err!("Fail to format"))
225            .map(|_| TreeNodeRecursion::Continue)
226    }
227}
228
229/// Formats plans to display as postgresql plan json format.
230///
231/// There are already many existing visualizer for this format, for example [dalibo](https://explain.dalibo.com/).
232/// Unfortunately, there is no formal spec for this format, but it is widely used in the PostgreSQL community.
233///
234/// Here is an example of the format:
235///
236/// ```json
237/// [
238///     {
239///         "Plan": {
240///             "Node Type": "Sort",
241///             "Output": [
242///                 "question_1.id",
243///                 "question_1.title",
244///                 "question_1.text",
245///                 "question_1.file",
246///                 "question_1.type",
247///                 "question_1.source",
248///                 "question_1.exam_id"
249///             ],
250///             "Sort Key": [
251///                 "question_1.id"
252///             ],
253///             "Plans": [
254///                 {
255///                     "Node Type": "Seq Scan",
256///                     "Parent Relationship": "Left",
257///                     "Relation Name": "question",
258///                     "Schema": "public",
259///                     "Alias": "question_1",
260///                     "Output": [
261///                        "question_1.id",
262///                         "question_1.title",
263///                        "question_1.text",
264///                         "question_1.file",
265///                         "question_1.type",
266///                         "question_1.source",
267///                         "question_1.exam_id"
268///                     ],
269///                     "Filter": "(question_1.exam_id = 1)"
270///                 }
271///             ]
272///         }
273///     }
274/// ]
275/// ```
276pub struct PgJsonVisitor<'a, 'b> {
277    f: &'a mut fmt::Formatter<'b>,
278
279    /// A mapping from plan node id to the plan node json representation.
280    objects: HashMap<u32, serde_json::Value>,
281
282    next_id: u32,
283
284    /// If true, includes summarized schema information
285    with_schema: bool,
286
287    /// Holds the ids (as generated from `graphviz_builder` of all
288    /// parent nodes
289    parent_ids: Vec<u32>,
290}
291
292impl<'a, 'b> PgJsonVisitor<'a, 'b> {
293    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
294        Self {
295            f,
296            objects: HashMap::new(),
297            next_id: 0,
298            with_schema: false,
299            parent_ids: Vec::new(),
300        }
301    }
302
303    /// Sets a flag which controls if the output schema is displayed
304    pub fn with_schema(&mut self, with_schema: bool) {
305        self.with_schema = with_schema;
306    }
307
308    /// Converts a logical plan node to a json object.
309    fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
310        match node {
311            LogicalPlan::EmptyRelation(_) => {
312                json!({
313                    "Node Type": "EmptyRelation",
314                })
315            }
316            LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
317                json!({
318                    "Node Type": "RecursiveQuery",
319                    "Is Distinct": is_distinct,
320                })
321            }
322            LogicalPlan::Values(Values { ref values, .. }) => {
323                let str_values = values
324                    .iter()
325                    // limit to only 5 values to avoid horrible display
326                    .take(5)
327                    .map(|row| {
328                        let item = row
329                            .iter()
330                            .map(|expr| expr.to_string())
331                            .collect::<Vec<_>>()
332                            .join(", ");
333                        format!("({item})")
334                    })
335                    .collect::<Vec<_>>()
336                    .join(", ");
337
338                let eclipse = if values.len() > 5 { "..." } else { "" };
339
340                let values_str = format!("{str_values}{eclipse}");
341                json!({
342                    "Node Type": "Values",
343                    "Values": values_str
344                })
345            }
346            LogicalPlan::TableScan(TableScan {
347                ref source,
348                ref table_name,
349                ref filters,
350                ref fetch,
351                ..
352            }) => {
353                let mut object = json!({
354                    "Node Type": "TableScan",
355                    "Relation Name": table_name.table(),
356                });
357
358                if let Some(s) = table_name.schema() {
359                    object["Schema"] = serde_json::Value::String(s.to_string());
360                }
361
362                if let Some(c) = table_name.catalog() {
363                    object["Catalog"] = serde_json::Value::String(c.to_string());
364                }
365
366                if !filters.is_empty() {
367                    let mut full_filter = vec![];
368                    let mut partial_filter = vec![];
369                    let mut unsupported_filters = vec![];
370                    let filters: Vec<&Expr> = filters.iter().collect();
371
372                    if let Ok(results) = source.supports_filters_pushdown(&filters) {
373                        filters.iter().zip(results.iter()).for_each(
374                            |(x, res)| match res {
375                                TableProviderFilterPushDown::Exact => full_filter.push(x),
376                                TableProviderFilterPushDown::Inexact => {
377                                    partial_filter.push(x)
378                                }
379                                TableProviderFilterPushDown::Unsupported => {
380                                    unsupported_filters.push(x)
381                                }
382                            },
383                        );
384                    }
385
386                    if !full_filter.is_empty() {
387                        object["Full Filters"] =
388                            serde_json::Value::String(expr_vec_fmt!(full_filter));
389                    };
390                    if !partial_filter.is_empty() {
391                        object["Partial Filters"] =
392                            serde_json::Value::String(expr_vec_fmt!(partial_filter));
393                    }
394                    if !unsupported_filters.is_empty() {
395                        object["Unsupported Filters"] =
396                            serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
397                    }
398                }
399
400                if let Some(f) = fetch {
401                    object["Fetch"] = serde_json::Value::Number((*f).into());
402                }
403
404                object
405            }
406            LogicalPlan::Projection(Projection { ref expr, .. }) => {
407                json!({
408                    "Node Type": "Projection",
409                    "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
410                })
411            }
412            LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
413                json!({
414                    "Node Type": "Projection",
415                    "Operation": op.name(),
416                    "Table Name": table_name.table()
417                })
418            }
419            LogicalPlan::Copy(CopyTo {
420                input: _,
421                output_url,
422                file_type,
423                partition_by: _,
424                options,
425                output_schema: _,
426            }) => {
427                let op_str = options
428                    .iter()
429                    .map(|(k, v)| format!("{k}={v}"))
430                    .collect::<Vec<_>>()
431                    .join(", ");
432                json!({
433                    "Node Type": "CopyTo",
434                    "Output URL": output_url,
435                    "File Type": format!("{}", file_type.get_ext()),
436                    "Options": op_str
437                })
438            }
439            LogicalPlan::Ddl(ddl) => {
440                json!({
441                    "Node Type": "Ddl",
442                    "Operation": format!("{}", ddl.display())
443                })
444            }
445            LogicalPlan::Filter(Filter {
446                predicate: ref expr,
447                ..
448            }) => {
449                json!({
450                    "Node Type": "Filter",
451                    "Condition": format!("{}", expr)
452                })
453            }
454            LogicalPlan::Window(Window {
455                ref window_expr, ..
456            }) => {
457                json!({
458                    "Node Type": "WindowAggr",
459                    "Expressions": expr_vec_fmt!(window_expr)
460                })
461            }
462            LogicalPlan::Aggregate(Aggregate {
463                ref group_expr,
464                ref aggr_expr,
465                ..
466            }) => {
467                json!({
468                    "Node Type": "Aggregate",
469                    "Group By": expr_vec_fmt!(group_expr),
470                    "Aggregates": expr_vec_fmt!(aggr_expr)
471                })
472            }
473            LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
474                let mut object = json!({
475                    "Node Type": "Sort",
476                    "Sort Key": expr_vec_fmt!(expr),
477                });
478
479                if let Some(fetch) = fetch {
480                    object["Fetch"] = serde_json::Value::Number((*fetch).into());
481                }
482
483                object
484            }
485            LogicalPlan::Join(Join {
486                on: ref keys,
487                filter,
488                join_constraint,
489                join_type,
490                ..
491            }) => {
492                let join_expr: Vec<String> =
493                    keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
494                let filter_expr = filter
495                    .as_ref()
496                    .map(|expr| format!(" Filter: {expr}"))
497                    .unwrap_or_else(|| "".to_string());
498                json!({
499                    "Node Type": format!("{} Join", join_type),
500                    "Join Constraint": format!("{:?}", join_constraint),
501                    "Join Keys": join_expr.join(", "),
502                    "Filter": format!("{}", filter_expr)
503                })
504            }
505            LogicalPlan::Repartition(Repartition {
506                partitioning_scheme,
507                ..
508            }) => match partitioning_scheme {
509                Partitioning::RoundRobinBatch(n) => {
510                    json!({
511                        "Node Type": "Repartition",
512                        "Partitioning Scheme": "RoundRobinBatch",
513                        "Partition Count": n
514                    })
515                }
516                Partitioning::Hash(expr, n) => {
517                    let hash_expr: Vec<String> =
518                        expr.iter().map(|e| format!("{e}")).collect();
519
520                    json!({
521                        "Node Type": "Repartition",
522                        "Partitioning Scheme": "Hash",
523                        "Partition Count": n,
524                        "Partitioning Key": hash_expr
525                    })
526                }
527                Partitioning::DistributeBy(expr) => {
528                    let dist_by_expr: Vec<String> =
529                        expr.iter().map(|e| format!("{e}")).collect();
530                    json!({
531                        "Node Type": "Repartition",
532                        "Partitioning Scheme": "DistributeBy",
533                        "Partitioning Key": dist_by_expr
534                    })
535                }
536            },
537            LogicalPlan::Limit(Limit {
538                ref skip,
539                ref fetch,
540                ..
541            }) => {
542                let mut object = serde_json::json!(
543                    {
544                        "Node Type": "Limit",
545                    }
546                );
547                if let Some(s) = skip {
548                    object["Skip"] = s.to_string().into()
549                };
550                if let Some(f) = fetch {
551                    object["Fetch"] = f.to_string().into()
552                };
553                object
554            }
555            LogicalPlan::Subquery(Subquery { .. }) => {
556                json!({
557                    "Node Type": "Subquery"
558                })
559            }
560            LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
561                json!({
562                    "Node Type": "Subquery",
563                    "Alias": alias.table(),
564                })
565            }
566            LogicalPlan::Statement(statement) => {
567                json!({
568                    "Node Type": "Statement",
569                    "Statement": format!("{}", statement.display())
570                })
571            }
572            LogicalPlan::Distinct(distinct) => match distinct {
573                Distinct::All(_) => {
574                    json!({
575                        "Node Type": "DistinctAll"
576                    })
577                }
578                Distinct::On(DistinctOn {
579                    on_expr,
580                    select_expr,
581                    sort_expr,
582                    ..
583                }) => {
584                    let mut object = json!({
585                        "Node Type": "DistinctOn",
586                        "On": expr_vec_fmt!(on_expr),
587                        "Select": expr_vec_fmt!(select_expr),
588                    });
589                    if let Some(sort_expr) = sort_expr {
590                        object["Sort"] =
591                            serde_json::Value::String(expr_vec_fmt!(sort_expr));
592                    }
593
594                    object
595                }
596            },
597            LogicalPlan::Explain { .. } => {
598                json!({
599                    "Node Type": "Explain"
600                })
601            }
602            LogicalPlan::Analyze { .. } => {
603                json!({
604                    "Node Type": "Analyze"
605                })
606            }
607            LogicalPlan::Union(_) => {
608                json!({
609                    "Node Type": "Union"
610                })
611            }
612            LogicalPlan::Extension(e) => {
613                json!({
614                    "Node Type": e.node.name(),
615                    "Detail": format!("{:?}", e.node)
616                })
617            }
618            LogicalPlan::DescribeTable(DescribeTable { .. }) => {
619                json!({
620                    "Node Type": "DescribeTable"
621                })
622            }
623            LogicalPlan::Unnest(Unnest {
624                input: plan,
625                list_type_columns: list_col_indices,
626                struct_type_columns: struct_col_indices,
627                ..
628            }) => {
629                let input_columns = plan.schema().columns();
630                let list_type_columns = list_col_indices
631                    .iter()
632                    .map(|(i, unnest_info)| {
633                        format!(
634                            "{}|depth={:?}",
635                            &input_columns[*i].to_string(),
636                            unnest_info.depth
637                        )
638                    })
639                    .collect::<Vec<String>>();
640                let struct_type_columns = struct_col_indices
641                    .iter()
642                    .map(|i| &input_columns[*i])
643                    .collect::<Vec<&Column>>();
644                json!({
645                    "Node Type": "Unnest",
646                    "ListColumn": expr_vec_fmt!(list_type_columns),
647                    "StructColumn": expr_vec_fmt!(struct_type_columns),
648                })
649            }
650        }
651    }
652}
653
654impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
655    type Node = LogicalPlan;
656
657    fn f_down(
658        &mut self,
659        node: &'n LogicalPlan,
660    ) -> datafusion_common::Result<TreeNodeRecursion> {
661        let id = self.next_id;
662        self.next_id += 1;
663        let mut object = Self::to_json_value(node);
664
665        object["Plans"] = serde_json::Value::Array(vec![]);
666
667        if self.with_schema {
668            object["Output"] = serde_json::Value::Array(
669                node.schema()
670                    .fields()
671                    .iter()
672                    .map(|f| f.name().to_string())
673                    .map(serde_json::Value::String)
674                    .collect(),
675            );
676        };
677
678        self.objects.insert(id, object);
679        self.parent_ids.push(id);
680        Ok(TreeNodeRecursion::Continue)
681    }
682
683    fn f_up(
684        &mut self,
685        _node: &Self::Node,
686    ) -> datafusion_common::Result<TreeNodeRecursion> {
687        let id = self.parent_ids.pop().unwrap();
688
689        let current_node = self
690            .objects
691            .remove(&id)
692            .ok_or_else(|| internal_datafusion_err!("Missing current node!"))?;
693
694        if let Some(parent_id) = self.parent_ids.last() {
695            let parent_node = self
696                .objects
697                .get_mut(parent_id)
698                .expect("Missing parent node!");
699            let plans = parent_node
700                .get_mut("Plans")
701                .and_then(|p| p.as_array_mut())
702                .expect("Plans should be an array");
703
704            plans.push(current_node);
705        } else {
706            // This is the root node
707            let plan = serde_json::json!([{"Plan": current_node}]);
708            write!(
709                self.f,
710                "{}",
711                serde_json::to_string_pretty(&plan)
712                    .map_err(|e| DataFusionError::External(Box::new(e)))?
713            )?;
714        }
715
716        Ok(TreeNodeRecursion::Continue)
717    }
718}
719
720#[cfg(test)]
721mod tests {
722    use arrow::datatypes::{DataType, Field};
723    use insta::assert_snapshot;
724
725    use super::*;
726
727    #[test]
728    fn test_display_empty_schema() {
729        let schema = Schema::empty();
730        assert_snapshot!(display_schema(&schema), @"[]");
731    }
732
733    #[test]
734    fn test_display_schema() {
735        let schema = Schema::new(vec![
736            Field::new("id", DataType::Int32, false),
737            Field::new("first_name", DataType::Utf8, true),
738        ]);
739
740        assert_snapshot!(display_schema(&schema), @"[id:Int32, first_name:Utf8;N]");
741    }
742}