1use std::fmt;
2use std::path::PathBuf;
3
4use polars_core::schema::Schema;
5use polars_utils::pl_str::PlSmallStr;
6
7use super::format::ExprIRSliceDisplay;
8use crate::constants::UNLIMITED_CACHE;
9use crate::prelude::ir::format::ColumnsDisplay;
10use crate::prelude::*;
11
12pub struct IRDotDisplay<'a> {
13 is_streaming: bool,
14 lp: IRPlanRef<'a>,
15}
16
17const INDENT: &str = " ";
18
19#[derive(Clone, Copy)]
20enum DotNode {
21 Plain(usize),
22 Cache(usize),
23}
24
25impl fmt::Display for DotNode {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 match self {
28 DotNode::Plain(n) => write!(f, "p{n}"),
29 DotNode::Cache(n) => write!(f, "c{n}"),
30 }
31 }
32}
33
34#[inline(always)]
35fn write_label<'a, 'b>(
36 f: &'a mut fmt::Formatter<'b>,
37 id: DotNode,
38 mut w: impl FnMut(&mut EscapeLabel<'a>) -> fmt::Result,
39) -> fmt::Result {
40 write!(f, "{INDENT}{id}[label=\"")?;
41
42 let mut escaped = EscapeLabel(f);
43 w(&mut escaped)?;
44 let EscapeLabel(f) = escaped;
45
46 writeln!(f, "\"]")?;
47
48 Ok(())
49}
50
51impl<'a> IRDotDisplay<'a> {
52 pub fn new(lp: IRPlanRef<'a>) -> Self {
53 if let Some(streaming_lp) = lp.extract_streaming_plan() {
54 return Self::new_streaming(streaming_lp);
55 }
56
57 Self {
58 is_streaming: false,
59 lp,
60 }
61 }
62
63 fn new_streaming(lp: IRPlanRef<'a>) -> Self {
64 Self {
65 is_streaming: true,
66 lp,
67 }
68 }
69
70 fn with_root(&self, root: Node) -> Self {
71 Self {
72 is_streaming: false,
73 lp: self.lp.with_root(root),
74 }
75 }
76
77 fn display_expr(&self, expr: &'a ExprIR) -> ExprIRDisplay<'a> {
78 expr.display(self.lp.expr_arena)
79 }
80
81 fn display_exprs(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> {
82 ExprIRSliceDisplay {
83 exprs,
84 expr_arena: self.lp.expr_arena,
85 }
86 }
87
88 fn _format(
89 &self,
90 f: &mut fmt::Formatter<'_>,
91 parent: Option<DotNode>,
92 last: &mut usize,
93 ) -> std::fmt::Result {
94 use fmt::Write;
95
96 let root = self.lp.root();
97
98 let mut parent = parent;
99 if self.is_streaming {
100 *last += 1;
101 let streaming_node = DotNode::Plain(*last);
102
103 if let Some(parent) = parent {
104 writeln!(f, "{INDENT}{parent} -- {streaming_node}")?;
105 write_label(f, streaming_node, |f| f.write_str("STREAMING"))?;
106 }
107
108 parent = Some(streaming_node);
109 }
110 let parent = parent;
111
112 let id = if let IR::Cache { id, .. } = root {
113 DotNode::Cache(*id)
114 } else {
115 *last += 1;
116 DotNode::Plain(*last)
117 };
118
119 if let Some(parent) = parent {
120 writeln!(f, "{INDENT}{parent} -- {id}")?;
121 }
122
123 use IR::*;
124 match root {
125 Union { inputs, .. } => {
126 for input in inputs {
127 self.with_root(*input)._format(f, Some(id), last)?;
128 }
129
130 write_label(f, id, |f| f.write_str("UNION"))?;
131 },
132 HConcat { inputs, .. } => {
133 for input in inputs {
134 self.with_root(*input)._format(f, Some(id), last)?;
135 }
136
137 write_label(f, id, |f| f.write_str("HCONCAT"))?;
138 },
139 Cache {
140 input, cache_hits, ..
141 } => {
142 self.with_root(*input)._format(f, Some(id), last)?;
143
144 if *cache_hits == UNLIMITED_CACHE {
145 write_label(f, id, |f| f.write_str("CACHE"))?;
146 } else {
147 write_label(f, id, |f| write!(f, "CACHE: {cache_hits} times"))?;
148 };
149 },
150 Filter { predicate, input } => {
151 self.with_root(*input)._format(f, Some(id), last)?;
152
153 let pred = self.display_expr(predicate);
154 write_label(f, id, |f| write!(f, "FILTER BY {pred}"))?;
155 },
156 #[cfg(feature = "python")]
157 PythonScan { options } => {
158 let predicate = match &options.predicate {
159 PythonPredicate::Polars(e) => format!("{}", self.display_expr(e)),
160 PythonPredicate::PyArrow(s) => s.clone(),
161 PythonPredicate::None => "none".to_string(),
162 };
163 let with_columns = NumColumns(options.with_columns.as_ref().map(|s| s.as_ref()));
164 let total_columns = options.schema.len();
165
166 write_label(f, id, |f| {
167 write!(
168 f,
169 "PYTHON SCAN\nπ {with_columns}/{total_columns};\nσ {predicate}"
170 )
171 })?
172 },
173 Select {
174 expr,
175 input,
176 schema,
177 ..
178 } => {
179 self.with_root(*input)._format(f, Some(id), last)?;
180 write_label(f, id, |f| write!(f, "π {}/{}", expr.len(), schema.len()))?;
181 },
182 Sort {
183 input, by_column, ..
184 } => {
185 let by_column = self.display_exprs(by_column);
186 self.with_root(*input)._format(f, Some(id), last)?;
187 write_label(f, id, |f| write!(f, "SORT BY {by_column}"))?;
188 },
189 GroupBy {
190 input, keys, aggs, ..
191 } => {
192 let keys = self.display_exprs(keys);
193 let aggs = self.display_exprs(aggs);
194 self.with_root(*input)._format(f, Some(id), last)?;
195 write_label(f, id, |f| write!(f, "AGG {aggs}\nBY\n{keys}"))?;
196 },
197 HStack { input, exprs, .. } => {
198 let exprs = self.display_exprs(exprs);
199 self.with_root(*input)._format(f, Some(id), last)?;
200 write_label(f, id, |f| write!(f, "WITH COLUMNS {exprs}"))?;
201 },
202 Slice { input, offset, len } => {
203 self.with_root(*input)._format(f, Some(id), last)?;
204 write_label(f, id, |f| write!(f, "SLICE offset: {offset}; len: {len}"))?;
205 },
206 Distinct { input, options, .. } => {
207 self.with_root(*input)._format(f, Some(id), last)?;
208 write_label(f, id, |f| {
209 f.write_str("DISTINCT")?;
210
211 if let Some(subset) = &options.subset {
212 f.write_str(" BY ")?;
213
214 let mut subset = subset.iter();
215
216 if let Some(fst) = subset.next() {
217 f.write_str(fst)?;
218 for name in subset {
219 write!(f, ", \"{name}\"")?;
220 }
221 } else {
222 f.write_str("None")?;
223 }
224 }
225
226 Ok(())
227 })?;
228 },
229 DataFrameScan {
230 schema,
231 output_schema,
232 ..
233 } => {
234 let num_columns = NumColumnsSchema(output_schema.as_ref().map(|p| p.as_ref()));
235 let total_columns = schema.len();
236
237 write_label(f, id, |f| {
238 write!(f, "TABLE\nπ {num_columns}/{total_columns}")
239 })?;
240 },
241 Scan {
242 sources,
243 file_info,
244 hive_parts: _,
245 predicate,
246 scan_type,
247 unified_scan_args,
248 output_schema: _,
249 id: _,
250 } => {
251 let name: &str = (&**scan_type).into();
252 let path = ScanSourcesDisplay(sources);
253 let with_columns = unified_scan_args
254 .projection
255 .as_ref()
256 .map(|cols| cols.as_ref());
257 let with_columns = NumColumns(with_columns);
258 let total_columns =
259 file_info.schema.len() - usize::from(unified_scan_args.row_index.is_some());
260
261 write_label(f, id, |f| {
262 write!(f, "{name} SCAN {path}\nπ {with_columns}/{total_columns};",)?;
263
264 if let Some(predicate) = predicate.as_ref() {
265 write!(f, "\nσ {}", self.display_expr(predicate))?;
266 }
267
268 if let Some(row_index) = unified_scan_args.row_index.as_ref() {
269 write!(f, "\nrow index: {} (+{})", row_index.name, row_index.offset)?;
270 }
271
272 Ok(())
273 })?;
274 },
275 Join {
276 input_left,
277 input_right,
278 left_on,
279 right_on,
280 options,
281 ..
282 } => {
283 self.with_root(*input_left)._format(f, Some(id), last)?;
284 self.with_root(*input_right)._format(f, Some(id), last)?;
285
286 let left_on = self.display_exprs(left_on);
287 let right_on = self.display_exprs(right_on);
288
289 write_label(f, id, |f| {
290 write!(
291 f,
292 "JOIN {}\nleft: {left_on};\nright: {right_on}",
293 options.args.how
294 )
295 })?;
296 },
297 MapFunction {
298 input, function, ..
299 } => {
300 if let Some(streaming_lp) = function.to_streaming_lp() {
301 Self::new_streaming(streaming_lp)._format(f, Some(id), last)?;
302 } else {
303 self.with_root(*input)._format(f, Some(id), last)?;
304 write_label(f, id, |f| write!(f, "{function}"))?;
305 }
306 },
307 ExtContext { input, .. } => {
308 self.with_root(*input)._format(f, Some(id), last)?;
309 write_label(f, id, |f| f.write_str("EXTERNAL_CONTEXT"))?;
310 },
311 Sink { input, payload, .. } => {
312 self.with_root(*input)._format(f, Some(id), last)?;
313
314 write_label(f, id, |f| {
315 f.write_str(match payload {
316 SinkTypeIR::Memory => "SINK (MEMORY)",
317 SinkTypeIR::File { .. } => "SINK (FILE)",
318 SinkTypeIR::Partition { .. } => "SINK (PARTITION)",
319 })
320 })?;
321 },
322 SinkMultiple { inputs } => {
323 for input in inputs {
324 self.with_root(*input)._format(f, Some(id), last)?;
325 }
326
327 write_label(f, id, |f| f.write_str("SINK MULTIPLE"))?;
328 },
329 SimpleProjection { input, columns } => {
330 let num_columns = columns.as_ref().len();
331 let total_columns = self.lp.lp_arena.get(*input).schema(self.lp.lp_arena).len();
332
333 let columns = ColumnsDisplay(columns.as_ref());
334 self.with_root(*input)._format(f, Some(id), last)?;
335 write_label(f, id, |f| {
336 write!(f, "simple π {num_columns}/{total_columns}\n[{columns}]")
337 })?;
338 },
339 #[cfg(feature = "merge_sorted")]
340 MergeSorted {
341 input_left,
342 input_right,
343 key,
344 } => {
345 self.with_root(*input_left)._format(f, Some(id), last)?;
346 self.with_root(*input_right)._format(f, Some(id), last)?;
347
348 write_label(f, id, |f| write!(f, "MERGE_SORTED ON '{key}'",))?;
349 },
350 Invalid => write_label(f, id, |f| f.write_str("INVALID"))?,
351 }
352
353 Ok(())
354 }
355}
356
357pub struct PathsDisplay<'a>(pub &'a [PathBuf]);
359pub struct ScanSourcesDisplay<'a>(pub &'a ScanSources);
360struct NumColumns<'a>(Option<&'a [PlSmallStr]>);
361struct NumColumnsSchema<'a>(Option<&'a Schema>);
362
363impl fmt::Display for ScanSourceRef<'_> {
364 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365 match self {
366 ScanSourceRef::Path(path) => path.display().fmt(f),
367 ScanSourceRef::File(_) => f.write_str("open-file"),
368 ScanSourceRef::Buffer(buff) => write!(f, "{} in-mem bytes", buff.len()),
369 }
370 }
371}
372
373impl fmt::Display for ScanSourcesDisplay<'_> {
374 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375 match self.0.len() {
376 0 => write!(f, "[]"),
377 1 => write!(f, "[{}]", self.0.at(0)),
378 2 => write!(f, "[{}, {}]", self.0.at(0), self.0.at(1)),
379 _ => write!(
380 f,
381 "[{}, ... {} other sources]",
382 self.0.at(0),
383 self.0.len() - 1,
384 ),
385 }
386 }
387}
388
389impl fmt::Display for PathsDisplay<'_> {
390 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391 match self.0.len() {
392 0 => write!(f, "[]"),
393 1 => write!(f, "[{}]", self.0[0].display()),
394 2 => write!(f, "[{}, {}]", self.0[0].display(), self.0[1].display()),
395 _ => write!(
396 f,
397 "[{}, ... {} other files]",
398 self.0[0].display(),
399 self.0.len() - 1,
400 ),
401 }
402 }
403}
404
405impl fmt::Display for NumColumns<'_> {
406 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
407 match self.0 {
408 None => f.write_str("*"),
409 Some(columns) => columns.len().fmt(f),
410 }
411 }
412}
413
414impl fmt::Display for NumColumnsSchema<'_> {
415 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
416 match self.0 {
417 None => f.write_str("*"),
418 Some(columns) => columns.len().fmt(f),
419 }
420 }
421}
422
423pub struct EscapeLabel<'a>(pub &'a mut dyn fmt::Write);
425
426impl fmt::Write for EscapeLabel<'_> {
427 fn write_str(&mut self, mut s: &str) -> fmt::Result {
428 loop {
429 let mut char_indices = s.char_indices();
430
431 let f = char_indices.find_map(|(i, c)| match c {
435 '"' => Some((i, r#"\""#)),
436 '\n' => Some((i, r#"\n"#)),
437 _ => None,
438 });
439
440 let Some((at, to_write)) = f else {
441 break;
442 };
443
444 self.0.write_str(&s[..at])?;
445 self.0.write_str(to_write)?;
446 s = &s[at + 1..];
447 }
448
449 self.0.write_str(s)?;
450
451 Ok(())
452 }
453}
454
455impl fmt::Display for IRDotDisplay<'_> {
456 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457 writeln!(f, "graph polars_query {{")?;
458
459 let mut last = 0;
460 self._format(f, None, &mut last)?;
461
462 writeln!(f, "}}")?;
463
464 Ok(())
465 }
466}