datafusion_cli/
print_options.rs1use std::fmt::{Display, Formatter};
19use std::io;
20use std::pin::Pin;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use crate::object_storage::instrumented::{
25 InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummaries,
26};
27use crate::print_format::PrintFormat;
28
29use arrow::datatypes::SchemaRef;
30use arrow::record_batch::RecordBatch;
31use datafusion::common::DataFusionError;
32use datafusion::common::instant::Instant;
33use datafusion::error::Result;
34use datafusion::physical_plan::RecordBatchStream;
35
36use datafusion::config::FormatOptions;
37use futures::StreamExt;
38
39#[derive(Debug, Clone, PartialEq, Copy)]
40pub enum MaxRows {
41 Unlimited,
43 Limited(usize),
45}
46
47impl FromStr for MaxRows {
48 type Err = String;
49
50 fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
51 if maxrows.to_lowercase() == "inf"
52 || maxrows.to_lowercase() == "infinite"
53 || maxrows.to_lowercase() == "none"
54 {
55 Ok(Self::Unlimited)
56 } else {
57 match maxrows.parse::<usize>() {
58 Ok(nrows) => Ok(Self::Limited(nrows)),
59 _ => Err(format!(
60 "Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit."
61 )),
62 }
63 }
64 }
65}
66
67impl Display for MaxRows {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 match self {
70 Self::Unlimited => write!(f, "unlimited"),
71 Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
72 }
73 }
74}
75
76const OBJECT_STORE_PROFILING_HEADER: &str = "Object Store Profiling";
77
78#[derive(Debug, Clone)]
79pub struct PrintOptions {
80 pub format: PrintFormat,
81 pub quiet: bool,
82 pub maxrows: MaxRows,
83 pub color: bool,
84 pub instrumented_registry: Arc<InstrumentedObjectStoreRegistry>,
85}
86
87fn get_execution_details_formatted(
89 row_count: usize,
90 maxrows: MaxRows,
91 query_start_time: Instant,
92) -> String {
93 let nrows_shown_msg = match maxrows {
94 MaxRows::Limited(nrows) if nrows < row_count => {
95 format!("(First {nrows} displayed. Use --maxrows to adjust)")
96 }
97 _ => String::new(),
98 };
99
100 format!(
101 "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
102 row_count,
103 nrows_shown_msg,
104 query_start_time.elapsed().as_secs_f64()
105 )
106}
107
108impl PrintOptions {
109 pub fn print_batches(
111 &self,
112 schema: SchemaRef,
113 batches: &[RecordBatch],
114 query_start_time: Instant,
115 row_count: usize,
116 format_options: &FormatOptions,
117 ) -> Result<()> {
118 let stdout = std::io::stdout();
119 let mut writer = stdout.lock();
120
121 self.format.print_batches(
122 &mut writer,
123 schema,
124 batches,
125 self.maxrows,
126 true,
127 format_options,
128 )?;
129
130 let formatted_exec_details = get_execution_details_formatted(
131 row_count,
132 if self.format == PrintFormat::Table {
133 self.maxrows
134 } else {
135 MaxRows::Unlimited
136 },
137 query_start_time,
138 );
139
140 self.write_output(&mut writer, formatted_exec_details)
141 }
142
143 pub async fn print_stream(
145 &self,
146 mut stream: Pin<Box<dyn RecordBatchStream>>,
147 query_start_time: Instant,
148 format_options: &FormatOptions,
149 ) -> Result<()> {
150 if self.format == PrintFormat::Table {
151 return Err(DataFusionError::External(
152 "PrintFormat::Table is not implemented".to_string().into(),
153 ));
154 };
155
156 let stdout = std::io::stdout();
157 let mut writer = stdout.lock();
158
159 let mut row_count = 0_usize;
160 let mut with_header = true;
161
162 while let Some(maybe_batch) = stream.next().await {
163 let batch = maybe_batch?;
164 row_count += batch.num_rows();
165 self.format.print_batches(
166 &mut writer,
167 batch.schema(),
168 &[batch],
169 MaxRows::Unlimited,
170 with_header,
171 format_options,
172 )?;
173 with_header = false;
174 }
175
176 let formatted_exec_details = get_execution_details_formatted(
177 row_count,
178 MaxRows::Unlimited,
179 query_start_time,
180 );
181
182 self.write_output(&mut writer, formatted_exec_details)
183 }
184
185 fn write_output<W: io::Write>(
186 &self,
187 writer: &mut W,
188 formatted_exec_details: String,
189 ) -> Result<()> {
190 if !self.quiet {
191 writeln!(writer, "{formatted_exec_details}")?;
192
193 let instrument_mode = self.instrumented_registry.instrument_mode();
194 if instrument_mode != InstrumentedObjectStoreMode::Disabled {
195 writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
196 for store in self.instrumented_registry.stores() {
197 let requests = store.take_requests();
198
199 if !requests.is_empty() {
200 writeln!(writer, "{store}")?;
201 if instrument_mode == InstrumentedObjectStoreMode::Trace {
202 for req in requests.iter() {
203 writeln!(writer, "{req}")?;
204 }
205 writeln!(writer)?;
207 }
208
209 writeln!(writer, "Summaries:")?;
210 let summaries = RequestSummaries::new(&requests);
211 writeln!(writer, "{summaries}")?;
212 }
213 }
214 }
215 }
216
217 Ok(())
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use datafusion::error::Result;
224
225 use super::*;
226
227 #[test]
228 fn write_output() -> Result<()> {
229 let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new());
230 let mut print_options = PrintOptions {
231 format: PrintFormat::Automatic,
232 quiet: true,
233 maxrows: MaxRows::Unlimited,
234 color: true,
235 instrumented_registry: Arc::clone(&instrumented_registry),
236 };
237
238 let mut print_output: Vec<u8> = Vec::new();
239 let exec_out = String::from("Formatted Exec Output");
240 print_options.write_output(&mut print_output, exec_out.clone())?;
241 assert!(print_output.is_empty());
242
243 print_options.quiet = false;
244 print_options.write_output(&mut print_output, exec_out.clone())?;
245 let out_str: String = print_output
246 .clone()
247 .try_into()
248 .expect("Expected successful String conversion");
249 assert!(out_str.contains(&exec_out));
250
251 print_output.clear();
253 print_options
254 .instrumented_registry
255 .set_instrument_mode(InstrumentedObjectStoreMode::Trace);
256 print_options.write_output(&mut print_output, exec_out.clone())?;
257 let out_str: String = print_output
258 .clone()
259 .try_into()
260 .expect("Expected successful String conversion");
261 assert!(out_str.contains(&exec_out));
262 assert!(out_str.contains(OBJECT_STORE_PROFILING_HEADER));
263
264 Ok(())
265 }
266}