use regex::RegexSet;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::str::FromStr;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use crate::metrics::{SwanlingErrorMetric, SwanlingRequestMetric, SwanlingTaskMetric};
use crate::swanling::SwanlingDebug;
use crate::{SwanlingConfiguration, SwanlingDefaults, SwanlingError};
pub(crate) type SwanlingLoggerJoinHandle =
Option<tokio::task::JoinHandle<std::result::Result<(), SwanlingError>>>;
pub(crate) type SwanlingLoggerTx = Option<flume::Sender<Option<SwanlingLog>>>;
#[derive(Debug, Deserialize, Serialize)]
pub enum SwanlingLog {
Debug(SwanlingDebug),
Error(SwanlingErrorMetric),
Request(SwanlingRequestMetric),
Task(SwanlingTaskMetric),
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum SwanlingLogFormat {
Csv,
Json,
Raw,
}
impl FromStr for SwanlingLogFormat {
type Err = SwanlingError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let log_format = RegexSet::new(&[r"(?i)^csv$", r"(?i)^(json|jsn)$", r"(?i)^raw$"])
.expect("failed to compile log_format RegexSet");
let matches = log_format.matches(&s);
if matches.matched(0) {
Ok(SwanlingLogFormat::Csv)
} else if matches.matched(1) {
Ok(SwanlingLogFormat::Json)
} else if matches.matched(2) {
Ok(SwanlingLogFormat::Raw)
} else {
Err(SwanlingError::InvalidOption {
option: format!("SwanlingLogFormat::{:?}", s),
value: s.to_string(),
detail: "Invalid log_format, expected: csv, json, or raw".to_string(),
})
}
}
}
fn debug_csv_header() -> String {
format!("{},{},{},{}", "tag", "request", "header", "body")
}
fn error_csv_header() -> String {
format!(
"{},{},{},{},{},{},{},{},{},{}",
"elapsed",
"method",
"name",
"url",
"final_url",
"redirected",
"response_time",
"status_code",
"user",
"error",
)
}
fn requests_csv_header() -> String {
format!(
"{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
"elapsed",
"method",
"name",
"url",
"final_url",
"redirected",
"response_time",
"status_code",
"success",
"update",
"user",
"error",
"coordinated_omission_elapsed",
"user_cadence",
)
}
fn tasks_csv_header() -> String {
format!(
"{},{},{},{},{},{},{}",
"elapsed",
"taskset_index",
"task_index",
"name",
"run_time",
"success",
"user",
)
}
pub(crate) trait SwanlingLogger<T> {
fn format_message(&self, message: T) -> String;
fn prepare_csv(&self, message: &T) -> String;
}
impl SwanlingLogger<SwanlingDebug> for SwanlingConfiguration {
fn format_message(&self, message: SwanlingDebug) -> String {
if let Some(debug_format) = self.debug_format.as_ref() {
match debug_format {
SwanlingLogFormat::Json => json!(message).to_string(),
SwanlingLogFormat::Raw => format!("{:?}", message),
SwanlingLogFormat::Csv => self.prepare_csv(&message),
}
} else {
unreachable!()
}
}
fn prepare_csv(&self, debug: &SwanlingDebug) -> String {
format!(
"\"{}\",\"{:?}\",\"{:?}\",\"{:?}\"",
debug.tag, debug.request, debug.header, debug.body
)
}
}
impl SwanlingLogger<SwanlingErrorMetric> for SwanlingConfiguration {
fn format_message(&self, message: SwanlingErrorMetric) -> String {
if let Some(error_format) = self.error_format.as_ref() {
match error_format {
SwanlingLogFormat::Json => json!(message).to_string(),
SwanlingLogFormat::Raw => format!("{:?}", message),
SwanlingLogFormat::Csv => self.prepare_csv(&message),
}
} else {
unreachable!()
}
}
fn prepare_csv(&self, request: &SwanlingErrorMetric) -> String {
format!(
"{},{},\"{}\",\"{}\",\"{}\",{},{},{},{},\"{}\"",
request.elapsed,
request.method,
request.name,
request.url,
request.final_url,
request.redirected,
request.response_time,
request.status_code,
request.user,
request.error,
)
}
}
impl SwanlingLogger<SwanlingRequestMetric> for SwanlingConfiguration {
fn format_message(&self, message: SwanlingRequestMetric) -> String {
if let Some(request_format) = self.request_format.as_ref() {
match request_format {
SwanlingLogFormat::Json => json!(message).to_string(),
SwanlingLogFormat::Raw => format!("{:?}", message),
SwanlingLogFormat::Csv => self.prepare_csv(&message),
}
} else {
unreachable!()
}
}
fn prepare_csv(&self, request: &SwanlingRequestMetric) -> String {
format!(
"{},{},\"{}\",\"{}\",\"{}\",{},{},{},{},{},{},{},{},{}",
request.elapsed,
request.method,
request.name,
request.url,
request.final_url,
request.redirected,
request.response_time,
request.status_code,
request.success,
request.update,
request.user,
request.error,
request.coordinated_omission_elapsed,
request.user_cadence,
)
}
}
impl SwanlingLogger<SwanlingTaskMetric> for SwanlingConfiguration {
fn format_message(&self, message: SwanlingTaskMetric) -> String {
if let Some(task_format) = self.task_format.as_ref() {
match task_format {
SwanlingLogFormat::Json => json!(message).to_string(),
SwanlingLogFormat::Raw => format!("{:?}", message),
SwanlingLogFormat::Csv => self.prepare_csv(&message),
}
} else {
unreachable!()
}
}
fn prepare_csv(&self, request: &SwanlingTaskMetric) -> String {
format!(
"{},{},{},\"{}\",{},{},{}",
request.elapsed,
request.taskset_index,
request.task_index,
request.name,
request.run_time,
request.success,
request.user,
)
}
}
impl SwanlingConfiguration {
pub(crate) fn configure_loggers(&mut self, defaults: &SwanlingDefaults) {
if self.manager {
return;
}
if self.debug_log.is_empty() {
if let Some(default_debug_log) = defaults.debug_log.clone() {
self.debug_log = default_debug_log;
}
}
if self.error_log.is_empty() {
if let Some(default_error_log) = defaults.error_log.clone() {
self.error_log = default_error_log;
}
}
if self.request_log.is_empty() {
if let Some(default_request_log) = defaults.request_log.clone() {
self.request_log = default_request_log;
}
}
if self.task_log.is_empty() {
if let Some(default_task_log) = defaults.task_log.clone() {
self.task_log = default_task_log;
}
}
}
pub(crate) async fn setup_loggers(
&mut self,
defaults: &SwanlingDefaults,
) -> Result<(SwanlingLoggerJoinHandle, SwanlingLoggerTx), SwanlingError> {
if self.manager {
return Ok((None, None));
}
self.configure_loggers(defaults);
if self.debug_log.is_empty()
&& self.request_log.is_empty()
&& self.task_log.is_empty()
&& self.error_log.is_empty()
{
return Ok((None, None));
}
let (all_threads_logger_tx, logger_rx): (
flume::Sender<Option<SwanlingLog>>,
flume::Receiver<Option<SwanlingLog>>,
) = flume::unbounded();
let configuration = self.clone();
let logger_handle = tokio::spawn(async move { configuration.logger_main(logger_rx).await });
Ok((Some(logger_handle), Some(all_threads_logger_tx)))
}
async fn open_log_file(
&self,
log_file_path: &str,
log_file_type: &str,
buffer_capacity: usize,
) -> std::option::Option<tokio::io::BufWriter<tokio::fs::File>> {
if log_file_path.is_empty() {
None
} else {
match File::create(log_file_path).await {
Ok(f) => {
info!("writing {} to: {}", log_file_type, log_file_path);
Some(BufWriter::with_capacity(buffer_capacity, f))
}
Err(e) => {
error!(
"failed to create {} ({}): {}",
log_file_type, log_file_path, e
);
None
}
}
}
}
async fn write_to_log_file(
&self,
log_file: &mut tokio::io::BufWriter<tokio::fs::File>,
formatted_message: String,
) -> Result<(), ()> {
match log_file
.write(format!("{}\n", formatted_message).as_ref())
.await
{
Ok(_) => (),
Err(e) => {
warn!("failed to write to {}: {}", &self.debug_log, e);
}
}
Ok(())
}
pub(crate) async fn logger_main(
self: SwanlingConfiguration,
receiver: flume::Receiver<Option<SwanlingLog>>,
) -> Result<(), SwanlingError> {
let mut debug_log = self
.open_log_file(
&self.debug_log,
"debug file",
if self.no_debug_body {
64 * 1024
} else {
8 * 1024 * 1024
},
)
.await;
if self.debug_format == Some(SwanlingLogFormat::Csv) {
if let Some(log_file) = debug_log.as_mut() {
let _ = self.write_to_log_file(log_file, debug_csv_header()).await;
}
}
let mut error_log = self
.open_log_file(&self.error_log, "error log", 64 * 1024)
.await;
if self.error_format == Some(SwanlingLogFormat::Csv) {
if let Some(log_file) = error_log.as_mut() {
let _ = self.write_to_log_file(log_file, error_csv_header()).await;
}
}
let mut request_log = self
.open_log_file(&self.request_log, "request log", 64 * 1024)
.await;
if self.request_format == Some(SwanlingLogFormat::Csv) {
if let Some(log_file) = request_log.as_mut() {
let _ = self
.write_to_log_file(log_file, requests_csv_header())
.await;
}
}
let mut task_log = self
.open_log_file(&self.task_log, "task log", 64 * 1024)
.await;
if self.task_format == Some(SwanlingLogFormat::Csv) {
if let Some(log_file) = task_log.as_mut() {
let _ = self.write_to_log_file(log_file, tasks_csv_header()).await;
}
}
while let Ok(received_message) = receiver.recv_async().await {
if let Some(message) = received_message {
let formatted_message;
if let Some(log_file) = match message {
SwanlingLog::Debug(debug_message) => {
formatted_message = self.format_message(debug_message).to_string();
debug_log.as_mut()
}
SwanlingLog::Error(error_message) => {
formatted_message = self.format_message(error_message).to_string();
error_log.as_mut()
}
SwanlingLog::Request(request_message) => {
formatted_message = self.format_message(request_message).to_string();
request_log.as_mut()
}
SwanlingLog::Task(task_message) => {
formatted_message = self.format_message(task_message).to_string();
task_log.as_mut()
}
} {
let _ = self.write_to_log_file(log_file, formatted_message).await;
}
} else {
break;
}
}
if let Some(debug_log_file) = debug_log.as_mut() {
info!("flushing debug_log: {}", &self.debug_log);
let _ = debug_log_file.flush().await;
};
if let Some(requests_log_file) = request_log.as_mut() {
info!("flushing request_log: {}", &self.request_log);
let _ = requests_log_file.flush().await;
}
if let Some(tasks_log_file) = task_log.as_mut() {
info!("flushing task_log: {}", &self.task_log);
let _ = tasks_log_file.flush().await;
}
if let Some(error_log_file) = error_log.as_mut() {
info!("flushing error_log: {}", &self.error_log);
let _ = error_log_file.flush().await;
};
Ok(())
}
}