1use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24 expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
25 Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
26 Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
27 Unnest, Values, Window,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{Column, DataFusionError};
35use serde_json::json;
36
37pub struct IndentVisitor<'a, 'b> {
43 f: &'a mut fmt::Formatter<'b>,
44 with_schema: bool,
46 indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51 pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54 Self {
55 f,
56 with_schema,
57 indent: 0,
58 }
59 }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63 type Node = LogicalPlan;
64
65 fn f_down(
66 &mut self,
67 plan: &'n LogicalPlan,
68 ) -> datafusion_common::Result<TreeNodeRecursion> {
69 if self.indent > 0 {
70 writeln!(self.f)?;
71 }
72 write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73 write!(self.f, "{}", plan.display())?;
74 if self.with_schema {
75 write!(
76 self.f,
77 " {}",
78 display_schema(&plan.schema().as_ref().to_owned().into())
79 )?;
80 }
81
82 self.indent += 1;
83 Ok(TreeNodeRecursion::Continue)
84 }
85
86 fn f_up(
87 &mut self,
88 _plan: &'n LogicalPlan,
89 ) -> datafusion_common::Result<TreeNodeRecursion> {
90 self.indent -= 1;
91 Ok(TreeNodeRecursion::Continue)
92 }
93}
94
95pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
114 struct Wrapper<'a>(&'a Schema);
115
116 impl fmt::Display for Wrapper<'_> {
117 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118 write!(f, "[")?;
119 for (idx, field) in self.0.fields().iter().enumerate() {
120 if idx > 0 {
121 write!(f, ", ")?;
122 }
123 let nullable_str = if field.is_nullable() { ";N" } else { "" };
124 write!(
125 f,
126 "{}:{:?}{}",
127 field.name(),
128 field.data_type(),
129 nullable_str
130 )?;
131 }
132 write!(f, "]")
133 }
134 }
135 Wrapper(schema)
136}
137
138pub struct GraphvizVisitor<'a, 'b> {
142 f: &'a mut fmt::Formatter<'b>,
143 graphviz_builder: GraphvizBuilder,
144 with_schema: bool,
146
147 parent_ids: Vec<usize>,
150}
151
152impl<'a, 'b> GraphvizVisitor<'a, 'b> {
153 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
154 Self {
155 f,
156 graphviz_builder: GraphvizBuilder::default(),
157 with_schema: false,
158 parent_ids: Vec::new(),
159 }
160 }
161
162 pub fn set_with_schema(&mut self, with_schema: bool) {
164 self.with_schema = with_schema;
165 }
166
167 pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
168 self.graphviz_builder.start_cluster(self.f, label)
169 }
170
171 pub fn post_visit_plan(&mut self) -> fmt::Result {
172 self.graphviz_builder.end_cluster(self.f)
173 }
174
175 pub fn start_graph(&mut self) -> fmt::Result {
176 self.graphviz_builder.start_graph(self.f)
177 }
178
179 pub fn end_graph(&mut self) -> fmt::Result {
180 self.graphviz_builder.end_graph(self.f)
181 }
182}
183
184impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
185 type Node = LogicalPlan;
186
187 fn f_down(
188 &mut self,
189 plan: &'n LogicalPlan,
190 ) -> datafusion_common::Result<TreeNodeRecursion> {
191 let id = self.graphviz_builder.next_id();
192
193 let label = if self.with_schema {
196 format!(
197 r"{}\nSchema: {}",
198 plan.display(),
199 display_schema(&plan.schema().as_ref().to_owned().into())
200 )
201 } else {
202 format!("{}", plan.display())
203 };
204
205 self.graphviz_builder
206 .add_node(self.f, id, &label, None)
207 .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?;
208
209 if let Some(parent_id) = self.parent_ids.last() {
212 self.graphviz_builder
213 .add_edge(self.f, *parent_id, id)
214 .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?;
215 }
216
217 self.parent_ids.push(id);
218 Ok(TreeNodeRecursion::Continue)
219 }
220
221 fn f_up(
222 &mut self,
223 _plan: &LogicalPlan,
224 ) -> datafusion_common::Result<TreeNodeRecursion> {
225 let res = self.parent_ids.pop();
228 res.ok_or(DataFusionError::Internal("Fail to format".to_string()))
229 .map(|_| TreeNodeRecursion::Continue)
230 }
231}
232
233pub struct PgJsonVisitor<'a, 'b> {
281 f: &'a mut fmt::Formatter<'b>,
282
283 objects: HashMap<u32, serde_json::Value>,
285
286 next_id: u32,
287
288 with_schema: bool,
290
291 parent_ids: Vec<u32>,
294}
295
296impl<'a, 'b> PgJsonVisitor<'a, 'b> {
297 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
298 Self {
299 f,
300 objects: HashMap::new(),
301 next_id: 0,
302 with_schema: false,
303 parent_ids: Vec::new(),
304 }
305 }
306
307 pub fn with_schema(&mut self, with_schema: bool) {
309 self.with_schema = with_schema;
310 }
311
312 fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
314 match node {
315 LogicalPlan::EmptyRelation(_) => {
316 json!({
317 "Node Type": "EmptyRelation",
318 })
319 }
320 LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
321 json!({
322 "Node Type": "RecursiveQuery",
323 "Is Distinct": is_distinct,
324 })
325 }
326 LogicalPlan::Values(Values { ref values, .. }) => {
327 let str_values = values
328 .iter()
329 .take(5)
331 .map(|row| {
332 let item = row
333 .iter()
334 .map(|expr| expr.to_string())
335 .collect::<Vec<_>>()
336 .join(", ");
337 format!("({item})")
338 })
339 .collect::<Vec<_>>()
340 .join(", ");
341
342 let eclipse = if values.len() > 5 { "..." } else { "" };
343
344 let values_str = format!("{str_values}{eclipse}");
345 json!({
346 "Node Type": "Values",
347 "Values": values_str
348 })
349 }
350 LogicalPlan::TableScan(TableScan {
351 ref source,
352 ref table_name,
353 ref filters,
354 ref fetch,
355 ..
356 }) => {
357 let mut object = json!({
358 "Node Type": "TableScan",
359 "Relation Name": table_name.table(),
360 });
361
362 if let Some(s) = table_name.schema() {
363 object["Schema"] = serde_json::Value::String(s.to_string());
364 }
365
366 if let Some(c) = table_name.catalog() {
367 object["Catalog"] = serde_json::Value::String(c.to_string());
368 }
369
370 if !filters.is_empty() {
371 let mut full_filter = vec![];
372 let mut partial_filter = vec![];
373 let mut unsupported_filters = vec![];
374 let filters: Vec<&Expr> = filters.iter().collect();
375
376 if let Ok(results) = source.supports_filters_pushdown(&filters) {
377 filters.iter().zip(results.iter()).for_each(
378 |(x, res)| match res {
379 TableProviderFilterPushDown::Exact => full_filter.push(x),
380 TableProviderFilterPushDown::Inexact => {
381 partial_filter.push(x)
382 }
383 TableProviderFilterPushDown::Unsupported => {
384 unsupported_filters.push(x)
385 }
386 },
387 );
388 }
389
390 if !full_filter.is_empty() {
391 object["Full Filters"] =
392 serde_json::Value::String(expr_vec_fmt!(full_filter));
393 };
394 if !partial_filter.is_empty() {
395 object["Partial Filters"] =
396 serde_json::Value::String(expr_vec_fmt!(partial_filter));
397 }
398 if !unsupported_filters.is_empty() {
399 object["Unsupported Filters"] =
400 serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
401 }
402 }
403
404 if let Some(f) = fetch {
405 object["Fetch"] = serde_json::Value::Number((*f).into());
406 }
407
408 object
409 }
410 LogicalPlan::Projection(Projection { ref expr, .. }) => {
411 json!({
412 "Node Type": "Projection",
413 "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
414 })
415 }
416 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
417 json!({
418 "Node Type": "Projection",
419 "Operation": op.name(),
420 "Table Name": table_name.table()
421 })
422 }
423 LogicalPlan::Copy(CopyTo {
424 input: _,
425 output_url,
426 file_type,
427 partition_by: _,
428 options,
429 output_schema: _,
430 }) => {
431 let op_str = options
432 .iter()
433 .map(|(k, v)| format!("{k}={v}"))
434 .collect::<Vec<_>>()
435 .join(", ");
436 json!({
437 "Node Type": "CopyTo",
438 "Output URL": output_url,
439 "File Type": format!("{}", file_type.get_ext()),
440 "Options": op_str
441 })
442 }
443 LogicalPlan::Ddl(ddl) => {
444 json!({
445 "Node Type": "Ddl",
446 "Operation": format!("{}", ddl.display())
447 })
448 }
449 LogicalPlan::Filter(Filter {
450 predicate: ref expr,
451 ..
452 }) => {
453 json!({
454 "Node Type": "Filter",
455 "Condition": format!("{}", expr)
456 })
457 }
458 LogicalPlan::Window(Window {
459 ref window_expr, ..
460 }) => {
461 json!({
462 "Node Type": "WindowAggr",
463 "Expressions": expr_vec_fmt!(window_expr)
464 })
465 }
466 LogicalPlan::Aggregate(Aggregate {
467 ref group_expr,
468 ref aggr_expr,
469 ..
470 }) => {
471 json!({
472 "Node Type": "Aggregate",
473 "Group By": expr_vec_fmt!(group_expr),
474 "Aggregates": expr_vec_fmt!(aggr_expr)
475 })
476 }
477 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
478 let mut object = json!({
479 "Node Type": "Sort",
480 "Sort Key": expr_vec_fmt!(expr),
481 });
482
483 if let Some(fetch) = fetch {
484 object["Fetch"] = serde_json::Value::Number((*fetch).into());
485 }
486
487 object
488 }
489 LogicalPlan::Join(Join {
490 on: ref keys,
491 filter,
492 join_constraint,
493 join_type,
494 ..
495 }) => {
496 let join_expr: Vec<String> =
497 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
498 let filter_expr = filter
499 .as_ref()
500 .map(|expr| format!(" Filter: {expr}"))
501 .unwrap_or_else(|| "".to_string());
502 json!({
503 "Node Type": format!("{} Join", join_type),
504 "Join Constraint": format!("{:?}", join_constraint),
505 "Join Keys": join_expr.join(", "),
506 "Filter": format!("{}", filter_expr)
507 })
508 }
509 LogicalPlan::Repartition(Repartition {
510 partitioning_scheme,
511 ..
512 }) => match partitioning_scheme {
513 Partitioning::RoundRobinBatch(n) => {
514 json!({
515 "Node Type": "Repartition",
516 "Partitioning Scheme": "RoundRobinBatch",
517 "Partition Count": n
518 })
519 }
520 Partitioning::Hash(expr, n) => {
521 let hash_expr: Vec<String> =
522 expr.iter().map(|e| format!("{e}")).collect();
523
524 json!({
525 "Node Type": "Repartition",
526 "Partitioning Scheme": "Hash",
527 "Partition Count": n,
528 "Partitioning Key": hash_expr
529 })
530 }
531 Partitioning::DistributeBy(expr) => {
532 let dist_by_expr: Vec<String> =
533 expr.iter().map(|e| format!("{e}")).collect();
534 json!({
535 "Node Type": "Repartition",
536 "Partitioning Scheme": "DistributeBy",
537 "Partitioning Key": dist_by_expr
538 })
539 }
540 },
541 LogicalPlan::Limit(Limit {
542 ref skip,
543 ref fetch,
544 ..
545 }) => {
546 let mut object = serde_json::json!(
547 {
548 "Node Type": "Limit",
549 }
550 );
551 if let Some(s) = skip {
552 object["Skip"] = s.to_string().into()
553 };
554 if let Some(f) = fetch {
555 object["Fetch"] = f.to_string().into()
556 };
557 object
558 }
559 LogicalPlan::Subquery(Subquery { .. }) => {
560 json!({
561 "Node Type": "Subquery"
562 })
563 }
564 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
565 json!({
566 "Node Type": "Subquery",
567 "Alias": alias.table(),
568 })
569 }
570 LogicalPlan::Statement(statement) => {
571 json!({
572 "Node Type": "Statement",
573 "Statement": format!("{}", statement.display())
574 })
575 }
576 LogicalPlan::Distinct(distinct) => match distinct {
577 Distinct::All(_) => {
578 json!({
579 "Node Type": "DistinctAll"
580 })
581 }
582 Distinct::On(DistinctOn {
583 on_expr,
584 select_expr,
585 sort_expr,
586 ..
587 }) => {
588 let mut object = json!({
589 "Node Type": "DistinctOn",
590 "On": expr_vec_fmt!(on_expr),
591 "Select": expr_vec_fmt!(select_expr),
592 });
593 if let Some(sort_expr) = sort_expr {
594 object["Sort"] =
595 serde_json::Value::String(expr_vec_fmt!(sort_expr));
596 }
597
598 object
599 }
600 },
601 LogicalPlan::Explain { .. } => {
602 json!({
603 "Node Type": "Explain"
604 })
605 }
606 LogicalPlan::Analyze { .. } => {
607 json!({
608 "Node Type": "Analyze"
609 })
610 }
611 LogicalPlan::Union(_) => {
612 json!({
613 "Node Type": "Union"
614 })
615 }
616 LogicalPlan::Extension(e) => {
617 json!({
618 "Node Type": e.node.name(),
619 "Detail": format!("{:?}", e.node)
620 })
621 }
622 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
623 json!({
624 "Node Type": "DescribeTable"
625 })
626 }
627 LogicalPlan::Unnest(Unnest {
628 input: plan,
629 list_type_columns: list_col_indices,
630 struct_type_columns: struct_col_indices,
631 ..
632 }) => {
633 let input_columns = plan.schema().columns();
634 let list_type_columns = list_col_indices
635 .iter()
636 .map(|(i, unnest_info)| {
637 format!(
638 "{}|depth={:?}",
639 &input_columns[*i].to_string(),
640 unnest_info.depth
641 )
642 })
643 .collect::<Vec<String>>();
644 let struct_type_columns = struct_col_indices
645 .iter()
646 .map(|i| &input_columns[*i])
647 .collect::<Vec<&Column>>();
648 json!({
649 "Node Type": "Unnest",
650 "ListColumn": expr_vec_fmt!(list_type_columns),
651 "StructColumn": expr_vec_fmt!(struct_type_columns),
652 })
653 }
654 }
655 }
656}
657
658impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
659 type Node = LogicalPlan;
660
661 fn f_down(
662 &mut self,
663 node: &'n LogicalPlan,
664 ) -> datafusion_common::Result<TreeNodeRecursion> {
665 let id = self.next_id;
666 self.next_id += 1;
667 let mut object = Self::to_json_value(node);
668
669 object["Plans"] = serde_json::Value::Array(vec![]);
670
671 if self.with_schema {
672 object["Output"] = serde_json::Value::Array(
673 node.schema()
674 .fields()
675 .iter()
676 .map(|f| f.name().to_string())
677 .map(serde_json::Value::String)
678 .collect(),
679 );
680 };
681
682 self.objects.insert(id, object);
683 self.parent_ids.push(id);
684 Ok(TreeNodeRecursion::Continue)
685 }
686
687 fn f_up(
688 &mut self,
689 _node: &Self::Node,
690 ) -> datafusion_common::Result<TreeNodeRecursion> {
691 let id = self.parent_ids.pop().unwrap();
692
693 let current_node = self.objects.remove(&id).ok_or_else(|| {
694 DataFusionError::Internal("Missing current node!".to_string())
695 })?;
696
697 if let Some(parent_id) = self.parent_ids.last() {
698 let parent_node = self
699 .objects
700 .get_mut(parent_id)
701 .expect("Missing parent node!");
702 let plans = parent_node
703 .get_mut("Plans")
704 .and_then(|p| p.as_array_mut())
705 .expect("Plans should be an array");
706
707 plans.push(current_node);
708 } else {
709 let plan = serde_json::json!([{"Plan": current_node}]);
711 write!(
712 self.f,
713 "{}",
714 serde_json::to_string_pretty(&plan)
715 .map_err(|e| DataFusionError::External(Box::new(e)))?
716 )?;
717 }
718
719 Ok(TreeNodeRecursion::Continue)
720 }
721}
722
723#[cfg(test)]
724mod tests {
725 use arrow::datatypes::{DataType, Field};
726 use insta::assert_snapshot;
727
728 use super::*;
729
730 #[test]
731 fn test_display_empty_schema() {
732 let schema = Schema::empty();
733 assert_snapshot!(display_schema(&schema), @"[]");
734 }
735
736 #[test]
737 fn test_display_schema() {
738 let schema = Schema::new(vec![
739 Field::new("id", DataType::Int32, false),
740 Field::new("first_name", DataType::Utf8, true),
741 ]);
742
743 assert_snapshot!(display_schema(&schema), @"[id:Int32, first_name:Utf8;N]");
744 }
745}