use std::fmt::{Display, Formatter};
use std::io::Write;
use std::pin::Pin;
use std::str::FromStr;
use std::time::Instant;
use crate::print_format::PrintFormat;
use arrow::record_batch::RecordBatch;
use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::physical_plan::RecordBatchStream;
use futures::StreamExt;
#[derive(Debug, Clone, PartialEq, Copy)]
pub enum MaxRows {
Unlimited,
Limited(usize),
}
impl FromStr for MaxRows {
type Err = String;
fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
if maxrows.to_lowercase() == "inf"
|| maxrows.to_lowercase() == "infinite"
|| maxrows.to_lowercase() == "none"
{
Ok(Self::Unlimited)
} else {
match maxrows.parse::<usize>() {
Ok(nrows) => Ok(Self::Limited(nrows)),
_ => Err(format!("Invalid maxrows {}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.", maxrows)),
}
}
}
}
impl Display for MaxRows {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unlimited => write!(f, "unlimited"),
Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
}
}
}
#[derive(Debug, Clone)]
pub struct PrintOptions {
pub format: PrintFormat,
pub quiet: bool,
pub maxrows: MaxRows,
}
fn get_timing_info_str(
row_count: usize,
maxrows: MaxRows,
query_start_time: Instant,
) -> String {
let row_word = if row_count == 1 { "row" } else { "rows" };
let nrows_shown_msg = match maxrows {
MaxRows::Limited(nrows) if nrows < row_count => format!(" ({} shown)", nrows),
_ => String::new(),
};
format!(
"{} {} in set{}. Query took {:.3} seconds.\n",
row_count,
row_word,
nrows_shown_msg,
query_start_time.elapsed().as_secs_f64()
)
}
impl PrintOptions {
pub fn print_batches(
&self,
batches: &[RecordBatch],
query_start_time: Instant,
) -> Result<()> {
let stdout = std::io::stdout();
let mut writer = stdout.lock();
self.format
.print_batches(&mut writer, batches, self.maxrows, true)?;
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
let timing_info = get_timing_info_str(
row_count,
if self.format == PrintFormat::Table {
self.maxrows
} else {
MaxRows::Unlimited
},
query_start_time,
);
if !self.quiet {
writeln!(writer, "{timing_info}")?;
}
Ok(())
}
pub async fn print_stream(
&self,
mut stream: Pin<Box<dyn RecordBatchStream>>,
query_start_time: Instant,
) -> Result<()> {
if self.format == PrintFormat::Table {
return Err(DataFusionError::External(
"PrintFormat::Table is not implemented".to_string().into(),
));
};
let stdout = std::io::stdout();
let mut writer = stdout.lock();
let mut row_count = 0_usize;
let mut with_header = true;
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
row_count += batch.num_rows();
self.format.print_batches(
&mut writer,
&[batch],
MaxRows::Unlimited,
with_header,
)?;
with_header = false;
}
let timing_info =
get_timing_info_str(row_count, MaxRows::Unlimited, query_start_time);
if !self.quiet {
writeln!(writer, "{timing_info}")?;
}
Ok(())
}
}