use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;
use tracing::info;
use crate::metrics::{MetricEvent, MetricId, ScanType};
pub(crate) struct ScanMetrics {
num_add_files_seen: AtomicU64,
num_active_add_files: AtomicU64,
num_remove_files_seen: AtomicU64,
num_non_file_actions: AtomicU64,
num_predicate_filtered: AtomicU64,
peak_hash_set_size: AtomicUsize,
dedup_visitor_time_ns: AtomicU64,
predicate_eval_time_ns: AtomicU64,
}
impl Default for ScanMetrics {
fn default() -> Self {
Self {
num_add_files_seen: AtomicU64::new(0),
num_active_add_files: AtomicU64::new(0),
num_remove_files_seen: AtomicU64::new(0),
num_non_file_actions: AtomicU64::new(0),
num_predicate_filtered: AtomicU64::new(0),
peak_hash_set_size: AtomicUsize::new(0),
dedup_visitor_time_ns: AtomicU64::new(0),
predicate_eval_time_ns: AtomicU64::new(0),
}
}
}
impl ScanMetrics {
pub(crate) fn incr_add_files_seen(&self) {
self.num_add_files_seen.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn incr_active_add_files(&self) {
self.num_active_add_files.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn incr_remove_files_seen(&self) {
self.num_remove_files_seen.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn incr_non_file_actions(&self) {
self.num_non_file_actions.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn add_predicate_filtered(&self, value: u64) {
self.num_predicate_filtered
.fetch_add(value, Ordering::Relaxed);
}
pub(crate) fn update_peak_hash_set_size(&self, value: usize) {
self.peak_hash_set_size.fetch_max(value, Ordering::Relaxed);
}
pub(crate) fn add_dedup_visitor_time_ns(&self, duration_ns: u64) {
self.dedup_visitor_time_ns
.fetch_add(duration_ns, Ordering::Relaxed);
}
pub(crate) fn add_predicate_eval_time_ns(&self, duration_ns: u64) {
self.predicate_eval_time_ns
.fetch_add(duration_ns, Ordering::Relaxed);
}
pub(crate) fn reset_counters(&self) {
self.num_add_files_seen.store(0, Ordering::Relaxed);
self.num_active_add_files.store(0, Ordering::Relaxed);
self.num_remove_files_seen.store(0, Ordering::Relaxed);
self.num_non_file_actions.store(0, Ordering::Relaxed);
self.num_predicate_filtered.store(0, Ordering::Relaxed);
self.dedup_visitor_time_ns.store(0, Ordering::Relaxed);
self.predicate_eval_time_ns.store(0, Ordering::Relaxed);
}
pub(crate) fn to_event(
&self,
operation_id: MetricId,
scan_type: ScanType,
total_duration: Duration,
) -> MetricEvent {
MetricEvent::ScanMetadataCompleted {
operation_id,
scan_type,
total_duration,
num_add_files_seen: self.num_add_files_seen.load(Ordering::Relaxed),
num_active_add_files: self.num_active_add_files.load(Ordering::Relaxed),
num_remove_files_seen: self.num_remove_files_seen.load(Ordering::Relaxed),
num_non_file_actions: self.num_non_file_actions.load(Ordering::Relaxed),
num_predicate_filtered: self.num_predicate_filtered.load(Ordering::Relaxed),
peak_hash_set_size: self.peak_hash_set_size.load(Ordering::Relaxed),
dedup_visitor_time_ms: self.dedup_visitor_time_ns.load(Ordering::Relaxed) / 1_000_000,
predicate_eval_time_ms: self.predicate_eval_time_ns.load(Ordering::Relaxed) / 1_000_000,
}
}
pub(crate) fn log(&self, message: impl AsRef<str>) {
let add_files_seen = self.num_add_files_seen.load(Ordering::Relaxed);
let active_add_files = self.num_active_add_files.load(Ordering::Relaxed);
let remove_files_seen = self.num_remove_files_seen.load(Ordering::Relaxed);
let non_file_actions = self.num_non_file_actions.load(Ordering::Relaxed);
let predicate_filtered = self.num_predicate_filtered.load(Ordering::Relaxed);
let peak_hash_set_size = self.peak_hash_set_size.load(Ordering::Relaxed);
let dedup_visitor_time_ms = self.dedup_visitor_time_ns.load(Ordering::Relaxed) / 1_000_000;
let predicate_eval_time_ms =
self.predicate_eval_time_ns.load(Ordering::Relaxed) / 1_000_000;
info!(
add_files_seen,
active_add_files,
remove_files_seen,
non_file_actions,
predicate_filtered,
peak_hash_set_size,
dedup_visitor_time_ms,
predicate_eval_time_ms,
"{}",
message.as_ref()
);
}
}