datafusion_cli/
print_options.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{Display, Formatter};
19use std::io;
20use std::pin::Pin;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use crate::object_storage::instrumented::{
25    InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummaries,
26};
27use crate::print_format::PrintFormat;
28
29use arrow::datatypes::SchemaRef;
30use arrow::record_batch::RecordBatch;
31use datafusion::common::instant::Instant;
32use datafusion::common::DataFusionError;
33use datafusion::error::Result;
34use datafusion::physical_plan::RecordBatchStream;
35
36use datafusion::config::FormatOptions;
37use futures::StreamExt;
38
39#[derive(Debug, Clone, PartialEq, Copy)]
40pub enum MaxRows {
41    /// show all rows in the output
42    Unlimited,
43    /// Only show n rows
44    Limited(usize),
45}
46
47impl FromStr for MaxRows {
48    type Err = String;
49
50    fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
51        if maxrows.to_lowercase() == "inf"
52            || maxrows.to_lowercase() == "infinite"
53            || maxrows.to_lowercase() == "none"
54        {
55            Ok(Self::Unlimited)
56        } else {
57            match maxrows.parse::<usize>() {
58                Ok(nrows)  => Ok(Self::Limited(nrows)),
59                _ => Err(format!("Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.")),
60            }
61        }
62    }
63}
64
65impl Display for MaxRows {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        match self {
68            Self::Unlimited => write!(f, "unlimited"),
69            Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
70        }
71    }
72}
73
74const OBJECT_STORE_PROFILING_HEADER: &str = "Object Store Profiling";
75
76#[derive(Debug, Clone)]
77pub struct PrintOptions {
78    pub format: PrintFormat,
79    pub quiet: bool,
80    pub maxrows: MaxRows,
81    pub color: bool,
82    pub instrumented_registry: Arc<InstrumentedObjectStoreRegistry>,
83}
84
85// Returns the query execution details formatted
86fn get_execution_details_formatted(
87    row_count: usize,
88    maxrows: MaxRows,
89    query_start_time: Instant,
90) -> String {
91    let nrows_shown_msg = match maxrows {
92        MaxRows::Limited(nrows) if nrows < row_count => {
93            format!("(First {nrows} displayed. Use --maxrows to adjust)")
94        }
95        _ => String::new(),
96    };
97
98    format!(
99        "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
100        row_count,
101        nrows_shown_msg,
102        query_start_time.elapsed().as_secs_f64()
103    )
104}
105
106impl PrintOptions {
107    /// Print the batches to stdout using the specified format
108    pub fn print_batches(
109        &self,
110        schema: SchemaRef,
111        batches: &[RecordBatch],
112        query_start_time: Instant,
113        row_count: usize,
114        format_options: &FormatOptions,
115    ) -> Result<()> {
116        let stdout = std::io::stdout();
117        let mut writer = stdout.lock();
118
119        self.format.print_batches(
120            &mut writer,
121            schema,
122            batches,
123            self.maxrows,
124            true,
125            format_options,
126        )?;
127
128        let formatted_exec_details = get_execution_details_formatted(
129            row_count,
130            if self.format == PrintFormat::Table {
131                self.maxrows
132            } else {
133                MaxRows::Unlimited
134            },
135            query_start_time,
136        );
137
138        self.write_output(&mut writer, formatted_exec_details)
139    }
140
141    /// Print the stream to stdout using the specified format
142    pub async fn print_stream(
143        &self,
144        mut stream: Pin<Box<dyn RecordBatchStream>>,
145        query_start_time: Instant,
146        format_options: &FormatOptions,
147    ) -> Result<()> {
148        if self.format == PrintFormat::Table {
149            return Err(DataFusionError::External(
150                "PrintFormat::Table is not implemented".to_string().into(),
151            ));
152        };
153
154        let stdout = std::io::stdout();
155        let mut writer = stdout.lock();
156
157        let mut row_count = 0_usize;
158        let mut with_header = true;
159
160        while let Some(maybe_batch) = stream.next().await {
161            let batch = maybe_batch?;
162            row_count += batch.num_rows();
163            self.format.print_batches(
164                &mut writer,
165                batch.schema(),
166                &[batch],
167                MaxRows::Unlimited,
168                with_header,
169                format_options,
170            )?;
171            with_header = false;
172        }
173
174        let formatted_exec_details = get_execution_details_formatted(
175            row_count,
176            MaxRows::Unlimited,
177            query_start_time,
178        );
179
180        self.write_output(&mut writer, formatted_exec_details)
181    }
182
183    fn write_output<W: io::Write>(
184        &self,
185        writer: &mut W,
186        formatted_exec_details: String,
187    ) -> Result<()> {
188        if !self.quiet {
189            writeln!(writer, "{formatted_exec_details}")?;
190
191            let instrument_mode = self.instrumented_registry.instrument_mode();
192            if instrument_mode != InstrumentedObjectStoreMode::Disabled {
193                writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
194                for store in self.instrumented_registry.stores() {
195                    let requests = store.take_requests();
196
197                    if !requests.is_empty() {
198                        writeln!(writer, "{store}")?;
199                        if instrument_mode == InstrumentedObjectStoreMode::Trace {
200                            for req in requests.iter() {
201                                writeln!(writer, "{req}")?;
202                            }
203                            // Add an extra blank line to help visually organize the output
204                            writeln!(writer)?;
205                        }
206
207                        writeln!(writer, "Summaries:")?;
208                        let summaries = RequestSummaries::new(&requests);
209                        writeln!(writer, "{summaries}")?;
210                    }
211                }
212            }
213        }
214
215        Ok(())
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use datafusion::error::Result;
222
223    use super::*;
224
225    #[test]
226    fn write_output() -> Result<()> {
227        let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new());
228        let mut print_options = PrintOptions {
229            format: PrintFormat::Automatic,
230            quiet: true,
231            maxrows: MaxRows::Unlimited,
232            color: true,
233            instrumented_registry: Arc::clone(&instrumented_registry),
234        };
235
236        let mut print_output: Vec<u8> = Vec::new();
237        let exec_out = String::from("Formatted Exec Output");
238        print_options.write_output(&mut print_output, exec_out.clone())?;
239        assert!(print_output.is_empty());
240
241        print_options.quiet = false;
242        print_options.write_output(&mut print_output, exec_out.clone())?;
243        let out_str: String = print_output
244            .clone()
245            .try_into()
246            .expect("Expected successful String conversion");
247        assert!(out_str.contains(&exec_out));
248
249        // clear the previous data from the output so it doesn't pollute the next test
250        print_output.clear();
251        print_options
252            .instrumented_registry
253            .set_instrument_mode(InstrumentedObjectStoreMode::Trace);
254        print_options.write_output(&mut print_output, exec_out.clone())?;
255        let out_str: String = print_output
256            .clone()
257            .try_into()
258            .expect("Expected successful String conversion");
259        assert!(out_str.contains(&exec_out));
260        assert!(out_str.contains(OBJECT_STORE_PROFILING_HEADER));
261
262        Ok(())
263    }
264}