1use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24 Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr, Filter, Join,
25 Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort,
26 Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan, Unnest, Values,
27 Window, expr_vec_fmt,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{Column, DataFusionError, internal_datafusion_err};
35use serde_json::json;
36
37pub struct IndentVisitor<'a, 'b> {
43 f: &'a mut fmt::Formatter<'b>,
44 with_schema: bool,
46 indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51 pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54 Self {
55 f,
56 with_schema,
57 indent: 0,
58 }
59 }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63 type Node = LogicalPlan;
64
65 fn f_down(
66 &mut self,
67 plan: &'n LogicalPlan,
68 ) -> datafusion_common::Result<TreeNodeRecursion> {
69 if self.indent > 0 {
70 writeln!(self.f)?;
71 }
72 write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73 write!(self.f, "{}", plan.display())?;
74 if self.with_schema {
75 write!(self.f, " {}", display_schema(plan.schema().as_arrow()))?;
76 }
77
78 self.indent += 1;
79 Ok(TreeNodeRecursion::Continue)
80 }
81
82 fn f_up(
83 &mut self,
84 _plan: &'n LogicalPlan,
85 ) -> datafusion_common::Result<TreeNodeRecursion> {
86 self.indent -= 1;
87 Ok(TreeNodeRecursion::Continue)
88 }
89}
90
91pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
110 struct Wrapper<'a>(&'a Schema);
111
112 impl fmt::Display for Wrapper<'_> {
113 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114 write!(f, "[")?;
115 for (idx, field) in self.0.fields().iter().enumerate() {
116 if idx > 0 {
117 write!(f, ", ")?;
118 }
119 let nullable_str = if field.is_nullable() { ";N" } else { "" };
120 write!(f, "{}:{}{}", field.name(), field.data_type(), nullable_str)?;
121 }
122 write!(f, "]")
123 }
124 }
125 Wrapper(schema)
126}
127
128pub struct GraphvizVisitor<'a, 'b> {
132 f: &'a mut fmt::Formatter<'b>,
133 graphviz_builder: GraphvizBuilder,
134 with_schema: bool,
136
137 parent_ids: Vec<usize>,
140}
141
142impl<'a, 'b> GraphvizVisitor<'a, 'b> {
143 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
144 Self {
145 f,
146 graphviz_builder: GraphvizBuilder::default(),
147 with_schema: false,
148 parent_ids: Vec::new(),
149 }
150 }
151
152 pub fn set_with_schema(&mut self, with_schema: bool) {
154 self.with_schema = with_schema;
155 }
156
157 pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
158 self.graphviz_builder.start_cluster(self.f, label)
159 }
160
161 pub fn post_visit_plan(&mut self) -> fmt::Result {
162 self.graphviz_builder.end_cluster(self.f)
163 }
164
165 pub fn start_graph(&mut self) -> fmt::Result {
166 self.graphviz_builder.start_graph(self.f)
167 }
168
169 pub fn end_graph(&mut self) -> fmt::Result {
170 self.graphviz_builder.end_graph(self.f)
171 }
172}
173
174impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
175 type Node = LogicalPlan;
176
177 fn f_down(
178 &mut self,
179 plan: &'n LogicalPlan,
180 ) -> datafusion_common::Result<TreeNodeRecursion> {
181 let id = self.graphviz_builder.next_id();
182
183 let label = if self.with_schema {
186 format!(
187 r"{}\nSchema: {}",
188 plan.display(),
189 display_schema(plan.schema().as_arrow())
190 )
191 } else {
192 format!("{}", plan.display())
193 };
194
195 self.graphviz_builder
196 .add_node(self.f, id, &label, None)
197 .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
198
199 if let Some(parent_id) = self.parent_ids.last() {
202 self.graphviz_builder
203 .add_edge(self.f, *parent_id, id)
204 .map_err(|_e| internal_datafusion_err!("Fail to format"))?;
205 }
206
207 self.parent_ids.push(id);
208 Ok(TreeNodeRecursion::Continue)
209 }
210
211 fn f_up(
212 &mut self,
213 _plan: &LogicalPlan,
214 ) -> datafusion_common::Result<TreeNodeRecursion> {
215 let res = self.parent_ids.pop();
218 res.ok_or(internal_datafusion_err!("Fail to format"))
219 .map(|_| TreeNodeRecursion::Continue)
220 }
221}
222
223pub struct PgJsonVisitor<'a, 'b> {
271 f: &'a mut fmt::Formatter<'b>,
272
273 objects: HashMap<u32, serde_json::Value>,
275
276 next_id: u32,
277
278 with_schema: bool,
280
281 parent_ids: Vec<u32>,
284}
285
286impl<'a, 'b> PgJsonVisitor<'a, 'b> {
287 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
288 Self {
289 f,
290 objects: HashMap::new(),
291 next_id: 0,
292 with_schema: false,
293 parent_ids: Vec::new(),
294 }
295 }
296
297 pub fn with_schema(&mut self, with_schema: bool) {
299 self.with_schema = with_schema;
300 }
301
302 fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
304 match node {
305 LogicalPlan::EmptyRelation(_) => {
306 json!({
307 "Node Type": "EmptyRelation",
308 })
309 }
310 LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
311 json!({
312 "Node Type": "RecursiveQuery",
313 "Is Distinct": is_distinct,
314 })
315 }
316 LogicalPlan::Values(Values { values, .. }) => {
317 let str_values = values
318 .iter()
319 .take(5)
321 .map(|row| {
322 let item = row
323 .iter()
324 .map(|expr| expr.to_string())
325 .collect::<Vec<_>>()
326 .join(", ");
327 format!("({item})")
328 })
329 .collect::<Vec<_>>()
330 .join(", ");
331
332 let eclipse = if values.len() > 5 { "..." } else { "" };
333
334 let values_str = format!("{str_values}{eclipse}");
335 json!({
336 "Node Type": "Values",
337 "Values": values_str
338 })
339 }
340 LogicalPlan::TableScan(TableScan {
341 source,
342 table_name,
343 filters,
344 fetch,
345 ..
346 }) => {
347 let mut object = json!({
348 "Node Type": "TableScan",
349 "Relation Name": table_name.table(),
350 });
351
352 if let Some(s) = table_name.schema() {
353 object["Schema"] = serde_json::Value::String(s.to_string());
354 }
355
356 if let Some(c) = table_name.catalog() {
357 object["Catalog"] = serde_json::Value::String(c.to_string());
358 }
359
360 if !filters.is_empty() {
361 let mut full_filter = vec![];
362 let mut partial_filter = vec![];
363 let mut unsupported_filters = vec![];
364 let filters: Vec<&Expr> = filters.iter().collect();
365
366 if let Ok(results) = source.supports_filters_pushdown(&filters) {
367 filters.iter().zip(results.iter()).for_each(
368 |(x, res)| match res {
369 TableProviderFilterPushDown::Exact => full_filter.push(x),
370 TableProviderFilterPushDown::Inexact => {
371 partial_filter.push(x)
372 }
373 TableProviderFilterPushDown::Unsupported => {
374 unsupported_filters.push(x)
375 }
376 },
377 );
378 }
379
380 if !full_filter.is_empty() {
381 object["Full Filters"] =
382 serde_json::Value::String(expr_vec_fmt!(full_filter));
383 };
384 if !partial_filter.is_empty() {
385 object["Partial Filters"] =
386 serde_json::Value::String(expr_vec_fmt!(partial_filter));
387 }
388 if !unsupported_filters.is_empty() {
389 object["Unsupported Filters"] =
390 serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
391 }
392 }
393
394 if let Some(f) = fetch {
395 object["Fetch"] = serde_json::Value::Number((*f).into());
396 }
397
398 object
399 }
400 LogicalPlan::Projection(Projection { expr, .. }) => {
401 json!({
402 "Node Type": "Projection",
403 "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
404 })
405 }
406 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
407 json!({
408 "Node Type": "Projection",
409 "Operation": op.name(),
410 "Table Name": table_name.table()
411 })
412 }
413 LogicalPlan::Copy(CopyTo {
414 input: _,
415 output_url,
416 file_type,
417 partition_by: _,
418 options,
419 output_schema: _,
420 }) => {
421 let op_str = options
422 .iter()
423 .map(|(k, v)| format!("{k}={v}"))
424 .collect::<Vec<_>>()
425 .join(", ");
426 json!({
427 "Node Type": "CopyTo",
428 "Output URL": output_url,
429 "File Type": format!("{}", file_type.get_ext()),
430 "Options": op_str
431 })
432 }
433 LogicalPlan::Ddl(ddl) => {
434 json!({
435 "Node Type": "Ddl",
436 "Operation": format!("{}", ddl.display())
437 })
438 }
439 LogicalPlan::Filter(Filter {
440 predicate: expr, ..
441 }) => {
442 json!({
443 "Node Type": "Filter",
444 "Condition": format!("{}", expr)
445 })
446 }
447 LogicalPlan::Window(Window { window_expr, .. }) => {
448 json!({
449 "Node Type": "WindowAggr",
450 "Expressions": expr_vec_fmt!(window_expr)
451 })
452 }
453 LogicalPlan::Aggregate(Aggregate {
454 group_expr,
455 aggr_expr,
456 ..
457 }) => {
458 json!({
459 "Node Type": "Aggregate",
460 "Group By": expr_vec_fmt!(group_expr),
461 "Aggregates": expr_vec_fmt!(aggr_expr)
462 })
463 }
464 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
465 let mut object = json!({
466 "Node Type": "Sort",
467 "Sort Key": expr_vec_fmt!(expr),
468 });
469
470 if let Some(fetch) = fetch {
471 object["Fetch"] = serde_json::Value::Number((*fetch).into());
472 }
473
474 object
475 }
476 LogicalPlan::Join(Join {
477 on: keys,
478 filter,
479 join_constraint,
480 join_type,
481 ..
482 }) => {
483 let join_expr: Vec<String> =
484 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
485 let filter_expr = filter
486 .as_ref()
487 .map(|expr| format!(" Filter: {expr}"))
488 .unwrap_or_else(|| "".to_string());
489 json!({
490 "Node Type": format!("{} Join", join_type),
491 "Join Constraint": format!("{:?}", join_constraint),
492 "Join Keys": join_expr.join(", "),
493 "Filter": format!("{}", filter_expr)
494 })
495 }
496 LogicalPlan::Repartition(Repartition {
497 partitioning_scheme,
498 ..
499 }) => match partitioning_scheme {
500 Partitioning::RoundRobinBatch(n) => {
501 json!({
502 "Node Type": "Repartition",
503 "Partitioning Scheme": "RoundRobinBatch",
504 "Partition Count": n
505 })
506 }
507 Partitioning::Hash(expr, n) => {
508 let hash_expr: Vec<String> =
509 expr.iter().map(|e| format!("{e}")).collect();
510
511 json!({
512 "Node Type": "Repartition",
513 "Partitioning Scheme": "Hash",
514 "Partition Count": n,
515 "Partitioning Key": hash_expr
516 })
517 }
518 Partitioning::DistributeBy(expr) => {
519 let dist_by_expr: Vec<String> =
520 expr.iter().map(|e| format!("{e}")).collect();
521 json!({
522 "Node Type": "Repartition",
523 "Partitioning Scheme": "DistributeBy",
524 "Partitioning Key": dist_by_expr
525 })
526 }
527 },
528 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
529 let mut object = serde_json::json!(
530 {
531 "Node Type": "Limit",
532 }
533 );
534 if let Some(s) = skip {
535 object["Skip"] = s.to_string().into()
536 };
537 if let Some(f) = fetch {
538 object["Fetch"] = f.to_string().into()
539 };
540 object
541 }
542 LogicalPlan::Subquery(Subquery { .. }) => {
543 json!({
544 "Node Type": "Subquery"
545 })
546 }
547 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
548 json!({
549 "Node Type": "Subquery",
550 "Alias": alias.table(),
551 })
552 }
553 LogicalPlan::Statement(statement) => {
554 json!({
555 "Node Type": "Statement",
556 "Statement": format!("{}", statement.display())
557 })
558 }
559 LogicalPlan::Distinct(distinct) => match distinct {
560 Distinct::All(_) => {
561 json!({
562 "Node Type": "DistinctAll"
563 })
564 }
565 Distinct::On(DistinctOn {
566 on_expr,
567 select_expr,
568 sort_expr,
569 ..
570 }) => {
571 let mut object = json!({
572 "Node Type": "DistinctOn",
573 "On": expr_vec_fmt!(on_expr),
574 "Select": expr_vec_fmt!(select_expr),
575 });
576 if let Some(sort_expr) = sort_expr {
577 object["Sort"] =
578 serde_json::Value::String(expr_vec_fmt!(sort_expr));
579 }
580
581 object
582 }
583 },
584 LogicalPlan::Explain { .. } => {
585 json!({
586 "Node Type": "Explain"
587 })
588 }
589 LogicalPlan::Analyze { .. } => {
590 json!({
591 "Node Type": "Analyze"
592 })
593 }
594 LogicalPlan::Union(_) => {
595 json!({
596 "Node Type": "Union"
597 })
598 }
599 LogicalPlan::Extension(e) => {
600 json!({
601 "Node Type": e.node.name(),
602 "Detail": format!("{:?}", e.node)
603 })
604 }
605 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
606 json!({
607 "Node Type": "DescribeTable"
608 })
609 }
610 LogicalPlan::Unnest(Unnest {
611 input: plan,
612 list_type_columns: list_col_indices,
613 struct_type_columns: struct_col_indices,
614 ..
615 }) => {
616 let input_columns = plan.schema().columns();
617 let list_type_columns = list_col_indices
618 .iter()
619 .map(|(i, unnest_info)| {
620 format!(
621 "{}|depth={:?}",
622 &input_columns[*i].to_string(),
623 unnest_info.depth
624 )
625 })
626 .collect::<Vec<String>>();
627 let struct_type_columns = struct_col_indices
628 .iter()
629 .map(|i| &input_columns[*i])
630 .collect::<Vec<&Column>>();
631 json!({
632 "Node Type": "Unnest",
633 "ListColumn": expr_vec_fmt!(list_type_columns),
634 "StructColumn": expr_vec_fmt!(struct_type_columns),
635 })
636 }
637 }
638 }
639}
640
641impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
642 type Node = LogicalPlan;
643
644 fn f_down(
645 &mut self,
646 node: &'n LogicalPlan,
647 ) -> datafusion_common::Result<TreeNodeRecursion> {
648 let id = self.next_id;
649 self.next_id += 1;
650 let mut object = Self::to_json_value(node);
651
652 object["Plans"] = serde_json::Value::Array(vec![]);
653
654 if self.with_schema {
655 object["Output"] = serde_json::Value::Array(
656 node.schema()
657 .fields()
658 .iter()
659 .map(|f| f.name().to_string())
660 .map(serde_json::Value::String)
661 .collect(),
662 );
663 };
664
665 self.objects.insert(id, object);
666 self.parent_ids.push(id);
667 Ok(TreeNodeRecursion::Continue)
668 }
669
670 fn f_up(
671 &mut self,
672 _node: &Self::Node,
673 ) -> datafusion_common::Result<TreeNodeRecursion> {
674 let id = self.parent_ids.pop().unwrap();
675
676 let current_node = self
677 .objects
678 .remove(&id)
679 .ok_or_else(|| internal_datafusion_err!("Missing current node!"))?;
680
681 if let Some(parent_id) = self.parent_ids.last() {
682 let parent_node = self
683 .objects
684 .get_mut(parent_id)
685 .expect("Missing parent node!");
686 let plans = parent_node
687 .get_mut("Plans")
688 .and_then(|p| p.as_array_mut())
689 .expect("Plans should be an array");
690
691 plans.push(current_node);
692 } else {
693 let plan = serde_json::json!([{"Plan": current_node}]);
695 write!(
696 self.f,
697 "{}",
698 serde_json::to_string_pretty(&plan)
699 .map_err(|e| DataFusionError::External(Box::new(e)))?
700 )?;
701 }
702
703 Ok(TreeNodeRecursion::Continue)
704 }
705}
706
707#[cfg(test)]
708mod tests {
709 use arrow::datatypes::{DataType, Field};
710 use insta::assert_snapshot;
711
712 use super::*;
713
714 #[test]
715 fn test_display_empty_schema() {
716 let schema = Schema::empty();
717 assert_snapshot!(display_schema(&schema), @"[]");
718 }
719
720 #[test]
721 fn test_display_schema() {
722 let schema = Schema::new(vec![
723 Field::new("id", DataType::Int32, false),
724 Field::new("first_name", DataType::Utf8, true),
725 ]);
726
727 assert_snapshot!(display_schema(&schema), @"[id:Int32, first_name:Utf8;N]");
728 }
729}