datafusion_cli/
print_options.rs1use std::fmt::{Display, Formatter};
19use std::io::Write;
20use std::pin::Pin;
21use std::str::FromStr;
22
23use crate::print_format::PrintFormat;
24
25use arrow::datatypes::SchemaRef;
26use arrow::record_batch::RecordBatch;
27use datafusion::common::instant::Instant;
28use datafusion::common::DataFusionError;
29use datafusion::error::Result;
30use datafusion::physical_plan::RecordBatchStream;
31
32use futures::StreamExt;
33
34#[derive(Debug, Clone, PartialEq, Copy)]
35pub enum MaxRows {
36 Unlimited,
38 Limited(usize),
40}
41
42impl FromStr for MaxRows {
43 type Err = String;
44
45 fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
46 if maxrows.to_lowercase() == "inf"
47 || maxrows.to_lowercase() == "infinite"
48 || maxrows.to_lowercase() == "none"
49 {
50 Ok(Self::Unlimited)
51 } else {
52 match maxrows.parse::<usize>() {
53 Ok(nrows) => Ok(Self::Limited(nrows)),
54 _ => Err(format!("Invalid maxrows {}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.", maxrows)),
55 }
56 }
57 }
58}
59
60impl Display for MaxRows {
61 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62 match self {
63 Self::Unlimited => write!(f, "unlimited"),
64 Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct PrintOptions {
71 pub format: PrintFormat,
72 pub quiet: bool,
73 pub maxrows: MaxRows,
74 pub color: bool,
75}
76
77fn get_execution_details_formatted(
79 row_count: usize,
80 maxrows: MaxRows,
81 query_start_time: Instant,
82) -> String {
83 let nrows_shown_msg = match maxrows {
84 MaxRows::Limited(nrows) if nrows < row_count => {
85 format!("(First {nrows} displayed. Use --maxrows to adjust)")
86 }
87 _ => String::new(),
88 };
89
90 format!(
91 "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
92 row_count,
93 nrows_shown_msg,
94 query_start_time.elapsed().as_secs_f64()
95 )
96}
97
98impl PrintOptions {
99 pub fn print_batches(
101 &self,
102 schema: SchemaRef,
103 batches: &[RecordBatch],
104 query_start_time: Instant,
105 row_count: usize,
106 ) -> Result<()> {
107 let stdout = std::io::stdout();
108 let mut writer = stdout.lock();
109
110 self.format
111 .print_batches(&mut writer, schema, batches, self.maxrows, true)?;
112
113 let formatted_exec_details = get_execution_details_formatted(
114 row_count,
115 if self.format == PrintFormat::Table {
116 self.maxrows
117 } else {
118 MaxRows::Unlimited
119 },
120 query_start_time,
121 );
122
123 if !self.quiet {
124 writeln!(writer, "{formatted_exec_details}")?;
125 }
126
127 Ok(())
128 }
129
130 pub async fn print_stream(
132 &self,
133 mut stream: Pin<Box<dyn RecordBatchStream>>,
134 query_start_time: Instant,
135 ) -> Result<()> {
136 if self.format == PrintFormat::Table {
137 return Err(DataFusionError::External(
138 "PrintFormat::Table is not implemented".to_string().into(),
139 ));
140 };
141
142 let stdout = std::io::stdout();
143 let mut writer = stdout.lock();
144
145 let mut row_count = 0_usize;
146 let mut with_header = true;
147
148 while let Some(maybe_batch) = stream.next().await {
149 let batch = maybe_batch?;
150 row_count += batch.num_rows();
151 self.format.print_batches(
152 &mut writer,
153 batch.schema(),
154 &[batch],
155 MaxRows::Unlimited,
156 with_header,
157 )?;
158 with_header = false;
159 }
160
161 let formatted_exec_details = get_execution_details_formatted(
162 row_count,
163 MaxRows::Unlimited,
164 query_start_time,
165 );
166
167 if !self.quiet {
168 writeln!(writer, "{formatted_exec_details}")?;
169 }
170
171 Ok(())
172 }
173}