datafusion_cli/
print_options.rs1use std::fmt::{Display, Formatter};
19use std::io;
20use std::pin::Pin;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use crate::object_storage::instrumented::{
25 InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummaries,
26};
27use crate::print_format::PrintFormat;
28
29use arrow::datatypes::SchemaRef;
30use arrow::record_batch::RecordBatch;
31use datafusion::common::instant::Instant;
32use datafusion::common::DataFusionError;
33use datafusion::error::Result;
34use datafusion::physical_plan::RecordBatchStream;
35
36use datafusion::config::FormatOptions;
37use futures::StreamExt;
38
39#[derive(Debug, Clone, PartialEq, Copy)]
40pub enum MaxRows {
41 Unlimited,
43 Limited(usize),
45}
46
47impl FromStr for MaxRows {
48 type Err = String;
49
50 fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
51 if maxrows.to_lowercase() == "inf"
52 || maxrows.to_lowercase() == "infinite"
53 || maxrows.to_lowercase() == "none"
54 {
55 Ok(Self::Unlimited)
56 } else {
57 match maxrows.parse::<usize>() {
58 Ok(nrows) => Ok(Self::Limited(nrows)),
59 _ => Err(format!("Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.")),
60 }
61 }
62 }
63}
64
65impl Display for MaxRows {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 match self {
68 Self::Unlimited => write!(f, "unlimited"),
69 Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
70 }
71 }
72}
73
74const OBJECT_STORE_PROFILING_HEADER: &str = "Object Store Profiling";
75
76#[derive(Debug, Clone)]
77pub struct PrintOptions {
78 pub format: PrintFormat,
79 pub quiet: bool,
80 pub maxrows: MaxRows,
81 pub color: bool,
82 pub instrumented_registry: Arc<InstrumentedObjectStoreRegistry>,
83}
84
85fn get_execution_details_formatted(
87 row_count: usize,
88 maxrows: MaxRows,
89 query_start_time: Instant,
90) -> String {
91 let nrows_shown_msg = match maxrows {
92 MaxRows::Limited(nrows) if nrows < row_count => {
93 format!("(First {nrows} displayed. Use --maxrows to adjust)")
94 }
95 _ => String::new(),
96 };
97
98 format!(
99 "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
100 row_count,
101 nrows_shown_msg,
102 query_start_time.elapsed().as_secs_f64()
103 )
104}
105
106impl PrintOptions {
107 pub fn print_batches(
109 &self,
110 schema: SchemaRef,
111 batches: &[RecordBatch],
112 query_start_time: Instant,
113 row_count: usize,
114 format_options: &FormatOptions,
115 ) -> Result<()> {
116 let stdout = std::io::stdout();
117 let mut writer = stdout.lock();
118
119 self.format.print_batches(
120 &mut writer,
121 schema,
122 batches,
123 self.maxrows,
124 true,
125 format_options,
126 )?;
127
128 let formatted_exec_details = get_execution_details_formatted(
129 row_count,
130 if self.format == PrintFormat::Table {
131 self.maxrows
132 } else {
133 MaxRows::Unlimited
134 },
135 query_start_time,
136 );
137
138 self.write_output(&mut writer, formatted_exec_details)
139 }
140
141 pub async fn print_stream(
143 &self,
144 mut stream: Pin<Box<dyn RecordBatchStream>>,
145 query_start_time: Instant,
146 format_options: &FormatOptions,
147 ) -> Result<()> {
148 if self.format == PrintFormat::Table {
149 return Err(DataFusionError::External(
150 "PrintFormat::Table is not implemented".to_string().into(),
151 ));
152 };
153
154 let stdout = std::io::stdout();
155 let mut writer = stdout.lock();
156
157 let mut row_count = 0_usize;
158 let mut with_header = true;
159
160 while let Some(maybe_batch) = stream.next().await {
161 let batch = maybe_batch?;
162 row_count += batch.num_rows();
163 self.format.print_batches(
164 &mut writer,
165 batch.schema(),
166 &[batch],
167 MaxRows::Unlimited,
168 with_header,
169 format_options,
170 )?;
171 with_header = false;
172 }
173
174 let formatted_exec_details = get_execution_details_formatted(
175 row_count,
176 MaxRows::Unlimited,
177 query_start_time,
178 );
179
180 self.write_output(&mut writer, formatted_exec_details)
181 }
182
183 fn write_output<W: io::Write>(
184 &self,
185 writer: &mut W,
186 formatted_exec_details: String,
187 ) -> Result<()> {
188 if !self.quiet {
189 writeln!(writer, "{formatted_exec_details}")?;
190
191 let instrument_mode = self.instrumented_registry.instrument_mode();
192 if instrument_mode != InstrumentedObjectStoreMode::Disabled {
193 writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
194 for store in self.instrumented_registry.stores() {
195 let requests = store.take_requests();
196
197 if !requests.is_empty() {
198 writeln!(writer, "{store}")?;
199 if instrument_mode == InstrumentedObjectStoreMode::Trace {
200 for req in requests.iter() {
201 writeln!(writer, "{req}")?;
202 }
203 writeln!(writer)?;
205 }
206
207 writeln!(writer, "Summaries:")?;
208 let summaries = RequestSummaries::new(&requests);
209 writeln!(writer, "{summaries}")?;
210 }
211 }
212 }
213 }
214
215 Ok(())
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use datafusion::error::Result;
222
223 use super::*;
224
225 #[test]
226 fn write_output() -> Result<()> {
227 let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new());
228 let mut print_options = PrintOptions {
229 format: PrintFormat::Automatic,
230 quiet: true,
231 maxrows: MaxRows::Unlimited,
232 color: true,
233 instrumented_registry: Arc::clone(&instrumented_registry),
234 };
235
236 let mut print_output: Vec<u8> = Vec::new();
237 let exec_out = String::from("Formatted Exec Output");
238 print_options.write_output(&mut print_output, exec_out.clone())?;
239 assert!(print_output.is_empty());
240
241 print_options.quiet = false;
242 print_options.write_output(&mut print_output, exec_out.clone())?;
243 let out_str: String = print_output
244 .clone()
245 .try_into()
246 .expect("Expected successful String conversion");
247 assert!(out_str.contains(&exec_out));
248
249 print_output.clear();
251 print_options
252 .instrumented_registry
253 .set_instrument_mode(InstrumentedObjectStoreMode::Trace);
254 print_options.write_output(&mut print_output, exec_out.clone())?;
255 let out_str: String = print_output
256 .clone()
257 .try_into()
258 .expect("Expected successful String conversion");
259 assert!(out_str.contains(&exec_out));
260 assert!(out_str.contains(OBJECT_STORE_PROFILING_HEADER));
261
262 Ok(())
263 }
264}