datafusion_cli/
print_options.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{Display, Formatter};
19use std::io;
20use std::pin::Pin;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use crate::object_storage::instrumented::{
25    InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummaries,
26};
27use crate::print_format::PrintFormat;
28
29use arrow::datatypes::SchemaRef;
30use arrow::record_batch::RecordBatch;
31use datafusion::common::DataFusionError;
32use datafusion::common::instant::Instant;
33use datafusion::error::Result;
34use datafusion::physical_plan::RecordBatchStream;
35
36use datafusion::config::FormatOptions;
37use futures::StreamExt;
38
39#[derive(Debug, Clone, PartialEq, Copy)]
40pub enum MaxRows {
41    /// show all rows in the output
42    Unlimited,
43    /// Only show n rows
44    Limited(usize),
45}
46
47impl FromStr for MaxRows {
48    type Err = String;
49
50    fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
51        if maxrows.to_lowercase() == "inf"
52            || maxrows.to_lowercase() == "infinite"
53            || maxrows.to_lowercase() == "none"
54        {
55            Ok(Self::Unlimited)
56        } else {
57            match maxrows.parse::<usize>() {
58                Ok(nrows) => Ok(Self::Limited(nrows)),
59                _ => Err(format!(
60                    "Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit."
61                )),
62            }
63        }
64    }
65}
66
67impl Display for MaxRows {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        match self {
70            Self::Unlimited => write!(f, "unlimited"),
71            Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
72        }
73    }
74}
75
76const OBJECT_STORE_PROFILING_HEADER: &str = "Object Store Profiling";
77
78#[derive(Debug, Clone)]
79pub struct PrintOptions {
80    pub format: PrintFormat,
81    pub quiet: bool,
82    pub maxrows: MaxRows,
83    pub color: bool,
84    pub instrumented_registry: Arc<InstrumentedObjectStoreRegistry>,
85}
86
87// Returns the query execution details formatted
88fn get_execution_details_formatted(
89    row_count: usize,
90    maxrows: MaxRows,
91    query_start_time: Instant,
92) -> String {
93    let nrows_shown_msg = match maxrows {
94        MaxRows::Limited(nrows) if nrows < row_count => {
95            format!("(First {nrows} displayed. Use --maxrows to adjust)")
96        }
97        _ => String::new(),
98    };
99
100    format!(
101        "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
102        row_count,
103        nrows_shown_msg,
104        query_start_time.elapsed().as_secs_f64()
105    )
106}
107
108impl PrintOptions {
109    /// Print the batches to stdout using the specified format
110    pub fn print_batches(
111        &self,
112        schema: SchemaRef,
113        batches: &[RecordBatch],
114        query_start_time: Instant,
115        row_count: usize,
116        format_options: &FormatOptions,
117    ) -> Result<()> {
118        let stdout = std::io::stdout();
119        let mut writer = stdout.lock();
120
121        self.format.print_batches(
122            &mut writer,
123            schema,
124            batches,
125            self.maxrows,
126            true,
127            format_options,
128        )?;
129
130        let formatted_exec_details = get_execution_details_formatted(
131            row_count,
132            if self.format == PrintFormat::Table {
133                self.maxrows
134            } else {
135                MaxRows::Unlimited
136            },
137            query_start_time,
138        );
139
140        self.write_output(&mut writer, formatted_exec_details)
141    }
142
143    /// Print the stream to stdout using the specified format
144    pub async fn print_stream(
145        &self,
146        mut stream: Pin<Box<dyn RecordBatchStream>>,
147        query_start_time: Instant,
148        format_options: &FormatOptions,
149    ) -> Result<()> {
150        if self.format == PrintFormat::Table {
151            return Err(DataFusionError::External(
152                "PrintFormat::Table is not implemented".to_string().into(),
153            ));
154        };
155
156        let stdout = std::io::stdout();
157        let mut writer = stdout.lock();
158
159        let mut row_count = 0_usize;
160        let mut with_header = true;
161
162        while let Some(maybe_batch) = stream.next().await {
163            let batch = maybe_batch?;
164            row_count += batch.num_rows();
165            self.format.print_batches(
166                &mut writer,
167                batch.schema(),
168                &[batch],
169                MaxRows::Unlimited,
170                with_header,
171                format_options,
172            )?;
173            with_header = false;
174        }
175
176        let formatted_exec_details = get_execution_details_formatted(
177            row_count,
178            MaxRows::Unlimited,
179            query_start_time,
180        );
181
182        self.write_output(&mut writer, formatted_exec_details)
183    }
184
185    fn write_output<W: io::Write>(
186        &self,
187        writer: &mut W,
188        formatted_exec_details: String,
189    ) -> Result<()> {
190        if !self.quiet {
191            writeln!(writer, "{formatted_exec_details}")?;
192
193            let instrument_mode = self.instrumented_registry.instrument_mode();
194            if instrument_mode != InstrumentedObjectStoreMode::Disabled {
195                writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
196                for store in self.instrumented_registry.stores() {
197                    let requests = store.take_requests();
198
199                    if !requests.is_empty() {
200                        writeln!(writer, "{store}")?;
201                        if instrument_mode == InstrumentedObjectStoreMode::Trace {
202                            for req in requests.iter() {
203                                writeln!(writer, "{req}")?;
204                            }
205                            // Add an extra blank line to help visually organize the output
206                            writeln!(writer)?;
207                        }
208
209                        writeln!(writer, "Summaries:")?;
210                        let summaries = RequestSummaries::new(&requests);
211                        writeln!(writer, "{summaries}")?;
212                    }
213                }
214            }
215        }
216
217        Ok(())
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use datafusion::error::Result;
224
225    use super::*;
226
227    #[test]
228    fn write_output() -> Result<()> {
229        let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new());
230        let mut print_options = PrintOptions {
231            format: PrintFormat::Automatic,
232            quiet: true,
233            maxrows: MaxRows::Unlimited,
234            color: true,
235            instrumented_registry: Arc::clone(&instrumented_registry),
236        };
237
238        let mut print_output: Vec<u8> = Vec::new();
239        let exec_out = String::from("Formatted Exec Output");
240        print_options.write_output(&mut print_output, exec_out.clone())?;
241        assert!(print_output.is_empty());
242
243        print_options.quiet = false;
244        print_options.write_output(&mut print_output, exec_out.clone())?;
245        let out_str: String = print_output
246            .clone()
247            .try_into()
248            .expect("Expected successful String conversion");
249        assert!(out_str.contains(&exec_out));
250
251        // clear the previous data from the output so it doesn't pollute the next test
252        print_output.clear();
253        print_options
254            .instrumented_registry
255            .set_instrument_mode(InstrumentedObjectStoreMode::Trace);
256        print_options.write_output(&mut print_output, exec_out.clone())?;
257        let out_str: String = print_output
258            .clone()
259            .try_into()
260            .expect("Expected successful String conversion");
261        assert!(out_str.contains(&exec_out));
262        assert!(out_str.contains(OBJECT_STORE_PROFILING_HEADER));
263
264        Ok(())
265    }
266}