use crate::model::{Error, QueriesSortBy, QueryLog, QueryLogExtended, QueryLogTotal};
use std::collections::HashMap;
use tokio::sync::mpsc::Receiver;
struct Analyzer {
total_queries: QueryLogTotal,
queries: HashMap<u64, QueryLog>,
query_extended: Option<QueryLogExtended>,
errors: HashMap<i32, Error>,
}
pub async fn top_queries(
receiver: Receiver<QueryLog>,
limit: usize,
sort_by: QueriesSortBy,
) -> Vec<QueryLog> {
let mut analyzer = Analyzer::new();
analyzer.collect_logs(receiver).await;
analyzer.top_queries(limit, sort_by)
}
pub async fn extended_query(receiver: Receiver<QueryLogExtended>) -> Option<QueryLogExtended> {
let mut analyzer = Analyzer::new();
analyzer.collect_logs_extended(receiver).await;
analyzer.query_extended
}
pub async fn total_queries(receiver: Receiver<QueryLogTotal>) -> QueryLogTotal {
let mut analyzer = Analyzer::new();
analyzer.collect_logs_total(receiver).await;
analyzer.total_queries.clone()
}
pub async fn top_errors(receiver: Receiver<Error>, limit: usize) -> Vec<Error> {
let mut analyzer = Analyzer::new();
analyzer.collect_errors(receiver).await;
analyzer.top_errors(limit)
}
impl Analyzer {
fn new() -> Self {
Self {
total_queries: QueryLogTotal::default(),
queries: HashMap::new(),
query_extended: None,
errors: HashMap::new(),
}
}
fn merge_query_total(&mut self, log: QueryLogTotal) {
self.total_queries.queries_count += log.queries_count;
self.total_queries.io_impact += log.io_impact;
self.total_queries.cpu_impact += log.cpu_impact;
self.total_queries.memory_impact += log.memory_impact;
self.total_queries.time_impact += log.time_impact;
self.total_queries.network_impact += log.network_impact;
self.total_queries.total_impact += log.total_impact;
}
fn merge_query(&mut self, log: QueryLog) {
self.queries
.entry(log.normalized_query_hash)
.and_modify(|existing| {
existing.io_impact += log.io_impact;
existing.cpu_impact += log.cpu_impact;
existing.memory_impact += log.memory_impact;
existing.time_impact += log.time_impact;
existing.network_impact += log.network_impact;
existing.total_impact += log.total_impact;
})
.or_insert(log);
}
fn merge_query_extended(&mut self, log: QueryLogExtended) {
match &mut self.query_extended {
Some(existing) => {
existing.total_query_duration_ms += log.total_query_duration_ms;
existing.total_read_rows += log.total_read_rows;
existing.total_read_bytes += log.total_read_bytes;
existing.total_memory_usage += log.total_memory_usage;
existing.total_user_time_us += log.total_user_time_us;
existing.total_system_time_us += log.total_system_time_us;
existing.total_network_send_bytes += log.total_network_send_bytes;
existing.total_network_receive_bytes += log.total_network_receive_bytes;
existing.max_event_time = existing.max_event_time.max(log.max_event_time);
existing.min_event_time = existing.min_event_time.min(log.min_event_time);
merge_string_vecs(&mut existing.users, &log.users);
merge_string_vecs(&mut existing.databases, &log.databases);
merge_string_vecs(&mut existing.tables, &log.tables);
}
None => {
self.query_extended = Some(log);
}
}
}
fn merge_error(&mut self, err: Error) {
self.errors
.entry(err.code)
.and_modify(|existing| {
existing.count += err.count;
if err.last_error_time > existing.last_error_time {
existing.last_error_time = err.last_error_time;
}
})
.or_insert(err);
}
async fn collect_logs(&mut self, mut rx: Receiver<QueryLog>) {
while let Some(log) = rx.recv().await {
self.merge_query(log);
}
}
async fn collect_logs_total(&mut self, mut rx: Receiver<QueryLogTotal>) {
while let Some(log) = rx.recv().await {
self.merge_query_total(log);
}
}
async fn collect_logs_extended(&mut self, mut rx: Receiver<QueryLogExtended>) {
while let Some(log) = rx.recv().await {
self.merge_query_extended(log);
}
}
async fn collect_errors(&mut self, mut rx: Receiver<Error>) {
while let Some(err) = rx.recv().await {
self.merge_error(err);
}
}
fn top_queries(&self, limit: usize, sort_by: QueriesSortBy) -> Vec<QueryLog> {
let mut top_queries: Vec<_> = self.queries.values().cloned().collect();
top_queries.sort_by_key(|q| {
std::cmp::Reverse(match sort_by {
QueriesSortBy::TotalImpact => q.total_impact,
QueriesSortBy::IOImpact => q.io_impact,
QueriesSortBy::CPUImpact => q.cpu_impact,
QueriesSortBy::MemoryImpact => q.memory_impact,
QueriesSortBy::TimeImpact => q.time_impact,
QueriesSortBy::NetworkImpact => q.network_impact,
})
});
top_queries.truncate(limit);
top_queries
}
fn top_errors(&self, limit: usize) -> Vec<Error> {
let mut top_errors: Vec<Error> = self.errors.values().cloned().collect();
top_errors.sort_by_key(|e| (std::cmp::Reverse(e.count), e.code));
top_errors.truncate(limit);
top_errors
}
}
fn merge_string_vecs(target: &mut Vec<String>, source: &[String]) {
target.extend_from_slice(source);
target.sort_unstable();
target.dedup();
}