datafusion_expr/logical_plan/
display.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides logic for displaying LogicalPlans in various styles
19
20use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24    Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr, Filter, Join,
25    Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort,
26    Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan, Unnest, Values,
27    Window, expr_vec_fmt,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{Column, DataFusionError, internal_datafusion_err};
35use serde_json::json;
36
37/// Formats plans with a single line per node. For example:
38///
39/// Projection: id
40///    Filter: state Eq Utf8(\"CO\")\
41///       CsvScan: employee.csv projection=Some([0, 3])";
42pub struct IndentVisitor<'a, 'b> {
43    f: &'a mut fmt::Formatter<'b>,
44    /// If true, includes summarized schema information
45    with_schema: bool,
46    /// The current indent
47    indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51    /// Create a visitor that will write a formatted LogicalPlan to f. If `with_schema` is
52    /// true, includes schema information on each line.
53    pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54        Self {
55            f,
56            with_schema,
57            indent: 0,
58        }
59    }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63    type Node = LogicalPlan;
64
65    fn f_down(
66        &mut self,
67        plan: &'n LogicalPlan,
68    ) -> datafusion_common::Result<TreeNodeRecursion> {
69        if self.indent > 0 {
70            writeln!(self.f)?;
71        }
72        write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73        write!(self.f, "{}", plan.display())?;
74        if self.with_schema {
75            write!(self.f, " {}", display_schema(plan.schema().as_arrow()))?;
76        }
77
78        self.indent += 1;
79        Ok(TreeNodeRecursion::Continue)
80    }
81
82    fn f_up(
83        &mut self,
84        _plan: &'n LogicalPlan,
85    ) -> datafusion_common::Result<TreeNodeRecursion> {
86        self.indent -= 1;
87        Ok(TreeNodeRecursion::Continue)
88    }
89}
90
91/// Print the schema in a compact representation to `buf`
92///
93/// For example: `foo:Utf8` if `foo` can not be null, and
94/// `foo:Utf8;N` if `foo` is nullable.
95///
96/// ```
97/// use arrow::datatypes::{DataType, Field, Schema};
98/// # use datafusion_expr::logical_plan::display_schema;
99/// let schema = Schema::new(vec![
100///     Field::new("id", DataType::Int32, false),
101///     Field::new("first_name", DataType::Utf8, true),
102/// ]);
103///
104/// assert_eq!(
105///     "[id:Int32, first_name:Utf8;N]",
106///     format!("{}", display_schema(&schema))
107/// );
108/// ```
109pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
110    struct Wrapper<'a>(&'a Schema);
111
112    impl fmt::Display for Wrapper<'_> {
113        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114            write!(f, "[")?;
115            for (idx, field) in self.0.fields().iter().enumerate() {
116                if idx > 0 {
117                    write!(f, ", ")?;
118                }
119                let nullable_str = if field.is_nullable() { ";N" } else { "" };
120                write!(
121                    f,
122                    "{}:{:?}{}",
123                    field.name(),
124                    field.data_type(),
125                    nullable_str
126                )?;
127            }
128            write!(f, "]")
129        }
130    }
131    Wrapper(schema)
132}
133
134/// Formats plans for graphical display using the `DOT` language. This
135/// format can be visualized using software from
136/// [`graphviz`](https://graphviz.org/)
137pub struct GraphvizVisitor<'a, 'b> {
138    f: &'a mut fmt::Formatter<'b>,
139    graphviz_builder: GraphvizBuilder,
140    /// If true, includes summarized schema information
141    with_schema: bool,
142
143    /// Holds the ids (as generated from `graphviz_builder` of all
144    /// parent nodes
145    parent_ids: Vec<usize>,
146}
147
148impl<'a, 'b> GraphvizVisitor<'a, 'b> {
149    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
150        Self {
151            f,
152            graphviz_builder: GraphvizBuilder::default(),
153            with_schema: false,
154            parent_ids: Vec::new(),
155        }
156    }
157
158    /// Sets a flag which controls if the output schema is displayed
159    pub fn set_with_schema(&mut self, with_schema: bool) {
160        self.with_schema = with_schema;
161    }
162
163    pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
164        self.graphviz_builder.start_cluster(self.f, label)
165    }
166
167    pub fn post_visit_plan(&mut self) -> fmt::Result {
168        self.graphviz_builder.end_cluster(self.f)
169    }
170
171    pub fn start_graph(&mut self) -> fmt::Result {
172        self.graphviz_builder.start_graph(self.f)
173    }
174
175    pub fn end_graph(&mut self) -> fmt::Result {
176        self.graphviz_builder.end_graph(self.f)
177    }
178}
179
180impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
181    type Node = LogicalPlan;
182
183    fn f_down(
184        &mut self,
185        plan: &'n LogicalPlan,
186    ) -> datafusion_common::Result<TreeNodeRecursion> {
187        let id = self.graphviz_builder.next_id();
188
189        // Create a new graph node for `plan` such as
190        // id [label="foo"]
191        let label = if self.with_schema {
192            format!(
193                r"{}\nSchema: {}",
194                plan.display(),
195                display_schema(plan.schema().as_arrow())
196            )
197        } else {
198            format!("{}", plan.display())
199        };
200
201        self.graphviz_builder
202            .add_node(self.f, id, &label, None)
203            .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
204
205        // Create an edge to our parent node, if any
206        //  parent_id -> id
207        if let Some(parent_id) = self.parent_ids.last() {
208            self.graphviz_builder
209                .add_edge(self.f, *parent_id, id)
210                .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
211        }
212
213        self.parent_ids.push(id);
214        Ok(TreeNodeRecursion::Continue)
215    }
216
217    fn f_up(
218        &mut self,
219        _plan: &LogicalPlan,
220    ) -> datafusion_common::Result<TreeNodeRecursion> {
221        // always be non-empty as pre_visit always pushes
222        // So it should always be Ok(true)
223        let res = self.parent_ids.pop();
224        res.ok_or(internal_datafusion_err!("Fail to format"))
225            .map(|_| TreeNodeRecursion::Continue)
226    }
227}
228
229/// Formats plans to display as postgresql plan json format.
230///
231/// There are already many existing visualizer for this format, for example [dalibo](https://explain.dalibo.com/).
232/// Unfortunately, there is no formal spec for this format, but it is widely used in the PostgreSQL community.
233///
234/// Here is an example of the format:
235///
236/// ```json
237/// [
238///     {
239///         "Plan": {
240///             "Node Type": "Sort",
241///             "Output": [
242///                 "question_1.id",
243///                 "question_1.title",
244///                 "question_1.text",
245///                 "question_1.file",
246///                 "question_1.type",
247///                 "question_1.source",
248///                 "question_1.exam_id"
249///             ],
250///             "Sort Key": [
251///                 "question_1.id"
252///             ],
253///             "Plans": [
254///                 {
255///                     "Node Type": "Seq Scan",
256///                     "Parent Relationship": "Left",
257///                     "Relation Name": "question",
258///                     "Schema": "public",
259///                     "Alias": "question_1",
260///                     "Output": [
261///                        "question_1.id",
262///                         "question_1.title",
263///                        "question_1.text",
264///                         "question_1.file",
265///                         "question_1.type",
266///                         "question_1.source",
267///                         "question_1.exam_id"
268///                     ],
269///                     "Filter": "(question_1.exam_id = 1)"
270///                 }
271///             ]
272///         }
273///     }
274/// ]
275/// ```
276pub struct PgJsonVisitor<'a, 'b> {
277    f: &'a mut fmt::Formatter<'b>,
278
279    /// A mapping from plan node id to the plan node json representation.
280    objects: HashMap<u32, serde_json::Value>,
281
282    next_id: u32,
283
284    /// If true, includes summarized schema information
285    with_schema: bool,
286
287    /// Holds the ids (as generated from `graphviz_builder` of all
288    /// parent nodes
289    parent_ids: Vec<u32>,
290}
291
292impl<'a, 'b> PgJsonVisitor<'a, 'b> {
293    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
294        Self {
295            f,
296            objects: HashMap::new(),
297            next_id: 0,
298            with_schema: false,
299            parent_ids: Vec::new(),
300        }
301    }
302
303    /// Sets a flag which controls if the output schema is displayed
304    pub fn with_schema(&mut self, with_schema: bool) {
305        self.with_schema = with_schema;
306    }
307
308    /// Converts a logical plan node to a json object.
309    fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
310        match node {
311            LogicalPlan::EmptyRelation(_) => {
312                json!({
313                    "Node Type": "EmptyRelation",
314                })
315            }
316            LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
317                json!({
318                    "Node Type": "RecursiveQuery",
319                    "Is Distinct": is_distinct,
320                })
321            }
322            LogicalPlan::Values(Values { values, .. }) => {
323                let str_values = values
324                    .iter()
325                    // limit to only 5 values to avoid horrible display
326                    .take(5)
327                    .map(|row| {
328                        let item = row
329                            .iter()
330                            .map(|expr| expr.to_string())
331                            .collect::<Vec<_>>()
332                            .join(", ");
333                        format!("({item})")
334                    })
335                    .collect::<Vec<_>>()
336                    .join(", ");
337
338                let eclipse = if values.len() > 5 { "..." } else { "" };
339
340                let values_str = format!("{str_values}{eclipse}");
341                json!({
342                    "Node Type": "Values",
343                    "Values": values_str
344                })
345            }
346            LogicalPlan::TableScan(TableScan {
347                source,
348                table_name,
349                filters,
350                fetch,
351                ..
352            }) => {
353                let mut object = json!({
354                    "Node Type": "TableScan",
355                    "Relation Name": table_name.table(),
356                });
357
358                if let Some(s) = table_name.schema() {
359                    object["Schema"] = serde_json::Value::String(s.to_string());
360                }
361
362                if let Some(c) = table_name.catalog() {
363                    object["Catalog"] = serde_json::Value::String(c.to_string());
364                }
365
366                if !filters.is_empty() {
367                    let mut full_filter = vec![];
368                    let mut partial_filter = vec![];
369                    let mut unsupported_filters = vec![];
370                    let filters: Vec<&Expr> = filters.iter().collect();
371
372                    if let Ok(results) = source.supports_filters_pushdown(&filters) {
373                        filters.iter().zip(results.iter()).for_each(
374                            |(x, res)| match res {
375                                TableProviderFilterPushDown::Exact => full_filter.push(x),
376                                TableProviderFilterPushDown::Inexact => {
377                                    partial_filter.push(x)
378                                }
379                                TableProviderFilterPushDown::Unsupported => {
380                                    unsupported_filters.push(x)
381                                }
382                            },
383                        );
384                    }
385
386                    if !full_filter.is_empty() {
387                        object["Full Filters"] =
388                            serde_json::Value::String(expr_vec_fmt!(full_filter));
389                    };
390                    if !partial_filter.is_empty() {
391                        object["Partial Filters"] =
392                            serde_json::Value::String(expr_vec_fmt!(partial_filter));
393                    }
394                    if !unsupported_filters.is_empty() {
395                        object["Unsupported Filters"] =
396                            serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
397                    }
398                }
399
400                if let Some(f) = fetch {
401                    object["Fetch"] = serde_json::Value::Number((*f).into());
402                }
403
404                object
405            }
406            LogicalPlan::Projection(Projection { expr, .. }) => {
407                json!({
408                    "Node Type": "Projection",
409                    "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
410                })
411            }
412            LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
413                json!({
414                    "Node Type": "Projection",
415                    "Operation": op.name(),
416                    "Table Name": table_name.table()
417                })
418            }
419            LogicalPlan::Copy(CopyTo {
420                input: _,
421                output_url,
422                file_type,
423                partition_by: _,
424                options,
425                output_schema: _,
426            }) => {
427                let op_str = options
428                    .iter()
429                    .map(|(k, v)| format!("{k}={v}"))
430                    .collect::<Vec<_>>()
431                    .join(", ");
432                json!({
433                    "Node Type": "CopyTo",
434                    "Output URL": output_url,
435                    "File Type": format!("{}", file_type.get_ext()),
436                    "Options": op_str
437                })
438            }
439            LogicalPlan::Ddl(ddl) => {
440                json!({
441                    "Node Type": "Ddl",
442                    "Operation": format!("{}", ddl.display())
443                })
444            }
445            LogicalPlan::Filter(Filter {
446                predicate: expr, ..
447            }) => {
448                json!({
449                    "Node Type": "Filter",
450                    "Condition": format!("{}", expr)
451                })
452            }
453            LogicalPlan::Window(Window { window_expr, .. }) => {
454                json!({
455                    "Node Type": "WindowAggr",
456                    "Expressions": expr_vec_fmt!(window_expr)
457                })
458            }
459            LogicalPlan::Aggregate(Aggregate {
460                group_expr,
461                aggr_expr,
462                ..
463            }) => {
464                json!({
465                    "Node Type": "Aggregate",
466                    "Group By": expr_vec_fmt!(group_expr),
467                    "Aggregates": expr_vec_fmt!(aggr_expr)
468                })
469            }
470            LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
471                let mut object = json!({
472                    "Node Type": "Sort",
473                    "Sort Key": expr_vec_fmt!(expr),
474                });
475
476                if let Some(fetch) = fetch {
477                    object["Fetch"] = serde_json::Value::Number((*fetch).into());
478                }
479
480                object
481            }
482            LogicalPlan::Join(Join {
483                on: keys,
484                filter,
485                join_constraint,
486                join_type,
487                ..
488            }) => {
489                let join_expr: Vec<String> =
490                    keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
491                let filter_expr = filter
492                    .as_ref()
493                    .map(|expr| format!(" Filter: {expr}"))
494                    .unwrap_or_else(|| "".to_string());
495                json!({
496                    "Node Type": format!("{} Join", join_type),
497                    "Join Constraint": format!("{:?}", join_constraint),
498                    "Join Keys": join_expr.join(", "),
499                    "Filter": format!("{}", filter_expr)
500                })
501            }
502            LogicalPlan::Repartition(Repartition {
503                partitioning_scheme,
504                ..
505            }) => match partitioning_scheme {
506                Partitioning::RoundRobinBatch(n) => {
507                    json!({
508                        "Node Type": "Repartition",
509                        "Partitioning Scheme": "RoundRobinBatch",
510                        "Partition Count": n
511                    })
512                }
513                Partitioning::Hash(expr, n) => {
514                    let hash_expr: Vec<String> =
515                        expr.iter().map(|e| format!("{e}")).collect();
516
517                    json!({
518                        "Node Type": "Repartition",
519                        "Partitioning Scheme": "Hash",
520                        "Partition Count": n,
521                        "Partitioning Key": hash_expr
522                    })
523                }
524                Partitioning::DistributeBy(expr) => {
525                    let dist_by_expr: Vec<String> =
526                        expr.iter().map(|e| format!("{e}")).collect();
527                    json!({
528                        "Node Type": "Repartition",
529                        "Partitioning Scheme": "DistributeBy",
530                        "Partitioning Key": dist_by_expr
531                    })
532                }
533            },
534            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
535                let mut object = serde_json::json!(
536                    {
537                        "Node Type": "Limit",
538                    }
539                );
540                if let Some(s) = skip {
541                    object["Skip"] = s.to_string().into()
542                };
543                if let Some(f) = fetch {
544                    object["Fetch"] = f.to_string().into()
545                };
546                object
547            }
548            LogicalPlan::Subquery(Subquery { .. }) => {
549                json!({
550                    "Node Type": "Subquery"
551                })
552            }
553            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
554                json!({
555                    "Node Type": "Subquery",
556                    "Alias": alias.table(),
557                })
558            }
559            LogicalPlan::Statement(statement) => {
560                json!({
561                    "Node Type": "Statement",
562                    "Statement": format!("{}", statement.display())
563                })
564            }
565            LogicalPlan::Distinct(distinct) => match distinct {
566                Distinct::All(_) => {
567                    json!({
568                        "Node Type": "DistinctAll"
569                    })
570                }
571                Distinct::On(DistinctOn {
572                    on_expr,
573                    select_expr,
574                    sort_expr,
575                    ..
576                }) => {
577                    let mut object = json!({
578                        "Node Type": "DistinctOn",
579                        "On": expr_vec_fmt!(on_expr),
580                        "Select": expr_vec_fmt!(select_expr),
581                    });
582                    if let Some(sort_expr) = sort_expr {
583                        object["Sort"] =
584                            serde_json::Value::String(expr_vec_fmt!(sort_expr));
585                    }
586
587                    object
588                }
589            },
590            LogicalPlan::Explain { .. } => {
591                json!({
592                    "Node Type": "Explain"
593                })
594            }
595            LogicalPlan::Analyze { .. } => {
596                json!({
597                    "Node Type": "Analyze"
598                })
599            }
600            LogicalPlan::Union(_) => {
601                json!({
602                    "Node Type": "Union"
603                })
604            }
605            LogicalPlan::Extension(e) => {
606                json!({
607                    "Node Type": e.node.name(),
608                    "Detail": format!("{:?}", e.node)
609                })
610            }
611            LogicalPlan::DescribeTable(DescribeTable { .. }) => {
612                json!({
613                    "Node Type": "DescribeTable"
614                })
615            }
616            LogicalPlan::Unnest(Unnest {
617                input: plan,
618                list_type_columns: list_col_indices,
619                struct_type_columns: struct_col_indices,
620                ..
621            }) => {
622                let input_columns = plan.schema().columns();
623                let list_type_columns = list_col_indices
624                    .iter()
625                    .map(|(i, unnest_info)| {
626                        format!(
627                            "{}|depth={:?}",
628                            &input_columns[*i].to_string(),
629                            unnest_info.depth
630                        )
631                    })
632                    .collect::<Vec<String>>();
633                let struct_type_columns = struct_col_indices
634                    .iter()
635                    .map(|i| &input_columns[*i])
636                    .collect::<Vec<&Column>>();
637                json!({
638                    "Node Type": "Unnest",
639                    "ListColumn": expr_vec_fmt!(list_type_columns),
640                    "StructColumn": expr_vec_fmt!(struct_type_columns),
641                })
642            }
643        }
644    }
645}
646
647impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
648    type Node = LogicalPlan;
649
650    fn f_down(
651        &mut self,
652        node: &'n LogicalPlan,
653    ) -> datafusion_common::Result<TreeNodeRecursion> {
654        let id = self.next_id;
655        self.next_id += 1;
656        let mut object = Self::to_json_value(node);
657
658        object["Plans"] = serde_json::Value::Array(vec![]);
659
660        if self.with_schema {
661            object["Output"] = serde_json::Value::Array(
662                node.schema()
663                    .fields()
664                    .iter()
665                    .map(|f| f.name().to_string())
666                    .map(serde_json::Value::String)
667                    .collect(),
668            );
669        };
670
671        self.objects.insert(id, object);
672        self.parent_ids.push(id);
673        Ok(TreeNodeRecursion::Continue)
674    }
675
676    fn f_up(
677        &mut self,
678        _node: &Self::Node,
679    ) -> datafusion_common::Result<TreeNodeRecursion> {
680        let id = self.parent_ids.pop().unwrap();
681
682        let current_node = self
683            .objects
684            .remove(&id)
685            .ok_or_else(|| internal_datafusion_err!("Missing current node!"))?;
686
687        if let Some(parent_id) = self.parent_ids.last() {
688            let parent_node = self
689                .objects
690                .get_mut(parent_id)
691                .expect("Missing parent node!");
692            let plans = parent_node
693                .get_mut("Plans")
694                .and_then(|p| p.as_array_mut())
695                .expect("Plans should be an array");
696
697            plans.push(current_node);
698        } else {
699            // This is the root node
700            let plan = serde_json::json!([{"Plan": current_node}]);
701            write!(
702                self.f,
703                "{}",
704                serde_json::to_string_pretty(&plan)
705                    .map_err(|e| DataFusionError::External(Box::new(e)))?
706            )?;
707        }
708
709        Ok(TreeNodeRecursion::Continue)
710    }
711}
712
713#[cfg(test)]
714mod tests {
715    use arrow::datatypes::{DataType, Field};
716    use insta::assert_snapshot;
717
718    use super::*;
719
720    #[test]
721    fn test_display_empty_schema() {
722        let schema = Schema::empty();
723        assert_snapshot!(display_schema(&schema), @"[]");
724    }
725
726    #[test]
727    fn test_display_schema() {
728        let schema = Schema::new(vec![
729            Field::new("id", DataType::Int32, false),
730            Field::new("first_name", DataType::Utf8, true),
731        ]);
732
733        assert_snapshot!(display_schema(&schema), @"[id:Int32, first_name:Utf8;N]");
734    }
735}