1use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24 expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
25 Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
26 Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
27 Unnest, Values, Window,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{internal_datafusion_err, Column, DataFusionError};
35use serde_json::json;
36
37pub struct IndentVisitor<'a, 'b> {
43 f: &'a mut fmt::Formatter<'b>,
44 with_schema: bool,
46 indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51 pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54 Self {
55 f,
56 with_schema,
57 indent: 0,
58 }
59 }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63 type Node = LogicalPlan;
64
65 fn f_down(
66 &mut self,
67 plan: &'n LogicalPlan,
68 ) -> datafusion_common::Result<TreeNodeRecursion> {
69 if self.indent > 0 {
70 writeln!(self.f)?;
71 }
72 write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73 write!(self.f, "{}", plan.display())?;
74 if self.with_schema {
75 write!(self.f, " {}", display_schema(plan.schema().as_arrow()))?;
76 }
77
78 self.indent += 1;
79 Ok(TreeNodeRecursion::Continue)
80 }
81
82 fn f_up(
83 &mut self,
84 _plan: &'n LogicalPlan,
85 ) -> datafusion_common::Result<TreeNodeRecursion> {
86 self.indent -= 1;
87 Ok(TreeNodeRecursion::Continue)
88 }
89}
90
91pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
110 struct Wrapper<'a>(&'a Schema);
111
112 impl fmt::Display for Wrapper<'_> {
113 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114 write!(f, "[")?;
115 for (idx, field) in self.0.fields().iter().enumerate() {
116 if idx > 0 {
117 write!(f, ", ")?;
118 }
119 let nullable_str = if field.is_nullable() { ";N" } else { "" };
120 write!(
121 f,
122 "{}:{:?}{}",
123 field.name(),
124 field.data_type(),
125 nullable_str
126 )?;
127 }
128 write!(f, "]")
129 }
130 }
131 Wrapper(schema)
132}
133
134pub struct GraphvizVisitor<'a, 'b> {
138 f: &'a mut fmt::Formatter<'b>,
139 graphviz_builder: GraphvizBuilder,
140 with_schema: bool,
142
143 parent_ids: Vec<usize>,
146}
147
148impl<'a, 'b> GraphvizVisitor<'a, 'b> {
149 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
150 Self {
151 f,
152 graphviz_builder: GraphvizBuilder::default(),
153 with_schema: false,
154 parent_ids: Vec::new(),
155 }
156 }
157
158 pub fn set_with_schema(&mut self, with_schema: bool) {
160 self.with_schema = with_schema;
161 }
162
163 pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
164 self.graphviz_builder.start_cluster(self.f, label)
165 }
166
167 pub fn post_visit_plan(&mut self) -> fmt::Result {
168 self.graphviz_builder.end_cluster(self.f)
169 }
170
171 pub fn start_graph(&mut self) -> fmt::Result {
172 self.graphviz_builder.start_graph(self.f)
173 }
174
175 pub fn end_graph(&mut self) -> fmt::Result {
176 self.graphviz_builder.end_graph(self.f)
177 }
178}
179
180impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
181 type Node = LogicalPlan;
182
183 fn f_down(
184 &mut self,
185 plan: &'n LogicalPlan,
186 ) -> datafusion_common::Result<TreeNodeRecursion> {
187 let id = self.graphviz_builder.next_id();
188
189 let label = if self.with_schema {
192 format!(
193 r"{}\nSchema: {}",
194 plan.display(),
195 display_schema(plan.schema().as_arrow())
196 )
197 } else {
198 format!("{}", plan.display())
199 };
200
201 self.graphviz_builder
202 .add_node(self.f, id, &label, None)
203 .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
204
205 if let Some(parent_id) = self.parent_ids.last() {
208 self.graphviz_builder
209 .add_edge(self.f, *parent_id, id)
210 .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
211 }
212
213 self.parent_ids.push(id);
214 Ok(TreeNodeRecursion::Continue)
215 }
216
217 fn f_up(
218 &mut self,
219 _plan: &LogicalPlan,
220 ) -> datafusion_common::Result<TreeNodeRecursion> {
221 let res = self.parent_ids.pop();
224 res.ok_or(internal_datafusion_err!("Fail to format"))
225 .map(|_| TreeNodeRecursion::Continue)
226 }
227}
228
229pub struct PgJsonVisitor<'a, 'b> {
277 f: &'a mut fmt::Formatter<'b>,
278
279 objects: HashMap<u32, serde_json::Value>,
281
282 next_id: u32,
283
284 with_schema: bool,
286
287 parent_ids: Vec<u32>,
290}
291
292impl<'a, 'b> PgJsonVisitor<'a, 'b> {
293 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
294 Self {
295 f,
296 objects: HashMap::new(),
297 next_id: 0,
298 with_schema: false,
299 parent_ids: Vec::new(),
300 }
301 }
302
303 pub fn with_schema(&mut self, with_schema: bool) {
305 self.with_schema = with_schema;
306 }
307
308 fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
310 match node {
311 LogicalPlan::EmptyRelation(_) => {
312 json!({
313 "Node Type": "EmptyRelation",
314 })
315 }
316 LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
317 json!({
318 "Node Type": "RecursiveQuery",
319 "Is Distinct": is_distinct,
320 })
321 }
322 LogicalPlan::Values(Values { ref values, .. }) => {
323 let str_values = values
324 .iter()
325 .take(5)
327 .map(|row| {
328 let item = row
329 .iter()
330 .map(|expr| expr.to_string())
331 .collect::<Vec<_>>()
332 .join(", ");
333 format!("({item})")
334 })
335 .collect::<Vec<_>>()
336 .join(", ");
337
338 let eclipse = if values.len() > 5 { "..." } else { "" };
339
340 let values_str = format!("{str_values}{eclipse}");
341 json!({
342 "Node Type": "Values",
343 "Values": values_str
344 })
345 }
346 LogicalPlan::TableScan(TableScan {
347 ref source,
348 ref table_name,
349 ref filters,
350 ref fetch,
351 ..
352 }) => {
353 let mut object = json!({
354 "Node Type": "TableScan",
355 "Relation Name": table_name.table(),
356 });
357
358 if let Some(s) = table_name.schema() {
359 object["Schema"] = serde_json::Value::String(s.to_string());
360 }
361
362 if let Some(c) = table_name.catalog() {
363 object["Catalog"] = serde_json::Value::String(c.to_string());
364 }
365
366 if !filters.is_empty() {
367 let mut full_filter = vec![];
368 let mut partial_filter = vec![];
369 let mut unsupported_filters = vec![];
370 let filters: Vec<&Expr> = filters.iter().collect();
371
372 if let Ok(results) = source.supports_filters_pushdown(&filters) {
373 filters.iter().zip(results.iter()).for_each(
374 |(x, res)| match res {
375 TableProviderFilterPushDown::Exact => full_filter.push(x),
376 TableProviderFilterPushDown::Inexact => {
377 partial_filter.push(x)
378 }
379 TableProviderFilterPushDown::Unsupported => {
380 unsupported_filters.push(x)
381 }
382 },
383 );
384 }
385
386 if !full_filter.is_empty() {
387 object["Full Filters"] =
388 serde_json::Value::String(expr_vec_fmt!(full_filter));
389 };
390 if !partial_filter.is_empty() {
391 object["Partial Filters"] =
392 serde_json::Value::String(expr_vec_fmt!(partial_filter));
393 }
394 if !unsupported_filters.is_empty() {
395 object["Unsupported Filters"] =
396 serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
397 }
398 }
399
400 if let Some(f) = fetch {
401 object["Fetch"] = serde_json::Value::Number((*f).into());
402 }
403
404 object
405 }
406 LogicalPlan::Projection(Projection { ref expr, .. }) => {
407 json!({
408 "Node Type": "Projection",
409 "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
410 })
411 }
412 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
413 json!({
414 "Node Type": "Projection",
415 "Operation": op.name(),
416 "Table Name": table_name.table()
417 })
418 }
419 LogicalPlan::Copy(CopyTo {
420 input: _,
421 output_url,
422 file_type,
423 partition_by: _,
424 options,
425 output_schema: _,
426 }) => {
427 let op_str = options
428 .iter()
429 .map(|(k, v)| format!("{k}={v}"))
430 .collect::<Vec<_>>()
431 .join(", ");
432 json!({
433 "Node Type": "CopyTo",
434 "Output URL": output_url,
435 "File Type": format!("{}", file_type.get_ext()),
436 "Options": op_str
437 })
438 }
439 LogicalPlan::Ddl(ddl) => {
440 json!({
441 "Node Type": "Ddl",
442 "Operation": format!("{}", ddl.display())
443 })
444 }
445 LogicalPlan::Filter(Filter {
446 predicate: ref expr,
447 ..
448 }) => {
449 json!({
450 "Node Type": "Filter",
451 "Condition": format!("{}", expr)
452 })
453 }
454 LogicalPlan::Window(Window {
455 ref window_expr, ..
456 }) => {
457 json!({
458 "Node Type": "WindowAggr",
459 "Expressions": expr_vec_fmt!(window_expr)
460 })
461 }
462 LogicalPlan::Aggregate(Aggregate {
463 ref group_expr,
464 ref aggr_expr,
465 ..
466 }) => {
467 json!({
468 "Node Type": "Aggregate",
469 "Group By": expr_vec_fmt!(group_expr),
470 "Aggregates": expr_vec_fmt!(aggr_expr)
471 })
472 }
473 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
474 let mut object = json!({
475 "Node Type": "Sort",
476 "Sort Key": expr_vec_fmt!(expr),
477 });
478
479 if let Some(fetch) = fetch {
480 object["Fetch"] = serde_json::Value::Number((*fetch).into());
481 }
482
483 object
484 }
485 LogicalPlan::Join(Join {
486 on: ref keys,
487 filter,
488 join_constraint,
489 join_type,
490 ..
491 }) => {
492 let join_expr: Vec<String> =
493 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
494 let filter_expr = filter
495 .as_ref()
496 .map(|expr| format!(" Filter: {expr}"))
497 .unwrap_or_else(|| "".to_string());
498 json!({
499 "Node Type": format!("{} Join", join_type),
500 "Join Constraint": format!("{:?}", join_constraint),
501 "Join Keys": join_expr.join(", "),
502 "Filter": format!("{}", filter_expr)
503 })
504 }
505 LogicalPlan::Repartition(Repartition {
506 partitioning_scheme,
507 ..
508 }) => match partitioning_scheme {
509 Partitioning::RoundRobinBatch(n) => {
510 json!({
511 "Node Type": "Repartition",
512 "Partitioning Scheme": "RoundRobinBatch",
513 "Partition Count": n
514 })
515 }
516 Partitioning::Hash(expr, n) => {
517 let hash_expr: Vec<String> =
518 expr.iter().map(|e| format!("{e}")).collect();
519
520 json!({
521 "Node Type": "Repartition",
522 "Partitioning Scheme": "Hash",
523 "Partition Count": n,
524 "Partitioning Key": hash_expr
525 })
526 }
527 Partitioning::DistributeBy(expr) => {
528 let dist_by_expr: Vec<String> =
529 expr.iter().map(|e| format!("{e}")).collect();
530 json!({
531 "Node Type": "Repartition",
532 "Partitioning Scheme": "DistributeBy",
533 "Partitioning Key": dist_by_expr
534 })
535 }
536 },
537 LogicalPlan::Limit(Limit {
538 ref skip,
539 ref fetch,
540 ..
541 }) => {
542 let mut object = serde_json::json!(
543 {
544 "Node Type": "Limit",
545 }
546 );
547 if let Some(s) = skip {
548 object["Skip"] = s.to_string().into()
549 };
550 if let Some(f) = fetch {
551 object["Fetch"] = f.to_string().into()
552 };
553 object
554 }
555 LogicalPlan::Subquery(Subquery { .. }) => {
556 json!({
557 "Node Type": "Subquery"
558 })
559 }
560 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
561 json!({
562 "Node Type": "Subquery",
563 "Alias": alias.table(),
564 })
565 }
566 LogicalPlan::Statement(statement) => {
567 json!({
568 "Node Type": "Statement",
569 "Statement": format!("{}", statement.display())
570 })
571 }
572 LogicalPlan::Distinct(distinct) => match distinct {
573 Distinct::All(_) => {
574 json!({
575 "Node Type": "DistinctAll"
576 })
577 }
578 Distinct::On(DistinctOn {
579 on_expr,
580 select_expr,
581 sort_expr,
582 ..
583 }) => {
584 let mut object = json!({
585 "Node Type": "DistinctOn",
586 "On": expr_vec_fmt!(on_expr),
587 "Select": expr_vec_fmt!(select_expr),
588 });
589 if let Some(sort_expr) = sort_expr {
590 object["Sort"] =
591 serde_json::Value::String(expr_vec_fmt!(sort_expr));
592 }
593
594 object
595 }
596 },
597 LogicalPlan::Explain { .. } => {
598 json!({
599 "Node Type": "Explain"
600 })
601 }
602 LogicalPlan::Analyze { .. } => {
603 json!({
604 "Node Type": "Analyze"
605 })
606 }
607 LogicalPlan::Union(_) => {
608 json!({
609 "Node Type": "Union"
610 })
611 }
612 LogicalPlan::Extension(e) => {
613 json!({
614 "Node Type": e.node.name(),
615 "Detail": format!("{:?}", e.node)
616 })
617 }
618 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
619 json!({
620 "Node Type": "DescribeTable"
621 })
622 }
623 LogicalPlan::Unnest(Unnest {
624 input: plan,
625 list_type_columns: list_col_indices,
626 struct_type_columns: struct_col_indices,
627 ..
628 }) => {
629 let input_columns = plan.schema().columns();
630 let list_type_columns = list_col_indices
631 .iter()
632 .map(|(i, unnest_info)| {
633 format!(
634 "{}|depth={:?}",
635 &input_columns[*i].to_string(),
636 unnest_info.depth
637 )
638 })
639 .collect::<Vec<String>>();
640 let struct_type_columns = struct_col_indices
641 .iter()
642 .map(|i| &input_columns[*i])
643 .collect::<Vec<&Column>>();
644 json!({
645 "Node Type": "Unnest",
646 "ListColumn": expr_vec_fmt!(list_type_columns),
647 "StructColumn": expr_vec_fmt!(struct_type_columns),
648 })
649 }
650 }
651 }
652}
653
654impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
655 type Node = LogicalPlan;
656
657 fn f_down(
658 &mut self,
659 node: &'n LogicalPlan,
660 ) -> datafusion_common::Result<TreeNodeRecursion> {
661 let id = self.next_id;
662 self.next_id += 1;
663 let mut object = Self::to_json_value(node);
664
665 object["Plans"] = serde_json::Value::Array(vec![]);
666
667 if self.with_schema {
668 object["Output"] = serde_json::Value::Array(
669 node.schema()
670 .fields()
671 .iter()
672 .map(|f| f.name().to_string())
673 .map(serde_json::Value::String)
674 .collect(),
675 );
676 };
677
678 self.objects.insert(id, object);
679 self.parent_ids.push(id);
680 Ok(TreeNodeRecursion::Continue)
681 }
682
683 fn f_up(
684 &mut self,
685 _node: &Self::Node,
686 ) -> datafusion_common::Result<TreeNodeRecursion> {
687 let id = self.parent_ids.pop().unwrap();
688
689 let current_node = self
690 .objects
691 .remove(&id)
692 .ok_or_else(|| internal_datafusion_err!("Missing current node!"))?;
693
694 if let Some(parent_id) = self.parent_ids.last() {
695 let parent_node = self
696 .objects
697 .get_mut(parent_id)
698 .expect("Missing parent node!");
699 let plans = parent_node
700 .get_mut("Plans")
701 .and_then(|p| p.as_array_mut())
702 .expect("Plans should be an array");
703
704 plans.push(current_node);
705 } else {
706 let plan = serde_json::json!([{"Plan": current_node}]);
708 write!(
709 self.f,
710 "{}",
711 serde_json::to_string_pretty(&plan)
712 .map_err(|e| DataFusionError::External(Box::new(e)))?
713 )?;
714 }
715
716 Ok(TreeNodeRecursion::Continue)
717 }
718}
719
720#[cfg(test)]
721mod tests {
722 use arrow::datatypes::{DataType, Field};
723 use insta::assert_snapshot;
724
725 use super::*;
726
727 #[test]
728 fn test_display_empty_schema() {
729 let schema = Schema::empty();
730 assert_snapshot!(display_schema(&schema), @"[]");
731 }
732
733 #[test]
734 fn test_display_schema() {
735 let schema = Schema::new(vec![
736 Field::new("id", DataType::Int32, false),
737 Field::new("first_name", DataType::Utf8, true),
738 ]);
739
740 assert_snapshot!(display_schema(&schema), @"[id:Int32, first_name:Utf8;N]");
741 }
742}