use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Default)]
pub struct SimpleHistogram {
count: AtomicUsize,
sum_micros: AtomicUsize, }
impl SimpleHistogram {
pub fn record(&self, duration: Duration) {
self.count.fetch_add(1, Ordering::Relaxed);
self.sum_micros.fetch_add(
duration.as_micros().try_into().unwrap_or(usize::MAX),
Ordering::Relaxed,
);
}
pub fn get_count(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
pub fn get_sum_micros(&self) -> usize {
self.sum_micros.load(Ordering::Relaxed)
}
}
#[derive(Debug, Clone)]
pub struct SchedulerMetrics {
pub jobs_submitted: Arc<AtomicUsize>,
pub jobs_executed_success: Arc<AtomicUsize>,
pub jobs_executed_fail: Arc<AtomicUsize>,
pub jobs_panicked: Arc<AtomicUsize>,
pub jobs_retried: Arc<AtomicUsize>,
pub jobs_lineage_cancelled: Arc<AtomicUsize>,
pub jobs_instance_discarded_cancelled: Arc<AtomicUsize>,
pub jobs_permanently_failed: Arc<AtomicUsize>,
pub staging_submitted_total: Arc<AtomicUsize>,
pub staging_rejected_full: Arc<AtomicUsize>,
pub job_queue_scheduled_current: Arc<AtomicUsize>,
pub job_staging_buffer_current: Arc<AtomicUsize>,
pub workers_active_current: Arc<AtomicUsize>,
pub job_execution_duration: Arc<SimpleHistogram>,
pub job_queue_wait_duration: Arc<SimpleHistogram>,
}
impl SchedulerMetrics {
pub fn new() -> Self {
Self {
jobs_submitted: Default::default(),
jobs_executed_success: Default::default(),
jobs_executed_fail: Default::default(),
jobs_panicked: Default::default(),
jobs_retried: Default::default(),
jobs_lineage_cancelled: Default::default(),
jobs_instance_discarded_cancelled: Default::default(),
jobs_permanently_failed: Default::default(),
staging_submitted_total: Default::default(),
staging_rejected_full: Default::default(),
job_queue_scheduled_current: Default::default(),
job_staging_buffer_current: Default::default(),
workers_active_current: Default::default(),
job_execution_duration: Arc::new(SimpleHistogram::default()),
job_queue_wait_duration: Arc::new(SimpleHistogram::default()),
}
}
pub fn snapshot(&self) -> MetricsSnapshot {
let order = Ordering::Relaxed;
MetricsSnapshot {
jobs_submitted: self.jobs_submitted.load(order),
jobs_executed_success: self.jobs_executed_success.load(order),
jobs_executed_fail: self.jobs_executed_fail.load(order),
jobs_panicked: self.jobs_panicked.load(order),
jobs_retried: self.jobs_retried.load(order),
jobs_lineage_cancelled: self.jobs_lineage_cancelled.load(order),
jobs_instance_discarded_cancelled: self.jobs_instance_discarded_cancelled.load(order),
jobs_permanently_failed: self.jobs_permanently_failed.load(order),
staging_submitted_total: self.staging_submitted_total.load(order),
staging_rejected_full: self.staging_rejected_full.load(order),
job_queue_scheduled_current: self.job_queue_scheduled_current.load(order),
job_staging_buffer_current: self.job_staging_buffer_current.load(order),
workers_active_current: self.workers_active_current.load(order),
job_execution_duration_count: self.job_execution_duration.get_count(),
job_execution_duration_sum_micros: self.job_execution_duration.get_sum_micros(),
job_queue_wait_duration_count: self.job_queue_wait_duration.get_count(),
job_queue_wait_duration_sum_micros: self.job_queue_wait_duration.get_sum_micros(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct MetricsSnapshot {
pub jobs_submitted: usize,
pub jobs_executed_success: usize,
pub jobs_executed_fail: usize,
pub jobs_panicked: usize,
pub jobs_retried: usize,
pub jobs_lineage_cancelled: usize,
pub jobs_instance_discarded_cancelled: usize,
pub jobs_permanently_failed: usize,
pub staging_submitted_total: usize,
pub staging_rejected_full: usize,
pub job_queue_scheduled_current: usize,
pub job_staging_buffer_current: usize,
pub workers_active_current: usize,
pub job_execution_duration_count: usize,
pub job_execution_duration_sum_micros: usize,
pub job_queue_wait_duration_count: usize,
pub job_queue_wait_duration_sum_micros: usize,
}
impl MetricsSnapshot {
pub fn mean_execution_duration_micros(&self) -> Option<f64> {
if self.job_execution_duration_count == 0 {
None
} else {
Some(self.job_execution_duration_sum_micros as f64 / self.job_execution_duration_count as f64)
}
}
pub fn mean_execution_duration(&self) -> Option<Duration> {
self
.mean_execution_duration_micros()
.map(|micros| Duration::from_micros(micros as u64)) }
pub fn mean_queue_wait_duration_micros(&self) -> Option<f64> {
if self.job_queue_wait_duration_count == 0 {
None
} else {
Some(
self.job_queue_wait_duration_sum_micros as f64 / self.job_queue_wait_duration_count as f64,
)
}
}
pub fn mean_queue_wait_duration(&self) -> Option<Duration> {
self
.mean_queue_wait_duration_micros()
.map(|micros| Duration::from_micros(micros as u64))
}
}