datafusion_cli/
print_options.rs1use std::fmt::{Display, Formatter};
19use std::io::Write;
20use std::pin::Pin;
21use std::str::FromStr;
22
23use crate::print_format::PrintFormat;
24
25use arrow::datatypes::SchemaRef;
26use arrow::record_batch::RecordBatch;
27use datafusion::common::instant::Instant;
28use datafusion::common::DataFusionError;
29use datafusion::error::Result;
30use datafusion::physical_plan::RecordBatchStream;
31
32use datafusion::config::FormatOptions;
33use futures::StreamExt;
34
35#[derive(Debug, Clone, PartialEq, Copy)]
36pub enum MaxRows {
37 Unlimited,
39 Limited(usize),
41}
42
43impl FromStr for MaxRows {
44 type Err = String;
45
46 fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
47 if maxrows.to_lowercase() == "inf"
48 || maxrows.to_lowercase() == "infinite"
49 || maxrows.to_lowercase() == "none"
50 {
51 Ok(Self::Unlimited)
52 } else {
53 match maxrows.parse::<usize>() {
54 Ok(nrows) => Ok(Self::Limited(nrows)),
55 _ => Err(format!("Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.")),
56 }
57 }
58 }
59}
60
61impl Display for MaxRows {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 match self {
64 Self::Unlimited => write!(f, "unlimited"),
65 Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
66 }
67 }
68}
69
70#[derive(Debug, Clone)]
71pub struct PrintOptions {
72 pub format: PrintFormat,
73 pub quiet: bool,
74 pub maxrows: MaxRows,
75 pub color: bool,
76}
77
78fn get_execution_details_formatted(
80 row_count: usize,
81 maxrows: MaxRows,
82 query_start_time: Instant,
83) -> String {
84 let nrows_shown_msg = match maxrows {
85 MaxRows::Limited(nrows) if nrows < row_count => {
86 format!("(First {nrows} displayed. Use --maxrows to adjust)")
87 }
88 _ => String::new(),
89 };
90
91 format!(
92 "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
93 row_count,
94 nrows_shown_msg,
95 query_start_time.elapsed().as_secs_f64()
96 )
97}
98
99impl PrintOptions {
100 pub fn print_batches(
102 &self,
103 schema: SchemaRef,
104 batches: &[RecordBatch],
105 query_start_time: Instant,
106 row_count: usize,
107 format_options: &FormatOptions,
108 ) -> Result<()> {
109 let stdout = std::io::stdout();
110 let mut writer = stdout.lock();
111
112 self.format.print_batches(
113 &mut writer,
114 schema,
115 batches,
116 self.maxrows,
117 true,
118 format_options,
119 )?;
120
121 let formatted_exec_details = get_execution_details_formatted(
122 row_count,
123 if self.format == PrintFormat::Table {
124 self.maxrows
125 } else {
126 MaxRows::Unlimited
127 },
128 query_start_time,
129 );
130
131 if !self.quiet {
132 writeln!(writer, "{formatted_exec_details}")?;
133 }
134
135 Ok(())
136 }
137
138 pub async fn print_stream(
140 &self,
141 mut stream: Pin<Box<dyn RecordBatchStream>>,
142 query_start_time: Instant,
143 format_options: &FormatOptions,
144 ) -> Result<()> {
145 if self.format == PrintFormat::Table {
146 return Err(DataFusionError::External(
147 "PrintFormat::Table is not implemented".to_string().into(),
148 ));
149 };
150
151 let stdout = std::io::stdout();
152 let mut writer = stdout.lock();
153
154 let mut row_count = 0_usize;
155 let mut with_header = true;
156
157 while let Some(maybe_batch) = stream.next().await {
158 let batch = maybe_batch?;
159 row_count += batch.num_rows();
160 self.format.print_batches(
161 &mut writer,
162 batch.schema(),
163 &[batch],
164 MaxRows::Unlimited,
165 with_header,
166 format_options,
167 )?;
168 with_header = false;
169 }
170
171 let formatted_exec_details = get_execution_details_formatted(
172 row_count,
173 MaxRows::Unlimited,
174 query_start_time,
175 );
176
177 if !self.quiet {
178 writeln!(writer, "{formatted_exec_details}")?;
179 }
180
181 Ok(())
182 }
183}