use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub enum Phase {
#[default]
Idle,
Hashing,
Transfer,
}
impl Phase {
const fn as_u8(self) -> u8 {
match self {
Phase::Idle => 0,
Phase::Hashing => 1,
Phase::Transfer => 2,
}
}
const fn from_u8(value: u8) -> Self {
match value {
1 => Phase::Hashing,
2 => Phase::Transfer,
_ => Phase::Idle,
}
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct MeterSnapshot {
pub bytes_in: u64,
pub bytes_out: u64,
pub objects_done: u64,
pub objects_total: u64,
pub objects_skipped: u64,
pub in_flight: u64,
pub phase: Phase,
pub current_limit: u64,
pub target_rate: u64,
}
#[derive(Debug, Default)]
pub struct Meter {
bytes_in: AtomicU64,
bytes_out: AtomicU64,
objects_done: AtomicU64,
objects_total: AtomicU64,
objects_skipped: AtomicU64,
in_flight: AtomicU64,
phase: AtomicU8,
current_limit: AtomicU64,
target_rate: AtomicU64,
}
impl Meter {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn add_in(&self, n: u64) {
self.bytes_in.fetch_add(n, Ordering::Relaxed);
}
pub fn add_out(&self, n: u64) {
self.bytes_out.fetch_add(n, Ordering::Relaxed);
}
pub fn object_started(&self) {
self.in_flight.fetch_add(1, Ordering::Relaxed);
}
pub fn object_finished(&self) {
let mut current = self.in_flight.load(Ordering::Relaxed);
while current > 0 {
match self.in_flight.compare_exchange_weak(
current,
current - 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
}
self.objects_done.fetch_add(1, Ordering::Relaxed);
}
pub fn set_total(&self, n: u64) {
self.objects_total.store(n, Ordering::Relaxed);
}
pub fn add_skipped(&self, n: u64) {
self.objects_skipped.fetch_add(n, Ordering::Relaxed);
}
pub fn set_current_limit(&self, n: u64) {
self.current_limit.store(n, Ordering::Relaxed);
}
pub fn set_target_rate(&self, n: u64) {
self.target_rate.store(n, Ordering::Relaxed);
}
pub fn set_phase(&self, p: Phase) {
self.phase.store(p.as_u8(), Ordering::Relaxed);
}
#[must_use]
pub fn phase(&self) -> Phase {
Phase::from_u8(self.phase.load(Ordering::Relaxed))
}
#[must_use]
pub fn snapshot(&self) -> MeterSnapshot {
MeterSnapshot {
bytes_in: self.bytes_in.load(Ordering::Relaxed),
bytes_out: self.bytes_out.load(Ordering::Relaxed),
objects_done: self.objects_done.load(Ordering::Relaxed),
objects_total: self.objects_total.load(Ordering::Relaxed),
objects_skipped: self.objects_skipped.load(Ordering::Relaxed),
in_flight: self.in_flight.load(Ordering::Relaxed),
phase: self.phase(),
current_limit: self.current_limit.load(Ordering::Relaxed),
target_rate: self.target_rate.load(Ordering::Relaxed),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn progress_meter_counters_and_snapshot() {
let meter = Meter::new();
let initial = meter.snapshot();
assert_eq!(initial.bytes_in, 0);
assert_eq!(initial.bytes_out, 0);
assert_eq!(initial.objects_done, 0);
assert_eq!(initial.objects_total, 0);
assert_eq!(initial.objects_skipped, 0);
assert_eq!(initial.in_flight, 0);
assert_eq!(initial.phase, Phase::Idle);
meter.add_in(100);
meter.add_in(23);
meter.add_out(7);
meter.set_total(10);
meter.add_skipped(2);
meter.add_skipped(1);
meter.set_phase(Phase::Hashing);
meter.object_started();
meter.object_started();
meter.object_finished();
let snap = meter.snapshot();
assert_eq!(snap.bytes_in, 123);
assert_eq!(snap.bytes_out, 7);
assert_eq!(snap.objects_done, 1);
assert_eq!(snap.objects_total, 10);
assert_eq!(snap.objects_skipped, 3);
assert_eq!(snap.in_flight, 1);
assert_eq!(snap.phase, Phase::Hashing);
for p in [Phase::Idle, Phase::Hashing, Phase::Transfer] {
meter.set_phase(p);
assert_eq!(meter.phase(), p);
assert_eq!(meter.snapshot().phase, p);
}
}
#[test]
fn progress_meter_in_flight_gauge() {
let meter = Meter::new();
meter.object_started();
meter.object_started();
meter.object_started();
meter.object_finished();
meter.object_finished();
let snap = meter.snapshot();
assert_eq!(snap.in_flight, 1, "3 started - 2 finished == 1 in flight");
assert_eq!(snap.objects_done, 2, "2 finished == 2 done");
meter.object_finished();
meter.object_finished();
let snap = meter.snapshot();
assert_eq!(snap.in_flight, 0, "gauge saturates at 0, no underflow");
assert_eq!(snap.objects_done, 4);
}
#[test]
fn resources_meter_adaptive_fields_round_trip() {
let meter = Meter::new();
let initial = meter.snapshot();
assert_eq!(initial.current_limit, 0);
assert_eq!(initial.target_rate, 0);
meter.set_current_limit(5_000_000);
meter.set_target_rate(8_000_000);
let snap = meter.snapshot();
assert_eq!(snap.current_limit, 5_000_000);
assert_eq!(snap.target_rate, 8_000_000);
meter.set_current_limit(0);
let snap = meter.snapshot();
assert_eq!(snap.current_limit, 0);
assert_eq!(snap.target_rate, 8_000_000, "target unchanged");
}
}