use std::collections::VecDeque;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use crate::types::JobState;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobSample {
pub category: String,
pub state: JobState,
pub duration_secs: f64,
pub completed_at: u64,
pub priority: u8,
pub output_bytes: u64,
}
impl JobSample {
#[must_use]
pub fn now(category: impl Into<String>, state: JobState, duration_secs: f64) -> Self {
let completed_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
Self {
category: category.into(),
state,
duration_secs,
completed_at,
priority: 1,
output_bytes: 0,
}
}
#[must_use]
pub fn is_success(&self) -> bool {
matches!(self.state, JobState::Completed)
}
#[must_use]
pub fn is_failure(&self) -> bool {
matches!(self.state, JobState::Failed)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Window {
LastMinute,
LastHour,
LastDay,
Custom(u64),
}
impl Window {
#[must_use]
pub fn secs(self) -> u64 {
match self {
Self::LastMinute => 60,
Self::LastHour => 3_600,
Self::LastDay => 86_400,
Self::Custom(s) => s,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct WindowMetrics {
pub total_jobs: usize,
pub completed: usize,
pub failed: usize,
pub cancelled: usize,
pub mean_duration_secs: f64,
pub p50_duration_secs: f64,
pub p95_duration_secs: f64,
pub p99_duration_secs: f64,
pub max_duration_secs: f64,
pub failure_rate: f64,
pub total_output_bytes: u64,
pub throughput_per_sec: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TrendDirection {
Improving,
Stable,
Degrading,
Unknown,
}
#[derive(Debug, Clone)]
pub struct SlaReport {
pub within_sla: usize,
pub breached_sla: usize,
pub compliance_rate: f64,
pub threshold_secs: f64,
}
#[derive(Debug, Clone)]
pub struct AnalyticsConfig {
pub max_samples: usize,
pub sla_target_secs: Option<f64>,
}
impl Default for AnalyticsConfig {
fn default() -> Self {
Self {
max_samples: 100_000,
sla_target_secs: None,
}
}
}
pub struct BatchAnalytics {
samples: parking_lot::Mutex<VecDeque<JobSample>>,
config: AnalyticsConfig,
}
impl BatchAnalytics {
#[must_use]
pub fn new(config: AnalyticsConfig) -> Self {
Self {
samples: parking_lot::Mutex::new(VecDeque::new()),
config,
}
}
pub fn ingest(&self, sample: JobSample) {
let mut ring = self.samples.lock();
ring.push_back(sample);
while ring.len() > self.config.max_samples {
ring.pop_front();
}
}
pub fn ingest_batch(&self, samples: impl IntoIterator<Item = JobSample>) {
let mut ring = self.samples.lock();
for s in samples {
ring.push_back(s);
}
while ring.len() > self.config.max_samples {
ring.pop_front();
}
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs()
}
fn samples_in_window(&self, window: Window) -> Vec<JobSample> {
let now = Self::now_secs();
let since = now.saturating_sub(window.secs());
let ring = self.samples.lock();
ring.iter()
.filter(|s| s.completed_at >= since)
.cloned()
.collect()
}
fn samples_in_prev_window(&self, window: Window) -> Vec<JobSample> {
let now = Self::now_secs();
let width = window.secs();
let end = now.saturating_sub(width);
let start = end.saturating_sub(width);
let ring = self.samples.lock();
ring.iter()
.filter(|s| s.completed_at >= start && s.completed_at < end)
.cloned()
.collect()
}
#[must_use]
pub fn metrics(&self, window: Window) -> WindowMetrics {
let samples = self.samples_in_window(window);
Self::compute_metrics(&samples, window.secs())
}
fn compute_metrics(samples: &[JobSample], window_secs: u64) -> WindowMetrics {
if samples.is_empty() {
return WindowMetrics::default();
}
let total_jobs = samples.len();
let completed = samples.iter().filter(|s| s.is_success()).count();
let failed = samples.iter().filter(|s| s.is_failure()).count();
let cancelled = samples
.iter()
.filter(|s| matches!(s.state, JobState::Cancelled))
.count();
let total_output_bytes: u64 = samples.iter().map(|s| s.output_bytes).sum();
let mut durations: Vec<f64> = samples.iter().map(|s| s.duration_secs).collect();
durations.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let mean_duration_secs = durations.iter().sum::<f64>() / total_jobs as f64;
let p50 = percentile(&durations, 0.50);
let p95 = percentile(&durations, 0.95);
let p99 = percentile(&durations, 0.99);
let max_duration_secs = durations.last().copied().unwrap_or(0.0);
let failure_rate = if total_jobs > 0 {
failed as f64 / total_jobs as f64
} else {
0.0
};
let throughput_per_sec = if window_secs > 0 {
completed as f64 / window_secs as f64
} else {
0.0
};
WindowMetrics {
total_jobs,
completed,
failed,
cancelled,
mean_duration_secs,
p50_duration_secs: p50,
p95_duration_secs: p95,
p99_duration_secs: p99,
max_duration_secs,
failure_rate,
total_output_bytes,
throughput_per_sec,
}
}
#[must_use]
pub fn sla_report(&self, window: Window, threshold_secs: f64) -> SlaReport {
let samples = self.samples_in_window(window);
let total = samples.len();
if total == 0 {
return SlaReport {
within_sla: 0,
breached_sla: 0,
compliance_rate: 1.0,
threshold_secs,
};
}
let within_sla = samples
.iter()
.filter(|s| s.is_success() && s.duration_secs <= threshold_secs)
.count();
let breached_sla = total - within_sla;
let compliance_rate = within_sla as f64 / total as f64;
SlaReport {
within_sla,
breached_sla,
compliance_rate,
threshold_secs,
}
}
#[must_use]
pub fn throughput_trend(&self, window: Window) -> TrendDirection {
let current = self.metrics(window);
let prev_samples = self.samples_in_prev_window(window);
if prev_samples.is_empty() {
return TrendDirection::Unknown;
}
let prev = Self::compute_metrics(&prev_samples, window.secs());
let curr_tp = current.throughput_per_sec;
let prev_tp = prev.throughput_per_sec;
if prev_tp == 0.0 {
if curr_tp > 0.0 {
return TrendDirection::Improving;
}
return TrendDirection::Unknown;
}
let ratio = curr_tp / prev_tp;
if ratio >= 1.05 {
TrendDirection::Improving
} else if ratio <= 0.95 {
TrendDirection::Degrading
} else {
TrendDirection::Stable
}
}
#[must_use]
pub fn failure_breakdown(
&self,
window: Window,
) -> std::collections::HashMap<String, (usize, usize)> {
let samples = self.samples_in_window(window);
let mut map: std::collections::HashMap<String, (usize, usize)> =
std::collections::HashMap::new();
for s in &samples {
let entry = map.entry(s.category.clone()).or_insert((0, 0));
entry.1 += 1; if s.is_failure() {
entry.0 += 1; }
}
map
}
#[must_use]
pub fn top_categories(&self, window: Window, n: usize) -> Vec<(String, usize)> {
let samples = self.samples_in_window(window);
let mut map: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
for s in &samples {
*map.entry(s.category.clone()).or_insert(0) += 1;
}
let mut v: Vec<_> = map.into_iter().collect();
v.sort_by(|a, b| b.1.cmp(&a.1));
v.truncate(n);
v
}
#[must_use]
pub fn sample_count(&self) -> usize {
self.samples.lock().len()
}
pub fn clear(&self) {
self.samples.lock().clear();
}
}
fn percentile(sorted: &[f64], p: f64) -> f64 {
if sorted.is_empty() {
return 0.0;
}
if sorted.len() == 1 {
return sorted[0];
}
let idx = p * (sorted.len() - 1) as f64;
let lo = idx.floor() as usize;
let hi = idx.ceil() as usize;
let frac = idx - lo as f64;
sorted[lo] + frac * (sorted[hi] - sorted[lo])
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::JobState;
fn make_sample(state: JobState, duration: f64) -> JobSample {
JobSample::now("test", state, duration)
}
#[test]
fn test_percentile_single_element() {
assert!((percentile(&[5.0], 0.5) - 5.0).abs() < 1e-9);
}
#[test]
fn test_percentile_sorted_range() {
let data: Vec<f64> = (1..=100).map(|i| i as f64).collect();
let p50 = percentile(&data, 0.50);
assert!(p50 >= 49.0 && p50 <= 51.0, "p50={p50}");
let p99 = percentile(&data, 0.99);
assert!(p99 >= 97.0 && p99 <= 100.0, "p99={p99}");
}
#[test]
fn test_percentile_empty_returns_zero() {
assert_eq!(percentile(&[], 0.5), 0.0);
}
#[test]
fn test_ingest_and_sample_count() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
a.ingest(make_sample(JobState::Completed, 1.0));
a.ingest(make_sample(JobState::Failed, 0.5));
assert_eq!(a.sample_count(), 2);
}
#[test]
fn test_ingest_batch() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
let samples: Vec<_> = (0..10)
.map(|_| make_sample(JobState::Completed, 2.0))
.collect();
a.ingest_batch(samples);
assert_eq!(a.sample_count(), 10);
}
#[test]
fn test_ring_buffer_max_capacity() {
let cfg = AnalyticsConfig {
max_samples: 3,
sla_target_secs: None,
};
let a = BatchAnalytics::new(cfg);
for _ in 0..10 {
a.ingest(make_sample(JobState::Completed, 1.0));
}
assert_eq!(a.sample_count(), 3);
}
#[test]
fn test_metrics_all_successful() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
a.ingest(make_sample(JobState::Completed, 10.0));
a.ingest(make_sample(JobState::Completed, 20.0));
let m = a.metrics(Window::LastHour);
assert_eq!(m.total_jobs, 2);
assert_eq!(m.completed, 2);
assert_eq!(m.failed, 0);
assert!((m.failure_rate).abs() < 1e-9);
}
#[test]
fn test_metrics_failure_rate() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
a.ingest(make_sample(JobState::Completed, 5.0));
a.ingest(make_sample(JobState::Failed, 1.0));
let m = a.metrics(Window::LastHour);
assert!((m.failure_rate - 0.5).abs() < 1e-4);
}
#[test]
fn test_metrics_empty_window() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
let m = a.metrics(Window::LastMinute);
assert_eq!(m.total_jobs, 0);
assert!((m.failure_rate).abs() < 1e-9);
}
#[test]
fn test_metrics_duration_percentiles() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
for i in 1..=100 {
a.ingest(make_sample(JobState::Completed, i as f64));
}
let m = a.metrics(Window::LastHour);
assert!(m.p50_duration_secs >= 49.0 && m.p50_duration_secs <= 51.0);
assert!(m.max_duration_secs >= 99.0 && m.max_duration_secs <= 101.0);
}
#[test]
fn test_sla_all_within() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
a.ingest(make_sample(JobState::Completed, 5.0));
a.ingest(make_sample(JobState::Completed, 3.0));
let report = a.sla_report(Window::LastHour, 10.0);
assert_eq!(report.within_sla, 2);
assert!((report.compliance_rate - 1.0).abs() < 1e-9);
}
#[test]
fn test_sla_some_breached() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
a.ingest(make_sample(JobState::Completed, 5.0));
a.ingest(make_sample(JobState::Completed, 15.0)); a.ingest(make_sample(JobState::Failed, 1.0)); let report = a.sla_report(Window::LastHour, 10.0);
assert_eq!(report.within_sla, 1);
assert_eq!(report.breached_sla, 2);
assert!((report.compliance_rate - 1.0 / 3.0).abs() < 1e-4);
}
#[test]
fn test_sla_empty_window_full_compliance() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
let report = a.sla_report(Window::LastHour, 30.0);
assert!((report.compliance_rate - 1.0).abs() < 1e-9);
}
#[test]
fn test_failure_breakdown_per_category() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
let mut s1 = make_sample(JobState::Completed, 5.0);
s1.category = "encode".into();
let mut s2 = make_sample(JobState::Failed, 1.0);
s2.category = "encode".into();
let mut s3 = make_sample(JobState::Failed, 2.0);
s3.category = "ingest".into();
a.ingest(s1);
a.ingest(s2);
a.ingest(s3);
let breakdown = a.failure_breakdown(Window::LastHour);
let (enc_fail, enc_total) = breakdown["encode"];
assert_eq!(enc_total, 2);
assert_eq!(enc_fail, 1);
let (ing_fail, ing_total) = breakdown["ingest"];
assert_eq!(ing_total, 1);
assert_eq!(ing_fail, 1);
}
#[test]
fn test_top_categories_sorted_descending() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
for _ in 0..5 {
let mut s = make_sample(JobState::Completed, 1.0);
s.category = "encode".into();
a.ingest(s);
}
for _ in 0..2 {
let mut s = make_sample(JobState::Completed, 1.0);
s.category = "ingest".into();
a.ingest(s);
}
let top = a.top_categories(Window::LastHour, 2);
assert_eq!(top[0].0, "encode");
assert_eq!(top[0].1, 5);
assert_eq!(top[1].0, "ingest");
}
#[test]
fn test_trend_unknown_when_no_prior_data() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
a.ingest(make_sample(JobState::Completed, 1.0));
assert_eq!(
a.throughput_trend(Window::LastMinute),
TrendDirection::Unknown
);
}
#[test]
fn test_clear_empties_buffer() {
let a = BatchAnalytics::new(AnalyticsConfig::default());
a.ingest(make_sample(JobState::Completed, 1.0));
a.clear();
assert_eq!(a.sample_count(), 0);
let m = a.metrics(Window::LastHour);
assert_eq!(m.total_jobs, 0);
}
#[test]
fn test_job_sample_is_success_failure() {
let s_ok = make_sample(JobState::Completed, 1.0);
let s_fail = make_sample(JobState::Failed, 0.5);
let s_cancel = make_sample(JobState::Cancelled, 0.1);
assert!(s_ok.is_success());
assert!(!s_ok.is_failure());
assert!(s_fail.is_failure());
assert!(!s_cancel.is_success());
assert!(!s_cancel.is_failure());
}
}