use std::fmt::{Display, Formatter};
use std::io;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use crate::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummaries,
};
use crate::print_format::PrintFormat;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion::common::instant::Instant;
use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::physical_plan::RecordBatchStream;
use datafusion::config::FormatOptions;
use futures::StreamExt;
#[derive(Debug, Clone, PartialEq, Copy)]
pub enum MaxRows {
Unlimited,
Limited(usize),
}
impl FromStr for MaxRows {
type Err = String;
fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
if maxrows.to_lowercase() == "inf"
|| maxrows.to_lowercase() == "infinite"
|| maxrows.to_lowercase() == "none"
{
Ok(Self::Unlimited)
} else {
match maxrows.parse::<usize>() {
Ok(nrows) => Ok(Self::Limited(nrows)),
_ => Err(format!("Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.")),
}
}
}
}
impl Display for MaxRows {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unlimited => write!(f, "unlimited"),
Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
}
}
}
const OBJECT_STORE_PROFILING_HEADER: &str = "Object Store Profiling";
#[derive(Debug, Clone)]
pub struct PrintOptions {
pub format: PrintFormat,
pub quiet: bool,
pub maxrows: MaxRows,
pub color: bool,
pub instrumented_registry: Arc<InstrumentedObjectStoreRegistry>,
}
fn get_execution_details_formatted(
row_count: usize,
maxrows: MaxRows,
query_start_time: Instant,
) -> String {
let nrows_shown_msg = match maxrows {
MaxRows::Limited(nrows) if nrows < row_count => {
format!("(First {nrows} displayed. Use --maxrows to adjust)")
}
_ => String::new(),
};
format!(
"{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
row_count,
nrows_shown_msg,
query_start_time.elapsed().as_secs_f64()
)
}
impl PrintOptions {
pub fn print_batches(
&self,
schema: SchemaRef,
batches: &[RecordBatch],
query_start_time: Instant,
row_count: usize,
format_options: &FormatOptions,
) -> Result<()> {
let stdout = std::io::stdout();
let mut writer = stdout.lock();
self.format.print_batches(
&mut writer,
schema,
batches,
self.maxrows,
true,
format_options,
)?;
let formatted_exec_details = get_execution_details_formatted(
row_count,
if self.format == PrintFormat::Table {
self.maxrows
} else {
MaxRows::Unlimited
},
query_start_time,
);
self.write_output(&mut writer, formatted_exec_details)
}
pub async fn print_stream(
&self,
mut stream: Pin<Box<dyn RecordBatchStream>>,
query_start_time: Instant,
format_options: &FormatOptions,
) -> Result<()> {
if self.format == PrintFormat::Table {
return Err(DataFusionError::External(
"PrintFormat::Table is not implemented".to_string().into(),
));
};
let stdout = std::io::stdout();
let mut writer = stdout.lock();
let mut row_count = 0_usize;
let mut with_header = true;
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
row_count += batch.num_rows();
self.format.print_batches(
&mut writer,
batch.schema(),
&[batch],
MaxRows::Unlimited,
with_header,
format_options,
)?;
with_header = false;
}
let formatted_exec_details = get_execution_details_formatted(
row_count,
MaxRows::Unlimited,
query_start_time,
);
self.write_output(&mut writer, formatted_exec_details)
}
fn write_output<W: io::Write>(
&self,
writer: &mut W,
formatted_exec_details: String,
) -> Result<()> {
if !self.quiet {
writeln!(writer, "{formatted_exec_details}")?;
let instrument_mode = self.instrumented_registry.instrument_mode();
if instrument_mode != InstrumentedObjectStoreMode::Disabled {
writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
for store in self.instrumented_registry.stores() {
let requests = store.take_requests();
if !requests.is_empty() {
writeln!(writer, "{store}")?;
if instrument_mode == InstrumentedObjectStoreMode::Trace {
for req in requests.iter() {
writeln!(writer, "{req}")?;
}
writeln!(writer)?;
}
writeln!(writer, "Summaries:")?;
let summaries = RequestSummaries::new(&requests);
writeln!(writer, "{summaries}")?;
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use datafusion::error::Result;
use super::*;
#[test]
fn write_output() -> Result<()> {
let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new());
let mut print_options = PrintOptions {
format: PrintFormat::Automatic,
quiet: true,
maxrows: MaxRows::Unlimited,
color: true,
instrumented_registry: Arc::clone(&instrumented_registry),
};
let mut print_output: Vec<u8> = Vec::new();
let exec_out = String::from("Formatted Exec Output");
print_options.write_output(&mut print_output, exec_out.clone())?;
assert!(print_output.is_empty());
print_options.quiet = false;
print_options.write_output(&mut print_output, exec_out.clone())?;
let out_str: String = print_output
.clone()
.try_into()
.expect("Expected successful String conversion");
assert!(out_str.contains(&exec_out));
print_output.clear();
print_options
.instrumented_registry
.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
print_options.write_output(&mut print_output, exec_out.clone())?;
let out_str: String = print_output
.clone()
.try_into()
.expect("Expected successful String conversion");
assert!(out_str.contains(&exec_out));
assert!(out_str.contains(OBJECT_STORE_PROFILING_HEADER));
Ok(())
}
}