use std::time::Duration;
pub const DEFAULT_EMIT_THRESHOLD_RATIO: f32 = 2.0;
pub const DEFAULT_DECAY_HALF_LIFE_SECS: u64 = 30 * 60;
pub const MIN_EMIT_THRESHOLD_RATIO: f32 = 1.01;
pub const MAX_EMIT_THRESHOLD_RATIO: f32 = 10.0;
pub const DEFAULT_NORMALIZATION_REFERENCE_RATE: f32 = 1000.0;
pub const MIN_NORMALIZATION_REFERENCE_RATE: f32 = 1.5;
#[derive(Debug, Clone)]
pub struct DataGravityPolicy {
pub enabled: bool,
pub emit_threshold_ratio: f32,
pub decay_half_life: Duration,
pub normalization_reference_rate: f32,
}
impl Default for DataGravityPolicy {
fn default() -> Self {
Self {
enabled: true,
emit_threshold_ratio: DEFAULT_EMIT_THRESHOLD_RATIO,
decay_half_life: Duration::from_secs(DEFAULT_DECAY_HALF_LIFE_SECS),
normalization_reference_rate: DEFAULT_NORMALIZATION_REFERENCE_RATE,
}
}
}
impl DataGravityPolicy {
pub fn new() -> Self {
Self::default()
}
pub fn with_enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
pub fn with_emit_threshold_ratio(mut self, ratio: f32) -> Self {
self.emit_threshold_ratio = ratio;
self
}
pub fn with_decay_half_life(mut self, half_life: Duration) -> Self {
self.decay_half_life = half_life;
self
}
pub fn with_normalization_reference_rate(mut self, reference: f32) -> Self {
self.normalization_reference_rate = reference;
self
}
pub fn normalize_rate_for_wire(&self, rate: f64) -> f64 {
if !rate.is_finite() || rate <= 0.0 {
return 0.0;
}
let reference = self
.normalization_reference_rate
.max(MIN_NORMALIZATION_REFERENCE_RATE) as f64;
let denom = reference.ln_1p();
if denom <= 0.0 {
return 0.0;
}
(rate.ln_1p() / denom).clamp(0.0, 1.0)
}
pub fn validate(&self) -> Result<(), DataGravityPolicyError> {
if !self.emit_threshold_ratio.is_finite()
|| self.emit_threshold_ratio < MIN_EMIT_THRESHOLD_RATIO
|| self.emit_threshold_ratio > MAX_EMIT_THRESHOLD_RATIO
{
return Err(DataGravityPolicyError::EmitThresholdOutOfRange {
got: self.emit_threshold_ratio,
min: MIN_EMIT_THRESHOLD_RATIO,
max: MAX_EMIT_THRESHOLD_RATIO,
});
}
if self.decay_half_life.is_zero() {
return Err(DataGravityPolicyError::DecayHalfLifeZero);
}
if !self.normalization_reference_rate.is_finite()
|| self.normalization_reference_rate < MIN_NORMALIZATION_REFERENCE_RATE
{
return Err(DataGravityPolicyError::NormalizationReferenceTooLow {
got: self.normalization_reference_rate,
min: MIN_NORMALIZATION_REFERENCE_RATE,
});
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum EmissionDecision {
Suppress,
Emit {
rate: f64,
},
Withdraw,
}
pub fn should_emit_heat(
current_rate: f64,
last_emitted: Option<f64>,
policy: &DataGravityPolicy,
) -> EmissionDecision {
if !policy.enabled {
return EmissionDecision::Suppress;
}
if !current_rate.is_finite() || current_rate < 0.0 {
return EmissionDecision::Suppress;
}
let ratio = policy.emit_threshold_ratio as f64;
match (last_emitted, current_rate) {
(None, r) if r > 0.0 => EmissionDecision::Emit { rate: r },
(None, _) => EmissionDecision::Suppress,
(Some(prev), 0.0) if prev > 0.0 => EmissionDecision::Withdraw,
(Some(_), 0.0) => EmissionDecision::Suppress,
(Some(prev), r) => {
let prev_safe = prev.is_normal() && prev > f64::EPSILON;
let r_safe = r.is_normal() && r > f64::EPSILON;
let bootstrap = !prev_safe || !r_safe;
let crossed_threshold = !bootstrap && ((r / prev) >= ratio || (prev / r) >= ratio);
if bootstrap || crossed_threshold {
EmissionDecision::Emit { rate: r }
} else {
EmissionDecision::Suppress
}
}
}
}
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum DataGravityPolicyError {
#[error("data-gravity emit_threshold_ratio {got} outside [{min}, {max}] or non-finite")]
EmitThresholdOutOfRange {
got: f32,
min: f32,
max: f32,
},
#[error("data-gravity decay_half_life must be non-zero")]
DecayHalfLifeZero,
#[error("data-gravity requires greedy to be enabled first")]
GreedyNotEnabled,
#[error("data-gravity normalization_reference_rate {got} below floor {min} or non-finite")]
NormalizationReferenceTooLow {
got: f32,
min: f32,
},
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_policy_validates() {
DataGravityPolicy::default()
.validate()
.expect("defaults must validate");
}
#[test]
fn emit_threshold_below_min_rejected() {
let p = DataGravityPolicy::default().with_emit_threshold_ratio(1.0);
let err = p.validate().expect_err("ratio 1.0 must reject");
assert!(matches!(
err,
DataGravityPolicyError::EmitThresholdOutOfRange { .. }
));
}
#[test]
fn emit_threshold_above_max_rejected() {
let p = DataGravityPolicy::default().with_emit_threshold_ratio(20.0);
let err = p.validate().expect_err("ratio 20.0 must reject");
assert!(matches!(
err,
DataGravityPolicyError::EmitThresholdOutOfRange { .. }
));
}
#[test]
fn emit_threshold_nan_rejected() {
let p = DataGravityPolicy::default().with_emit_threshold_ratio(f32::NAN);
let err = p.validate().expect_err("NaN ratio must reject");
assert!(matches!(
err,
DataGravityPolicyError::EmitThresholdOutOfRange { .. }
));
}
#[test]
fn decay_half_life_zero_rejected() {
let p = DataGravityPolicy::default().with_decay_half_life(Duration::ZERO);
let err = p.validate().expect_err("zero half-life must reject");
assert!(matches!(err, DataGravityPolicyError::DecayHalfLifeZero));
}
#[test]
fn normalization_reference_too_low_rejected() {
let p = DataGravityPolicy::default().with_normalization_reference_rate(1.0);
let err = p.validate().expect_err("reference 1.0 must reject");
assert!(matches!(
err,
DataGravityPolicyError::NormalizationReferenceTooLow { .. }
));
}
#[test]
fn should_emit_heat_handles_subnormal_prev() {
let p = policy();
let decision = should_emit_heat(1.0, Some(f64::MIN_POSITIVE), &p);
match decision {
EmissionDecision::Emit { rate } => assert!((rate - 1.0).abs() < 1e-9),
other => panic!("subnormal prev must route to Emit, got {:?}", other),
}
let decision = should_emit_heat(f64::MIN_POSITIVE, Some(1.0), &p);
assert!(matches!(decision, EmissionDecision::Emit { .. }));
let decision = should_emit_heat(1.0, Some(f64::EPSILON), &p);
assert!(matches!(decision, EmissionDecision::Emit { .. }));
}
#[test]
fn normalize_rate_for_wire_log_scale_has_dynamic_range() {
let p = DataGravityPolicy::default();
let v_1 = p.normalize_rate_for_wire(1.0);
let v_10 = p.normalize_rate_for_wire(10.0);
let v_100 = p.normalize_rate_for_wire(100.0);
let v_1000 = p.normalize_rate_for_wire(1000.0);
assert!(v_1 < v_10);
assert!(v_10 < v_100);
assert!(v_100 < v_1000);
assert!(v_1000 >= 0.99);
assert!(v_100 - v_10 > 0.20);
assert_eq!(p.normalize_rate_for_wire(0.0), 0.0);
assert_eq!(p.normalize_rate_for_wire(-5.0), 0.0);
assert_eq!(p.normalize_rate_for_wire(f64::NAN), 0.0);
assert_eq!(p.normalize_rate_for_wire(1.0e9), 1.0);
}
fn policy() -> DataGravityPolicy {
DataGravityPolicy::default()
}
#[test]
fn disabled_policy_always_suppresses() {
let p = policy().with_enabled(false);
assert_eq!(
should_emit_heat(10.0, Some(1.0), &p),
EmissionDecision::Suppress
);
}
#[test]
fn first_emission_fires_when_heat_present() {
let p = policy();
match should_emit_heat(5.0, None, &p) {
EmissionDecision::Emit { rate } => assert_eq!(rate, 5.0),
other => panic!("expected Emit, got {other:?}"),
}
}
#[test]
fn first_emission_suppressed_with_zero_rate() {
let p = policy();
assert_eq!(should_emit_heat(0.0, None, &p), EmissionDecision::Suppress);
}
#[test]
fn doubled_rate_emits() {
let p = policy();
match should_emit_heat(20.0, Some(10.0), &p) {
EmissionDecision::Emit { rate } => assert_eq!(rate, 20.0),
other => panic!("expected Emit, got {other:?}"),
}
}
#[test]
fn halved_rate_emits() {
let p = policy();
match should_emit_heat(5.0, Some(10.0), &p) {
EmissionDecision::Emit { rate } => assert_eq!(rate, 5.0),
other => panic!("expected Emit, got {other:?}"),
}
}
#[test]
fn sub_threshold_change_suppresses() {
let p = policy();
assert_eq!(
should_emit_heat(15.0, Some(10.0), &p),
EmissionDecision::Suppress
);
}
#[test]
fn decay_to_zero_emits_withdrawal() {
let p = policy();
assert_eq!(
should_emit_heat(0.0, Some(5.0), &p),
EmissionDecision::Withdraw
);
}
#[test]
fn already_withdrawn_suppresses() {
let p = policy();
assert_eq!(
should_emit_heat(0.0, Some(0.0), &p),
EmissionDecision::Suppress
);
}
#[test]
fn negative_rate_suppresses_defensively() {
let p = policy();
assert_eq!(
should_emit_heat(-1.0, Some(1.0), &p),
EmissionDecision::Suppress
);
}
#[test]
fn non_finite_rate_suppresses_defensively() {
let p = policy();
assert_eq!(
should_emit_heat(f64::NAN, Some(1.0), &p),
EmissionDecision::Suppress
);
assert_eq!(
should_emit_heat(f64::INFINITY, Some(1.0), &p),
EmissionDecision::Suppress
);
}
#[test]
fn higher_threshold_suppresses_doubled_rate() {
let p = policy().with_emit_threshold_ratio(3.0);
assert_eq!(
should_emit_heat(20.0, Some(10.0), &p),
EmissionDecision::Suppress
);
match should_emit_heat(30.0, Some(10.0), &p) {
EmissionDecision::Emit { rate } => assert_eq!(rate, 30.0),
other => panic!("expected Emit, got {other:?}"),
}
}
}