datafusion_cli/
print_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Print format variants
19
20use std::str::FromStr;
21
22use crate::print_options::MaxRows;
23
24use arrow::csv::writer::WriterBuilder;
25use arrow::datatypes::SchemaRef;
26use arrow::json::{ArrayWriter, LineDelimitedWriter};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches_with_options;
29use datafusion::config::FormatOptions;
30use datafusion::error::Result;
31
32/// Allow records to be printed in different formats
33#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)]
34pub enum PrintFormat {
35    Csv,
36    Tsv,
37    Table,
38    Json,
39    NdJson,
40    Automatic,
41}
42
43impl FromStr for PrintFormat {
44    type Err = String;
45
46    fn from_str(s: &str) -> Result<Self, Self::Err> {
47        clap::ValueEnum::from_str(s, true)
48    }
49}
50
51macro_rules! batches_to_json {
52    ($WRITER: ident, $writer: expr, $batches: expr) => {{
53        {
54            if !$batches.is_empty() {
55                let mut json_writer = $WRITER::new(&mut *$writer);
56                for batch in $batches {
57                    json_writer.write(batch)?;
58                }
59                json_writer.finish()?;
60                json_finish!($WRITER, $writer);
61            }
62        }
63        Ok(()) as Result<()>
64    }};
65}
66
67macro_rules! json_finish {
68    (ArrayWriter, $writer: expr) => {{
69        writeln!($writer)?;
70    }};
71    (LineDelimitedWriter, $writer: expr) => {{}};
72}
73
74fn print_batches_with_sep<W: std::io::Write>(
75    writer: &mut W,
76    batches: &[RecordBatch],
77    delimiter: u8,
78    with_header: bool,
79) -> Result<()> {
80    let builder = WriterBuilder::new()
81        .with_header(with_header)
82        .with_delimiter(delimiter);
83    let mut csv_writer = builder.build(writer);
84
85    for batch in batches {
86        csv_writer.write(batch)?;
87    }
88
89    Ok(())
90}
91
92fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
93    let lines: Vec<String> = s.lines().map(String::from).collect();
94
95    assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border
96
97    let last_line = &lines[lines.len() - 1]; // bottom border line
98
99    let spaces = last_line.len().saturating_sub(4);
100    let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
101
102    let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and `maxrows` lines
103    result.extend(vec![dotted_line; 3]); // Append ... lines
104    result.push(last_line.clone());
105
106    result.join("\n")
107}
108
109fn format_batches_with_maxrows<W: std::io::Write>(
110    writer: &mut W,
111    batches: &[RecordBatch],
112    maxrows: MaxRows,
113    format_options: &FormatOptions,
114) -> Result<()> {
115    let options: arrow::util::display::FormatOptions = format_options.try_into()?;
116
117    match maxrows {
118        MaxRows::Limited(maxrows) => {
119            // Filter batches to meet the maxrows condition
120            let mut filtered_batches = Vec::new();
121            let mut row_count: usize = 0;
122            let mut over_limit = false;
123            for batch in batches {
124                if row_count + batch.num_rows() > maxrows {
125                    // If adding this batch exceeds maxrows, slice the batch
126                    let limit = maxrows - row_count;
127                    let sliced_batch = batch.slice(0, limit);
128                    filtered_batches.push(sliced_batch);
129                    over_limit = true;
130                    break;
131                } else {
132                    filtered_batches.push(batch.clone());
133                    row_count += batch.num_rows();
134                }
135            }
136
137            let formatted =
138                pretty_format_batches_with_options(&filtered_batches, &options)?;
139            if over_limit {
140                let mut formatted_str = format!("{formatted}");
141                formatted_str = keep_only_maxrows(&formatted_str, maxrows);
142                writeln!(writer, "{formatted_str}")?;
143            } else {
144                writeln!(writer, "{formatted}")?;
145            }
146        }
147        MaxRows::Unlimited => {
148            let formatted = pretty_format_batches_with_options(batches, &options)?;
149            writeln!(writer, "{formatted}")?;
150        }
151    }
152
153    Ok(())
154}
155
156impl PrintFormat {
157    /// Print the batches to a writer using the specified format
158    pub fn print_batches<W: std::io::Write>(
159        &self,
160        writer: &mut W,
161        schema: SchemaRef,
162        batches: &[RecordBatch],
163        maxrows: MaxRows,
164        with_header: bool,
165        format_options: &FormatOptions,
166    ) -> Result<()> {
167        // filter out any empty batches
168        let batches: Vec<_> = batches
169            .iter()
170            .filter(|b| b.num_rows() > 0)
171            .cloned()
172            .collect();
173        if batches.is_empty() {
174            return self.print_empty(writer, schema, format_options);
175        }
176
177        match self {
178            Self::Csv | Self::Automatic => {
179                print_batches_with_sep(writer, &batches, b',', with_header)
180            }
181            Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header),
182            Self::Table => {
183                if maxrows == MaxRows::Limited(0) {
184                    return Ok(());
185                }
186                format_batches_with_maxrows(writer, &batches, maxrows, format_options)
187            }
188            Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
189            Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
190        }
191    }
192
193    /// Print when the result batches contain no rows
194    fn print_empty<W: std::io::Write>(
195        &self,
196        writer: &mut W,
197        schema: SchemaRef,
198        format_options: &FormatOptions,
199    ) -> Result<()> {
200        match self {
201            // Print column headers for Table format
202            Self::Table if !schema.fields().is_empty() => {
203                let format_options: arrow::util::display::FormatOptions =
204                    format_options.try_into()?;
205
206                let empty_batch = RecordBatch::new_empty(schema);
207                let formatted =
208                    pretty_format_batches_with_options(&[empty_batch], &format_options)?;
209                writeln!(writer, "{formatted}")?;
210            }
211            _ => {}
212        }
213        Ok(())
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use std::sync::Arc;
221
222    use arrow::array::Int32Array;
223    use arrow::datatypes::{DataType, Field, Schema};
224    use insta::{allow_duplicates, assert_snapshot};
225
226    #[test]
227    fn print_empty() {
228        for format in [
229            PrintFormat::Csv,
230            PrintFormat::Tsv,
231            PrintFormat::Json,
232            PrintFormat::NdJson,
233            PrintFormat::Automatic,
234        ] {
235            // no output for empty batches, even with header set
236            let output = PrintBatchesTest::new()
237                .with_format(format)
238                .with_schema(three_column_schema())
239                .with_batches(vec![])
240                .run();
241            assert_eq!(output, "")
242        }
243
244        // output column headers for empty batches when format is Table
245        let output = PrintBatchesTest::new()
246            .with_format(PrintFormat::Table)
247            .with_schema(three_column_schema())
248            .with_batches(vec![])
249            .run();
250        assert_snapshot!(output, @r#"
251        +---+---+---+
252        | a | b | c |
253        +---+---+---+
254        +---+---+---+
255        "#);
256    }
257
258    #[test]
259    fn print_csv_no_header() {
260        let output = PrintBatchesTest::new()
261            .with_format(PrintFormat::Csv)
262            .with_batches(split_batch(three_column_batch()))
263            .with_header(WithHeader::No)
264            .run();
265        assert_snapshot!(output, @r#"
266        1,4,7
267        2,5,8
268        3,6,9
269        "#);
270    }
271
272    #[test]
273    fn print_csv_with_header() {
274        let output = PrintBatchesTest::new()
275            .with_format(PrintFormat::Csv)
276            .with_batches(split_batch(three_column_batch()))
277            .with_header(WithHeader::Yes)
278            .run();
279        assert_snapshot!(output, @r#"
280        a,b,c
281        1,4,7
282        2,5,8
283        3,6,9
284        "#);
285    }
286
287    #[test]
288    fn print_tsv_no_header() {
289        let output = PrintBatchesTest::new()
290            .with_format(PrintFormat::Tsv)
291            .with_batches(split_batch(three_column_batch()))
292            .with_header(WithHeader::No)
293            .run();
294        assert_snapshot!(output, @"
295        1\t4\t7
296        2\t5\t8
297        3\t6\t9
298        ")
299    }
300
301    #[test]
302    fn print_tsv_with_header() {
303        let output = PrintBatchesTest::new()
304            .with_format(PrintFormat::Tsv)
305            .with_batches(split_batch(three_column_batch()))
306            .with_header(WithHeader::Yes)
307            .run();
308        assert_snapshot!(output, @"
309        a\tb\tc
310        1\t4\t7
311        2\t5\t8
312        3\t6\t9
313        ");
314    }
315
316    #[test]
317    fn print_table() {
318        let output = PrintBatchesTest::new()
319            .with_format(PrintFormat::Table)
320            .with_batches(split_batch(three_column_batch()))
321            .with_header(WithHeader::Ignored)
322            .run();
323        assert_snapshot!(output, @r#"
324        +---+---+---+
325        | a | b | c |
326        +---+---+---+
327        | 1 | 4 | 7 |
328        | 2 | 5 | 8 |
329        | 3 | 6 | 9 |
330        +---+---+---+
331        "#);
332    }
333    #[test]
334    fn print_json() {
335        let output = PrintBatchesTest::new()
336            .with_format(PrintFormat::Json)
337            .with_batches(split_batch(three_column_batch()))
338            .with_header(WithHeader::Ignored)
339            .run();
340        assert_snapshot!(output, @r#"
341        [{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]
342        "#);
343    }
344
345    #[test]
346    fn print_ndjson() {
347        let output = PrintBatchesTest::new()
348            .with_format(PrintFormat::NdJson)
349            .with_batches(split_batch(three_column_batch()))
350            .with_header(WithHeader::Ignored)
351            .run();
352        assert_snapshot!(output, @r#"
353        {"a":1,"b":4,"c":7}
354        {"a":2,"b":5,"c":8}
355        {"a":3,"b":6,"c":9}
356        "#);
357    }
358
359    #[test]
360    fn print_automatic_no_header() {
361        let output = PrintBatchesTest::new()
362            .with_format(PrintFormat::Automatic)
363            .with_batches(split_batch(three_column_batch()))
364            .with_header(WithHeader::No)
365            .run();
366        assert_snapshot!(output, @r#"
367        1,4,7
368        2,5,8
369        3,6,9
370        "#);
371    }
372    #[test]
373    fn print_automatic_with_header() {
374        let output = PrintBatchesTest::new()
375            .with_format(PrintFormat::Automatic)
376            .with_batches(split_batch(three_column_batch()))
377            .with_header(WithHeader::Yes)
378            .run();
379        assert_snapshot!(output, @r#"
380        a,b,c
381        1,4,7
382        2,5,8
383        3,6,9
384        "#);
385    }
386
387    #[test]
388    fn print_maxrows_unlimited() {
389        // should print out entire output with no truncation if unlimited or
390        // limit greater than number of batches or equal to the number of batches
391        for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
392            let output = PrintBatchesTest::new()
393                .with_format(PrintFormat::Table)
394                .with_schema(one_column_schema())
395                .with_batches(vec![one_column_batch()])
396                .with_maxrows(max_rows)
397                .run();
398            allow_duplicates! {
399                assert_snapshot!(output, @r#"
400                +---+
401                | a |
402                +---+
403                | 1 |
404                | 2 |
405                | 3 |
406                +---+
407                "#);
408            }
409        }
410    }
411
412    #[test]
413    fn print_maxrows_limited_one_batch() {
414        let output = PrintBatchesTest::new()
415            .with_format(PrintFormat::Table)
416            .with_batches(vec![one_column_batch()])
417            .with_maxrows(MaxRows::Limited(1))
418            .run();
419        assert_snapshot!(output, @r#"
420        +---+
421        | a |
422        +---+
423        | 1 |
424        | . |
425        | . |
426        | . |
427        +---+
428        "#);
429    }
430
431    #[test]
432    fn print_maxrows_limited_multi_batched() {
433        let output = PrintBatchesTest::new()
434            .with_format(PrintFormat::Table)
435            .with_batches(vec![
436                one_column_batch(),
437                one_column_batch(),
438                one_column_batch(),
439            ])
440            .with_maxrows(MaxRows::Limited(5))
441            .run();
442        assert_snapshot!(output, @r#"
443        +---+
444        | a |
445        +---+
446        | 1 |
447        | 2 |
448        | 3 |
449        | 1 |
450        | 2 |
451        | . |
452        | . |
453        | . |
454        +---+
455        "#);
456    }
457
458    #[test]
459    fn test_print_batches_empty_batches() {
460        let batch = one_column_batch();
461        let empty_batch = RecordBatch::new_empty(batch.schema());
462
463        let output = PrintBatchesTest::new()
464            .with_format(PrintFormat::Table)
465            .with_batches(vec![empty_batch.clone(), batch, empty_batch])
466            .run();
467        assert_snapshot!(output, @r#"
468        +---+
469        | a |
470        +---+
471        | 1 |
472        | 2 |
473        | 3 |
474        +---+
475        "#);
476    }
477
478    #[test]
479    fn test_print_batches_empty_batch() {
480        let empty_batch = RecordBatch::new_empty(one_column_batch().schema());
481
482        // Print column headers for empty batch when format is Table
483        let output = PrintBatchesTest::new()
484            .with_format(PrintFormat::Table)
485            .with_schema(one_column_schema())
486            .with_batches(vec![empty_batch])
487            .with_header(WithHeader::Yes)
488            .run();
489        assert_snapshot!(output, @r#"
490        +---+
491        | a |
492        +---+
493        +---+
494        "#);
495
496        // No output for empty batch when schema contains no columns
497        let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
498        let output = PrintBatchesTest::new()
499            .with_format(PrintFormat::Table)
500            .with_schema(Arc::new(Schema::empty()))
501            .with_batches(vec![empty_batch])
502            .with_header(WithHeader::Yes)
503            .run();
504        assert_eq!(output, "")
505    }
506
507    #[derive(Debug)]
508    struct PrintBatchesTest {
509        format: PrintFormat,
510        schema: SchemaRef,
511        batches: Vec<RecordBatch>,
512        maxrows: MaxRows,
513        with_header: WithHeader,
514    }
515
516    /// How to test with_header
517    #[derive(Debug, Clone)]
518    enum WithHeader {
519        Yes,
520        No,
521        /// output should be the same with or without header
522        Ignored,
523    }
524
525    impl PrintBatchesTest {
526        fn new() -> Self {
527            Self {
528                format: PrintFormat::Table,
529                schema: Arc::new(Schema::empty()),
530                batches: vec![],
531                maxrows: MaxRows::Unlimited,
532                with_header: WithHeader::Ignored,
533            }
534        }
535
536        /// set the format
537        fn with_format(mut self, format: PrintFormat) -> Self {
538            self.format = format;
539            self
540        }
541
542        // set the schema
543        fn with_schema(mut self, schema: SchemaRef) -> Self {
544            self.schema = schema;
545            self
546        }
547
548        /// set the batches to convert
549        fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
550            self.batches = batches;
551            self
552        }
553
554        /// set maxrows
555        fn with_maxrows(mut self, maxrows: MaxRows) -> Self {
556            self.maxrows = maxrows;
557            self
558        }
559
560        /// set with_header
561        fn with_header(mut self, with_header: WithHeader) -> Self {
562            self.with_header = with_header;
563            self
564        }
565
566        /// run the test
567        /// formats batches using parameters and returns the resulting output
568        fn run(self) -> String {
569            match self.with_header {
570                WithHeader::Yes => self.output_with_header(true),
571                WithHeader::No => self.output_with_header(false),
572                WithHeader::Ignored => {
573                    let output = self.output_with_header(true);
574                    // ensure the output is the same without header
575                    let output_without_header = self.output_with_header(false);
576                    assert_eq!(
577                        output, output_without_header,
578                        "Expected output to be the same with or without header"
579                    );
580                    output
581                }
582            }
583        }
584
585        fn output_with_header(&self, with_header: bool) -> String {
586            let mut buffer: Vec<u8> = vec![];
587            self.format
588                .print_batches(
589                    &mut buffer,
590                    self.schema.clone(),
591                    &self.batches,
592                    self.maxrows,
593                    with_header,
594                    &FormatOptions::default(),
595                )
596                .unwrap();
597            String::from_utf8(buffer).unwrap()
598        }
599    }
600
601    /// Return a schema with three columns
602    fn three_column_schema() -> SchemaRef {
603        Arc::new(Schema::new(vec![
604            Field::new("a", DataType::Int32, false),
605            Field::new("b", DataType::Int32, false),
606            Field::new("c", DataType::Int32, false),
607        ]))
608    }
609
610    /// Return a batch with three columns and three rows
611    fn three_column_batch() -> RecordBatch {
612        RecordBatch::try_new(
613            three_column_schema(),
614            vec![
615                Arc::new(Int32Array::from(vec![1, 2, 3])),
616                Arc::new(Int32Array::from(vec![4, 5, 6])),
617                Arc::new(Int32Array::from(vec![7, 8, 9])),
618            ],
619        )
620        .unwrap()
621    }
622
623    /// Return a schema with one column
624    fn one_column_schema() -> SchemaRef {
625        Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
626    }
627
628    /// return a batch with one column and three rows
629    fn one_column_batch() -> RecordBatch {
630        RecordBatch::try_new(
631            one_column_schema(),
632            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
633        )
634        .unwrap()
635    }
636
637    /// Slice the record batch into 2 batches
638    fn split_batch(batch: RecordBatch) -> Vec<RecordBatch> {
639        assert!(batch.num_rows() > 1);
640        let split = batch.num_rows() / 2;
641        vec![
642            batch.slice(0, split),
643            batch.slice(split, batch.num_rows() - split),
644        ]
645    }
646}