datafusion_cli/
print_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Print format variants
19
20use std::str::FromStr;
21
22use crate::print_options::MaxRows;
23
24use arrow::csv::writer::WriterBuilder;
25use arrow::datatypes::SchemaRef;
26use arrow::json::{ArrayWriter, LineDelimitedWriter};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches_with_options;
29use datafusion::config::FormatOptions;
30use datafusion::error::Result;
31
32/// Allow records to be printed in different formats
33#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)]
34pub enum PrintFormat {
35    Csv,
36    Tsv,
37    Table,
38    Json,
39    NdJson,
40    Automatic,
41}
42
43impl FromStr for PrintFormat {
44    type Err = String;
45
46    fn from_str(s: &str) -> Result<Self, Self::Err> {
47        clap::ValueEnum::from_str(s, true)
48    }
49}
50
51macro_rules! batches_to_json {
52    ($WRITER: ident, $writer: expr, $batches: expr) => {{
53        {
54            if !$batches.is_empty() {
55                let mut json_writer = $WRITER::new(&mut *$writer);
56                for batch in $batches {
57                    json_writer.write(batch)?;
58                }
59                json_writer.finish()?;
60                json_finish!($WRITER, $writer);
61            }
62        }
63        Ok(()) as Result<()>
64    }};
65}
66
67macro_rules! json_finish {
68    (ArrayWriter, $writer: expr) => {{
69        writeln!($writer)?;
70    }};
71    (LineDelimitedWriter, $writer: expr) => {{}};
72}
73
74fn print_batches_with_sep<W: std::io::Write>(
75    writer: &mut W,
76    batches: &[RecordBatch],
77    delimiter: u8,
78    with_header: bool,
79) -> Result<()> {
80    let builder = WriterBuilder::new()
81        .with_header(with_header)
82        .with_delimiter(delimiter);
83    let mut csv_writer = builder.build(writer);
84
85    for batch in batches {
86        csv_writer.write(batch)?;
87    }
88
89    Ok(())
90}
91
92fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
93    let lines: Vec<String> = s.lines().map(String::from).collect();
94
95    assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border
96
97    let last_line = &lines[lines.len() - 1]; // bottom border line
98
99    let spaces = last_line.len().saturating_sub(4);
100    let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
101
102    let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and `maxrows` lines
103    result.extend(vec![dotted_line; 3]); // Append ... lines
104    result.push(last_line.clone());
105
106    result.join("\n")
107}
108
109fn format_batches_with_maxrows<W: std::io::Write>(
110    writer: &mut W,
111    batches: &[RecordBatch],
112    maxrows: MaxRows,
113    format_options: &FormatOptions,
114) -> Result<()> {
115    let options: arrow::util::display::FormatOptions = format_options.try_into()?;
116
117    match maxrows {
118        MaxRows::Limited(maxrows) => {
119            // Filter batches to meet the maxrows condition
120            let mut filtered_batches = Vec::new();
121            let mut row_count: usize = 0;
122            let mut over_limit = false;
123            for batch in batches {
124                if row_count + batch.num_rows() > maxrows {
125                    // If adding this batch exceeds maxrows, slice the batch
126                    let limit = maxrows - row_count;
127                    let sliced_batch = batch.slice(0, limit);
128                    filtered_batches.push(sliced_batch);
129                    over_limit = true;
130                    break;
131                } else {
132                    filtered_batches.push(batch.clone());
133                    row_count += batch.num_rows();
134                }
135            }
136
137            let formatted =
138                pretty_format_batches_with_options(&filtered_batches, &options)?;
139            if over_limit {
140                let mut formatted_str = format!("{formatted}");
141                formatted_str = keep_only_maxrows(&formatted_str, maxrows);
142                writeln!(writer, "{formatted_str}")?;
143            } else {
144                writeln!(writer, "{formatted}")?;
145            }
146        }
147        MaxRows::Unlimited => {
148            let formatted = pretty_format_batches_with_options(batches, &options)?;
149            writeln!(writer, "{formatted}")?;
150        }
151    }
152
153    Ok(())
154}
155
156impl PrintFormat {
157    /// Print the batches to a writer using the specified format
158    pub fn print_batches<W: std::io::Write>(
159        &self,
160        writer: &mut W,
161        schema: SchemaRef,
162        batches: &[RecordBatch],
163        maxrows: MaxRows,
164        with_header: bool,
165        format_options: &FormatOptions,
166    ) -> Result<()> {
167        // filter out any empty batches
168        let batches: Vec<_> = batches
169            .iter()
170            .filter(|b| b.num_rows() > 0)
171            .cloned()
172            .collect();
173        if batches.is_empty() {
174            return self.print_empty(writer, schema, format_options);
175        }
176
177        match self {
178            Self::Csv | Self::Automatic => {
179                print_batches_with_sep(writer, &batches, b',', with_header)
180            }
181            Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header),
182            Self::Table => {
183                if maxrows == MaxRows::Limited(0) {
184                    return Ok(());
185                }
186                format_batches_with_maxrows(writer, &batches, maxrows, format_options)
187            }
188            Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
189            Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
190        }
191    }
192
193    /// Print when the result batches contain no rows
194    fn print_empty<W: std::io::Write>(
195        &self,
196        writer: &mut W,
197        schema: SchemaRef,
198        format_options: &FormatOptions,
199    ) -> Result<()> {
200        match self {
201            // Print column headers for Table format
202            Self::Table if !schema.fields().is_empty() => {
203                let format_options: arrow::util::display::FormatOptions =
204                    format_options.try_into()?;
205
206                let empty_batch = RecordBatch::new_empty(schema);
207                let formatted =
208                    pretty_format_batches_with_options(&[empty_batch], &format_options)?;
209                writeln!(writer, "{formatted}")?;
210            }
211            _ => {}
212        }
213        Ok(())
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use std::sync::Arc;
221
222    use arrow::array::Int32Array;
223    use arrow::datatypes::{DataType, Field, Schema};
224
225    #[test]
226    fn print_empty() {
227        for format in [
228            PrintFormat::Csv,
229            PrintFormat::Tsv,
230            PrintFormat::Json,
231            PrintFormat::NdJson,
232            PrintFormat::Automatic,
233        ] {
234            // no output for empty batches, even with header set
235            PrintBatchesTest::new()
236                .with_format(format)
237                .with_schema(three_column_schema())
238                .with_batches(vec![])
239                .with_expected(&[""])
240                .run();
241        }
242
243        // output column headers for empty batches when format is Table
244        #[rustfmt::skip]
245        let expected = &[
246            "+---+---+---+",
247            "| a | b | c |",
248            "+---+---+---+",
249            "+---+---+---+",
250        ];
251        PrintBatchesTest::new()
252            .with_format(PrintFormat::Table)
253            .with_schema(three_column_schema())
254            .with_batches(vec![])
255            .with_expected(expected)
256            .run();
257    }
258
259    #[test]
260    fn print_csv_no_header() {
261        #[rustfmt::skip]
262        let expected = &[
263            "1,4,7",
264            "2,5,8",
265            "3,6,9",
266        ];
267
268        PrintBatchesTest::new()
269            .with_format(PrintFormat::Csv)
270            .with_batches(split_batch(three_column_batch()))
271            .with_header(WithHeader::No)
272            .with_expected(expected)
273            .run();
274    }
275
276    #[test]
277    fn print_csv_with_header() {
278        #[rustfmt::skip]
279        let expected = &[
280            "a,b,c",
281            "1,4,7",
282            "2,5,8",
283            "3,6,9",
284        ];
285
286        PrintBatchesTest::new()
287            .with_format(PrintFormat::Csv)
288            .with_batches(split_batch(three_column_batch()))
289            .with_header(WithHeader::Yes)
290            .with_expected(expected)
291            .run();
292    }
293
294    #[test]
295    fn print_tsv_no_header() {
296        #[rustfmt::skip]
297        let expected = &[
298            "1\t4\t7",
299            "2\t5\t8",
300            "3\t6\t9",
301        ];
302
303        PrintBatchesTest::new()
304            .with_format(PrintFormat::Tsv)
305            .with_batches(split_batch(three_column_batch()))
306            .with_header(WithHeader::No)
307            .with_expected(expected)
308            .run();
309    }
310
311    #[test]
312    fn print_tsv_with_header() {
313        #[rustfmt::skip]
314        let expected = &[
315            "a\tb\tc",
316            "1\t4\t7",
317            "2\t5\t8",
318            "3\t6\t9",
319        ];
320
321        PrintBatchesTest::new()
322            .with_format(PrintFormat::Tsv)
323            .with_batches(split_batch(three_column_batch()))
324            .with_header(WithHeader::Yes)
325            .with_expected(expected)
326            .run();
327    }
328
329    #[test]
330    fn print_table() {
331        let expected = &[
332            "+---+---+---+",
333            "| a | b | c |",
334            "+---+---+---+",
335            "| 1 | 4 | 7 |",
336            "| 2 | 5 | 8 |",
337            "| 3 | 6 | 9 |",
338            "+---+---+---+",
339        ];
340
341        PrintBatchesTest::new()
342            .with_format(PrintFormat::Table)
343            .with_batches(split_batch(three_column_batch()))
344            .with_header(WithHeader::Ignored)
345            .with_expected(expected)
346            .run();
347    }
348    #[test]
349    fn print_json() {
350        let expected =
351            &[r#"[{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]"#];
352
353        PrintBatchesTest::new()
354            .with_format(PrintFormat::Json)
355            .with_batches(split_batch(three_column_batch()))
356            .with_header(WithHeader::Ignored)
357            .with_expected(expected)
358            .run();
359    }
360
361    #[test]
362    fn print_ndjson() {
363        let expected = &[
364            r#"{"a":1,"b":4,"c":7}"#,
365            r#"{"a":2,"b":5,"c":8}"#,
366            r#"{"a":3,"b":6,"c":9}"#,
367        ];
368
369        PrintBatchesTest::new()
370            .with_format(PrintFormat::NdJson)
371            .with_batches(split_batch(three_column_batch()))
372            .with_header(WithHeader::Ignored)
373            .with_expected(expected)
374            .run();
375    }
376
377    #[test]
378    fn print_automatic_no_header() {
379        #[rustfmt::skip]
380            let expected = &[
381            "1,4,7",
382            "2,5,8",
383            "3,6,9",
384        ];
385
386        PrintBatchesTest::new()
387            .with_format(PrintFormat::Automatic)
388            .with_batches(split_batch(three_column_batch()))
389            .with_header(WithHeader::No)
390            .with_expected(expected)
391            .run();
392    }
393    #[test]
394    fn print_automatic_with_header() {
395        #[rustfmt::skip]
396            let expected = &[
397            "a,b,c",
398            "1,4,7",
399            "2,5,8",
400            "3,6,9",
401        ];
402
403        PrintBatchesTest::new()
404            .with_format(PrintFormat::Automatic)
405            .with_batches(split_batch(three_column_batch()))
406            .with_header(WithHeader::Yes)
407            .with_expected(expected)
408            .run();
409    }
410
411    #[test]
412    fn print_maxrows_unlimited() {
413        #[rustfmt::skip]
414            let expected = &[
415            "+---+",
416            "| a |",
417            "+---+",
418            "| 1 |",
419            "| 2 |",
420            "| 3 |",
421            "+---+",
422        ];
423
424        // should print out entire output with no truncation if unlimited or
425        // limit greater than number of batches or equal to the number of batches
426        for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
427            PrintBatchesTest::new()
428                .with_format(PrintFormat::Table)
429                .with_schema(one_column_schema())
430                .with_batches(vec![one_column_batch()])
431                .with_maxrows(max_rows)
432                .with_expected(expected)
433                .run();
434        }
435    }
436
437    #[test]
438    fn print_maxrows_limited_one_batch() {
439        #[rustfmt::skip]
440            let expected = &[
441            "+---+",
442            "| a |",
443            "+---+",
444            "| 1 |",
445            "| . |",
446            "| . |",
447            "| . |",
448            "+---+",
449        ];
450
451        PrintBatchesTest::new()
452            .with_format(PrintFormat::Table)
453            .with_batches(vec![one_column_batch()])
454            .with_maxrows(MaxRows::Limited(1))
455            .with_expected(expected)
456            .run();
457    }
458
459    #[test]
460    fn print_maxrows_limited_multi_batched() {
461        #[rustfmt::skip]
462            let expected = &[
463            "+---+",
464            "| a |",
465            "+---+",
466            "| 1 |",
467            "| 2 |",
468            "| 3 |",
469            "| 1 |",
470            "| 2 |",
471            "| . |",
472            "| . |",
473            "| . |",
474            "+---+",
475        ];
476
477        PrintBatchesTest::new()
478            .with_format(PrintFormat::Table)
479            .with_batches(vec![
480                one_column_batch(),
481                one_column_batch(),
482                one_column_batch(),
483            ])
484            .with_maxrows(MaxRows::Limited(5))
485            .with_expected(expected)
486            .run();
487    }
488
489    #[test]
490    fn test_print_batches_empty_batches() {
491        let batch = one_column_batch();
492        let empty_batch = RecordBatch::new_empty(batch.schema());
493
494        #[rustfmt::skip]
495        let expected =&[
496            "+---+",
497            "| a |",
498            "+---+",
499            "| 1 |",
500            "| 2 |",
501            "| 3 |",
502            "+---+",
503        ];
504
505        PrintBatchesTest::new()
506            .with_format(PrintFormat::Table)
507            .with_batches(vec![empty_batch.clone(), batch, empty_batch])
508            .with_expected(expected)
509            .run();
510    }
511
512    #[test]
513    fn test_print_batches_empty_batch() {
514        let empty_batch = RecordBatch::new_empty(one_column_batch().schema());
515
516        // Print column headers for empty batch when format is Table
517        #[rustfmt::skip]
518        let expected =&[
519            "+---+",
520            "| a |",
521            "+---+",
522            "+---+",
523        ];
524
525        PrintBatchesTest::new()
526            .with_format(PrintFormat::Table)
527            .with_schema(one_column_schema())
528            .with_batches(vec![empty_batch])
529            .with_header(WithHeader::Yes)
530            .with_expected(expected)
531            .run();
532
533        // No output for empty batch when schema contains no columns
534        let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
535        let expected = &[""];
536        PrintBatchesTest::new()
537            .with_format(PrintFormat::Table)
538            .with_schema(Arc::new(Schema::empty()))
539            .with_batches(vec![empty_batch])
540            .with_header(WithHeader::Yes)
541            .with_expected(expected)
542            .run();
543    }
544
545    #[derive(Debug)]
546    struct PrintBatchesTest {
547        format: PrintFormat,
548        schema: SchemaRef,
549        batches: Vec<RecordBatch>,
550        maxrows: MaxRows,
551        with_header: WithHeader,
552        expected: Vec<&'static str>,
553    }
554
555    /// How to test with_header
556    #[derive(Debug, Clone)]
557    enum WithHeader {
558        Yes,
559        No,
560        /// output should be the same with or without header
561        Ignored,
562    }
563
564    impl PrintBatchesTest {
565        fn new() -> Self {
566            Self {
567                format: PrintFormat::Table,
568                schema: Arc::new(Schema::empty()),
569                batches: vec![],
570                maxrows: MaxRows::Unlimited,
571                with_header: WithHeader::Ignored,
572                expected: vec![],
573            }
574        }
575
576        /// set the format
577        fn with_format(mut self, format: PrintFormat) -> Self {
578            self.format = format;
579            self
580        }
581
582        // set the schema
583        fn with_schema(mut self, schema: SchemaRef) -> Self {
584            self.schema = schema;
585            self
586        }
587
588        /// set the batches to convert
589        fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
590            self.batches = batches;
591            self
592        }
593
594        /// set maxrows
595        fn with_maxrows(mut self, maxrows: MaxRows) -> Self {
596            self.maxrows = maxrows;
597            self
598        }
599
600        /// set with_header
601        fn with_header(mut self, with_header: WithHeader) -> Self {
602            self.with_header = with_header;
603            self
604        }
605
606        /// set expected output
607        fn with_expected(mut self, expected: &[&'static str]) -> Self {
608            self.expected = expected.to_vec();
609            self
610        }
611
612        /// run the test
613        fn run(self) {
614            let actual = self.output();
615            let actual: Vec<_> = actual.trim_end().split('\n').collect();
616            let expected = self.expected;
617            assert_eq!(
618                actual, expected,
619                "\n\nactual:\n{actual:#?}\n\nexpected:\n{expected:#?}"
620            );
621        }
622
623        /// formats batches using parameters and returns the resulting output
624        fn output(&self) -> String {
625            match self.with_header {
626                WithHeader::Yes => self.output_with_header(true),
627                WithHeader::No => self.output_with_header(false),
628                WithHeader::Ignored => {
629                    let output = self.output_with_header(true);
630                    // ensure the output is the same without header
631                    let output_without_header = self.output_with_header(false);
632                    assert_eq!(
633                        output, output_without_header,
634                        "Expected output to be the same with or without header"
635                    );
636                    output
637                }
638            }
639        }
640
641        fn output_with_header(&self, with_header: bool) -> String {
642            let mut buffer: Vec<u8> = vec![];
643            self.format
644                .print_batches(
645                    &mut buffer,
646                    self.schema.clone(),
647                    &self.batches,
648                    self.maxrows,
649                    with_header,
650                    &FormatOptions::default(),
651                )
652                .unwrap();
653            String::from_utf8(buffer).unwrap()
654        }
655    }
656
657    /// Return a schema with three columns
658    fn three_column_schema() -> SchemaRef {
659        Arc::new(Schema::new(vec![
660            Field::new("a", DataType::Int32, false),
661            Field::new("b", DataType::Int32, false),
662            Field::new("c", DataType::Int32, false),
663        ]))
664    }
665
666    /// Return a batch with three columns and three rows
667    fn three_column_batch() -> RecordBatch {
668        RecordBatch::try_new(
669            three_column_schema(),
670            vec![
671                Arc::new(Int32Array::from(vec![1, 2, 3])),
672                Arc::new(Int32Array::from(vec![4, 5, 6])),
673                Arc::new(Int32Array::from(vec![7, 8, 9])),
674            ],
675        )
676        .unwrap()
677    }
678
679    /// Return a schema with one column
680    fn one_column_schema() -> SchemaRef {
681        Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
682    }
683
684    /// return a batch with one column and three rows
685    fn one_column_batch() -> RecordBatch {
686        RecordBatch::try_new(
687            one_column_schema(),
688            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
689        )
690        .unwrap()
691    }
692
693    /// Slice the record batch into 2 batches
694    fn split_batch(batch: RecordBatch) -> Vec<RecordBatch> {
695        assert!(batch.num_rows() > 1);
696        let split = batch.num_rows() / 2;
697        vec![
698            batch.slice(0, split),
699            batch.slice(split, batch.num_rows() - split),
700        ]
701    }
702}