1use std::fmt;
2use std::path::PathBuf;
3
4use polars_core::schema::Schema;
5use polars_utils::pl_str::PlSmallStr;
6
7use super::format::ExprIRSliceDisplay;
8use crate::constants::UNLIMITED_CACHE;
9use crate::prelude::ir::format::ColumnsDisplay;
10use crate::prelude::*;
11
12pub struct IRDotDisplay<'a> {
13 is_streaming: bool,
14 lp: IRPlanRef<'a>,
15}
16
17const INDENT: &str = " ";
18
19#[derive(Clone, Copy)]
20enum DotNode {
21 Plain(usize),
22 Cache(usize),
23}
24
25impl fmt::Display for DotNode {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 match self {
28 DotNode::Plain(n) => write!(f, "p{n}"),
29 DotNode::Cache(n) => write!(f, "c{n}"),
30 }
31 }
32}
33
34#[inline(always)]
35fn write_label<'a, 'b>(
36 f: &'a mut fmt::Formatter<'b>,
37 id: DotNode,
38 mut w: impl FnMut(&mut EscapeLabel<'a>) -> fmt::Result,
39) -> fmt::Result {
40 write!(f, "{INDENT}{id}[label=\"")?;
41
42 let mut escaped = EscapeLabel(f);
43 w(&mut escaped)?;
44 let EscapeLabel(f) = escaped;
45
46 writeln!(f, "\"]")?;
47
48 Ok(())
49}
50
51impl<'a> IRDotDisplay<'a> {
52 pub fn new(lp: IRPlanRef<'a>) -> Self {
53 if let Some(streaming_lp) = lp.extract_streaming_plan() {
54 return Self::new_streaming(streaming_lp);
55 }
56
57 Self {
58 is_streaming: false,
59 lp,
60 }
61 }
62
63 fn new_streaming(lp: IRPlanRef<'a>) -> Self {
64 Self {
65 is_streaming: true,
66 lp,
67 }
68 }
69
70 fn with_root(&self, root: Node) -> Self {
71 Self {
72 is_streaming: false,
73 lp: self.lp.with_root(root),
74 }
75 }
76
77 fn display_expr(&self, expr: &'a ExprIR) -> ExprIRDisplay<'a> {
78 expr.display(self.lp.expr_arena)
79 }
80
81 fn display_exprs(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> {
82 ExprIRSliceDisplay {
83 exprs,
84 expr_arena: self.lp.expr_arena,
85 }
86 }
87
88 fn _format(
89 &self,
90 f: &mut fmt::Formatter<'_>,
91 parent: Option<DotNode>,
92 last: &mut usize,
93 ) -> std::fmt::Result {
94 use fmt::Write;
95
96 let root = self.lp.root();
97
98 let mut parent = parent;
99 if self.is_streaming {
100 *last += 1;
101 let streaming_node = DotNode::Plain(*last);
102
103 if let Some(parent) = parent {
104 writeln!(f, "{INDENT}{parent} -- {streaming_node}")?;
105 write_label(f, streaming_node, |f| f.write_str("STREAMING"))?;
106 }
107
108 parent = Some(streaming_node);
109 }
110 let parent = parent;
111
112 let id = if let IR::Cache { id, .. } = root {
113 DotNode::Cache(*id)
114 } else {
115 *last += 1;
116 DotNode::Plain(*last)
117 };
118
119 if let Some(parent) = parent {
120 writeln!(f, "{INDENT}{parent} -- {id}")?;
121 }
122
123 use IR::*;
124 match root {
125 Union { inputs, .. } => {
126 for input in inputs {
127 self.with_root(*input)._format(f, Some(id), last)?;
128 }
129
130 write_label(f, id, |f| f.write_str("UNION"))?;
131 },
132 HConcat { inputs, .. } => {
133 for input in inputs {
134 self.with_root(*input)._format(f, Some(id), last)?;
135 }
136
137 write_label(f, id, |f| f.write_str("HCONCAT"))?;
138 },
139 Cache {
140 input, cache_hits, ..
141 } => {
142 self.with_root(*input)._format(f, Some(id), last)?;
143
144 if *cache_hits == UNLIMITED_CACHE {
145 write_label(f, id, |f| f.write_str("CACHE"))?;
146 } else {
147 write_label(f, id, |f| write!(f, "CACHE: {cache_hits} times"))?;
148 };
149 },
150 Filter { predicate, input } => {
151 self.with_root(*input)._format(f, Some(id), last)?;
152
153 let pred = self.display_expr(predicate);
154 write_label(f, id, |f| write!(f, "FILTER BY {pred}"))?;
155 },
156 #[cfg(feature = "python")]
157 PythonScan { options } => {
158 let predicate = match &options.predicate {
159 PythonPredicate::Polars(e) => format!("{}", self.display_expr(e)),
160 PythonPredicate::PyArrow(s) => s.clone(),
161 PythonPredicate::None => "none".to_string(),
162 };
163 let with_columns = NumColumns(options.with_columns.as_ref().map(|s| s.as_ref()));
164 let total_columns = options.schema.len();
165
166 write_label(f, id, |f| {
167 write!(
168 f,
169 "PYTHON SCAN\nπ {with_columns}/{total_columns};\nσ {predicate}"
170 )
171 })?
172 },
173 Select {
174 expr,
175 input,
176 schema,
177 ..
178 } => {
179 self.with_root(*input)._format(f, Some(id), last)?;
180 write_label(f, id, |f| write!(f, "π {}/{}", expr.len(), schema.len()))?;
181 },
182 Sort {
183 input, by_column, ..
184 } => {
185 let by_column = self.display_exprs(by_column);
186 self.with_root(*input)._format(f, Some(id), last)?;
187 write_label(f, id, |f| write!(f, "SORT BY {by_column}"))?;
188 },
189 GroupBy {
190 input, keys, aggs, ..
191 } => {
192 let keys = self.display_exprs(keys);
193 let aggs = self.display_exprs(aggs);
194 self.with_root(*input)._format(f, Some(id), last)?;
195 write_label(f, id, |f| write!(f, "AGG {aggs}\nBY\n{keys}"))?;
196 },
197 HStack { input, exprs, .. } => {
198 let exprs = self.display_exprs(exprs);
199 self.with_root(*input)._format(f, Some(id), last)?;
200 write_label(f, id, |f| write!(f, "WITH COLUMNS {exprs}"))?;
201 },
202 Slice { input, offset, len } => {
203 self.with_root(*input)._format(f, Some(id), last)?;
204 write_label(f, id, |f| write!(f, "SLICE offset: {offset}; len: {len}"))?;
205 },
206 Distinct { input, options, .. } => {
207 self.with_root(*input)._format(f, Some(id), last)?;
208 write_label(f, id, |f| {
209 f.write_str("DISTINCT")?;
210
211 if let Some(subset) = &options.subset {
212 f.write_str(" BY ")?;
213
214 let mut subset = subset.iter();
215
216 if let Some(fst) = subset.next() {
217 f.write_str(fst)?;
218 for name in subset {
219 write!(f, ", \"{name}\"")?;
220 }
221 } else {
222 f.write_str("None")?;
223 }
224 }
225
226 Ok(())
227 })?;
228 },
229 DataFrameScan {
230 schema,
231 output_schema,
232 ..
233 } => {
234 let num_columns = NumColumnsSchema(output_schema.as_ref().map(|p| p.as_ref()));
235 let total_columns = schema.len();
236
237 write_label(f, id, |f| {
238 write!(f, "TABLE\nπ {num_columns}/{total_columns}")
239 })?;
240 },
241 Scan {
242 sources,
243 file_info,
244 hive_parts: _,
245 predicate,
246 scan_type,
247 unified_scan_args,
248 output_schema: _,
249 } => {
250 let name: &str = (&**scan_type).into();
251 let path = ScanSourcesDisplay(sources);
252 let with_columns = unified_scan_args
253 .projection
254 .as_ref()
255 .map(|cols| cols.as_ref());
256 let with_columns = NumColumns(with_columns);
257 let total_columns =
258 file_info.schema.len() - usize::from(unified_scan_args.row_index.is_some());
259
260 write_label(f, id, |f| {
261 write!(f, "{name} SCAN {path}\nπ {with_columns}/{total_columns};",)?;
262
263 if let Some(predicate) = predicate.as_ref() {
264 write!(f, "\nσ {}", self.display_expr(predicate))?;
265 }
266
267 if let Some(row_index) = unified_scan_args.row_index.as_ref() {
268 write!(f, "\nrow index: {} (+{})", row_index.name, row_index.offset)?;
269 }
270
271 Ok(())
272 })?;
273 },
274 Join {
275 input_left,
276 input_right,
277 left_on,
278 right_on,
279 options,
280 ..
281 } => {
282 self.with_root(*input_left)._format(f, Some(id), last)?;
283 self.with_root(*input_right)._format(f, Some(id), last)?;
284
285 let left_on = self.display_exprs(left_on);
286 let right_on = self.display_exprs(right_on);
287
288 write_label(f, id, |f| {
289 write!(
290 f,
291 "JOIN {}\nleft: {left_on};\nright: {right_on}",
292 options.args.how
293 )
294 })?;
295 },
296 MapFunction {
297 input, function, ..
298 } => {
299 if let Some(streaming_lp) = function.to_streaming_lp() {
300 Self::new_streaming(streaming_lp)._format(f, Some(id), last)?;
301 } else {
302 self.with_root(*input)._format(f, Some(id), last)?;
303 write_label(f, id, |f| write!(f, "{function}"))?;
304 }
305 },
306 ExtContext { input, .. } => {
307 self.with_root(*input)._format(f, Some(id), last)?;
308 write_label(f, id, |f| f.write_str("EXTERNAL_CONTEXT"))?;
309 },
310 Sink { input, payload, .. } => {
311 self.with_root(*input)._format(f, Some(id), last)?;
312
313 write_label(f, id, |f| {
314 f.write_str(match payload {
315 SinkTypeIR::Memory => "SINK (MEMORY)",
316 SinkTypeIR::File { .. } => "SINK (FILE)",
317 SinkTypeIR::Partition { .. } => "SINK (PARTITION)",
318 })
319 })?;
320 },
321 SinkMultiple { inputs } => {
322 for input in inputs {
323 self.with_root(*input)._format(f, Some(id), last)?;
324 }
325
326 write_label(f, id, |f| f.write_str("SINK MULTIPLE"))?;
327 },
328 SimpleProjection { input, columns } => {
329 let num_columns = columns.as_ref().len();
330 let total_columns = self.lp.lp_arena.get(*input).schema(self.lp.lp_arena).len();
331
332 let columns = ColumnsDisplay(columns.as_ref());
333 self.with_root(*input)._format(f, Some(id), last)?;
334 write_label(f, id, |f| {
335 write!(f, "simple π {num_columns}/{total_columns}\n[{columns}]")
336 })?;
337 },
338 #[cfg(feature = "merge_sorted")]
339 MergeSorted {
340 input_left,
341 input_right,
342 key,
343 } => {
344 self.with_root(*input_left)._format(f, Some(id), last)?;
345 self.with_root(*input_right)._format(f, Some(id), last)?;
346
347 write_label(f, id, |f| write!(f, "MERGE_SORTED ON '{key}'",))?;
348 },
349 Invalid => write_label(f, id, |f| f.write_str("INVALID"))?,
350 }
351
352 Ok(())
353 }
354}
355
356pub struct PathsDisplay<'a>(pub &'a [PathBuf]);
358pub struct ScanSourcesDisplay<'a>(pub &'a ScanSources);
359struct NumColumns<'a>(Option<&'a [PlSmallStr]>);
360struct NumColumnsSchema<'a>(Option<&'a Schema>);
361
362impl fmt::Display for ScanSourceRef<'_> {
363 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364 match self {
365 ScanSourceRef::Path(path) => path.display().fmt(f),
366 ScanSourceRef::File(_) => f.write_str("open-file"),
367 ScanSourceRef::Buffer(buff) => write!(f, "{} in-mem bytes", buff.len()),
368 }
369 }
370}
371
372impl fmt::Display for ScanSourcesDisplay<'_> {
373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374 match self.0.len() {
375 0 => write!(f, "[]"),
376 1 => write!(f, "[{}]", self.0.at(0)),
377 2 => write!(f, "[{}, {}]", self.0.at(0), self.0.at(1)),
378 _ => write!(
379 f,
380 "[{}, ... {} other sources]",
381 self.0.at(0),
382 self.0.len() - 1,
383 ),
384 }
385 }
386}
387
388impl fmt::Display for PathsDisplay<'_> {
389 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390 match self.0.len() {
391 0 => write!(f, "[]"),
392 1 => write!(f, "[{}]", self.0[0].display()),
393 2 => write!(f, "[{}, {}]", self.0[0].display(), self.0[1].display()),
394 _ => write!(
395 f,
396 "[{}, ... {} other files]",
397 self.0[0].display(),
398 self.0.len() - 1,
399 ),
400 }
401 }
402}
403
404impl fmt::Display for NumColumns<'_> {
405 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
406 match self.0 {
407 None => f.write_str("*"),
408 Some(columns) => columns.len().fmt(f),
409 }
410 }
411}
412
413impl fmt::Display for NumColumnsSchema<'_> {
414 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415 match self.0 {
416 None => f.write_str("*"),
417 Some(columns) => columns.len().fmt(f),
418 }
419 }
420}
421
422pub struct EscapeLabel<'a>(pub &'a mut dyn fmt::Write);
424
425impl fmt::Write for EscapeLabel<'_> {
426 fn write_str(&mut self, mut s: &str) -> fmt::Result {
427 loop {
428 let mut char_indices = s.char_indices();
429
430 let f = char_indices.find_map(|(i, c)| match c {
434 '"' => Some((i, r#"\""#)),
435 '\n' => Some((i, r#"\n"#)),
436 _ => None,
437 });
438
439 let Some((at, to_write)) = f else {
440 break;
441 };
442
443 self.0.write_str(&s[..at])?;
444 self.0.write_str(to_write)?;
445 s = &s[at + 1..];
446 }
447
448 self.0.write_str(s)?;
449
450 Ok(())
451 }
452}
453
454impl fmt::Display for IRDotDisplay<'_> {
455 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456 writeln!(f, "graph polars_query {{")?;
457
458 let mut last = 0;
459 self._format(f, None, &mut last)?;
460
461 writeln!(f, "}}")?;
462
463 Ok(())
464 }
465}