use std::{
borrow::{Borrow, Cow},
fmt::Display,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
use parking_lot::Mutex;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone)]
pub struct Count {
value: std::sync::Arc<AtomicUsize>,
}
impl PartialEq for Count {
fn eq(&self, other: &Self) -> bool {
self.value().eq(&other.value())
}
}
impl Display for Count {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.value())
}
}
impl Default for Count {
fn default() -> Self {
Self::new()
}
}
impl Count {
pub fn new() -> Self {
Self {
value: Arc::new(AtomicUsize::new(0)),
}
}
pub fn add(&self, n: usize) {
self.value.fetch_add(n, Ordering::Relaxed);
}
pub fn value(&self) -> usize {
self.value.load(Ordering::Relaxed)
}
}
#[derive(Debug, Clone)]
pub struct Gauge {
value: std::sync::Arc<AtomicUsize>,
}
impl PartialEq for Gauge {
fn eq(&self, other: &Self) -> bool {
self.value().eq(&other.value())
}
}
impl Display for Gauge {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.value())
}
}
impl Default for Gauge {
fn default() -> Self {
Self::new()
}
}
impl Gauge {
pub fn new() -> Self {
Self {
value: Arc::new(AtomicUsize::new(0)),
}
}
pub fn add(&self, n: usize) {
self.value.fetch_add(n, Ordering::Relaxed);
}
pub fn sub(&self, n: usize) {
self.value.fetch_sub(n, Ordering::Relaxed);
}
pub fn set(&self, n: usize) -> usize {
self.value.swap(n, Ordering::Relaxed)
}
pub fn value(&self) -> usize {
self.value.load(Ordering::Relaxed)
}
}
#[derive(Debug, Clone)]
pub struct Time {
nanos: Arc<AtomicUsize>,
}
impl Default for Time {
fn default() -> Self {
Self::new()
}
}
impl PartialEq for Time {
fn eq(&self, other: &Self) -> bool {
self.value().eq(&other.value())
}
}
impl Display for Time {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let duration = std::time::Duration::from_nanos(self.value() as u64);
write!(f, "{duration:?}")
}
}
impl Time {
pub fn new() -> Self {
Self {
nanos: Arc::new(AtomicUsize::new(0)),
}
}
pub fn add_elapsed(&self, start: Instant) {
self.add_duration(start.elapsed());
}
pub fn add_duration(&self, duration: Duration) {
let more_nanos = duration.as_nanos() as usize;
self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
}
pub fn add(&self, other: &Time) {
self.add_duration(Duration::from_nanos(other.value() as u64))
}
pub fn timer(&self) -> ScopedTimerGuard<'_> {
ScopedTimerGuard {
inner: self,
start: Some(Instant::now()),
}
}
pub fn value(&self) -> usize {
self.nanos.load(Ordering::Relaxed)
}
}
#[derive(Debug, Clone)]
pub struct Timestamp {
timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
}
impl Default for Timestamp {
fn default() -> Self {
Self::new()
}
}
impl Timestamp {
pub fn new() -> Self {
Self {
timestamp: Arc::new(Mutex::new(None)),
}
}
pub fn record(&self) {
self.set(Utc::now())
}
pub fn set(&self, now: DateTime<Utc>) {
*self.timestamp.lock() = Some(now);
}
pub fn value(&self) -> Option<DateTime<Utc>> {
*self.timestamp.lock()
}
pub fn update_to_min(&self, other: &Timestamp) {
let min = match (self.value(), other.value()) {
(None, None) => None,
(Some(v), None) => Some(v),
(None, Some(v)) => Some(v),
(Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
};
*self.timestamp.lock() = min;
}
pub fn update_to_max(&self, other: &Timestamp) {
let max = match (self.value(), other.value()) {
(None, None) => None,
(Some(v), None) => Some(v),
(None, Some(v)) => Some(v),
(Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
};
*self.timestamp.lock() = max;
}
}
impl PartialEq for Timestamp {
fn eq(&self, other: &Self) -> bool {
self.value().eq(&other.value())
}
}
impl Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self.value() {
None => write!(f, "NONE"),
Some(v) => {
write!(f, "{v}")
}
}
}
}
pub struct ScopedTimerGuard<'a> {
inner: &'a Time,
start: Option<Instant>,
}
impl<'a> ScopedTimerGuard<'a> {
pub fn stop(&mut self) {
if let Some(start) = self.start.take() {
self.inner.add_elapsed(start)
}
}
pub fn restart(&mut self) {
self.start = Some(Instant::now())
}
pub fn done(mut self) {
self.stop()
}
}
impl<'a> Drop for ScopedTimerGuard<'a> {
fn drop(&mut self) {
self.stop()
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum MetricValue {
OutputRows(Count),
ElapsedCompute(Time),
SpillCount(Count),
SpilledBytes(Count),
CurrentMemoryUsage(Gauge),
Count {
name: Cow<'static, str>,
count: Count,
},
Gauge {
name: Cow<'static, str>,
gauge: Gauge,
},
Time {
name: Cow<'static, str>,
time: Time,
},
StartTimestamp(Timestamp),
EndTimestamp(Timestamp),
}
impl MetricValue {
pub fn name(&self) -> &str {
match self {
Self::OutputRows(_) => "output_rows",
Self::SpillCount(_) => "spill_count",
Self::SpilledBytes(_) => "spilled_bytes",
Self::CurrentMemoryUsage(_) => "mem_used",
Self::ElapsedCompute(_) => "elapsed_compute",
Self::Count { name, .. } => name.borrow(),
Self::Gauge { name, .. } => name.borrow(),
Self::Time { name, .. } => name.borrow(),
Self::StartTimestamp(_) => "start_timestamp",
Self::EndTimestamp(_) => "end_timestamp",
}
}
pub fn as_usize(&self) -> usize {
match self {
Self::OutputRows(count) => count.value(),
Self::SpillCount(count) => count.value(),
Self::SpilledBytes(bytes) => bytes.value(),
Self::CurrentMemoryUsage(used) => used.value(),
Self::ElapsedCompute(time) => time.value(),
Self::Count { count, .. } => count.value(),
Self::Gauge { gauge, .. } => gauge.value(),
Self::Time { time, .. } => time.value(),
Self::StartTimestamp(timestamp) => timestamp
.value()
.map(|ts| ts.timestamp_nanos() as usize)
.unwrap_or(0),
Self::EndTimestamp(timestamp) => timestamp
.value()
.map(|ts| ts.timestamp_nanos() as usize)
.unwrap_or(0),
}
}
pub fn new_empty(&self) -> Self {
match self {
Self::OutputRows(_) => Self::OutputRows(Count::new()),
Self::SpillCount(_) => Self::SpillCount(Count::new()),
Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
Self::Count { name, .. } => Self::Count {
name: name.clone(),
count: Count::new(),
},
Self::Gauge { name, .. } => Self::Gauge {
name: name.clone(),
gauge: Gauge::new(),
},
Self::Time { name, .. } => Self::Time {
name: name.clone(),
time: Time::new(),
},
Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
}
}
pub fn aggregate(&mut self, other: &Self) {
match (self, other) {
(Self::OutputRows(count), Self::OutputRows(other_count))
| (Self::SpillCount(count), Self::SpillCount(other_count))
| (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
| (
Self::Count { count, .. },
Self::Count {
count: other_count, ..
},
) => count.add(other_count.value()),
(Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
| (
Self::Gauge { gauge, .. },
Self::Gauge {
gauge: other_gauge, ..
},
) => gauge.add(other_gauge.value()),
(Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
| (
Self::Time { time, .. },
Self::Time {
time: other_time, ..
},
) => time.add(other_time),
(Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
timestamp.update_to_min(other_timestamp);
}
(Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
timestamp.update_to_max(other_timestamp);
}
m @ (_, _) => {
panic!(
"Mismatched metric types. Can not aggregate {:?} with value {:?}",
m.0, m.1
)
}
}
}
pub fn display_sort_key(&self) -> u8 {
match self {
Self::OutputRows(_) => 0, Self::ElapsedCompute(_) => 1, Self::SpillCount(_) => 2,
Self::SpilledBytes(_) => 3,
Self::CurrentMemoryUsage(_) => 4,
Self::Count { .. } => 5,
Self::Gauge { .. } => 6,
Self::Time { .. } => 7,
Self::StartTimestamp(_) => 8, Self::EndTimestamp(_) => 9,
}
}
pub fn is_timestamp(&self) -> bool {
matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
}
}
impl std::fmt::Display for MetricValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::OutputRows(count)
| Self::SpillCount(count)
| Self::SpilledBytes(count)
| Self::Count { count, .. } => {
write!(f, "{count}")
}
Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
write!(f, "{gauge}")
}
Self::ElapsedCompute(time) | Self::Time { time, .. } => {
if time.value() > 0 {
write!(f, "{time}")
} else {
write!(f, "NOT RECORDED")
}
}
Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
write!(f, "{timestamp}")
}
}
}
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
use super::*;
#[test]
fn test_display_output_rows() {
let count = Count::new();
let values = vec![
MetricValue::OutputRows(count.clone()),
MetricValue::Count {
name: "my_counter".into(),
count: count.clone(),
},
];
for value in &values {
assert_eq!("0", value.to_string(), "value {value:?}");
}
count.add(42);
for value in &values {
assert_eq!("42", value.to_string(), "value {value:?}");
}
}
#[test]
fn test_display_time() {
let time = Time::new();
let values = vec![
MetricValue::ElapsedCompute(time.clone()),
MetricValue::Time {
name: "my_time".into(),
time: time.clone(),
},
];
for value in &values {
assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
}
time.add_duration(Duration::from_nanos(1042));
for value in &values {
assert_eq!("1.042µs", value.to_string(), "value {value:?}");
}
}
#[test]
fn test_display_timestamp() {
let timestamp = Timestamp::new();
let values = vec![
MetricValue::StartTimestamp(timestamp.clone()),
MetricValue::EndTimestamp(timestamp.clone()),
];
for value in &values {
assert_eq!("NONE", value.to_string(), "value {value:?}");
}
timestamp.set(Utc.timestamp_nanos(1431648000000000));
for value in &values {
assert_eq!(
"1970-01-17 13:40:48 UTC",
value.to_string(),
"value {value:?}"
);
}
}
}