datafusion_cli/
print_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Print format variants
19
20use std::str::FromStr;
21
22use crate::print_options::MaxRows;
23
24use arrow::csv::writer::WriterBuilder;
25use arrow::datatypes::SchemaRef;
26use arrow::json::{ArrayWriter, LineDelimitedWriter};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches_with_options;
29use datafusion::config::FormatOptions;
30use datafusion::error::Result;
31
32/// Allow records to be printed in different formats
33#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)]
34pub enum PrintFormat {
35    Csv,
36    Tsv,
37    Table,
38    Json,
39    NdJson,
40    Automatic,
41}
42
43impl FromStr for PrintFormat {
44    type Err = String;
45
46    fn from_str(s: &str) -> Result<Self, Self::Err> {
47        clap::ValueEnum::from_str(s, true)
48    }
49}
50
51macro_rules! batches_to_json {
52    ($WRITER: ident, $writer: expr, $batches: expr) => {{
53        {
54            if !$batches.is_empty() {
55                let mut json_writer = $WRITER::new(&mut *$writer);
56                for batch in $batches {
57                    json_writer.write(batch)?;
58                }
59                json_writer.finish()?;
60                json_finish!($WRITER, $writer);
61            }
62        }
63        Ok(()) as Result<()>
64    }};
65}
66
67macro_rules! json_finish {
68    (ArrayWriter, $writer: expr) => {{
69        writeln!($writer)?;
70    }};
71    (LineDelimitedWriter, $writer: expr) => {{}};
72}
73
74fn print_batches_with_sep<W: std::io::Write>(
75    writer: &mut W,
76    batches: &[RecordBatch],
77    delimiter: u8,
78    with_header: bool,
79) -> Result<()> {
80    let builder = WriterBuilder::new()
81        .with_header(with_header)
82        .with_delimiter(delimiter);
83    let mut csv_writer = builder.build(writer);
84
85    for batch in batches {
86        csv_writer.write(batch)?;
87    }
88
89    Ok(())
90}
91
92fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
93    let lines: Vec<String> = s.lines().map(String::from).collect();
94
95    assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border
96
97    let last_line = &lines[lines.len() - 1]; // bottom border line
98
99    let spaces = last_line.len().saturating_sub(4);
100    let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
101
102    let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and `maxrows` lines
103    result.extend(vec![dotted_line; 3]); // Append ... lines
104    result.push(last_line.clone());
105
106    result.join("\n")
107}
108
109fn format_batches_with_maxrows<W: std::io::Write>(
110    writer: &mut W,
111    batches: &[RecordBatch],
112    maxrows: MaxRows,
113    format_options: &FormatOptions,
114) -> Result<()> {
115    let options: arrow::util::display::FormatOptions = format_options.try_into()?;
116
117    match maxrows {
118        MaxRows::Limited(maxrows) => {
119            // Filter batches to meet the maxrows condition
120            let mut filtered_batches = Vec::new();
121            let mut row_count: usize = 0;
122            let mut over_limit = false;
123            for batch in batches {
124                if row_count + batch.num_rows() > maxrows {
125                    // If adding this batch exceeds maxrows, slice the batch
126                    let limit = maxrows - row_count;
127                    let sliced_batch = batch.slice(0, limit);
128                    filtered_batches.push(sliced_batch);
129                    over_limit = true;
130                    break;
131                } else {
132                    filtered_batches.push(batch.clone());
133                    row_count += batch.num_rows();
134                }
135            }
136
137            let formatted =
138                pretty_format_batches_with_options(&filtered_batches, &options)?;
139            if over_limit {
140                let mut formatted_str = format!("{formatted}");
141                formatted_str = keep_only_maxrows(&formatted_str, maxrows);
142                writeln!(writer, "{formatted_str}")?;
143            } else {
144                writeln!(writer, "{formatted}")?;
145            }
146        }
147        MaxRows::Unlimited => {
148            let formatted = pretty_format_batches_with_options(batches, &options)?;
149            writeln!(writer, "{formatted}")?;
150        }
151    }
152
153    Ok(())
154}
155
156impl PrintFormat {
157    /// Print the batches to a writer using the specified format
158    pub fn print_batches<W: std::io::Write>(
159        &self,
160        writer: &mut W,
161        schema: SchemaRef,
162        batches: &[RecordBatch],
163        maxrows: MaxRows,
164        with_header: bool,
165        format_options: &FormatOptions,
166    ) -> Result<()> {
167        // filter out any empty batches
168        let batches: Vec<_> = batches
169            .iter()
170            .filter(|b| b.num_rows() > 0)
171            .cloned()
172            .collect();
173        if batches.is_empty() {
174            return self.print_empty(writer, schema, format_options);
175        }
176
177        match self {
178            Self::Csv | Self::Automatic => {
179                print_batches_with_sep(writer, &batches, b',', with_header)
180            }
181            Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header),
182            Self::Table => {
183                if maxrows == MaxRows::Limited(0) {
184                    return Ok(());
185                }
186                format_batches_with_maxrows(writer, &batches, maxrows, format_options)
187            }
188            Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
189            Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
190        }
191    }
192
193    /// Print when the result batches contain no rows
194    fn print_empty<W: std::io::Write>(
195        &self,
196        writer: &mut W,
197        schema: SchemaRef,
198        format_options: &FormatOptions,
199    ) -> Result<()> {
200        match self {
201            // Print column headers for Table format
202            Self::Table if !schema.fields().is_empty() => {
203                let format_options: arrow::util::display::FormatOptions =
204                    format_options.try_into()?;
205
206                let empty_batch = RecordBatch::new_empty(schema);
207                let formatted =
208                    pretty_format_batches_with_options(&[empty_batch], &format_options)?;
209                writeln!(writer, "{formatted}")?;
210            }
211            _ => {}
212        }
213        Ok(())
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use std::sync::Arc;
221
222    use arrow::array::Int32Array;
223    use arrow::datatypes::{DataType, Field, Schema};
224    use insta::{allow_duplicates, assert_snapshot};
225
226    #[test]
227    fn print_empty() {
228        for format in [
229            PrintFormat::Csv,
230            PrintFormat::Tsv,
231            PrintFormat::Json,
232            PrintFormat::NdJson,
233            PrintFormat::Automatic,
234        ] {
235            // no output for empty batches, even with header set
236            let output = PrintBatchesTest::new()
237                .with_format(format)
238                .with_schema(three_column_schema())
239                .with_batches(vec![])
240                .run();
241            assert_eq!(output, "")
242        }
243
244        // output column headers for empty batches when format is Table
245        let output = PrintBatchesTest::new()
246            .with_format(PrintFormat::Table)
247            .with_schema(three_column_schema())
248            .with_batches(vec![])
249            .run();
250        assert_snapshot!(output, @r"
251        +---+---+---+
252        | a | b | c |
253        +---+---+---+
254        +---+---+---+
255        ");
256    }
257
258    #[test]
259    fn print_csv_no_header() {
260        let output = PrintBatchesTest::new()
261            .with_format(PrintFormat::Csv)
262            .with_batches(split_batch(three_column_batch()))
263            .with_header(WithHeader::No)
264            .run();
265        assert_snapshot!(output, @r"
266        1,4,7
267        2,5,8
268        3,6,9
269        ");
270    }
271
272    #[test]
273    fn print_csv_with_header() {
274        let output = PrintBatchesTest::new()
275            .with_format(PrintFormat::Csv)
276            .with_batches(split_batch(three_column_batch()))
277            .with_header(WithHeader::Yes)
278            .run();
279        assert_snapshot!(output, @r"
280        a,b,c
281        1,4,7
282        2,5,8
283        3,6,9
284        ");
285    }
286
287    #[test]
288    fn print_tsv_no_header() {
289        let output = PrintBatchesTest::new()
290            .with_format(PrintFormat::Tsv)
291            .with_batches(split_batch(three_column_batch()))
292            .with_header(WithHeader::No)
293            .run();
294        assert_snapshot!(output, @r"
295        1	4	7
296        2	5	8
297        3	6	9
298        ")
299    }
300
301    #[test]
302    fn print_tsv_with_header() {
303        let output = PrintBatchesTest::new()
304            .with_format(PrintFormat::Tsv)
305            .with_batches(split_batch(three_column_batch()))
306            .with_header(WithHeader::Yes)
307            .run();
308        assert_snapshot!(output, @r"
309        a	b	c
310        1	4	7
311        2	5	8
312        3	6	9
313        ");
314    }
315
316    #[test]
317    fn print_table() {
318        let output = PrintBatchesTest::new()
319            .with_format(PrintFormat::Table)
320            .with_batches(split_batch(three_column_batch()))
321            .with_header(WithHeader::Ignored)
322            .run();
323        assert_snapshot!(output, @r"
324        +---+---+---+
325        | a | b | c |
326        +---+---+---+
327        | 1 | 4 | 7 |
328        | 2 | 5 | 8 |
329        | 3 | 6 | 9 |
330        +---+---+---+
331        ");
332    }
333    #[test]
334    fn print_json() {
335        let output = PrintBatchesTest::new()
336            .with_format(PrintFormat::Json)
337            .with_batches(split_batch(three_column_batch()))
338            .with_header(WithHeader::Ignored)
339            .run();
340        assert_snapshot!(output, @r#"[{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]"#);
341    }
342
343    #[test]
344    fn print_ndjson() {
345        let output = PrintBatchesTest::new()
346            .with_format(PrintFormat::NdJson)
347            .with_batches(split_batch(three_column_batch()))
348            .with_header(WithHeader::Ignored)
349            .run();
350        assert_snapshot!(output, @r#"
351        {"a":1,"b":4,"c":7}
352        {"a":2,"b":5,"c":8}
353        {"a":3,"b":6,"c":9}
354        "#);
355    }
356
357    #[test]
358    fn print_automatic_no_header() {
359        let output = PrintBatchesTest::new()
360            .with_format(PrintFormat::Automatic)
361            .with_batches(split_batch(three_column_batch()))
362            .with_header(WithHeader::No)
363            .run();
364        assert_snapshot!(output, @r"
365        1,4,7
366        2,5,8
367        3,6,9
368        ");
369    }
370    #[test]
371    fn print_automatic_with_header() {
372        let output = PrintBatchesTest::new()
373            .with_format(PrintFormat::Automatic)
374            .with_batches(split_batch(three_column_batch()))
375            .with_header(WithHeader::Yes)
376            .run();
377        assert_snapshot!(output, @r"
378        a,b,c
379        1,4,7
380        2,5,8
381        3,6,9
382        ");
383    }
384
385    #[test]
386    fn print_maxrows_unlimited() {
387        // should print out entire output with no truncation if unlimited or
388        // limit greater than number of batches or equal to the number of batches
389        for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
390            let output = PrintBatchesTest::new()
391                .with_format(PrintFormat::Table)
392                .with_schema(one_column_schema())
393                .with_batches(vec![one_column_batch()])
394                .with_maxrows(max_rows)
395                .run();
396            allow_duplicates! {
397                assert_snapshot!(output, @r"
398                +---+
399                | a |
400                +---+
401                | 1 |
402                | 2 |
403                | 3 |
404                +---+
405                ");
406            }
407        }
408    }
409
410    #[test]
411    fn print_maxrows_limited_one_batch() {
412        let output = PrintBatchesTest::new()
413            .with_format(PrintFormat::Table)
414            .with_batches(vec![one_column_batch()])
415            .with_maxrows(MaxRows::Limited(1))
416            .run();
417        assert_snapshot!(output, @r"
418        +---+
419        | a |
420        +---+
421        | 1 |
422        | . |
423        | . |
424        | . |
425        +---+
426        ");
427    }
428
429    #[test]
430    fn print_maxrows_limited_multi_batched() {
431        let output = PrintBatchesTest::new()
432            .with_format(PrintFormat::Table)
433            .with_batches(vec![
434                one_column_batch(),
435                one_column_batch(),
436                one_column_batch(),
437            ])
438            .with_maxrows(MaxRows::Limited(5))
439            .run();
440        assert_snapshot!(output, @r"
441        +---+
442        | a |
443        +---+
444        | 1 |
445        | 2 |
446        | 3 |
447        | 1 |
448        | 2 |
449        | . |
450        | . |
451        | . |
452        +---+
453        ");
454    }
455
456    #[test]
457    fn test_print_batches_empty_batches() {
458        let batch = one_column_batch();
459        let empty_batch = RecordBatch::new_empty(batch.schema());
460
461        let output = PrintBatchesTest::new()
462            .with_format(PrintFormat::Table)
463            .with_batches(vec![empty_batch.clone(), batch, empty_batch])
464            .run();
465        assert_snapshot!(output, @r"
466        +---+
467        | a |
468        +---+
469        | 1 |
470        | 2 |
471        | 3 |
472        +---+
473        ");
474    }
475
476    #[test]
477    fn test_print_batches_empty_batch() {
478        let empty_batch = RecordBatch::new_empty(one_column_batch().schema());
479
480        // Print column headers for empty batch when format is Table
481        let output = PrintBatchesTest::new()
482            .with_format(PrintFormat::Table)
483            .with_schema(one_column_schema())
484            .with_batches(vec![empty_batch])
485            .with_header(WithHeader::Yes)
486            .run();
487        assert_snapshot!(output, @r"
488        +---+
489        | a |
490        +---+
491        +---+
492        ");
493
494        // No output for empty batch when schema contains no columns
495        let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
496        let output = PrintBatchesTest::new()
497            .with_format(PrintFormat::Table)
498            .with_schema(Arc::new(Schema::empty()))
499            .with_batches(vec![empty_batch])
500            .with_header(WithHeader::Yes)
501            .run();
502        assert_eq!(output, "")
503    }
504
505    #[derive(Debug)]
506    struct PrintBatchesTest {
507        format: PrintFormat,
508        schema: SchemaRef,
509        batches: Vec<RecordBatch>,
510        maxrows: MaxRows,
511        with_header: WithHeader,
512    }
513
514    /// How to test with_header
515    #[derive(Debug, Clone)]
516    enum WithHeader {
517        Yes,
518        No,
519        /// output should be the same with or without header
520        Ignored,
521    }
522
523    impl PrintBatchesTest {
524        fn new() -> Self {
525            Self {
526                format: PrintFormat::Table,
527                schema: Arc::new(Schema::empty()),
528                batches: vec![],
529                maxrows: MaxRows::Unlimited,
530                with_header: WithHeader::Ignored,
531            }
532        }
533
534        /// set the format
535        fn with_format(mut self, format: PrintFormat) -> Self {
536            self.format = format;
537            self
538        }
539
540        // set the schema
541        fn with_schema(mut self, schema: SchemaRef) -> Self {
542            self.schema = schema;
543            self
544        }
545
546        /// set the batches to convert
547        fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
548            self.batches = batches;
549            self
550        }
551
552        /// set maxrows
553        fn with_maxrows(mut self, maxrows: MaxRows) -> Self {
554            self.maxrows = maxrows;
555            self
556        }
557
558        /// set with_header
559        fn with_header(mut self, with_header: WithHeader) -> Self {
560            self.with_header = with_header;
561            self
562        }
563
564        /// run the test
565        /// formats batches using parameters and returns the resulting output
566        fn run(self) -> String {
567            match self.with_header {
568                WithHeader::Yes => self.output_with_header(true),
569                WithHeader::No => self.output_with_header(false),
570                WithHeader::Ignored => {
571                    let output = self.output_with_header(true);
572                    // ensure the output is the same without header
573                    let output_without_header = self.output_with_header(false);
574                    assert_eq!(
575                        output, output_without_header,
576                        "Expected output to be the same with or without header"
577                    );
578                    output
579                }
580            }
581        }
582
583        fn output_with_header(&self, with_header: bool) -> String {
584            let mut buffer: Vec<u8> = vec![];
585            self.format
586                .print_batches(
587                    &mut buffer,
588                    self.schema.clone(),
589                    &self.batches,
590                    self.maxrows,
591                    with_header,
592                    &FormatOptions::default(),
593                )
594                .unwrap();
595            String::from_utf8(buffer).unwrap()
596        }
597    }
598
599    /// Return a schema with three columns
600    fn three_column_schema() -> SchemaRef {
601        Arc::new(Schema::new(vec![
602            Field::new("a", DataType::Int32, false),
603            Field::new("b", DataType::Int32, false),
604            Field::new("c", DataType::Int32, false),
605        ]))
606    }
607
608    /// Return a batch with three columns and three rows
609    fn three_column_batch() -> RecordBatch {
610        RecordBatch::try_new(
611            three_column_schema(),
612            vec![
613                Arc::new(Int32Array::from(vec![1, 2, 3])),
614                Arc::new(Int32Array::from(vec![4, 5, 6])),
615                Arc::new(Int32Array::from(vec![7, 8, 9])),
616            ],
617        )
618        .unwrap()
619    }
620
621    /// Return a schema with one column
622    fn one_column_schema() -> SchemaRef {
623        Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
624    }
625
626    /// return a batch with one column and three rows
627    fn one_column_batch() -> RecordBatch {
628        RecordBatch::try_new(
629            one_column_schema(),
630            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
631        )
632        .unwrap()
633    }
634
635    /// Slice the record batch into 2 batches
636    fn split_batch(batch: RecordBatch) -> Vec<RecordBatch> {
637        assert!(batch.num_rows() > 1);
638        let split = batch.num_rows() / 2;
639        vec![
640            batch.slice(0, split),
641            batch.slice(split, batch.num_rows() - split),
642        ]
643    }
644}