use super::CustomMetricValue;
use chrono::{DateTime, Utc};
use datafusion_common::{
human_readable_count, human_readable_duration, human_readable_size, instant::Instant,
};
use parking_lot::Mutex;
use std::{
borrow::{Borrow, Cow},
fmt::{Debug, Display},
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
#[derive(Debug, Clone)]
pub struct Count {
value: Arc<AtomicUsize>,
}
impl PartialEq for Count {
fn eq(&self, other: &Self) -> bool {
self.value().eq(&other.value())
}
}
impl Display for Count {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", human_readable_count(self.value()))
}
}
impl Default for Count {
fn default() -> Self {
Self::new()
}
}
impl Count {
pub fn new() -> Self {
Self {
value: Arc::new(AtomicUsize::new(0)),
}
}
pub fn add(&self, n: usize) {
self.value.fetch_add(n, Ordering::Relaxed);
}
pub fn value(&self) -> usize {
self.value.load(Ordering::Relaxed)
}
}
#[derive(Debug, Clone)]
pub struct Gauge {
value: Arc<AtomicUsize>,
}
impl PartialEq for Gauge {
fn eq(&self, other: &Self) -> bool {
self.value().eq(&other.value())
}
}
impl Display for Gauge {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.value())
}
}
impl Default for Gauge {
fn default() -> Self {
Self::new()
}
}
impl Gauge {
pub fn new() -> Self {
Self {
value: Arc::new(AtomicUsize::new(0)),
}
}
pub fn add(&self, n: usize) {
self.value.fetch_add(n, Ordering::Relaxed);
}
pub fn sub(&self, n: usize) {
self.value.fetch_sub(n, Ordering::Relaxed);
}
pub fn set_max(&self, n: usize) {
self.value.fetch_max(n, Ordering::Relaxed);
}
pub fn set(&self, n: usize) -> usize {
self.value.swap(n, Ordering::Relaxed)
}
pub fn value(&self) -> usize {
self.value.load(Ordering::Relaxed)
}
}
#[derive(Debug, Clone)]
pub struct Time {
nanos: Arc<AtomicUsize>,
}
impl Default for Time {
fn default() -> Self {
Self::new()
}
}
impl PartialEq for Time {
fn eq(&self, other: &Self) -> bool {
self.value().eq(&other.value())
}
}
impl Display for Time {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", human_readable_duration(self.value() as u64))
}
}
impl Time {
pub fn new() -> Self {
Self {
nanos: Arc::new(AtomicUsize::new(0)),
}
}
pub fn add_elapsed(&self, start: Instant) {
self.add_duration(start.elapsed());
}
pub fn add_duration(&self, duration: Duration) {
let more_nanos = duration.as_nanos() as usize;
self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
}
pub fn add(&self, other: &Time) {
self.add_duration(Duration::from_nanos(other.value() as u64))
}
pub fn timer(&self) -> ScopedTimerGuard<'_> {
ScopedTimerGuard {
inner: self,
start: Some(Instant::now()),
}
}
pub fn value(&self) -> usize {
self.nanos.load(Ordering::Relaxed)
}
pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
ScopedTimerGuard {
inner: self,
start: Some(now),
}
}
}
#[derive(Debug, Clone)]
pub struct Timestamp {
timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
}
impl Default for Timestamp {
fn default() -> Self {
Self::new()
}
}
impl Timestamp {
pub fn new() -> Self {
Self {
timestamp: Arc::new(Mutex::new(None)),
}
}
pub fn record(&self) {
self.set(Utc::now())
}
pub fn set(&self, now: DateTime<Utc>) {
*self.timestamp.lock() = Some(now);
}
pub fn value(&self) -> Option<DateTime<Utc>> {
*self.timestamp.lock()
}
pub fn update_to_min(&self, other: &Timestamp) {
let min = match (self.value(), other.value()) {
(None, None) => None,
(Some(v), None) => Some(v),
(None, Some(v)) => Some(v),
(Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
};
*self.timestamp.lock() = min;
}
pub fn update_to_max(&self, other: &Timestamp) {
let max = match (self.value(), other.value()) {
(None, None) => None,
(Some(v), None) => Some(v),
(None, Some(v)) => Some(v),
(Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
};
*self.timestamp.lock() = max;
}
}
impl PartialEq for Timestamp {
fn eq(&self, other: &Self) -> bool {
self.value().eq(&other.value())
}
}
impl Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self.value() {
None => write!(f, "NONE"),
Some(v) => {
write!(f, "{v}")
}
}
}
}
pub struct ScopedTimerGuard<'a> {
inner: &'a Time,
start: Option<Instant>,
}
impl ScopedTimerGuard<'_> {
pub fn stop(&mut self) {
if let Some(start) = self.start.take() {
self.inner.add_elapsed(start)
}
}
pub fn restart(&mut self) {
self.start = Some(Instant::now())
}
pub fn done(mut self) {
self.stop()
}
pub fn stop_with(&mut self, end_time: Instant) {
if let Some(start) = self.start.take() {
let elapsed = end_time - start;
self.inner.add_duration(elapsed)
}
}
pub fn done_with(mut self, end_time: Instant) {
self.stop_with(end_time)
}
}
impl Drop for ScopedTimerGuard<'_> {
fn drop(&mut self) {
self.stop()
}
}
#[derive(Debug, Clone)]
pub struct PruningMetrics {
pruned: Arc<AtomicUsize>,
matched: Arc<AtomicUsize>,
fully_matched: Arc<AtomicUsize>,
}
impl Display for PruningMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let matched = self.matched.load(Ordering::Relaxed);
let total = self.pruned.load(Ordering::Relaxed) + matched;
let fully_matched = self.fully_matched.load(Ordering::Relaxed);
if fully_matched != 0 {
write!(
f,
"{} total → {} matched -> {} fully matched",
human_readable_count(total),
human_readable_count(matched),
human_readable_count(fully_matched)
)
} else {
write!(
f,
"{} total → {} matched",
human_readable_count(total),
human_readable_count(matched)
)
}
}
}
impl Default for PruningMetrics {
fn default() -> Self {
Self::new()
}
}
impl PruningMetrics {
pub fn new() -> Self {
Self {
pruned: Arc::new(AtomicUsize::new(0)),
matched: Arc::new(AtomicUsize::new(0)),
fully_matched: Arc::new(AtomicUsize::new(0)),
}
}
pub fn add_pruned(&self, n: usize) {
self.pruned.fetch_add(n, Ordering::Relaxed);
}
pub fn add_matched(&self, n: usize) {
self.matched.fetch_add(n, Ordering::Relaxed);
}
pub fn add_fully_matched(&self, n: usize) {
self.fully_matched.fetch_add(n, Ordering::Relaxed);
}
pub fn subtract_matched(&self, n: usize) {
self.matched.fetch_sub(n, Ordering::Relaxed);
}
pub fn pruned(&self) -> usize {
self.pruned.load(Ordering::Relaxed)
}
pub fn matched(&self) -> usize {
self.matched.load(Ordering::Relaxed)
}
pub fn fully_matched(&self) -> usize {
self.fully_matched.load(Ordering::Relaxed)
}
}
#[derive(Debug, Clone, Default)]
pub struct RatioMetrics {
part: Arc<AtomicUsize>,
total: Arc<AtomicUsize>,
merge_strategy: RatioMergeStrategy,
}
#[derive(Debug, Clone, Default)]
pub enum RatioMergeStrategy {
#[default]
AddPartAddTotal,
AddPartSetTotal,
SetPartAddTotal,
}
impl RatioMetrics {
pub fn new() -> Self {
Self {
part: Arc::new(AtomicUsize::new(0)),
total: Arc::new(AtomicUsize::new(0)),
merge_strategy: RatioMergeStrategy::AddPartAddTotal,
}
}
pub fn with_merge_strategy(mut self, merge_strategy: RatioMergeStrategy) -> Self {
self.merge_strategy = merge_strategy;
self
}
pub fn add_part(&self, n: usize) {
self.part.fetch_add(n, Ordering::Relaxed);
}
pub fn add_total(&self, n: usize) {
self.total.fetch_add(n, Ordering::Relaxed);
}
pub fn set_part(&self, n: usize) {
self.part.store(n, Ordering::Relaxed);
}
pub fn set_total(&self, n: usize) {
self.total.store(n, Ordering::Relaxed);
}
pub fn merge(&self, other: &Self) {
match self.merge_strategy {
RatioMergeStrategy::AddPartAddTotal => {
self.add_part(other.part());
self.add_total(other.total());
}
RatioMergeStrategy::AddPartSetTotal => {
self.add_part(other.part());
self.set_total(other.total());
}
RatioMergeStrategy::SetPartAddTotal => {
self.set_part(other.part());
self.add_total(other.total());
}
}
}
pub fn part(&self) -> usize {
self.part.load(Ordering::Relaxed)
}
pub fn total(&self) -> usize {
self.total.load(Ordering::Relaxed)
}
}
impl PartialEq for RatioMetrics {
fn eq(&self, other: &Self) -> bool {
self.part() == other.part() && self.total() == other.total()
}
}
fn fmt_significant(mut x: f64, digits: usize) -> String {
if x == 0.0 {
return "0".to_string();
}
let exp = x.abs().log10().floor(); let scale = 10f64.powf(-(exp - (digits as f64 - 1.0)));
x = (x * scale).round() / scale; format!("{x}")
}
impl Display for RatioMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let part = self.part();
let total = self.total();
if total == 0 {
if part == 0 {
write!(f, "N/A (0/0)")
} else {
write!(f, "N/A ({}/0)", human_readable_count(part))
}
} else {
let percentage = (part as f64 / total as f64) * 100.0;
write!(
f,
"{}% ({}/{})",
fmt_significant(percentage, 2),
human_readable_count(part),
human_readable_count(total)
)
}
}
}
#[derive(Debug, Clone)]
pub enum MetricValue {
OutputRows(Count),
ElapsedCompute(Time),
SpillCount(Count),
SpilledBytes(Count),
OutputBytes(Count),
OutputBatches(Count),
SpilledRows(Count),
CurrentMemoryUsage(Gauge),
Count {
name: Cow<'static, str>,
count: Count,
},
Gauge {
name: Cow<'static, str>,
gauge: Gauge,
},
Time {
name: Cow<'static, str>,
time: Time,
},
StartTimestamp(Timestamp),
EndTimestamp(Timestamp),
PruningMetrics {
name: Cow<'static, str>,
pruning_metrics: PruningMetrics,
},
Ratio {
name: Cow<'static, str>,
ratio_metrics: RatioMetrics,
},
Custom {
name: Cow<'static, str>,
value: Arc<dyn CustomMetricValue>,
},
}
impl PartialEq for MetricValue {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
count == other
}
(MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
time == other
}
(MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
count == other
}
(MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
count == other
}
(MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
count == other
}
(MetricValue::OutputBatches(count), MetricValue::OutputBatches(other)) => {
count == other
}
(MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
count == other
}
(
MetricValue::CurrentMemoryUsage(gauge),
MetricValue::CurrentMemoryUsage(other),
) => gauge == other,
(
MetricValue::Count { name, count },
MetricValue::Count {
name: other_name,
count: other_count,
},
) => name == other_name && count == other_count,
(
MetricValue::Gauge { name, gauge },
MetricValue::Gauge {
name: other_name,
gauge: other_gauge,
},
) => name == other_name && gauge == other_gauge,
(
MetricValue::Time { name, time },
MetricValue::Time {
name: other_name,
time: other_time,
},
) => name == other_name && time == other_time,
(
MetricValue::StartTimestamp(timestamp),
MetricValue::StartTimestamp(other),
) => timestamp == other,
(MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
timestamp == other
}
(
MetricValue::PruningMetrics {
name,
pruning_metrics,
},
MetricValue::PruningMetrics {
name: other_name,
pruning_metrics: other_pruning_metrics,
},
) => {
name == other_name
&& pruning_metrics.pruned() == other_pruning_metrics.pruned()
&& pruning_metrics.matched() == other_pruning_metrics.matched()
}
(
MetricValue::Ratio {
name,
ratio_metrics,
},
MetricValue::Ratio {
name: other_name,
ratio_metrics: other_ratio_metrics,
},
) => name == other_name && ratio_metrics == other_ratio_metrics,
(
MetricValue::Custom { name, value },
MetricValue::Custom {
name: other_name,
value: other_value,
},
) => name == other_name && value.is_eq(other_value),
_ => false,
}
}
}
impl MetricValue {
pub fn name(&self) -> &str {
match self {
Self::OutputRows(_) => "output_rows",
Self::SpillCount(_) => "spill_count",
Self::SpilledBytes(_) => "spilled_bytes",
Self::OutputBytes(_) => "output_bytes",
Self::OutputBatches(_) => "output_batches",
Self::SpilledRows(_) => "spilled_rows",
Self::CurrentMemoryUsage(_) => "mem_used",
Self::ElapsedCompute(_) => "elapsed_compute",
Self::Count { name, .. } => name.borrow(),
Self::Gauge { name, .. } => name.borrow(),
Self::Time { name, .. } => name.borrow(),
Self::StartTimestamp(_) => "start_timestamp",
Self::EndTimestamp(_) => "end_timestamp",
Self::PruningMetrics { name, .. } => name.borrow(),
Self::Ratio { name, .. } => name.borrow(),
Self::Custom { name, .. } => name.borrow(),
}
}
pub fn as_usize(&self) -> usize {
match self {
Self::OutputRows(count) => count.value(),
Self::SpillCount(count) => count.value(),
Self::SpilledBytes(bytes) => bytes.value(),
Self::OutputBytes(bytes) => bytes.value(),
Self::OutputBatches(count) => count.value(),
Self::SpilledRows(count) => count.value(),
Self::CurrentMemoryUsage(used) => used.value(),
Self::ElapsedCompute(time) => time.value(),
Self::Count { count, .. } => count.value(),
Self::Gauge { gauge, .. } => gauge.value(),
Self::Time { time, .. } => time.value(),
Self::StartTimestamp(timestamp) => timestamp
.value()
.and_then(|ts| ts.timestamp_nanos_opt())
.map(|nanos| nanos as usize)
.unwrap_or(0),
Self::EndTimestamp(timestamp) => timestamp
.value()
.and_then(|ts| ts.timestamp_nanos_opt())
.map(|nanos| nanos as usize)
.unwrap_or(0),
Self::PruningMetrics { .. } => 0,
Self::Ratio { .. } => 0,
Self::Custom { value, .. } => value.as_usize(),
}
}
pub fn new_empty(&self) -> Self {
match self {
Self::OutputRows(_) => Self::OutputRows(Count::new()),
Self::SpillCount(_) => Self::SpillCount(Count::new()),
Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
Self::OutputBatches(_) => Self::OutputBatches(Count::new()),
Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
Self::Count { name, .. } => Self::Count {
name: name.clone(),
count: Count::new(),
},
Self::Gauge { name, .. } => Self::Gauge {
name: name.clone(),
gauge: Gauge::new(),
},
Self::Time { name, .. } => Self::Time {
name: name.clone(),
time: Time::new(),
},
Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
Self::PruningMetrics { name, .. } => Self::PruningMetrics {
name: name.clone(),
pruning_metrics: PruningMetrics::new(),
},
Self::Ratio {
name,
ratio_metrics,
} => {
let merge_strategy = ratio_metrics.merge_strategy.clone();
Self::Ratio {
name: name.clone(),
ratio_metrics: RatioMetrics::new()
.with_merge_strategy(merge_strategy),
}
}
Self::Custom { name, value } => Self::Custom {
name: name.clone(),
value: value.new_empty(),
},
}
}
pub fn aggregate(&mut self, other: &Self) {
match (self, other) {
(Self::OutputRows(count), Self::OutputRows(other_count))
| (Self::SpillCount(count), Self::SpillCount(other_count))
| (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
| (Self::OutputBytes(count), Self::OutputBytes(other_count))
| (Self::OutputBatches(count), Self::OutputBatches(other_count))
| (Self::SpilledRows(count), Self::SpilledRows(other_count))
| (
Self::Count { count, .. },
Self::Count {
count: other_count, ..
},
) => count.add(other_count.value()),
(Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
| (
Self::Gauge { gauge, .. },
Self::Gauge {
gauge: other_gauge, ..
},
) => gauge.add(other_gauge.value()),
(Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
| (
Self::Time { time, .. },
Self::Time {
time: other_time, ..
},
) => time.add(other_time),
(Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
timestamp.update_to_min(other_timestamp);
}
(Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
timestamp.update_to_max(other_timestamp);
}
(
Self::PruningMetrics {
pruning_metrics, ..
},
Self::PruningMetrics {
pruning_metrics: other_pruning_metrics,
..
},
) => {
let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed);
let matched = other_pruning_metrics.matched.load(Ordering::Relaxed);
let fully_matched =
other_pruning_metrics.fully_matched.load(Ordering::Relaxed);
pruning_metrics.add_pruned(pruned);
pruning_metrics.add_matched(matched);
pruning_metrics.add_fully_matched(fully_matched);
}
(
Self::Ratio { ratio_metrics, .. },
Self::Ratio {
ratio_metrics: other_ratio_metrics,
..
},
) => {
ratio_metrics.merge(other_ratio_metrics);
}
(
Self::Custom { value, .. },
Self::Custom {
value: other_value, ..
},
) => {
value.aggregate(Arc::clone(other_value));
}
m @ (_, _) => {
panic!(
"Mismatched metric types. Can not aggregate {:?} with value {:?}",
m.0, m.1
)
}
}
}
pub fn display_sort_key(&self) -> u8 {
match self {
Self::OutputRows(_) => 0,
Self::ElapsedCompute(_) => 1,
Self::OutputBytes(_) => 2,
Self::OutputBatches(_) => 3,
Self::PruningMetrics { name, .. } => match name.as_ref() {
"files_ranges_pruned_statistics" => 4,
"row_groups_pruned_statistics" => 5,
"row_groups_pruned_bloom_filter" => 6,
"page_index_pages_pruned" => 7,
"page_index_rows_pruned" => 8,
_ => 9,
},
Self::SpillCount(_) => 10,
Self::SpilledBytes(_) => 11,
Self::SpilledRows(_) => 12,
Self::CurrentMemoryUsage(_) => 13,
Self::Count { .. } => 14,
Self::Gauge { .. } => 15,
Self::Time { .. } => 16,
Self::Ratio { .. } => 17,
Self::StartTimestamp(_) => 18, Self::EndTimestamp(_) => 19,
Self::Custom { .. } => 20,
}
}
pub fn is_timestamp(&self) -> bool {
matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
}
}
impl Display for MetricValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::OutputRows(count)
| Self::OutputBatches(count)
| Self::SpillCount(count)
| Self::SpilledRows(count)
| Self::Count { count, .. } => {
write!(f, "{count}")
}
Self::SpilledBytes(count) | Self::OutputBytes(count) => {
let readable_count = human_readable_size(count.value());
write!(f, "{readable_count}")
}
Self::CurrentMemoryUsage(gauge) => {
let readable_size = human_readable_size(gauge.value());
write!(f, "{readable_size}")
}
Self::Gauge { gauge, .. } => {
write!(f, "{}", human_readable_count(gauge.value()))
}
Self::ElapsedCompute(time) | Self::Time { time, .. } => {
if time.value() > 0 {
write!(f, "{time}")
} else {
write!(f, "NOT RECORDED")
}
}
Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
write!(f, "{timestamp}")
}
Self::PruningMetrics {
pruning_metrics, ..
} => {
write!(f, "{pruning_metrics}")
}
Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
Self::Custom { name, value } => {
write!(f, "name:{name} {value}")
}
}
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use chrono::TimeZone;
use datafusion_common::units::MB;
use super::*;
#[derive(Debug, Default)]
pub struct CustomCounter {
count: AtomicUsize,
}
impl Display for CustomCounter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "count: {}", self.count.load(Ordering::Relaxed))
}
}
impl CustomMetricValue for CustomCounter {
fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
Arc::new(CustomCounter::default())
}
fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
let other = other.as_any().downcast_ref::<Self>().unwrap();
self.count
.fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
}
fn as_any(&self) -> &dyn Any {
self
}
fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
let Some(other) = other.as_any().downcast_ref::<Self>() else {
return false;
};
self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
}
}
fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
let custom_counter = CustomCounter::default();
custom_counter.count.fetch_add(value, Ordering::Relaxed);
MetricValue::Custom {
name: Cow::Borrowed(name),
value: Arc::new(custom_counter),
}
}
#[test]
fn test_custom_metric_with_mismatching_names() {
let mut custom_val = new_custom_counter("Hi", 1);
let other_custom_val = new_custom_counter("Hello", 1);
assert!(other_custom_val != custom_val);
custom_val.aggregate(&other_custom_val);
let expected_val = new_custom_counter("Hi", 2);
assert!(expected_val == custom_val);
}
#[test]
fn test_custom_metric() {
let mut custom_val = new_custom_counter("hi", 11);
let other_custom_val = new_custom_counter("hi", 20);
custom_val.aggregate(&other_custom_val);
assert!(custom_val != other_custom_val);
if let MetricValue::Custom { value, .. } = custom_val {
let counter = value
.as_any()
.downcast_ref::<CustomCounter>()
.expect("Expected CustomCounter");
assert_eq!(counter.count.load(Ordering::Relaxed), 31);
} else {
panic!("Unexpected value");
}
}
#[test]
fn test_display_output_rows() {
let count = Count::new();
let values = vec![
MetricValue::OutputRows(count.clone()),
MetricValue::Count {
name: "my_counter".into(),
count: count.clone(),
},
];
for value in &values {
assert_eq!("0", value.to_string(), "value {value:?}");
}
count.add(42);
for value in &values {
assert_eq!("42", value.to_string(), "value {value:?}");
}
}
#[test]
fn test_display_spilled_bytes() {
let count = Count::new();
let spilled_byte = MetricValue::SpilledBytes(count.clone());
assert_eq!("0.0 B", spilled_byte.to_string());
count.add((100 * MB) as usize);
assert_eq!("100.0 MB", spilled_byte.to_string());
count.add((0.5 * MB as f64) as usize);
assert_eq!("100.5 MB", spilled_byte.to_string());
}
#[test]
fn test_display_time() {
let time = Time::new();
let values = vec![
MetricValue::ElapsedCompute(time.clone()),
MetricValue::Time {
name: "my_time".into(),
time: time.clone(),
},
];
for value in &values {
assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
}
time.add_duration(Duration::from_nanos(1042));
for value in &values {
assert_eq!("1.04µs", value.to_string(), "value {value:?}");
}
}
#[test]
fn test_display_ratio() {
let ratio_metrics = RatioMetrics::new();
let ratio = MetricValue::Ratio {
name: Cow::Borrowed("ratio_metric"),
ratio_metrics: ratio_metrics.clone(),
};
assert_eq!("N/A (0/0)", ratio.to_string());
ratio_metrics.add_part(10);
assert_eq!("N/A (10/0)", ratio.to_string());
ratio_metrics.add_total(40);
assert_eq!("25% (10/40)", ratio.to_string());
let tiny_ratio_metrics = RatioMetrics::new();
let tiny_ratio = MetricValue::Ratio {
name: Cow::Borrowed("tiny_ratio_metric"),
ratio_metrics: tiny_ratio_metrics.clone(),
};
tiny_ratio_metrics.add_part(1);
tiny_ratio_metrics.add_total(3000);
assert_eq!("0.033% (1/3.00 K)", tiny_ratio.to_string());
}
#[test]
fn test_ratio_set_methods() {
let ratio_metrics = RatioMetrics::new();
ratio_metrics.set_part(10);
ratio_metrics.set_part(10);
ratio_metrics.set_total(40);
ratio_metrics.set_total(40);
assert_eq!("25% (10/40)", ratio_metrics.to_string());
let ratio_metrics = RatioMetrics::new();
ratio_metrics.set_part(10);
ratio_metrics.set_part(30);
ratio_metrics.set_total(40);
ratio_metrics.set_total(50);
assert_eq!("60% (30/50)", ratio_metrics.to_string());
}
#[test]
fn test_ratio_merge_strategy() {
let ratio_metrics1 =
RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
ratio_metrics1.set_part(10);
ratio_metrics1.set_total(40);
assert_eq!("25% (10/40)", ratio_metrics1.to_string());
let ratio_metrics2 =
RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
ratio_metrics2.set_part(20);
ratio_metrics2.set_total(40);
assert_eq!("50% (20/40)", ratio_metrics2.to_string());
ratio_metrics1.merge(&ratio_metrics2);
assert_eq!("75% (30/40)", ratio_metrics1.to_string());
let ratio_metrics1 =
RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::SetPartAddTotal);
ratio_metrics1.set_part(20);
ratio_metrics1.set_total(50);
let ratio_metrics2 = RatioMetrics::new();
ratio_metrics2.set_part(20);
ratio_metrics2.set_total(50);
ratio_metrics1.merge(&ratio_metrics2);
assert_eq!("20% (20/100)", ratio_metrics1.to_string());
let ratio_metrics1 = RatioMetrics::new();
ratio_metrics1.set_part(20);
ratio_metrics1.set_total(50);
let ratio_metrics2 = RatioMetrics::new();
ratio_metrics2.set_part(20);
ratio_metrics2.set_total(50);
ratio_metrics1.merge(&ratio_metrics2);
assert_eq!("40% (40/100)", ratio_metrics1.to_string());
}
#[test]
fn test_display_timestamp() {
let timestamp = Timestamp::new();
let values = vec![
MetricValue::StartTimestamp(timestamp.clone()),
MetricValue::EndTimestamp(timestamp.clone()),
];
for value in &values {
assert_eq!("NONE", value.to_string(), "value {value:?}");
}
timestamp.set(Utc.timestamp_nanos(1431648000000000));
for value in &values {
assert_eq!(
"1970-01-17 13:40:48 UTC",
value.to_string(),
"value {value:?}"
);
}
}
#[test]
fn test_timer_with_custom_instant() {
let time = Time::new();
let start_time = Instant::now();
std::thread::sleep(Duration::from_millis(1));
let mut timer = time.timer_with(start_time);
std::thread::sleep(Duration::from_millis(1));
timer.stop();
assert!(
time.value() >= 2_000_000,
"Expected at least 2ms, got {} ns",
time.value()
);
}
#[test]
fn test_stop_with_custom_endpoint() {
let time = Time::new();
let start = Instant::now();
let mut timer = time.timer_with(start);
let end = start + Duration::from_millis(10);
timer.stop_with(end);
let recorded = time.value();
assert!(
(10_000_000..=10_100_000).contains(&recorded),
"Expected ~10ms, got {recorded} ns"
);
timer.stop_with(end);
assert_eq!(
recorded,
time.value(),
"Time should not change after second stop"
);
}
#[test]
fn test_done_with_custom_endpoint() {
let time = Time::new();
let start = Instant::now();
{
let timer = time.timer_with(start);
let end = start + Duration::from_millis(5);
timer.done_with(end);
}
let recorded = time.value();
assert!(
(5_000_000..=5_100_000).contains(&recorded),
"Expected ~5ms, got {recorded} ns",
);
{
let timer2 = time.timer_with(start);
let end2 = start + Duration::from_millis(5);
timer2.done_with(end2);
}
let new_recorded = time.value();
assert!(
(10_000_000..=10_100_000).contains(&new_recorded),
"Expected ~10ms total, got {new_recorded} ns",
);
}
#[test]
fn test_human_readable_metric_formatting() {
let small_count = Count::new();
small_count.add(42);
assert_eq!(
MetricValue::OutputRows(small_count.clone()).to_string(),
"42"
);
let thousand_count = Count::new();
thousand_count.add(10_100);
assert_eq!(
MetricValue::OutputRows(thousand_count.clone()).to_string(),
"10.10 K"
);
let million_count = Count::new();
million_count.add(1_532_000);
assert_eq!(
MetricValue::SpilledRows(million_count.clone()).to_string(),
"1.53 M"
);
let billion_count = Count::new();
billion_count.add(2_500_000_000);
assert_eq!(
MetricValue::OutputBatches(billion_count.clone()).to_string(),
"2.50 B"
);
let micros_time = Time::new();
micros_time.add_duration(Duration::from_nanos(1_234));
assert_eq!(
MetricValue::ElapsedCompute(micros_time.clone()).to_string(),
"1.23µs"
);
let millis_time = Time::new();
millis_time.add_duration(Duration::from_nanos(11_295_377));
assert_eq!(
MetricValue::ElapsedCompute(millis_time.clone()).to_string(),
"11.30ms"
);
let seconds_time = Time::new();
seconds_time.add_duration(Duration::from_nanos(1_234_567_890));
assert_eq!(
MetricValue::ElapsedCompute(seconds_time.clone()).to_string(),
"1.23s"
);
let mem_gauge = Gauge::new();
mem_gauge.add(100 * MB as usize);
assert_eq!(
MetricValue::CurrentMemoryUsage(mem_gauge.clone()).to_string(),
"100.0 MB"
);
let custom_gauge = Gauge::new();
custom_gauge.add(50_000);
assert_eq!(
MetricValue::Gauge {
name: "custom".into(),
gauge: custom_gauge.clone()
}
.to_string(),
"50.00 K"
);
let pruning = PruningMetrics::new();
pruning.add_matched(500_000);
pruning.add_pruned(500_000);
assert_eq!(
MetricValue::PruningMetrics {
name: "test_pruning".into(),
pruning_metrics: pruning.clone()
}
.to_string(),
"1.00 M total → 500.0 K matched"
);
let ratio = RatioMetrics::new();
ratio.add_part(250_000);
ratio.add_total(1_000_000);
assert_eq!(
MetricValue::Ratio {
name: "test_ratio".into(),
ratio_metrics: ratio.clone()
}
.to_string(),
"25% (250.0 K/1.00 M)"
);
}
}