#[cfg(any(test, feature = "fuzzing"))]
use std::path::Path;
#[cfg(any(test, feature = "fuzzing"))]
use std::time::Duration;
#[cfg(any(test, feature = "fuzzing"))]
use tokio::fs::File;
#[cfg(any(test, feature = "fuzzing"))]
use tokio::io::{AsyncBufReadExt, BufReader};
#[cfg(any(test, feature = "fuzzing"))]
use crate::error::{AppError, AppResult, MetricsError};
#[cfg(any(test, feature = "fuzzing"))]
use super::super::{LatencyHistogram, MetricRecord, MetricsRange, MetricsSummary};
#[cfg(any(test, feature = "fuzzing"))]
use super::LogResult;
#[cfg(any(test, feature = "fuzzing"))]
pub async fn read_metrics_log(
log_path: &Path,
expected_status_code: u16,
metrics_range: &Option<MetricsRange>,
metrics_max: usize,
warmup: Option<Duration>,
) -> AppResult<LogResult> {
let warmup_ms = warmup
.map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX))
.unwrap_or(0);
let file = File::open(log_path).await.map_err(|err| {
AppError::metrics(MetricsError::Io {
context: "open metrics log",
source: err,
})
})?;
let mut reader = BufReader::new(file);
let mut line = String::new();
let mut records = Vec::new();
let mut metrics_truncated = false;
let collect_records = metrics_max > 0;
let mut histogram = LatencyHistogram::new()?;
let mut success_histogram = LatencyHistogram::new()?;
let mut total_requests: u64 = 0;
let mut successful_requests: u64 = 0;
let mut timeout_requests: u64 = 0;
let mut latency_sum_ms: u128 = 0;
let mut success_latency_sum_ms: u128 = 0;
let mut min_latency_ms: u64 = u64::MAX;
let mut max_latency_ms: u64 = 0;
let mut success_min_latency_ms: u64 = u64::MAX;
let mut success_max_latency_ms: u64 = 0;
let mut transport_errors: u64 = 0;
let mut non_expected_status: u64 = 0;
let mut max_elapsed_ms: u64 = 0;
loop {
line.clear();
let bytes = reader.read_line(&mut line).await.map_err(|err| {
AppError::metrics(MetricsError::Io {
context: "read metrics log",
source: err,
})
})?;
if bytes == 0 {
break;
}
let trimmed = line.trim_end();
let mut parts = trimmed.split(',');
let elapsed_ms_raw = match parts.next().and_then(|value| value.parse::<u64>().ok()) {
Some(value) => value,
None => continue,
};
if elapsed_ms_raw < warmup_ms {
continue;
}
let elapsed_ms = elapsed_ms_raw.saturating_sub(warmup_ms);
let latency_ms = match parts.next().and_then(|value| value.parse::<u64>().ok()) {
Some(value) => value,
None => continue,
};
let status_code = match parts.next().and_then(|value| value.parse::<u16>().ok()) {
Some(value) => value,
None => continue,
};
let timed_out = parts
.next()
.and_then(|value| value.parse::<u8>().ok())
.is_some_and(|value| value != 0);
let transport_error = parts
.next()
.and_then(|value| value.parse::<u8>().ok())
.is_some_and(|value| value != 0);
let response_bytes = parts
.next()
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(0);
let in_flight_ops = parts
.next()
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(0);
total_requests = total_requests.saturating_add(1);
if status_code == expected_status_code && !timed_out && !transport_error {
successful_requests = successful_requests.saturating_add(1);
success_latency_sum_ms = success_latency_sum_ms.saturating_add(u128::from(latency_ms));
if latency_ms < success_min_latency_ms {
success_min_latency_ms = latency_ms;
}
if latency_ms > success_max_latency_ms {
success_max_latency_ms = latency_ms;
}
success_histogram.record(latency_ms)?;
}
if timed_out {
timeout_requests = timeout_requests.saturating_add(1);
} else if transport_error {
transport_errors = transport_errors.saturating_add(1);
} else if status_code != expected_status_code {
non_expected_status = non_expected_status.saturating_add(1);
}
latency_sum_ms = latency_sum_ms.saturating_add(u128::from(latency_ms));
if latency_ms < min_latency_ms {
min_latency_ms = latency_ms;
}
if latency_ms > max_latency_ms {
max_latency_ms = latency_ms;
}
if elapsed_ms > max_elapsed_ms {
max_elapsed_ms = elapsed_ms;
}
histogram.record(latency_ms)?;
if collect_records {
let seconds_elapsed = elapsed_ms / 1000;
let in_range = match metrics_range {
Some(MetricsRange(range)) => range.contains(&seconds_elapsed),
None => true,
};
if in_range {
if records.len() < metrics_max {
records.push(MetricRecord {
elapsed_ms,
latency_ms,
status_code,
timed_out,
transport_error,
response_bytes,
in_flight_ops,
});
} else {
metrics_truncated = true;
}
}
}
}
let duration = Duration::from_millis(max_elapsed_ms);
let avg_latency_ms = if total_requests > 0 {
let avg = latency_sum_ms
.checked_div(u128::from(total_requests))
.unwrap_or(0);
u64::try_from(avg).map_or(u64::MAX, |value| value)
} else {
0
};
let min_latency_ms = if total_requests > 0 {
min_latency_ms
} else {
0
};
let success_avg_latency_ms = if successful_requests > 0 {
let avg = success_latency_sum_ms
.checked_div(u128::from(successful_requests))
.unwrap_or(0);
u64::try_from(avg).map_or(u64::MAX, |value| value)
} else {
0
};
let success_min_latency_ms = if successful_requests > 0 {
success_min_latency_ms
} else {
0
};
let success_max_latency_ms = if successful_requests > 0 {
success_max_latency_ms
} else {
0
};
let error_requests = total_requests.saturating_sub(successful_requests);
Ok(LogResult {
records,
summary: MetricsSummary {
duration,
total_requests,
successful_requests,
error_requests,
timeout_requests,
transport_errors,
non_expected_status,
min_latency_ms,
max_latency_ms,
avg_latency_ms,
success_min_latency_ms,
success_max_latency_ms,
success_avg_latency_ms,
},
metrics_truncated,
latency_sum_ms,
success_latency_sum_ms,
histogram,
success_histogram,
})
}