datafusion_cli/
print_options.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{Display, Formatter};
19use std::io::Write;
20use std::pin::Pin;
21use std::str::FromStr;
22
23use crate::print_format::PrintFormat;
24
25use arrow::datatypes::SchemaRef;
26use arrow::record_batch::RecordBatch;
27use datafusion::common::instant::Instant;
28use datafusion::common::DataFusionError;
29use datafusion::error::Result;
30use datafusion::physical_plan::RecordBatchStream;
31
32use futures::StreamExt;
33
34#[derive(Debug, Clone, PartialEq, Copy)]
35pub enum MaxRows {
36    /// show all rows in the output
37    Unlimited,
38    /// Only show n rows
39    Limited(usize),
40}
41
42impl FromStr for MaxRows {
43    type Err = String;
44
45    fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
46        if maxrows.to_lowercase() == "inf"
47            || maxrows.to_lowercase() == "infinite"
48            || maxrows.to_lowercase() == "none"
49        {
50            Ok(Self::Unlimited)
51        } else {
52            match maxrows.parse::<usize>() {
53                Ok(nrows)  => Ok(Self::Limited(nrows)),
54                _ => Err(format!("Invalid maxrows {}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.", maxrows)),
55            }
56        }
57    }
58}
59
60impl Display for MaxRows {
61    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62        match self {
63            Self::Unlimited => write!(f, "unlimited"),
64            Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct PrintOptions {
71    pub format: PrintFormat,
72    pub quiet: bool,
73    pub maxrows: MaxRows,
74    pub color: bool,
75}
76
77// Returns the query execution details formatted
78fn get_execution_details_formatted(
79    row_count: usize,
80    maxrows: MaxRows,
81    query_start_time: Instant,
82) -> String {
83    let nrows_shown_msg = match maxrows {
84        MaxRows::Limited(nrows) if nrows < row_count => {
85            format!("(First {nrows} displayed. Use --maxrows to adjust)")
86        }
87        _ => String::new(),
88    };
89
90    format!(
91        "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
92        row_count,
93        nrows_shown_msg,
94        query_start_time.elapsed().as_secs_f64()
95    )
96}
97
98impl PrintOptions {
99    /// Print the batches to stdout using the specified format
100    pub fn print_batches(
101        &self,
102        schema: SchemaRef,
103        batches: &[RecordBatch],
104        query_start_time: Instant,
105        row_count: usize,
106    ) -> Result<()> {
107        let stdout = std::io::stdout();
108        let mut writer = stdout.lock();
109
110        self.format
111            .print_batches(&mut writer, schema, batches, self.maxrows, true)?;
112
113        let formatted_exec_details = get_execution_details_formatted(
114            row_count,
115            if self.format == PrintFormat::Table {
116                self.maxrows
117            } else {
118                MaxRows::Unlimited
119            },
120            query_start_time,
121        );
122
123        if !self.quiet {
124            writeln!(writer, "{formatted_exec_details}")?;
125        }
126
127        Ok(())
128    }
129
130    /// Print the stream to stdout using the specified format
131    pub async fn print_stream(
132        &self,
133        mut stream: Pin<Box<dyn RecordBatchStream>>,
134        query_start_time: Instant,
135    ) -> Result<()> {
136        if self.format == PrintFormat::Table {
137            return Err(DataFusionError::External(
138                "PrintFormat::Table is not implemented".to_string().into(),
139            ));
140        };
141
142        let stdout = std::io::stdout();
143        let mut writer = stdout.lock();
144
145        let mut row_count = 0_usize;
146        let mut with_header = true;
147
148        while let Some(maybe_batch) = stream.next().await {
149            let batch = maybe_batch?;
150            row_count += batch.num_rows();
151            self.format.print_batches(
152                &mut writer,
153                batch.schema(),
154                &[batch],
155                MaxRows::Unlimited,
156                with_header,
157            )?;
158            with_header = false;
159        }
160
161        let formatted_exec_details = get_execution_details_formatted(
162            row_count,
163            MaxRows::Unlimited,
164            query_start_time,
165        );
166
167        if !self.quiet {
168            writeln!(writer, "{formatted_exec_details}")?;
169        }
170
171        Ok(())
172    }
173}