Skip to main content

polars_stream/physical_plan/
fmt.rs

1use std::fmt::Write;
2
3use polars_plan::dsl::PartitionStrategyIR;
4use polars_plan::plans::expr_ir::ExprIR;
5use polars_plan::plans::{AExpr, EscapeLabel};
6use polars_plan::prelude::FileWriteFormat;
7use polars_time::ClosedWindow;
8#[cfg(feature = "dynamic_group_by")]
9use polars_time::DynamicGroupOptions;
10use polars_utils::arena::Arena;
11use polars_utils::slice_enum::Slice;
12use slotmap::{Key, SecondaryMap, SlotMap};
13
14use super::{PhysNode, PhysNodeKey, PhysNodeKind};
15use crate::physical_plan::ZipBehavior;
16
17/// A style of a graph node.
18pub enum NodeStyle {
19    InMemoryFallback,
20    MemoryIntensive,
21    Generic,
22}
23
24impl NodeStyle {
25    const COLOR_IN_MEM_FALLBACK: &str = "0.0 0.3 1.0"; // Pastel red
26    const COLOR_MEM_INTENSIVE: &str = "0.16 0.3 1.0"; // Pastel yellow
27
28    /// Returns a style for a node kind.
29    pub fn for_node_kind(kind: &PhysNodeKind) -> Self {
30        use PhysNodeKind as K;
31        match kind {
32            K::InMemoryMap { .. } | K::InMemoryJoin { .. } => Self::InMemoryFallback,
33            K::InMemorySource { .. }
34            | K::InputIndependentSelect { .. }
35            | K::NegativeSlice { .. }
36            | K::InMemorySink { .. }
37            | K::Sort { .. }
38            | K::GroupBy { .. }
39            | K::EquiJoin { .. }
40            | K::SemiAntiJoin { .. }
41            | K::Multiplexer { .. } => Self::MemoryIntensive,
42            #[cfg(feature = "merge_sorted")]
43            K::MergeSorted { .. } => Self::MemoryIntensive,
44            _ => Self::Generic,
45        }
46    }
47
48    /// Returns extra styling attributes (if any) for the graph node.
49    pub fn node_attrs(&self) -> Option<String> {
50        match self {
51            Self::InMemoryFallback => Some(format!(
52                "style=filled,fillcolor=\"{}\"",
53                Self::COLOR_IN_MEM_FALLBACK
54            )),
55            Self::MemoryIntensive => Some(format!(
56                "style=filled,fillcolor=\"{}\"",
57                Self::COLOR_MEM_INTENSIVE
58            )),
59            Self::Generic => None,
60        }
61    }
62
63    /// Returns a legend explaining the node style meaning.
64    pub fn legend() -> String {
65        format!(
66            "fontname=\"Helvetica\"\nfontsize=\"10\"\nlabelloc=\"b\"\nlabel=<<BR/><BR/><B>Legend</B><BR/><BR/>◯ streaming engine node <FONT COLOR=\"{}\">⬤</FONT> potentially memory-intensive node <FONT COLOR=\"{}\">⬤</FONT> in-memory engine fallback>",
67            Self::COLOR_MEM_INTENSIVE,
68            Self::COLOR_IN_MEM_FALLBACK,
69        )
70    }
71}
72
73fn escape_graphviz(s: &str) -> String {
74    s.replace('\\', "\\\\")
75        .replace('\n', "\\n")
76        .replace('"', "\\\"")
77}
78
79fn fmt_expr(f: &mut dyn Write, expr: &ExprIR, expr_arena: &Arena<AExpr>) -> std::fmt::Result {
80    // Remove the alias to make the display better
81    let without_alias = ExprIR::from_node(expr.node(), expr_arena);
82    write!(
83        f,
84        "{} = {}",
85        expr.output_name(),
86        without_alias.display(expr_arena)
87    )
88}
89
90pub enum FormatExprStyle {
91    Select,
92    NoAliases,
93}
94
95pub fn fmt_exprs_to_label(
96    exprs: &[ExprIR],
97    expr_arena: &Arena<AExpr>,
98    style: FormatExprStyle,
99) -> String {
100    let mut buffer = String::new();
101    let mut f = EscapeLabel(&mut buffer);
102    fmt_exprs(&mut f, exprs, expr_arena, style);
103    buffer
104}
105
106pub fn fmt_exprs(
107    f: &mut dyn Write,
108    exprs: &[ExprIR],
109    expr_arena: &Arena<AExpr>,
110    style: FormatExprStyle,
111) {
112    if matches!(style, FormatExprStyle::Select) {
113        let mut formatted = Vec::new();
114
115        let mut max_name_width = 0;
116        let mut max_expr_width = 0;
117
118        for e in exprs {
119            let mut name = String::new();
120            let mut expr = String::new();
121
122            // Remove the alias to make the display better
123            let without_alias = ExprIR::from_node(e.node(), expr_arena);
124
125            write!(name, "{}", e.output_name()).unwrap();
126            write!(expr, "{}", without_alias.display(expr_arena)).unwrap();
127
128            max_name_width = max_name_width.max(name.chars().count());
129            max_expr_width = max_expr_width.max(expr.chars().count());
130
131            formatted.push((name, expr));
132        }
133
134        for (name, expr) in formatted {
135            writeln!(f, "{name:>max_name_width$} = {expr:<max_expr_width$}").unwrap();
136        }
137    } else {
138        let Some(e) = exprs.first() else {
139            return;
140        };
141
142        fmt_expr(f, e, expr_arena).unwrap();
143
144        for e in &exprs[1..] {
145            f.write_str("\n").unwrap();
146            fmt_expr(f, e, expr_arena).unwrap();
147        }
148    }
149}
150
151#[recursive::recursive]
152fn visualize_plan_rec(
153    node_key: PhysNodeKey,
154    phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
155    expr_arena: &Arena<AExpr>,
156    visited: &mut SecondaryMap<PhysNodeKey, ()>,
157    out: &mut Vec<String>,
158) {
159    if visited.contains_key(node_key) {
160        return;
161    }
162    visited.insert(node_key, ());
163
164    let kind = &phys_sm[node_key].kind;
165
166    use std::slice::from_ref;
167    let (label, inputs) = match kind {
168        PhysNodeKind::InMemorySource {
169            df,
170            disable_morsel_split: _,
171        } => (
172            format!(
173                "in-memory-source\\ncols: {}",
174                df.get_column_names_owned().join(", ")
175            ),
176            &[][..],
177        ),
178        #[cfg(feature = "python")]
179        PhysNodeKind::PythonScan { .. } => ("python-scan".to_string(), &[][..]),
180        PhysNodeKind::SinkMultiple { sinks } => {
181            for sink in sinks {
182                visualize_plan_rec(*sink, phys_sm, expr_arena, visited, out);
183            }
184            return;
185        },
186        PhysNodeKind::Select {
187            input,
188            selectors,
189            extend_original,
190        } => {
191            let label = if *extend_original {
192                "with-columns"
193            } else {
194                "select"
195            };
196            (
197                format!(
198                    "{label}\\n{}",
199                    fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)
200                ),
201                from_ref(input),
202            )
203        },
204        PhysNodeKind::WithRowIndex {
205            input,
206            name,
207            offset,
208        } => (
209            format!("with-row-index\\nname: {name}\\noffset: {offset:?}"),
210            from_ref(input),
211        ),
212        PhysNodeKind::InputIndependentSelect { selectors } => (
213            format!(
214                "input-independent-select\\n{}",
215                fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)
216            ),
217            &[][..],
218        ),
219        PhysNodeKind::Reduce { input, exprs } => (
220            format!(
221                "reduce\\n{}",
222                fmt_exprs_to_label(exprs, expr_arena, FormatExprStyle::Select)
223            ),
224            from_ref(input),
225        ),
226        PhysNodeKind::StreamingSlice {
227            input,
228            offset,
229            length,
230        } => (
231            format!("slice\\noffset: {offset}, length: {length}"),
232            from_ref(input),
233        ),
234        PhysNodeKind::NegativeSlice {
235            input,
236            offset,
237            length,
238        } => (
239            format!("slice\\noffset: {offset}, length: {length}"),
240            from_ref(input),
241        ),
242        PhysNodeKind::DynamicSlice {
243            input,
244            offset,
245            length,
246        } => ("slice".to_owned(), &[*input, *offset, *length][..]),
247        PhysNodeKind::Shift {
248            input,
249            offset,
250            fill: Some(fill),
251        } => ("shift".to_owned(), &[*input, *offset, *fill][..]),
252        PhysNodeKind::Shift {
253            input,
254            offset,
255            fill: None,
256        } => ("shift".to_owned(), &[*input, *offset][..]),
257        PhysNodeKind::Filter { input, predicate } => (
258            format!(
259                "filter\\n{}",
260                fmt_exprs_to_label(from_ref(predicate), expr_arena, FormatExprStyle::Select)
261            ),
262            from_ref(input),
263        ),
264        PhysNodeKind::SimpleProjection { input, columns } => (
265            format!("select\\ncols: {}", columns.join(", ")),
266            from_ref(input),
267        ),
268        PhysNodeKind::InMemorySink { input } => ("in-memory-sink".to_string(), from_ref(input)),
269        PhysNodeKind::CallbackSink { input, .. } => ("callback-sink".to_string(), from_ref(input)),
270        PhysNodeKind::FileSink { input, options } => match options.file_format {
271            #[cfg(feature = "parquet")]
272            FileWriteFormat::Parquet(_) => ("parquet-sink".to_string(), from_ref(input)),
273            #[cfg(feature = "ipc")]
274            FileWriteFormat::Ipc(_) => ("ipc-sink".to_string(), from_ref(input)),
275            #[cfg(feature = "csv")]
276            FileWriteFormat::Csv(_) => ("csv-sink".to_string(), from_ref(input)),
277            #[cfg(feature = "json")]
278            FileWriteFormat::NDJson(_) => ("ndjson-sink".to_string(), from_ref(input)),
279            #[allow(unreachable_patterns)]
280            _ => todo!(),
281        },
282        PhysNodeKind::PartitionedSink { input, options } => {
283            let variant = match options.partition_strategy {
284                PartitionStrategyIR::Keyed { .. } => "partition-keyed",
285                PartitionStrategyIR::FileSize => "partition-file-size",
286            };
287
288            match options.file_format {
289                #[cfg(feature = "parquet")]
290                FileWriteFormat::Parquet(_) => (format!("{variant}[parquet]"), from_ref(input)),
291                #[cfg(feature = "ipc")]
292                FileWriteFormat::Ipc(_) => (format!("{variant}[ipc]"), from_ref(input)),
293                #[cfg(feature = "csv")]
294                FileWriteFormat::Csv(_) => (format!("{variant}[csv]"), from_ref(input)),
295                #[cfg(feature = "json")]
296                FileWriteFormat::NDJson(_) => (format!("{variant}[ndjson]"), from_ref(input)),
297                #[allow(unreachable_patterns)]
298                _ => todo!(),
299            }
300        },
301        PhysNodeKind::InMemoryMap {
302            input,
303            map: _,
304            format_str,
305        } => {
306            let mut label = String::new();
307            label.push_str("in-memory-map");
308            if let Some(format_str) = format_str {
309                label.write_str("\\n").unwrap();
310
311                let mut f = EscapeLabel(&mut label);
312                f.write_str(format_str).unwrap();
313            }
314            (label, from_ref(input))
315        },
316        PhysNodeKind::Map {
317            input,
318            map: _,
319            format_str,
320        } => {
321            let mut label = String::new();
322            label.push_str("map");
323            if let Some(format_str) = format_str {
324                label.push_str("\\n");
325
326                let mut f = EscapeLabel(&mut label);
327                f.write_str(format_str).unwrap();
328            }
329            (label, from_ref(input))
330        },
331        PhysNodeKind::SortedGroupBy {
332            input,
333            key,
334            aggs,
335            slice,
336        } => {
337            let mut s = String::new();
338            s.push_str("sorted-group-by\\n");
339            let f = &mut s;
340            write!(f, "key: {key}\\n").unwrap();
341            if let Some((offset, length)) = slice {
342                write!(f, "slice: {offset}, {length}\\n").unwrap();
343            }
344            write!(
345                f,
346                "aggs:\\n{}",
347                fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)
348            )
349            .unwrap();
350
351            (s, from_ref(input))
352        },
353        PhysNodeKind::Sort {
354            input,
355            by_column,
356            slice: _,
357            sort_options: _,
358        } => (
359            format!(
360                "sort\\n{}",
361                fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)
362            ),
363            from_ref(input),
364        ),
365        PhysNodeKind::TopK {
366            input,
367            k,
368            by_column,
369            reverse,
370            nulls_last: _,
371        } => {
372            let name = if reverse.iter().all(|r| *r) {
373                "bottom-k"
374            } else {
375                "top-k"
376            };
377            (
378                format!(
379                    "{name}\\n{}",
380                    fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)
381                ),
382                &[*input, *k][..],
383            )
384        },
385        PhysNodeKind::Repeat { value, repeats } => ("repeat".to_owned(), &[*value, *repeats][..]),
386        #[cfg(feature = "cum_agg")]
387        PhysNodeKind::CumAgg { input, kind } => {
388            use crate::nodes::cum_agg::CumAggKind;
389
390            (
391                format!(
392                    "cum_{}",
393                    match kind {
394                        CumAggKind::Min => "min",
395                        CumAggKind::Max => "max",
396                        CumAggKind::Sum => "sum",
397                        CumAggKind::Count => "count",
398                        CumAggKind::Prod => "prod",
399                    }
400                ),
401                &[*input][..],
402            )
403        },
404        PhysNodeKind::GatherEvery { input, n, offset } => (
405            format!("gather_every\\nn: {n}, offset: {offset}"),
406            &[*input][..],
407        ),
408        PhysNodeKind::Rle(input) => ("rle".to_owned(), &[*input][..]),
409        PhysNodeKind::RleId(input) => ("rle_id".to_owned(), &[*input][..]),
410        PhysNodeKind::PeakMinMax { input, is_peak_max } => (
411            if *is_peak_max { "peak_max" } else { "peak_min" }.to_owned(),
412            &[*input][..],
413        ),
414        PhysNodeKind::OrderedUnion { inputs } => ("ordered-union".to_string(), inputs.as_slice()),
415        PhysNodeKind::UnorderedUnion { inputs } => {
416            ("unordered-union".to_string(), inputs.as_slice())
417        },
418        PhysNodeKind::Zip {
419            inputs,
420            zip_behavior,
421        } => {
422            let label = match zip_behavior {
423                ZipBehavior::NullExtend => "zip-null-extend",
424                ZipBehavior::Broadcast => "zip-broadcast",
425                ZipBehavior::Strict => "zip-strict",
426            };
427            (label.to_string(), inputs.as_slice())
428        },
429        PhysNodeKind::Multiplexer { input } => ("multiplexer".to_string(), from_ref(input)),
430        PhysNodeKind::MultiScan {
431            scan_sources,
432            file_reader_builder,
433            cloud_options: _,
434            file_projection_builder,
435            output_schema,
436            row_index,
437            pre_slice,
438            predicate,
439            predicate_file_skip_applied: _,
440            hive_parts,
441            include_file_paths,
442            cast_columns_policy: _,
443            missing_columns_policy: _,
444            forbid_extra_columns: _,
445            deletion_files,
446            table_statistics: _,
447            file_schema: _,
448            disable_morsel_split: _,
449        } => {
450            let mut out = format!("multi-scan[{}]", file_reader_builder.reader_name());
451            let mut f = EscapeLabel(&mut out);
452
453            write!(f, "\n{} source", scan_sources.len()).unwrap();
454
455            if scan_sources.len() != 1 {
456                write!(f, "s").unwrap();
457            }
458
459            write!(
460                f,
461                "\nproject: {} total, {} from file",
462                output_schema.len(),
463                file_projection_builder.num_projections(),
464            )
465            .unwrap();
466
467            if let Some(ri) = row_index {
468                write!(f, "\nrow index: name: {}, offset: {:?}", ri.name, ri.offset).unwrap();
469            }
470
471            if let Some(col_name) = include_file_paths {
472                write!(f, "\nfile path column: {col_name}").unwrap();
473            }
474
475            if let Some(pre_slice) = pre_slice {
476                write!(f, "\nslice: offset: ").unwrap();
477
478                match pre_slice {
479                    Slice::Positive { offset, len: _ } => write!(f, "{}", *offset),
480                    Slice::Negative {
481                        offset_from_end,
482                        len: _,
483                    } => write!(f, "-{}", *offset_from_end),
484                }
485                .unwrap();
486
487                write!(f, ", len: {}", pre_slice.len()).unwrap()
488            }
489
490            if let Some(predicate) = predicate {
491                write!(f, "\nfilter: {}", predicate.display(expr_arena)).unwrap();
492            }
493
494            if let Some(v) = hive_parts.as_ref().map(|h| h.df().width()) {
495                write!(f, "\nhive: {v} column").unwrap();
496
497                if v != 1 {
498                    write!(f, "s").unwrap();
499                }
500            }
501
502            if let Some(deletion_files) = deletion_files {
503                write!(f, "\n{deletion_files}").unwrap();
504            }
505
506            (out, &[][..])
507        },
508        PhysNodeKind::GroupBy {
509            inputs,
510            key_per_input,
511            aggs_per_input,
512        } => {
513            let mut out = String::from("group-by");
514            for (key, aggs) in key_per_input.iter().zip(aggs_per_input) {
515                write!(
516                    &mut out,
517                    "\\nkey:\\n{}\\naggs:\\n{}",
518                    fmt_exprs_to_label(key, expr_arena, FormatExprStyle::Select),
519                    fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)
520                )
521                .ok();
522            }
523            (out, inputs.as_slice())
524        },
525        #[cfg(feature = "dynamic_group_by")]
526        PhysNodeKind::DynamicGroupBy {
527            input,
528            options,
529            aggs,
530            slice,
531        } => {
532            use polars_time::prelude::{Label, StartBy};
533
534            let DynamicGroupOptions {
535                index_column,
536                every,
537                period,
538                offset,
539                label,
540                include_boundaries,
541                closed_window,
542                start_by,
543            } = options;
544            let mut s = String::new();
545            let f = &mut s;
546            f.write_str("dynamic-group-by\\n").unwrap();
547            write!(f, "index column: {index_column}\\n").unwrap();
548            write!(f, "every: {every}").unwrap();
549            if every != period {
550                write!(f, ", period: {period}").unwrap();
551            }
552            if !offset.is_zero() {
553                write!(f, ", offset: {offset}").unwrap();
554            }
555            f.write_str("\\n").unwrap();
556            if *label != Label::Left {
557                write!(f, "label: {}\\n", <&'static str>::from(label)).unwrap();
558            }
559            if *include_boundaries {
560                write!(f, "include_boundaries: true\\n").unwrap();
561            }
562            if *start_by != StartBy::WindowBound {
563                write!(f, "start_by: {}\\n", <&'static str>::from(start_by)).unwrap();
564            }
565            if *closed_window != ClosedWindow::Left {
566                write!(
567                    f,
568                    "closed_window: {}\\n",
569                    <&'static str>::from(closed_window)
570                )
571                .unwrap();
572            }
573            if let Some((offset, length)) = slice {
574                write!(f, "slice: {offset}, {length}\\n").unwrap();
575            }
576            write!(
577                f,
578                "aggs:\\n{}",
579                fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)
580            )
581            .unwrap();
582
583            (s, from_ref(input))
584        },
585        #[cfg(feature = "dynamic_group_by")]
586        PhysNodeKind::RollingGroupBy {
587            input,
588            index_column,
589            period,
590            offset,
591            closed,
592            slice,
593            aggs,
594        } => {
595            let mut s = String::new();
596            let f = &mut s;
597            f.write_str("rolling-group-by\\n").unwrap();
598            write!(f, "index column: {index_column}\\n").unwrap();
599            write!(f, "period: {period}, offset: {offset}\\n").unwrap();
600            write!(f, "closed: {}\\n", <&'static str>::from(*closed)).unwrap();
601            if let Some((offset, length)) = slice {
602                write!(f, "slice: {offset}, {length}\\n").unwrap();
603            }
604            write!(
605                f,
606                "aggs:\\n{}",
607                fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)
608            )
609            .unwrap();
610
611            (s, from_ref(input))
612        },
613        PhysNodeKind::MergeJoin {
614            input_left,
615            input_right,
616            left_on,
617            right_on,
618            args,
619            ..
620        } => {
621            let mut label = "merge-join".to_string();
622            let how: &'static str = (&args.how).into();
623            write!(
624                label,
625                r"\nleft_on:\n{}",
626                left_on
627                    .iter()
628                    .map(|s| escape_graphviz(&s[..]))
629                    .collect::<Vec<_>>()
630                    .join("\n"),
631            )
632            .unwrap();
633            write!(
634                label,
635                r"\nright_on:\n{}",
636                right_on
637                    .iter()
638                    .map(|s| escape_graphviz(&s[..]))
639                    .collect::<Vec<_>>()
640                    .join("\n"),
641            )
642            .unwrap();
643            write!(label, r"\nhow: {}", escape_graphviz(how)).unwrap();
644            if args.nulls_equal {
645                write!(label, r"\njoin-nulls").unwrap();
646            }
647            (label, &[*input_left, *input_right][..])
648        },
649        PhysNodeKind::InMemoryJoin {
650            input_left,
651            input_right,
652            left_on,
653            right_on,
654            args,
655            ..
656        }
657        | PhysNodeKind::EquiJoin {
658            input_left,
659            input_right,
660            left_on,
661            right_on,
662            args,
663        }
664        | PhysNodeKind::SemiAntiJoin {
665            input_left,
666            input_right,
667            left_on,
668            right_on,
669            args,
670            output_bool: _,
671        } => {
672            let label = match phys_sm[node_key].kind {
673                PhysNodeKind::MergeJoin { .. } => "merge-join",
674                PhysNodeKind::EquiJoin { .. } => "equi-join",
675                PhysNodeKind::InMemoryJoin { .. } => "in-memory-join",
676                PhysNodeKind::CrossJoin { .. } => "cross-join",
677                PhysNodeKind::SemiAntiJoin {
678                    output_bool: false, ..
679                } if args.how.is_semi() => "semi-join",
680                PhysNodeKind::SemiAntiJoin {
681                    output_bool: false, ..
682                } if args.how.is_anti() => "anti-join",
683                PhysNodeKind::SemiAntiJoin {
684                    output_bool: true, ..
685                } if args.how.is_semi() => "is-in",
686                PhysNodeKind::SemiAntiJoin {
687                    output_bool: true, ..
688                } if args.how.is_anti() => "is-not-in",
689                _ => unreachable!(),
690            };
691            let mut label = label.to_string();
692            write!(
693                label,
694                r"\nleft_on:\n{}",
695                fmt_exprs_to_label(left_on, expr_arena, FormatExprStyle::NoAliases)
696            )
697            .unwrap();
698            write!(
699                label,
700                r"\nright_on:\n{}",
701                fmt_exprs_to_label(right_on, expr_arena, FormatExprStyle::NoAliases)
702            )
703            .unwrap();
704            if args.how.is_equi() {
705                write!(
706                    label,
707                    r"\nhow: {}",
708                    escape_graphviz(&format!("{:?}", args.how))
709                )
710                .unwrap();
711            }
712            if args.nulls_equal {
713                write!(label, r"\njoin-nulls").unwrap();
714            }
715            (label, &[*input_left, *input_right][..])
716        },
717        PhysNodeKind::CrossJoin {
718            input_left,
719            input_right,
720            args: _,
721        } => ("cross-join".to_string(), &[*input_left, *input_right][..]),
722        #[cfg(feature = "merge_sorted")]
723        PhysNodeKind::MergeSorted {
724            input_left,
725            input_right,
726        } => ("merge-sorted".to_string(), &[*input_left, *input_right][..]),
727        #[cfg(feature = "ewma")]
728        PhysNodeKind::EwmMean { input, options: _ } => ("ewm-mean".to_string(), &[*input][..]),
729        #[cfg(feature = "ewma")]
730        PhysNodeKind::EwmVar { input, options: _ } => ("ewm-var".to_string(), &[*input][..]),
731        #[cfg(feature = "ewma")]
732        PhysNodeKind::EwmStd { input, options: _ } => ("ewm-std".to_string(), &[*input][..]),
733    };
734
735    let node_id = node_key.data().as_ffi();
736    let style = NodeStyle::for_node_kind(kind);
737
738    if let Some(attrs) = style.node_attrs() {
739        out.push(format!("{node_id} [label=\"{label}\",{attrs}];"));
740    } else {
741        out.push(format!("{node_id} [label=\"{label}\"];"));
742    }
743    for input in inputs {
744        visualize_plan_rec(input.node, phys_sm, expr_arena, visited, out);
745        out.push(format!(
746            "{} -> {};",
747            input.node.data().as_ffi(),
748            node_key.data().as_ffi()
749        ));
750    }
751}
752
753pub fn visualize_plan(
754    root: PhysNodeKey,
755    phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
756    expr_arena: &Arena<AExpr>,
757) -> String {
758    let mut visited: SecondaryMap<PhysNodeKey, ()> = SecondaryMap::new();
759    let mut out = Vec::with_capacity(phys_sm.len() + 3);
760    out.push("digraph polars {\nrankdir=\"BT\"\nnode [fontname=\"Monospace\"]".to_string());
761    out.push(NodeStyle::legend());
762    visualize_plan_rec(root, phys_sm, expr_arena, &mut visited, &mut out);
763    out.push("}".to_string());
764    out.join("\n")
765}