datafusion_cli/
print_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Print format variants
19
20use std::str::FromStr;
21
22use crate::print_options::MaxRows;
23
24use arrow::csv::writer::WriterBuilder;
25use arrow::datatypes::SchemaRef;
26use arrow::json::{ArrayWriter, LineDelimitedWriter};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches_with_options;
29use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS;
30use datafusion::error::Result;
31
32/// Allow records to be printed in different formats
33#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)]
34pub enum PrintFormat {
35    Csv,
36    Tsv,
37    Table,
38    Json,
39    NdJson,
40    Automatic,
41}
42
43impl FromStr for PrintFormat {
44    type Err = String;
45
46    fn from_str(s: &str) -> Result<Self, Self::Err> {
47        clap::ValueEnum::from_str(s, true)
48    }
49}
50
51macro_rules! batches_to_json {
52    ($WRITER: ident, $writer: expr, $batches: expr) => {{
53        {
54            if !$batches.is_empty() {
55                let mut json_writer = $WRITER::new(&mut *$writer);
56                for batch in $batches {
57                    json_writer.write(batch)?;
58                }
59                json_writer.finish()?;
60                json_finish!($WRITER, $writer);
61            }
62        }
63        Ok(()) as Result<()>
64    }};
65}
66
67macro_rules! json_finish {
68    (ArrayWriter, $writer: expr) => {{
69        writeln!($writer)?;
70    }};
71    (LineDelimitedWriter, $writer: expr) => {{}};
72}
73
74fn print_batches_with_sep<W: std::io::Write>(
75    writer: &mut W,
76    batches: &[RecordBatch],
77    delimiter: u8,
78    with_header: bool,
79) -> Result<()> {
80    let builder = WriterBuilder::new()
81        .with_header(with_header)
82        .with_delimiter(delimiter);
83    let mut csv_writer = builder.build(writer);
84
85    for batch in batches {
86        csv_writer.write(batch)?;
87    }
88
89    Ok(())
90}
91
92fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
93    let lines: Vec<String> = s.lines().map(String::from).collect();
94
95    assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border
96
97    let last_line = &lines[lines.len() - 1]; // bottom border line
98
99    let spaces = last_line.len().saturating_sub(4);
100    let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
101
102    let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and `maxrows` lines
103    result.extend(vec![dotted_line; 3]); // Append ... lines
104    result.push(last_line.clone());
105
106    result.join("\n")
107}
108
109fn format_batches_with_maxrows<W: std::io::Write>(
110    writer: &mut W,
111    batches: &[RecordBatch],
112    maxrows: MaxRows,
113) -> Result<()> {
114    match maxrows {
115        MaxRows::Limited(maxrows) => {
116            // Filter batches to meet the maxrows condition
117            let mut filtered_batches = Vec::new();
118            let mut row_count: usize = 0;
119            let mut over_limit = false;
120            for batch in batches {
121                if row_count + batch.num_rows() > maxrows {
122                    // If adding this batch exceeds maxrows, slice the batch
123                    let limit = maxrows - row_count;
124                    let sliced_batch = batch.slice(0, limit);
125                    filtered_batches.push(sliced_batch);
126                    over_limit = true;
127                    break;
128                } else {
129                    filtered_batches.push(batch.clone());
130                    row_count += batch.num_rows();
131                }
132            }
133
134            let formatted = pretty_format_batches_with_options(
135                &filtered_batches,
136                &DEFAULT_CLI_FORMAT_OPTIONS,
137            )?;
138            if over_limit {
139                let mut formatted_str = format!("{}", formatted);
140                formatted_str = keep_only_maxrows(&formatted_str, maxrows);
141                writeln!(writer, "{}", formatted_str)?;
142            } else {
143                writeln!(writer, "{}", formatted)?;
144            }
145        }
146        MaxRows::Unlimited => {
147            let formatted =
148                pretty_format_batches_with_options(batches, &DEFAULT_CLI_FORMAT_OPTIONS)?;
149            writeln!(writer, "{}", formatted)?;
150        }
151    }
152
153    Ok(())
154}
155
156impl PrintFormat {
157    /// Print the batches to a writer using the specified format
158    pub fn print_batches<W: std::io::Write>(
159        &self,
160        writer: &mut W,
161        schema: SchemaRef,
162        batches: &[RecordBatch],
163        maxrows: MaxRows,
164        with_header: bool,
165    ) -> Result<()> {
166        // filter out any empty batches
167        let batches: Vec<_> = batches
168            .iter()
169            .filter(|b| b.num_rows() > 0)
170            .cloned()
171            .collect();
172        if batches.is_empty() {
173            return self.print_empty(writer, schema);
174        }
175
176        match self {
177            Self::Csv | Self::Automatic => {
178                print_batches_with_sep(writer, &batches, b',', with_header)
179            }
180            Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header),
181            Self::Table => {
182                if maxrows == MaxRows::Limited(0) {
183                    return Ok(());
184                }
185                format_batches_with_maxrows(writer, &batches, maxrows)
186            }
187            Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
188            Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
189        }
190    }
191
192    /// Print when the result batches contain no rows
193    fn print_empty<W: std::io::Write>(
194        &self,
195        writer: &mut W,
196        schema: SchemaRef,
197    ) -> Result<()> {
198        match self {
199            // Print column headers for Table format
200            Self::Table if !schema.fields().is_empty() => {
201                let empty_batch = RecordBatch::new_empty(schema);
202                let formatted = pretty_format_batches_with_options(
203                    &[empty_batch],
204                    &DEFAULT_CLI_FORMAT_OPTIONS,
205                )?;
206                writeln!(writer, "{}", formatted)?;
207            }
208            _ => {}
209        }
210        Ok(())
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use std::sync::Arc;
218
219    use arrow::array::Int32Array;
220    use arrow::datatypes::{DataType, Field, Schema};
221
222    #[test]
223    fn print_empty() {
224        for format in [
225            PrintFormat::Csv,
226            PrintFormat::Tsv,
227            PrintFormat::Json,
228            PrintFormat::NdJson,
229            PrintFormat::Automatic,
230        ] {
231            // no output for empty batches, even with header set
232            PrintBatchesTest::new()
233                .with_format(format)
234                .with_schema(three_column_schema())
235                .with_batches(vec![])
236                .with_expected(&[""])
237                .run();
238        }
239
240        // output column headers for empty batches when format is Table
241        #[rustfmt::skip]
242        let expected = &[
243            "+---+---+---+",
244            "| a | b | c |",
245            "+---+---+---+",
246            "+---+---+---+",
247        ];
248        PrintBatchesTest::new()
249            .with_format(PrintFormat::Table)
250            .with_schema(three_column_schema())
251            .with_batches(vec![])
252            .with_expected(expected)
253            .run();
254    }
255
256    #[test]
257    fn print_csv_no_header() {
258        #[rustfmt::skip]
259        let expected = &[
260            "1,4,7",
261            "2,5,8",
262            "3,6,9",
263        ];
264
265        PrintBatchesTest::new()
266            .with_format(PrintFormat::Csv)
267            .with_batches(split_batch(three_column_batch()))
268            .with_header(WithHeader::No)
269            .with_expected(expected)
270            .run();
271    }
272
273    #[test]
274    fn print_csv_with_header() {
275        #[rustfmt::skip]
276        let expected = &[
277            "a,b,c",
278            "1,4,7",
279            "2,5,8",
280            "3,6,9",
281        ];
282
283        PrintBatchesTest::new()
284            .with_format(PrintFormat::Csv)
285            .with_batches(split_batch(three_column_batch()))
286            .with_header(WithHeader::Yes)
287            .with_expected(expected)
288            .run();
289    }
290
291    #[test]
292    fn print_tsv_no_header() {
293        #[rustfmt::skip]
294        let expected = &[
295            "1\t4\t7",
296            "2\t5\t8",
297            "3\t6\t9",
298        ];
299
300        PrintBatchesTest::new()
301            .with_format(PrintFormat::Tsv)
302            .with_batches(split_batch(three_column_batch()))
303            .with_header(WithHeader::No)
304            .with_expected(expected)
305            .run();
306    }
307
308    #[test]
309    fn print_tsv_with_header() {
310        #[rustfmt::skip]
311        let expected = &[
312            "a\tb\tc",
313            "1\t4\t7",
314            "2\t5\t8",
315            "3\t6\t9",
316        ];
317
318        PrintBatchesTest::new()
319            .with_format(PrintFormat::Tsv)
320            .with_batches(split_batch(three_column_batch()))
321            .with_header(WithHeader::Yes)
322            .with_expected(expected)
323            .run();
324    }
325
326    #[test]
327    fn print_table() {
328        let expected = &[
329            "+---+---+---+",
330            "| a | b | c |",
331            "+---+---+---+",
332            "| 1 | 4 | 7 |",
333            "| 2 | 5 | 8 |",
334            "| 3 | 6 | 9 |",
335            "+---+---+---+",
336        ];
337
338        PrintBatchesTest::new()
339            .with_format(PrintFormat::Table)
340            .with_batches(split_batch(three_column_batch()))
341            .with_header(WithHeader::Ignored)
342            .with_expected(expected)
343            .run();
344    }
345    #[test]
346    fn print_json() {
347        let expected =
348            &[r#"[{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]"#];
349
350        PrintBatchesTest::new()
351            .with_format(PrintFormat::Json)
352            .with_batches(split_batch(three_column_batch()))
353            .with_header(WithHeader::Ignored)
354            .with_expected(expected)
355            .run();
356    }
357
358    #[test]
359    fn print_ndjson() {
360        let expected = &[
361            r#"{"a":1,"b":4,"c":7}"#,
362            r#"{"a":2,"b":5,"c":8}"#,
363            r#"{"a":3,"b":6,"c":9}"#,
364        ];
365
366        PrintBatchesTest::new()
367            .with_format(PrintFormat::NdJson)
368            .with_batches(split_batch(three_column_batch()))
369            .with_header(WithHeader::Ignored)
370            .with_expected(expected)
371            .run();
372    }
373
374    #[test]
375    fn print_automatic_no_header() {
376        #[rustfmt::skip]
377            let expected = &[
378            "1,4,7",
379            "2,5,8",
380            "3,6,9",
381        ];
382
383        PrintBatchesTest::new()
384            .with_format(PrintFormat::Automatic)
385            .with_batches(split_batch(three_column_batch()))
386            .with_header(WithHeader::No)
387            .with_expected(expected)
388            .run();
389    }
390    #[test]
391    fn print_automatic_with_header() {
392        #[rustfmt::skip]
393            let expected = &[
394            "a,b,c",
395            "1,4,7",
396            "2,5,8",
397            "3,6,9",
398        ];
399
400        PrintBatchesTest::new()
401            .with_format(PrintFormat::Automatic)
402            .with_batches(split_batch(three_column_batch()))
403            .with_header(WithHeader::Yes)
404            .with_expected(expected)
405            .run();
406    }
407
408    #[test]
409    fn print_maxrows_unlimited() {
410        #[rustfmt::skip]
411            let expected = &[
412            "+---+",
413            "| a |",
414            "+---+",
415            "| 1 |",
416            "| 2 |",
417            "| 3 |",
418            "+---+",
419        ];
420
421        // should print out entire output with no truncation if unlimited or
422        // limit greater than number of batches or equal to the number of batches
423        for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
424            PrintBatchesTest::new()
425                .with_format(PrintFormat::Table)
426                .with_schema(one_column_schema())
427                .with_batches(vec![one_column_batch()])
428                .with_maxrows(max_rows)
429                .with_expected(expected)
430                .run();
431        }
432    }
433
434    #[test]
435    fn print_maxrows_limited_one_batch() {
436        #[rustfmt::skip]
437            let expected = &[
438            "+---+",
439            "| a |",
440            "+---+",
441            "| 1 |",
442            "| . |",
443            "| . |",
444            "| . |",
445            "+---+",
446        ];
447
448        PrintBatchesTest::new()
449            .with_format(PrintFormat::Table)
450            .with_batches(vec![one_column_batch()])
451            .with_maxrows(MaxRows::Limited(1))
452            .with_expected(expected)
453            .run();
454    }
455
456    #[test]
457    fn print_maxrows_limited_multi_batched() {
458        #[rustfmt::skip]
459            let expected = &[
460            "+---+",
461            "| a |",
462            "+---+",
463            "| 1 |",
464            "| 2 |",
465            "| 3 |",
466            "| 1 |",
467            "| 2 |",
468            "| . |",
469            "| . |",
470            "| . |",
471            "+---+",
472        ];
473
474        PrintBatchesTest::new()
475            .with_format(PrintFormat::Table)
476            .with_batches(vec![
477                one_column_batch(),
478                one_column_batch(),
479                one_column_batch(),
480            ])
481            .with_maxrows(MaxRows::Limited(5))
482            .with_expected(expected)
483            .run();
484    }
485
486    #[test]
487    fn test_print_batches_empty_batches() {
488        let batch = one_column_batch();
489        let empty_batch = RecordBatch::new_empty(batch.schema());
490
491        #[rustfmt::skip]
492        let expected =&[
493            "+---+",
494            "| a |",
495            "+---+",
496            "| 1 |",
497            "| 2 |",
498            "| 3 |",
499            "+---+",
500        ];
501
502        PrintBatchesTest::new()
503            .with_format(PrintFormat::Table)
504            .with_batches(vec![empty_batch.clone(), batch, empty_batch])
505            .with_expected(expected)
506            .run();
507    }
508
509    #[test]
510    fn test_print_batches_empty_batch() {
511        let empty_batch = RecordBatch::new_empty(one_column_batch().schema());
512
513        // Print column headers for empty batch when format is Table
514        #[rustfmt::skip]
515        let expected =&[
516            "+---+",
517            "| a |",
518            "+---+",
519            "+---+",
520        ];
521
522        PrintBatchesTest::new()
523            .with_format(PrintFormat::Table)
524            .with_schema(one_column_schema())
525            .with_batches(vec![empty_batch])
526            .with_header(WithHeader::Yes)
527            .with_expected(expected)
528            .run();
529
530        // No output for empty batch when schema contains no columns
531        let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
532        let expected = &[""];
533        PrintBatchesTest::new()
534            .with_format(PrintFormat::Table)
535            .with_schema(Arc::new(Schema::empty()))
536            .with_batches(vec![empty_batch])
537            .with_header(WithHeader::Yes)
538            .with_expected(expected)
539            .run();
540    }
541
542    #[derive(Debug)]
543    struct PrintBatchesTest {
544        format: PrintFormat,
545        schema: SchemaRef,
546        batches: Vec<RecordBatch>,
547        maxrows: MaxRows,
548        with_header: WithHeader,
549        expected: Vec<&'static str>,
550    }
551
552    /// How to test with_header
553    #[derive(Debug, Clone)]
554    enum WithHeader {
555        Yes,
556        No,
557        /// output should be the same with or without header
558        Ignored,
559    }
560
561    impl PrintBatchesTest {
562        fn new() -> Self {
563            Self {
564                format: PrintFormat::Table,
565                schema: Arc::new(Schema::empty()),
566                batches: vec![],
567                maxrows: MaxRows::Unlimited,
568                with_header: WithHeader::Ignored,
569                expected: vec![],
570            }
571        }
572
573        /// set the format
574        fn with_format(mut self, format: PrintFormat) -> Self {
575            self.format = format;
576            self
577        }
578
579        // set the schema
580        fn with_schema(mut self, schema: SchemaRef) -> Self {
581            self.schema = schema;
582            self
583        }
584
585        /// set the batches to convert
586        fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
587            self.batches = batches;
588            self
589        }
590
591        /// set maxrows
592        fn with_maxrows(mut self, maxrows: MaxRows) -> Self {
593            self.maxrows = maxrows;
594            self
595        }
596
597        /// set with_header
598        fn with_header(mut self, with_header: WithHeader) -> Self {
599            self.with_header = with_header;
600            self
601        }
602
603        /// set expected output
604        fn with_expected(mut self, expected: &[&'static str]) -> Self {
605            self.expected = expected.to_vec();
606            self
607        }
608
609        /// run the test
610        fn run(self) {
611            let actual = self.output();
612            let actual: Vec<_> = actual.trim_end().split('\n').collect();
613            let expected = self.expected;
614            assert_eq!(
615                actual, expected,
616                "\n\nactual:\n{actual:#?}\n\nexpected:\n{expected:#?}"
617            );
618        }
619
620        /// formats batches using parameters and returns the resulting output
621        fn output(&self) -> String {
622            match self.with_header {
623                WithHeader::Yes => self.output_with_header(true),
624                WithHeader::No => self.output_with_header(false),
625                WithHeader::Ignored => {
626                    let output = self.output_with_header(true);
627                    // ensure the output is the same without header
628                    let output_without_header = self.output_with_header(false);
629                    assert_eq!(
630                        output, output_without_header,
631                        "Expected output to be the same with or without header"
632                    );
633                    output
634                }
635            }
636        }
637
638        fn output_with_header(&self, with_header: bool) -> String {
639            let mut buffer: Vec<u8> = vec![];
640            self.format
641                .print_batches(
642                    &mut buffer,
643                    self.schema.clone(),
644                    &self.batches,
645                    self.maxrows,
646                    with_header,
647                )
648                .unwrap();
649            String::from_utf8(buffer).unwrap()
650        }
651    }
652
653    /// Return a schema with three columns
654    fn three_column_schema() -> SchemaRef {
655        Arc::new(Schema::new(vec![
656            Field::new("a", DataType::Int32, false),
657            Field::new("b", DataType::Int32, false),
658            Field::new("c", DataType::Int32, false),
659        ]))
660    }
661
662    /// Return a batch with three columns and three rows
663    fn three_column_batch() -> RecordBatch {
664        RecordBatch::try_new(
665            three_column_schema(),
666            vec![
667                Arc::new(Int32Array::from(vec![1, 2, 3])),
668                Arc::new(Int32Array::from(vec![4, 5, 6])),
669                Arc::new(Int32Array::from(vec![7, 8, 9])),
670            ],
671        )
672        .unwrap()
673    }
674
675    /// Return a schema with one column
676    fn one_column_schema() -> SchemaRef {
677        Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
678    }
679
680    /// return a batch with one column and three rows
681    fn one_column_batch() -> RecordBatch {
682        RecordBatch::try_new(
683            one_column_schema(),
684            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
685        )
686        .unwrap()
687    }
688
689    /// Slice the record batch into 2 batches
690    fn split_batch(batch: RecordBatch) -> Vec<RecordBatch> {
691        assert!(batch.num_rows() > 1);
692        let split = batch.num_rows() / 2;
693        vec![
694            batch.slice(0, split),
695            batch.slice(split, batch.num_rows() - split),
696        ]
697    }
698}