use console::{style, StyledObject, Term};
use eyre::WrapErr;
use indexmap::IndexMap;
use itertools::Itertools;
use std::{
collections::HashMap,
sync::{LazyLock, Mutex},
};
use tokio::sync::broadcast;
use tracing::*;
use crate::{
get_tanu_config, http,
runner::{self, Event, EventBody, Test},
CaptureHttpMode, ModuleName, ProjectName, TestName,
};
#[derive(Debug, Clone, Default, strum::EnumString, strum::Display)]
#[strum(serialize_all = "snake_case")]
pub enum ReporterType {
Null,
#[default]
List,
Table,
}
async fn run<R: Reporter + Send + ?Sized>(reporter: &mut R) -> eyre::Result<()> {
let rx_result = runner::subscribe();
runner::wait_reporter_barrier().await;
let mut rx = rx_result?;
loop {
let res = match rx.recv().await {
Ok(Event {
project,
module,
test,
body: EventBody::Start,
}) => reporter.on_start(project, module, test).await,
Ok(Event {
project,
module,
test,
body: EventBody::Check(check),
}) => reporter.on_check(project, module, test, check).await,
Ok(Event {
project,
module,
test,
body: EventBody::Call(log),
}) => reporter.on_call(project, module, test, log).await,
Ok(Event {
project,
module,
test: test_name,
body: EventBody::Retry(test),
}) => reporter.on_retry(project, module, test_name, test).await,
Ok(Event {
project,
module,
test: test_name,
body: EventBody::End(test),
}) => reporter.on_end(project, module, test_name, test).await,
Ok(Event {
project: _,
module: _,
test: _,
body: EventBody::Summary(summary),
}) => reporter.on_summary(summary).await,
Err(broadcast::error::RecvError::Closed) => {
debug!("runner channel has been closed");
break;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
debug!("runner channel recv error");
continue;
}
};
if let Err(e) = res {
warn!("reporter error: {e:#}");
}
}
Ok(())
}
#[async_trait::async_trait]
pub trait Reporter {
async fn run(&mut self) -> eyre::Result<()> {
run(self).await
}
async fn on_start(
&mut self,
_project: String,
_module: String,
_test_name: String,
) -> eyre::Result<()> {
Ok(())
}
async fn on_check(
&mut self,
_project: String,
_module: String,
_test_name: String,
_check: Box<runner::Check>,
) -> eyre::Result<()> {
Ok(())
}
async fn on_call(
&mut self,
_project: String,
_module: String,
_test_name: String,
_log: runner::CallLog,
) -> eyre::Result<()> {
Ok(())
}
async fn on_retry(
&mut self,
_project: String,
_module: String,
_test_name: String,
_test: Test,
) -> eyre::Result<()> {
Ok(())
}
async fn on_end(
&mut self,
_project: String,
_module: String,
_test_name: String,
_test: Test,
) -> eyre::Result<()> {
Ok(())
}
async fn on_summary(&mut self, _summary: runner::TestSummary) -> eyre::Result<()> {
Ok(())
}
}
pub struct NullReporter;
#[async_trait::async_trait]
impl Reporter for NullReporter {}
#[allow(clippy::vec_box)]
#[derive(Default, Debug)]
struct Buffer {
test_number: Option<usize>,
http_logs: Vec<Box<http::Log>>,
#[cfg(feature = "grpc")]
grpc_logs: Vec<Box<crate::grpc::Log>>,
}
fn generate_test_number() -> usize {
static TEST_NUMBER: LazyLock<Mutex<usize>> = LazyLock::new(|| Mutex::new(0));
let mut test_number = TEST_NUMBER.lock().unwrap();
*test_number += 1;
*test_number
}
pub struct ListReporter {
terminal: Term,
buffer: IndexMap<(ProjectName, ModuleName, TestName), Buffer>,
capture_http: CaptureHttpMode,
}
impl ListReporter {
pub fn new(capture_http: CaptureHttpMode) -> ListReporter {
ListReporter {
terminal: Term::stdout(),
buffer: IndexMap::new(),
capture_http,
}
}
}
#[async_trait::async_trait]
impl Reporter for ListReporter {
async fn on_start(
&mut self,
project_name: String,
module_name: String,
test_name: String,
) -> eyre::Result<()> {
self.buffer
.insert((project_name, module_name, test_name), Buffer::default());
Ok(())
}
async fn on_call(
&mut self,
project_name: String,
module_name: String,
test_name: String,
log: runner::CallLog,
) -> eyre::Result<()> {
if !matches!(self.capture_http, CaptureHttpMode::Off) {
let buffer = self
.buffer
.get_mut(&(project_name, module_name, test_name.clone()))
.ok_or_else(|| eyre::eyre!("test case \"{test_name}\" not found in the buffer"))?;
match log {
runner::CallLog::Http(http_log) => buffer.http_logs.push(http_log),
#[cfg(feature = "grpc")]
runner::CallLog::Grpc(grpc_log) => buffer.grpc_logs.push(grpc_log),
}
}
Ok(())
}
async fn on_retry(
&mut self,
project_name: String,
module_name: String,
test_name: String,
test: Test,
) -> eyre::Result<()> {
let should_print = match self.capture_http {
CaptureHttpMode::All => true,
CaptureHttpMode::OnFailure => test.result.is_err(),
CaptureHttpMode::Off => false,
};
let buffer = self
.buffer
.get_mut(&(project_name.clone(), module_name.clone(), test_name.clone()))
.ok_or_else(|| eyre::eyre!("test case \"{test_name}\" not found in the buffer",))?;
let test_number = *buffer.test_number.get_or_insert_with(generate_test_number);
let http_logs: Vec<_> = buffer.http_logs.drain(..).collect();
#[cfg(feature = "grpc")]
let grpc_logs: Vec<_> = buffer.grpc_logs.drain(..).collect();
if let Err(e) = test.result {
self.terminal.write_line(&format!(
"{status} {test_number} {project} {path}: {retry_message}\n{error}",
status = symbol_error(),
test_number = style(test_number).dim(),
project = style_project(&project_name),
path = style_module_path(&module_name, &test_name),
retry_message = style("retrying...").blue(),
error = style(format!("{e:#}")).dim(),
))?;
}
if should_print {
for log in &http_logs {
write_http_log(&self.terminal, log)?;
}
#[cfg(feature = "grpc")]
for log in &grpc_logs {
write_grpc_log(&self.terminal, log)?;
}
}
Ok(())
}
async fn on_end(
&mut self,
project_name: String,
module_name: String,
test_name: String,
test: Test,
) -> eyre::Result<()> {
let mut buffer = self
.buffer
.swap_remove(&(project_name.clone(), module_name, test_name.clone()))
.ok_or_else(|| eyre::eyre!("test case \"{test_name}\" not found in the buffer"))?;
let should_print = match self.capture_http {
CaptureHttpMode::All => true,
CaptureHttpMode::OnFailure => test.result.is_err(),
CaptureHttpMode::Off => false,
};
let status = symbol_test_result(&test);
let Test {
result,
info,
request_time,
started_at: _,
ended_at: _,
worker_id: _,
} = test;
let test_number = style(buffer.test_number.get_or_insert_with(generate_test_number)).dim();
let request_time = style(format!("({request_time:.2?})")).dim();
let project = style_project(&project_name);
let path = style_module_path(&info.module, &info.name);
match result {
Ok(_res) => {
self.terminal.write_line(&format!(
"{status} {test_number} {project} {path} {request_time}"
))?;
}
Err(e) => {
self.terminal.write_line(&format!(
"{status} {test_number} {project} {path} {request_time}:\n{error}",
error = style(format!("{e:#}")).red()
))?;
}
}
if should_print {
for log in &buffer.http_logs {
write_http_log(&self.terminal, log)?;
}
#[cfg(feature = "grpc")]
for log in &buffer.grpc_logs {
write_grpc_log(&self.terminal, log)?;
}
}
Ok(())
}
async fn on_summary(&mut self, summary: runner::TestSummary) -> eyre::Result<()> {
let runner::TestSummary {
total_tests,
passed_tests,
failed_tests,
skipped_tests,
total_time,
test_prep_time,
} = summary;
self.terminal.write_line("")?;
let mut summary_line = format!(
"{}: {} {}, {} {}, {} {}",
style("Tests").bold(),
style(passed_tests).green().bold(),
style("passed").green(),
if failed_tests > 0 {
style(failed_tests).red().bold()
} else {
style(failed_tests).bold()
},
if failed_tests > 0 {
style("failed").red()
} else {
style("failed")
},
style(total_tests).bold(),
style("total").dim()
);
if skipped_tests > 0 {
summary_line.push_str(&format!(
", {} {}",
style(skipped_tests).yellow().bold(),
style("skipped").yellow()
));
}
self.terminal.write_line(&summary_line)?;
self.terminal.write_line(&format!(
"{}: {} ({}: {})",
style("Time").bold(),
style(format!("{total_time:.2?}")).cyan(),
style("prep").dim(),
style(format!("{test_prep_time:.2?}")).dim()
))?;
Ok(())
}
}
fn write(term: &Term, s: impl AsRef<str>) -> eyre::Result<()> {
let colored = style(s.as_ref()).dim();
term.write_line(&format!("{colored}"))
.wrap_err("failed to write character on terminal")
}
fn symbol_test_result(test: &Test) -> StyledObject<&'static str> {
match test.result {
Ok(_) => symbol_success(),
Err(_) => symbol_error(),
}
}
fn symbol_success() -> StyledObject<&'static str> {
style("✓").green()
}
fn symbol_error() -> StyledObject<&'static str> {
style("✘").red()
}
fn emoji_symbol_test_result(test: &Test) -> char {
match test.result {
Ok(_) => '🟢',
Err(_) => '🔴',
}
}
fn style_http_method(method: &str) -> StyledObject<&str> {
match method.to_uppercase().as_str() {
"GET" => style(method).green(),
"POST" => style(method).yellow(),
"PUT" => style(method).blue(),
"DELETE" => style(method).red(),
"PATCH" => style(method).magenta(),
"HEAD" => style(method).cyan(),
"OPTIONS" => style(method).white(),
_ => style(method),
}
}
fn style_status_code(status: http::StatusCode) -> StyledObject<String> {
let reason = status.canonical_reason().unwrap_or("");
let s = format!("{} {}", status.as_u16(), reason);
match status.as_u16() {
100..=199 => style(s).cyan(), 200..=299 => style(s).green(), 300..=399 => style(s).yellow(), 400..=499 => style(s).red(), 500..=599 => style(s).red().bold(), _ => style(s),
}
}
#[cfg(feature = "grpc")]
fn style_grpc_status(code: tonic::Code) -> StyledObject<String> {
let s = format!("{:?}", code);
match code {
tonic::Code::Ok => style(s).green(),
tonic::Code::Cancelled
| tonic::Code::Unknown
| tonic::Code::DeadlineExceeded
| tonic::Code::ResourceExhausted
| tonic::Code::Aborted
| tonic::Code::Unavailable => style(s).yellow(),
tonic::Code::InvalidArgument
| tonic::Code::NotFound
| tonic::Code::AlreadyExists
| tonic::Code::PermissionDenied
| tonic::Code::FailedPrecondition
| tonic::Code::OutOfRange
| tonic::Code::Unauthenticated => style(s).red(),
tonic::Code::Unimplemented | tonic::Code::Internal | tonic::Code::DataLoss => {
style(s).red().bold()
}
}
}
fn style_project(name: &str) -> StyledObject<String> {
style(format!("[{name}]")).magenta().bold()
}
fn style_module_path(module: &str, test: &str) -> String {
format!("{}::{}", style(module).cyan(), style(test).blue().bold())
}
fn write_http_log(terminal: &Term, log: &http::Log) -> eyre::Result<()> {
terminal.write_line(&format!(
" {} {} {}",
style("=>").cyan(),
style_http_method(log.request.method.as_ref()),
style(&log.request.url.to_string()).underlined()
))?;
terminal.write_line(&format!(
" {} {}",
style(">").cyan(),
style("request:").cyan()
))?;
terminal.write_line(&format!(
" {} {}",
style(">").cyan(),
style("headers:").dim()
))?;
for key in log.request.headers.keys() {
terminal.write_line(&format!(
" {} {}: {}",
style(">").cyan(),
style(key.as_str()).bold(),
style(log.request.headers.get(key).unwrap().to_str().unwrap()).dim()
))?;
}
terminal.write_line(&format!(
" {} {}",
style("<").yellow(),
style("response:").yellow()
))?;
terminal.write_line(&format!(
" {} {} {}",
style("<").yellow(),
style("status:").dim(),
style_status_code(log.response.status)
))?;
terminal.write_line(&format!(
" {} {}",
style("<").yellow(),
style("headers:").dim()
))?;
for key in log.response.headers.keys() {
terminal.write_line(&format!(
" {} {}: {}",
style("<").yellow(),
style(key.as_str()).bold(),
style(log.response.headers.get(key).unwrap().to_str().unwrap()).dim()
))?;
}
terminal.write_line(&format!(
" {} {} {}",
style("<").yellow(),
style("body:").dim(),
style(&log.response.body).dim()
))?;
Ok(())
}
#[cfg(feature = "grpc")]
fn write_grpc_log(terminal: &Term, log: &crate::grpc::Log) -> eyre::Result<()> {
terminal.write_line(&format!(
" {} {} {}",
style("=>").magenta(),
style("gRPC").magenta().bold(),
style(&log.request.method).underlined()
))?;
terminal.write_line(&format!(
" {} {}",
style(">").magenta(),
style("request:").magenta()
))?;
terminal.write_line(&format!(
" {} {}",
style(">").magenta(),
style("metadata:").dim()
))?;
for key_value in log.request.metadata.iter() {
let (key, value) = match key_value {
tonic::metadata::KeyAndValueRef::Ascii(k, v) => (
k.as_str().to_string(),
v.to_str().unwrap_or("<binary>").to_string(),
),
tonic::metadata::KeyAndValueRef::Binary(k, v) => (
k.as_str().to_string(),
format!("<binary: {} bytes>", v.as_encoded_bytes().len()),
),
};
terminal.write_line(&format!(
" {} {}: {}",
style(">").magenta(),
style(&key).bold(),
style(&value).dim()
))?;
}
if !log.request.message.is_empty() {
terminal.write_line(&format!(
" {} {} {}",
style(">").magenta(),
style("message:").dim(),
style(format!("{} bytes", log.request.message.len())).dim()
))?;
}
terminal.write_line(&format!(
" {} {} {}",
style("<").yellow(),
style("response:").yellow(),
style_grpc_status(log.response.status_code)
))?;
terminal.write_line(&format!(
" {} {}",
style("<").yellow(),
style("metadata:").dim()
))?;
for key_value in log.response.metadata.iter() {
let (key, value) = match key_value {
tonic::metadata::KeyAndValueRef::Ascii(k, v) => (
k.as_str().to_string(),
v.to_str().unwrap_or("<binary>").to_string(),
),
tonic::metadata::KeyAndValueRef::Binary(k, v) => (
k.as_str().to_string(),
format!("<binary: {} bytes>", v.as_encoded_bytes().len()),
),
};
terminal.write_line(&format!(
" {} {}: {}",
style("<").yellow(),
style(&key).bold(),
style(&value).dim()
))?;
}
if !log.response.message.is_empty() {
terminal.write_line(&format!(
" {} {} {}",
style("<").yellow(),
style("message:").dim(),
style(format!("{} bytes", log.response.message.len())).dim()
))?;
}
if !log.response.status_message.is_empty() {
terminal.write_line(&format!(
" {} {} {}",
style("<").yellow(),
style("status_message:").dim(),
style(&log.response.status_message).dim()
))?;
}
Ok(())
}
#[allow(clippy::vec_box, dead_code)]
pub struct TableReporter {
terminal: Term,
buffer: HashMap<(ProjectName, ModuleName, TestName), Test>,
capture_http: CaptureHttpMode,
}
impl TableReporter {
pub fn new(capture_http: CaptureHttpMode) -> TableReporter {
TableReporter {
terminal: Term::stdout(),
buffer: HashMap::new(),
capture_http,
}
}
}
#[async_trait::async_trait]
impl Reporter for TableReporter {
async fn run(&mut self) -> eyre::Result<()> {
run(self).await?;
let project_order: Vec<_> = get_tanu_config().projects.iter().map(|p| &p.name).collect();
let mut builder = tabled::builder::Builder::default();
builder.push_record(["Project", "Module", "Test", "Result"]);
self.buffer
.drain()
.sorted_by(|(a, _), (b, _)| {
let project_order_a = project_order
.iter()
.position(|&p| *p == a.0)
.unwrap_or(usize::MAX);
let project_order_b = project_order
.iter()
.position(|&p| *p == b.0)
.unwrap_or(usize::MAX);
project_order_a
.cmp(&project_order_b)
.then(a.1.cmp(&b.1))
.then(a.2.cmp(&b.2))
})
.for_each(|((p, m, t), test)| {
builder.push_record([p, m, t, emoji_symbol_test_result(&test).to_string()])
});
let mut table = builder.build();
table.with(tabled::settings::Style::modern()).with(
tabled::settings::Modify::new(tabled::settings::object::Columns::single(3))
.with(tabled::settings::Alignment::center()),
);
write(&self.terminal, format!("{table}")).wrap_err("failed to write table on terminal")?;
Ok(())
}
async fn on_end(
&mut self,
project_name: String,
module_name: String,
test_name: String,
test: Test,
) -> eyre::Result<()> {
self.buffer
.insert((project_name, module_name, test_name), test);
Ok(())
}
async fn on_summary(&mut self, summary: runner::TestSummary) -> eyre::Result<()> {
let runner::TestSummary {
total_tests,
passed_tests,
failed_tests,
skipped_tests,
total_time,
test_prep_time,
} = summary;
self.terminal.write_line("")?;
let mut summary_line = format!(
"{}: {} {}, {} {}, {} {}",
style("Tests").bold(),
style(passed_tests).green().bold(),
style("passed").green(),
if failed_tests > 0 {
style(failed_tests).red().bold()
} else {
style(failed_tests).bold()
},
if failed_tests > 0 {
style("failed").red()
} else {
style("failed")
},
style(total_tests).bold(),
style("total").dim()
);
if skipped_tests > 0 {
summary_line.push_str(&format!(
", {} {}",
style(skipped_tests).yellow().bold(),
style("skipped").yellow()
));
}
self.terminal.write_line(&summary_line)?;
self.terminal.write_line(&format!(
"{}: {} ({}: {})",
style("Time").bold(),
style(format!("{total_time:.2?}")).cyan(),
style("prep").dim(),
style(format!("{test_prep_time:.2?}")).dim()
))?;
Ok(())
}
}