datafusion_cli/
print_options.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{Display, Formatter};
19use std::io::Write;
20use std::pin::Pin;
21use std::str::FromStr;
22
23use crate::print_format::PrintFormat;
24
25use arrow::datatypes::SchemaRef;
26use arrow::record_batch::RecordBatch;
27use datafusion::common::instant::Instant;
28use datafusion::common::DataFusionError;
29use datafusion::error::Result;
30use datafusion::physical_plan::RecordBatchStream;
31
32use datafusion::config::FormatOptions;
33use futures::StreamExt;
34
35#[derive(Debug, Clone, PartialEq, Copy)]
36pub enum MaxRows {
37    /// show all rows in the output
38    Unlimited,
39    /// Only show n rows
40    Limited(usize),
41}
42
43impl FromStr for MaxRows {
44    type Err = String;
45
46    fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
47        if maxrows.to_lowercase() == "inf"
48            || maxrows.to_lowercase() == "infinite"
49            || maxrows.to_lowercase() == "none"
50        {
51            Ok(Self::Unlimited)
52        } else {
53            match maxrows.parse::<usize>() {
54                Ok(nrows)  => Ok(Self::Limited(nrows)),
55                _ => Err(format!("Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.")),
56            }
57        }
58    }
59}
60
61impl Display for MaxRows {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        match self {
64            Self::Unlimited => write!(f, "unlimited"),
65            Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
66        }
67    }
68}
69
70#[derive(Debug, Clone)]
71pub struct PrintOptions {
72    pub format: PrintFormat,
73    pub quiet: bool,
74    pub maxrows: MaxRows,
75    pub color: bool,
76}
77
78// Returns the query execution details formatted
79fn get_execution_details_formatted(
80    row_count: usize,
81    maxrows: MaxRows,
82    query_start_time: Instant,
83) -> String {
84    let nrows_shown_msg = match maxrows {
85        MaxRows::Limited(nrows) if nrows < row_count => {
86            format!("(First {nrows} displayed. Use --maxrows to adjust)")
87        }
88        _ => String::new(),
89    };
90
91    format!(
92        "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
93        row_count,
94        nrows_shown_msg,
95        query_start_time.elapsed().as_secs_f64()
96    )
97}
98
99impl PrintOptions {
100    /// Print the batches to stdout using the specified format
101    pub fn print_batches(
102        &self,
103        schema: SchemaRef,
104        batches: &[RecordBatch],
105        query_start_time: Instant,
106        row_count: usize,
107        format_options: &FormatOptions,
108    ) -> Result<()> {
109        let stdout = std::io::stdout();
110        let mut writer = stdout.lock();
111
112        self.format.print_batches(
113            &mut writer,
114            schema,
115            batches,
116            self.maxrows,
117            true,
118            format_options,
119        )?;
120
121        let formatted_exec_details = get_execution_details_formatted(
122            row_count,
123            if self.format == PrintFormat::Table {
124                self.maxrows
125            } else {
126                MaxRows::Unlimited
127            },
128            query_start_time,
129        );
130
131        if !self.quiet {
132            writeln!(writer, "{formatted_exec_details}")?;
133        }
134
135        Ok(())
136    }
137
138    /// Print the stream to stdout using the specified format
139    pub async fn print_stream(
140        &self,
141        mut stream: Pin<Box<dyn RecordBatchStream>>,
142        query_start_time: Instant,
143        format_options: &FormatOptions,
144    ) -> Result<()> {
145        if self.format == PrintFormat::Table {
146            return Err(DataFusionError::External(
147                "PrintFormat::Table is not implemented".to_string().into(),
148            ));
149        };
150
151        let stdout = std::io::stdout();
152        let mut writer = stdout.lock();
153
154        let mut row_count = 0_usize;
155        let mut with_header = true;
156
157        while let Some(maybe_batch) = stream.next().await {
158            let batch = maybe_batch?;
159            row_count += batch.num_rows();
160            self.format.print_batches(
161                &mut writer,
162                batch.schema(),
163                &[batch],
164                MaxRows::Unlimited,
165                with_header,
166                format_options,
167            )?;
168            with_header = false;
169        }
170
171        let formatted_exec_details = get_execution_details_formatted(
172            row_count,
173            MaxRows::Unlimited,
174            query_start_time,
175        );
176
177        if !self.quiet {
178            writeln!(writer, "{formatted_exec_details}")?;
179        }
180
181        Ok(())
182    }
183}