1use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24 Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr, Filter, Join,
25 Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort,
26 Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan, Unnest, Values,
27 Window, expr_vec_fmt,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{Column, DataFusionError, internal_datafusion_err};
35use serde_json::json;
36
37pub struct IndentVisitor<'a, 'b> {
43 f: &'a mut fmt::Formatter<'b>,
44 with_schema: bool,
46 indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51 pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54 Self {
55 f,
56 with_schema,
57 indent: 0,
58 }
59 }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63 type Node = LogicalPlan;
64
65 fn f_down(
66 &mut self,
67 plan: &'n LogicalPlan,
68 ) -> datafusion_common::Result<TreeNodeRecursion> {
69 if self.indent > 0 {
70 writeln!(self.f)?;
71 }
72 write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73 write!(self.f, "{}", plan.display())?;
74 if self.with_schema {
75 write!(self.f, " {}", display_schema(plan.schema().as_arrow()))?;
76 }
77
78 self.indent += 1;
79 Ok(TreeNodeRecursion::Continue)
80 }
81
82 fn f_up(
83 &mut self,
84 _plan: &'n LogicalPlan,
85 ) -> datafusion_common::Result<TreeNodeRecursion> {
86 self.indent -= 1;
87 Ok(TreeNodeRecursion::Continue)
88 }
89}
90
91pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
110 struct Wrapper<'a>(&'a Schema);
111
112 impl fmt::Display for Wrapper<'_> {
113 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114 write!(f, "[")?;
115 for (idx, field) in self.0.fields().iter().enumerate() {
116 if idx > 0 {
117 write!(f, ", ")?;
118 }
119 let nullable_str = if field.is_nullable() { ";N" } else { "" };
120 write!(
121 f,
122 "{}:{:?}{}",
123 field.name(),
124 field.data_type(),
125 nullable_str
126 )?;
127 }
128 write!(f, "]")
129 }
130 }
131 Wrapper(schema)
132}
133
134pub struct GraphvizVisitor<'a, 'b> {
138 f: &'a mut fmt::Formatter<'b>,
139 graphviz_builder: GraphvizBuilder,
140 with_schema: bool,
142
143 parent_ids: Vec<usize>,
146}
147
148impl<'a, 'b> GraphvizVisitor<'a, 'b> {
149 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
150 Self {
151 f,
152 graphviz_builder: GraphvizBuilder::default(),
153 with_schema: false,
154 parent_ids: Vec::new(),
155 }
156 }
157
158 pub fn set_with_schema(&mut self, with_schema: bool) {
160 self.with_schema = with_schema;
161 }
162
163 pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
164 self.graphviz_builder.start_cluster(self.f, label)
165 }
166
167 pub fn post_visit_plan(&mut self) -> fmt::Result {
168 self.graphviz_builder.end_cluster(self.f)
169 }
170
171 pub fn start_graph(&mut self) -> fmt::Result {
172 self.graphviz_builder.start_graph(self.f)
173 }
174
175 pub fn end_graph(&mut self) -> fmt::Result {
176 self.graphviz_builder.end_graph(self.f)
177 }
178}
179
180impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
181 type Node = LogicalPlan;
182
183 fn f_down(
184 &mut self,
185 plan: &'n LogicalPlan,
186 ) -> datafusion_common::Result<TreeNodeRecursion> {
187 let id = self.graphviz_builder.next_id();
188
189 let label = if self.with_schema {
192 format!(
193 r"{}\nSchema: {}",
194 plan.display(),
195 display_schema(plan.schema().as_arrow())
196 )
197 } else {
198 format!("{}", plan.display())
199 };
200
201 self.graphviz_builder
202 .add_node(self.f, id, &label, None)
203 .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
204
205 if let Some(parent_id) = self.parent_ids.last() {
208 self.graphviz_builder
209 .add_edge(self.f, *parent_id, id)
210 .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
211 }
212
213 self.parent_ids.push(id);
214 Ok(TreeNodeRecursion::Continue)
215 }
216
217 fn f_up(
218 &mut self,
219 _plan: &LogicalPlan,
220 ) -> datafusion_common::Result<TreeNodeRecursion> {
221 let res = self.parent_ids.pop();
224 res.ok_or(internal_datafusion_err!("Fail to format"))
225 .map(|_| TreeNodeRecursion::Continue)
226 }
227}
228
229pub struct PgJsonVisitor<'a, 'b> {
277 f: &'a mut fmt::Formatter<'b>,
278
279 objects: HashMap<u32, serde_json::Value>,
281
282 next_id: u32,
283
284 with_schema: bool,
286
287 parent_ids: Vec<u32>,
290}
291
292impl<'a, 'b> PgJsonVisitor<'a, 'b> {
293 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
294 Self {
295 f,
296 objects: HashMap::new(),
297 next_id: 0,
298 with_schema: false,
299 parent_ids: Vec::new(),
300 }
301 }
302
303 pub fn with_schema(&mut self, with_schema: bool) {
305 self.with_schema = with_schema;
306 }
307
308 fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
310 match node {
311 LogicalPlan::EmptyRelation(_) => {
312 json!({
313 "Node Type": "EmptyRelation",
314 })
315 }
316 LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
317 json!({
318 "Node Type": "RecursiveQuery",
319 "Is Distinct": is_distinct,
320 })
321 }
322 LogicalPlan::Values(Values { values, .. }) => {
323 let str_values = values
324 .iter()
325 .take(5)
327 .map(|row| {
328 let item = row
329 .iter()
330 .map(|expr| expr.to_string())
331 .collect::<Vec<_>>()
332 .join(", ");
333 format!("({item})")
334 })
335 .collect::<Vec<_>>()
336 .join(", ");
337
338 let eclipse = if values.len() > 5 { "..." } else { "" };
339
340 let values_str = format!("{str_values}{eclipse}");
341 json!({
342 "Node Type": "Values",
343 "Values": values_str
344 })
345 }
346 LogicalPlan::TableScan(TableScan {
347 source,
348 table_name,
349 filters,
350 fetch,
351 ..
352 }) => {
353 let mut object = json!({
354 "Node Type": "TableScan",
355 "Relation Name": table_name.table(),
356 });
357
358 if let Some(s) = table_name.schema() {
359 object["Schema"] = serde_json::Value::String(s.to_string());
360 }
361
362 if let Some(c) = table_name.catalog() {
363 object["Catalog"] = serde_json::Value::String(c.to_string());
364 }
365
366 if !filters.is_empty() {
367 let mut full_filter = vec![];
368 let mut partial_filter = vec![];
369 let mut unsupported_filters = vec![];
370 let filters: Vec<&Expr> = filters.iter().collect();
371
372 if let Ok(results) = source.supports_filters_pushdown(&filters) {
373 filters.iter().zip(results.iter()).for_each(
374 |(x, res)| match res {
375 TableProviderFilterPushDown::Exact => full_filter.push(x),
376 TableProviderFilterPushDown::Inexact => {
377 partial_filter.push(x)
378 }
379 TableProviderFilterPushDown::Unsupported => {
380 unsupported_filters.push(x)
381 }
382 },
383 );
384 }
385
386 if !full_filter.is_empty() {
387 object["Full Filters"] =
388 serde_json::Value::String(expr_vec_fmt!(full_filter));
389 };
390 if !partial_filter.is_empty() {
391 object["Partial Filters"] =
392 serde_json::Value::String(expr_vec_fmt!(partial_filter));
393 }
394 if !unsupported_filters.is_empty() {
395 object["Unsupported Filters"] =
396 serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
397 }
398 }
399
400 if let Some(f) = fetch {
401 object["Fetch"] = serde_json::Value::Number((*f).into());
402 }
403
404 object
405 }
406 LogicalPlan::Projection(Projection { expr, .. }) => {
407 json!({
408 "Node Type": "Projection",
409 "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
410 })
411 }
412 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
413 json!({
414 "Node Type": "Projection",
415 "Operation": op.name(),
416 "Table Name": table_name.table()
417 })
418 }
419 LogicalPlan::Copy(CopyTo {
420 input: _,
421 output_url,
422 file_type,
423 partition_by: _,
424 options,
425 output_schema: _,
426 }) => {
427 let op_str = options
428 .iter()
429 .map(|(k, v)| format!("{k}={v}"))
430 .collect::<Vec<_>>()
431 .join(", ");
432 json!({
433 "Node Type": "CopyTo",
434 "Output URL": output_url,
435 "File Type": format!("{}", file_type.get_ext()),
436 "Options": op_str
437 })
438 }
439 LogicalPlan::Ddl(ddl) => {
440 json!({
441 "Node Type": "Ddl",
442 "Operation": format!("{}", ddl.display())
443 })
444 }
445 LogicalPlan::Filter(Filter {
446 predicate: expr, ..
447 }) => {
448 json!({
449 "Node Type": "Filter",
450 "Condition": format!("{}", expr)
451 })
452 }
453 LogicalPlan::Window(Window { window_expr, .. }) => {
454 json!({
455 "Node Type": "WindowAggr",
456 "Expressions": expr_vec_fmt!(window_expr)
457 })
458 }
459 LogicalPlan::Aggregate(Aggregate {
460 group_expr,
461 aggr_expr,
462 ..
463 }) => {
464 json!({
465 "Node Type": "Aggregate",
466 "Group By": expr_vec_fmt!(group_expr),
467 "Aggregates": expr_vec_fmt!(aggr_expr)
468 })
469 }
470 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
471 let mut object = json!({
472 "Node Type": "Sort",
473 "Sort Key": expr_vec_fmt!(expr),
474 });
475
476 if let Some(fetch) = fetch {
477 object["Fetch"] = serde_json::Value::Number((*fetch).into());
478 }
479
480 object
481 }
482 LogicalPlan::Join(Join {
483 on: keys,
484 filter,
485 join_constraint,
486 join_type,
487 ..
488 }) => {
489 let join_expr: Vec<String> =
490 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
491 let filter_expr = filter
492 .as_ref()
493 .map(|expr| format!(" Filter: {expr}"))
494 .unwrap_or_else(|| "".to_string());
495 json!({
496 "Node Type": format!("{} Join", join_type),
497 "Join Constraint": format!("{:?}", join_constraint),
498 "Join Keys": join_expr.join(", "),
499 "Filter": format!("{}", filter_expr)
500 })
501 }
502 LogicalPlan::Repartition(Repartition {
503 partitioning_scheme,
504 ..
505 }) => match partitioning_scheme {
506 Partitioning::RoundRobinBatch(n) => {
507 json!({
508 "Node Type": "Repartition",
509 "Partitioning Scheme": "RoundRobinBatch",
510 "Partition Count": n
511 })
512 }
513 Partitioning::Hash(expr, n) => {
514 let hash_expr: Vec<String> =
515 expr.iter().map(|e| format!("{e}")).collect();
516
517 json!({
518 "Node Type": "Repartition",
519 "Partitioning Scheme": "Hash",
520 "Partition Count": n,
521 "Partitioning Key": hash_expr
522 })
523 }
524 Partitioning::DistributeBy(expr) => {
525 let dist_by_expr: Vec<String> =
526 expr.iter().map(|e| format!("{e}")).collect();
527 json!({
528 "Node Type": "Repartition",
529 "Partitioning Scheme": "DistributeBy",
530 "Partitioning Key": dist_by_expr
531 })
532 }
533 },
534 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
535 let mut object = serde_json::json!(
536 {
537 "Node Type": "Limit",
538 }
539 );
540 if let Some(s) = skip {
541 object["Skip"] = s.to_string().into()
542 };
543 if let Some(f) = fetch {
544 object["Fetch"] = f.to_string().into()
545 };
546 object
547 }
548 LogicalPlan::Subquery(Subquery { .. }) => {
549 json!({
550 "Node Type": "Subquery"
551 })
552 }
553 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
554 json!({
555 "Node Type": "Subquery",
556 "Alias": alias.table(),
557 })
558 }
559 LogicalPlan::Statement(statement) => {
560 json!({
561 "Node Type": "Statement",
562 "Statement": format!("{}", statement.display())
563 })
564 }
565 LogicalPlan::Distinct(distinct) => match distinct {
566 Distinct::All(_) => {
567 json!({
568 "Node Type": "DistinctAll"
569 })
570 }
571 Distinct::On(DistinctOn {
572 on_expr,
573 select_expr,
574 sort_expr,
575 ..
576 }) => {
577 let mut object = json!({
578 "Node Type": "DistinctOn",
579 "On": expr_vec_fmt!(on_expr),
580 "Select": expr_vec_fmt!(select_expr),
581 });
582 if let Some(sort_expr) = sort_expr {
583 object["Sort"] =
584 serde_json::Value::String(expr_vec_fmt!(sort_expr));
585 }
586
587 object
588 }
589 },
590 LogicalPlan::Explain { .. } => {
591 json!({
592 "Node Type": "Explain"
593 })
594 }
595 LogicalPlan::Analyze { .. } => {
596 json!({
597 "Node Type": "Analyze"
598 })
599 }
600 LogicalPlan::Union(_) => {
601 json!({
602 "Node Type": "Union"
603 })
604 }
605 LogicalPlan::Extension(e) => {
606 json!({
607 "Node Type": e.node.name(),
608 "Detail": format!("{:?}", e.node)
609 })
610 }
611 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
612 json!({
613 "Node Type": "DescribeTable"
614 })
615 }
616 LogicalPlan::Unnest(Unnest {
617 input: plan,
618 list_type_columns: list_col_indices,
619 struct_type_columns: struct_col_indices,
620 ..
621 }) => {
622 let input_columns = plan.schema().columns();
623 let list_type_columns = list_col_indices
624 .iter()
625 .map(|(i, unnest_info)| {
626 format!(
627 "{}|depth={:?}",
628 &input_columns[*i].to_string(),
629 unnest_info.depth
630 )
631 })
632 .collect::<Vec<String>>();
633 let struct_type_columns = struct_col_indices
634 .iter()
635 .map(|i| &input_columns[*i])
636 .collect::<Vec<&Column>>();
637 json!({
638 "Node Type": "Unnest",
639 "ListColumn": expr_vec_fmt!(list_type_columns),
640 "StructColumn": expr_vec_fmt!(struct_type_columns),
641 })
642 }
643 }
644 }
645}
646
647impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
648 type Node = LogicalPlan;
649
650 fn f_down(
651 &mut self,
652 node: &'n LogicalPlan,
653 ) -> datafusion_common::Result<TreeNodeRecursion> {
654 let id = self.next_id;
655 self.next_id += 1;
656 let mut object = Self::to_json_value(node);
657
658 object["Plans"] = serde_json::Value::Array(vec![]);
659
660 if self.with_schema {
661 object["Output"] = serde_json::Value::Array(
662 node.schema()
663 .fields()
664 .iter()
665 .map(|f| f.name().to_string())
666 .map(serde_json::Value::String)
667 .collect(),
668 );
669 };
670
671 self.objects.insert(id, object);
672 self.parent_ids.push(id);
673 Ok(TreeNodeRecursion::Continue)
674 }
675
676 fn f_up(
677 &mut self,
678 _node: &Self::Node,
679 ) -> datafusion_common::Result<TreeNodeRecursion> {
680 let id = self.parent_ids.pop().unwrap();
681
682 let current_node = self
683 .objects
684 .remove(&id)
685 .ok_or_else(|| internal_datafusion_err!("Missing current node!"))?;
686
687 if let Some(parent_id) = self.parent_ids.last() {
688 let parent_node = self
689 .objects
690 .get_mut(parent_id)
691 .expect("Missing parent node!");
692 let plans = parent_node
693 .get_mut("Plans")
694 .and_then(|p| p.as_array_mut())
695 .expect("Plans should be an array");
696
697 plans.push(current_node);
698 } else {
699 let plan = serde_json::json!([{"Plan": current_node}]);
701 write!(
702 self.f,
703 "{}",
704 serde_json::to_string_pretty(&plan)
705 .map_err(|e| DataFusionError::External(Box::new(e)))?
706 )?;
707 }
708
709 Ok(TreeNodeRecursion::Continue)
710 }
711}
712
713#[cfg(test)]
714mod tests {
715 use arrow::datatypes::{DataType, Field};
716 use insta::assert_snapshot;
717
718 use super::*;
719
720 #[test]
721 fn test_display_empty_schema() {
722 let schema = Schema::empty();
723 assert_snapshot!(display_schema(&schema), @"[]");
724 }
725
726 #[test]
727 fn test_display_schema() {
728 let schema = Schema::new(vec![
729 Field::new("id", DataType::Int32, false),
730 Field::new("first_name", DataType::Utf8, true),
731 ]);
732
733 assert_snapshot!(display_schema(&schema), @"[id:Int32, first_name:Utf8;N]");
734 }
735}