use std::collections::VecDeque;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::Notify;
const BUDGET_FRACTION: f64 = 1.0;
pub(crate) const MIN_BUDGET_PER_PARTITION: usize = 4 * 1024 * 1024 * 1024;
pub(crate) const MAX_BUDGET_PER_PARTITION: usize = 1024 * 1024 * 1024 * 1024;
const ENV_BUDGET_MIN: &str = "RERUN_PIPELINE_BUDGET_MIN";
const ENV_BUDGET_MAX: &str = "RERUN_PIPELINE_BUDGET_MAX";
const ENV_BUDGET_FRACTION: &str = "RERUN_PIPELINE_BUDGET_FRACTION";
const INITIAL_ESTIMATE_MULTIPLIER: f64 = 1.5;
const ESTIMATE_EMA_ALPHA: f64 = 0.2;
const MIN_ESTIMATE_MULTIPLIER: f64 = 1.0;
const MAX_ESTIMATE_MULTIPLIER: f64 = 3.0;
pub(crate) struct PipelineBudget {
budget: usize,
current: std::sync::atomic::AtomicUsize,
wait_queue: Mutex<VecDeque<Arc<Notify>>>,
estimate_multiplier: std::sync::atomic::AtomicU64,
peak_current: std::sync::atomic::AtomicUsize,
total_released_bytes: std::sync::atomic::AtomicUsize,
total_releases: std::sync::atomic::AtomicU64,
#[cfg(test)]
test_pause_hook: parking_lot::Mutex<Option<TestPauseHook>>,
}
#[cfg(test)]
#[derive(Clone)]
struct TestPauseHook {
arrived: Arc<Notify>,
resume: Arc<Notify>,
}
fn read_env_trimmed(key: &str) -> Option<String> {
match std::env::var(key) {
Ok(v) => {
let trimmed = v.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_owned())
}
}
Err(std::env::VarError::NotPresent) => None,
Err(std::env::VarError::NotUnicode(_)) => {
re_log::error!("{key}: value is not valid Unicode; using default");
None
}
}
}
fn parse_bytes_or_default(key: &str, raw: &str, default_bytes: usize) -> usize {
let parsed = re_format::parse_bytes(raw).or_else(|| raw.parse::<i64>().ok());
match parsed {
Some(n) if n > 0 => n as usize,
Some(_) => {
re_log::error!(
"{key}={raw:?} must be > 0; falling back to default {}",
re_format::format_bytes(default_bytes as f64),
);
default_bytes
}
None => {
re_log::error!(
"{key}={raw:?} could not be parsed as a byte size (e.g. \"64MB\", \"1GiB\", \
or a bare integer number of bytes); falling back to default {}",
re_format::format_bytes(default_bytes as f64),
);
default_bytes
}
}
}
fn parse_fraction_or_default(key: &str, raw: &str, default: f64) -> f64 {
match raw.parse::<f64>() {
Ok(f) if f.is_finite() && f > 0.0 && f <= 1.0 => f,
Ok(f) => {
re_log::error!(
"{key}={raw:?} must be a finite value in (0.0, 1.0], got {f}; \
falling back to default {default}",
);
default
}
Err(err) => {
re_log::error!(
"{key}={raw:?} could not be parsed as a float ({err}); \
falling back to default {default}",
);
default
}
}
}
fn read_env_bytes(key: &str, default_bytes: usize) -> usize {
match read_env_trimmed(key) {
Some(raw) => parse_bytes_or_default(key, &raw, default_bytes),
None => default_bytes,
}
}
fn read_env_fraction(key: &str, default: f64) -> f64 {
match read_env_trimmed(key) {
Some(raw) => parse_fraction_or_default(key, &raw, default),
None => default,
}
}
impl PipelineBudget {
pub(crate) fn new(total_uncompressed_estimate: usize, num_partitions: usize) -> Self {
let fraction = read_env_fraction(ENV_BUDGET_FRACTION, BUDGET_FRACTION);
let mut min_per_partition = read_env_bytes(ENV_BUDGET_MIN, MIN_BUDGET_PER_PARTITION);
let mut max_per_partition = read_env_bytes(ENV_BUDGET_MAX, MAX_BUDGET_PER_PARTITION);
if min_per_partition > max_per_partition {
re_log::error!(
"{ENV_BUDGET_MIN} ({}) must not exceed {ENV_BUDGET_MAX} ({}); \
falling back to defaults for both.",
re_format::format_bytes(min_per_partition as f64),
re_format::format_bytes(max_per_partition as f64),
);
min_per_partition = MIN_BUDGET_PER_PARTITION;
max_per_partition = MAX_BUDGET_PER_PARTITION;
}
#[expect(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let per_partition = ((total_uncompressed_estimate as f64 * fraction)
/ num_partitions.max(1) as f64) as usize;
let budget = per_partition.clamp(min_per_partition, max_per_partition) * num_partitions;
re_log::debug!("Pipeline budget: {}MB", budget / (1024 * 1024));
Self {
budget,
current: std::sync::atomic::AtomicUsize::new(0),
wait_queue: Mutex::new(VecDeque::new()),
estimate_multiplier: std::sync::atomic::AtomicU64::new(
INITIAL_ESTIMATE_MULTIPLIER.to_bits(),
),
peak_current: std::sync::atomic::AtomicUsize::new(0),
total_released_bytes: std::sync::atomic::AtomicUsize::new(0),
total_releases: std::sync::atomic::AtomicU64::new(0),
#[cfg(test)]
test_pause_hook: parking_lot::Mutex::new(None),
}
}
#[cfg(test)]
pub(crate) fn total_releases(&self) -> u64 {
self.total_releases
.load(std::sync::atomic::Ordering::Acquire)
}
fn current_multiplier(&self) -> f64 {
f64::from_bits(
self.estimate_multiplier
.load(std::sync::atomic::Ordering::Acquire),
)
}
#[cfg(test)]
fn set_multiplier(&self, multiplier: f64) {
self.estimate_multiplier
.store(multiplier.to_bits(), std::sync::atomic::Ordering::Release);
}
#[cfg(test)]
fn arm_pause_hook(&self) -> TestPauseHook {
let hook = TestPauseHook {
arrived: Arc::new(Notify::new()),
resume: Arc::new(Notify::new()),
};
*self.test_pause_hook.lock() = Some(hook.clone());
hook
}
fn record_actual_sample(&self, estimated: usize, actual: usize) {
use std::sync::atomic::Ordering::{AcqRel, Acquire};
if estimated == 0 {
return;
}
let observed = ((actual as f64) / (estimated as f64))
.clamp(MIN_ESTIMATE_MULTIPLIER, MAX_ESTIMATE_MULTIPLIER);
self.estimate_multiplier
.fetch_update(AcqRel, Acquire, |bits| {
let curr = f64::from_bits(bits);
let next = ESTIMATE_EMA_ALPHA * observed + (1.0 - ESTIMATE_EMA_ALPHA) * curr;
let next = next.clamp(MIN_ESTIMATE_MULTIPLIER, MAX_ESTIMATE_MULTIPLIER);
Some(next.to_bits())
})
.expect("closure always returns Some");
}
fn wake_next(&self) {
if let Some(notify) = self.wait_queue.lock().pop_front() {
notify.notify_one();
}
}
fn try_acquire(&self, reserved_bytes: usize) -> Option<usize> {
use std::sync::atomic::Ordering::{AcqRel, Acquire};
let mut cur = self.current.load(Acquire);
loop {
let next = cur + reserved_bytes;
if next > self.budget {
return None;
}
match self
.current
.compare_exchange_weak(cur, next, AcqRel, Acquire)
{
Ok(_) => {
self.peak_current.fetch_max(next, AcqRel);
return Some(next);
}
Err(actual) => cur = actual,
}
}
}
#[expect(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
pub(crate) async fn reserve(&self, estimated_bytes: usize) -> usize {
use std::sync::atomic::Ordering::AcqRel;
let reserved_bytes = ((estimated_bytes as f64) * self.current_multiplier()) as usize;
if reserved_bytes > self.budget {
re_log::warn!(
"Single fetch reservation ({}MB, raw estimate {}MB) exceeds entire \
pipeline budget ({}MB across all partitions) — allowing it through \
to avoid deadlock.",
reserved_bytes / (1024 * 1024),
estimated_bytes / (1024 * 1024),
self.budget / (1024 * 1024),
);
let new_cur = self.current.fetch_add(reserved_bytes, AcqRel) + reserved_bytes;
self.peak_current.fetch_max(new_cur, AcqRel);
return reserved_bytes;
}
let mut wait_count: u32 = 0;
loop {
if let Some(new_cur) = self.try_acquire(reserved_bytes) {
if new_cur < self.budget {
self.wake_next();
}
if wait_count > 0 {
re_log::debug!(
"Budget reserve succeeded after {wait_count} waits: \
reserved {}MB, current {}MB / {}MB",
reserved_bytes / (1024 * 1024),
new_cur / (1024 * 1024),
self.budget / (1024 * 1024),
);
}
return reserved_bytes;
}
let notify = Arc::new(Notify::new());
self.wait_queue.lock().push_back(Arc::clone(¬ify));
#[cfg(test)]
{
let hook = self.test_pause_hook.lock().clone();
if let Some(hook) = hook {
hook.arrived.notify_one();
hook.resume.notified().await;
}
}
if let Some(new_cur) = self.try_acquire(reserved_bytes) {
if new_cur < self.budget {
self.wake_next();
}
return reserved_bytes;
}
wait_count += 1;
if wait_count == 1 || wait_count.is_multiple_of(10) {
re_log::info!(
"Budget backpressure (wait #{wait_count}): want {}MB, \
current {}MB / {}MB budget",
reserved_bytes / (1024 * 1024),
self.current.load(std::sync::atomic::Ordering::Acquire) / (1024 * 1024),
self.budget / (1024 * 1024),
);
}
notify.notified().await;
}
}
pub(crate) fn adjust_reservation(&self, estimated: usize, reserved: usize, actual: usize) {
use std::sync::atomic::Ordering::{AcqRel, Acquire};
if actual > reserved {
let new_cur = self.current.fetch_add(actual - reserved, AcqRel) + (actual - reserved);
self.peak_current.fetch_max(new_cur, AcqRel);
} else if reserved > actual {
self.current
.fetch_update(AcqRel, Acquire, |current| {
Some(current.saturating_sub(reserved - actual))
})
.expect("closure always returns Some");
self.wake_next();
}
self.record_actual_sample(estimated, actual);
}
pub(crate) fn release(&self, bytes: usize) {
use std::sync::atomic::Ordering::{AcqRel, Acquire};
let prev = self
.current
.fetch_update(AcqRel, Acquire, |current| {
Some(current.saturating_sub(bytes))
})
.expect("closure always returns Some");
self.total_released_bytes.fetch_add(bytes, AcqRel);
self.total_releases.fetch_add(1, AcqRel);
re_log::debug!(
"Budget release: freed {}MB, {}MB → {}MB / {}MB",
bytes / (1024 * 1024),
prev / (1024 * 1024),
prev.saturating_sub(bytes) / (1024 * 1024),
self.budget / (1024 * 1024),
);
self.wake_next();
}
fn refund_reservation(&self, reserved: usize) {
use std::sync::atomic::Ordering::{AcqRel, Acquire};
if reserved == 0 {
return;
}
self.current
.fetch_update(AcqRel, Acquire, |current| {
Some(current.saturating_sub(reserved))
})
.expect("closure always returns Some");
self.wake_next();
}
pub(crate) async fn reserve_guarded(&self, estimated: usize) -> ReservationGuard<'_> {
let reserved = self.reserve(estimated).await;
ReservationGuard {
budget: self,
estimated,
reserved,
committed: false,
}
}
}
#[must_use = "ReservationGuard returns its bytes to the budget on drop; \
call .commit(actual) once the decoded size is known"]
pub(crate) struct ReservationGuard<'a> {
budget: &'a PipelineBudget,
estimated: usize,
reserved: usize,
committed: bool,
}
impl ReservationGuard<'_> {
pub(crate) fn commit(mut self, actual: usize) {
self.budget
.adjust_reservation(self.estimated, self.reserved, actual);
self.committed = true;
}
}
impl Drop for ReservationGuard<'_> {
fn drop(&mut self) {
if !self.committed {
self.budget.refund_reservation(self.reserved);
}
}
}
impl Drop for PipelineBudget {
fn drop(&mut self) {
use std::sync::atomic::Ordering::Acquire;
let n_releases = self.total_releases.load(Acquire);
if n_releases == 0 {
return;
}
const MB: usize = 1024 * 1024;
let peak = self.peak_current.load(Acquire);
let total_released = self.total_released_bytes.load(Acquire);
let pct = if self.budget > 0 {
#[expect(clippy::cast_precision_loss)]
let pct = peak as f64 / self.budget as f64 * 100.0;
pct
} else {
0.0
};
re_log::info!(
"PipelineBudget summary: peak={}MB / {}MB ({pct:.0}%), \
released_total={}MB across {n_releases} calls",
peak / MB,
self.budget / MB,
total_released / MB,
);
}
}
impl std::fmt::Debug for PipelineBudget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PipelineBudget")
.field("budget", &self.budget)
.field(
"current",
&self.current.load(std::sync::atomic::Ordering::Relaxed),
)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
#[test]
fn test_budget_clamps_to_min() {
let budget = PipelineBudget::new(10 * 1024 * 1024, 1);
assert_eq!(budget.budget, MIN_BUDGET_PER_PARTITION);
}
#[test]
fn test_budget_clamps_to_max() {
let budget = PipelineBudget::new(8 * 1024 * 1024 * 1024 * 1024 * 1024, 1);
assert_eq!(budget.budget, MAX_BUDGET_PER_PARTITION);
}
#[test]
fn test_budget_scales_with_partitions() {
let budget = PipelineBudget::new(64 * 1024 * 1024 * 1024 * 1024 * 1024, 14);
assert_eq!(budget.budget, MAX_BUDGET_PER_PARTITION * 14);
}
#[test]
fn test_budget_small_data_many_partitions() {
let budget = PipelineBudget::new(100 * 1024 * 1024, 4);
assert_eq!(budget.budget, MIN_BUDGET_PER_PARTITION * 4);
}
#[tokio::test]
async fn test_reserve_blocks_when_budget_exhausted() {
let budget = Arc::new(PipelineBudget::new(0, 1)); budget.set_multiplier(1.0);
let half = budget.budget / 2;
budget.reserve(half).await;
assert_eq!(
budget.current.load(std::sync::atomic::Ordering::Acquire),
half
);
budget.reserve(half).await;
assert_eq!(
budget.current.load(std::sync::atomic::Ordering::Acquire),
half * 2
);
let budget_clone = Arc::clone(&budget);
let handle = tokio::spawn(async move {
budget_clone.reserve(half).await;
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert!(
!handle.is_finished(),
"reserve should block when budget is exhausted"
);
budget.release(half);
tokio::time::timeout(std::time::Duration::from_secs(1), handle)
.await
.expect("reserve should unblock after release")
.expect("task should not panic");
}
#[tokio::test]
async fn test_adjust_reservation_corrects_estimate() {
let budget = Arc::new(PipelineBudget::new(0, 1)); budget.set_multiplier(1.0);
let estimated = 1000;
let actual = 600;
let reserved = budget.reserve(estimated).await;
assert_eq!(reserved, estimated);
assert_eq!(
budget.current.load(std::sync::atomic::Ordering::Acquire),
estimated
);
budget.adjust_reservation(estimated, reserved, actual);
assert_eq!(
budget.current.load(std::sync::atomic::Ordering::Acquire),
actual
);
}
#[tokio::test]
async fn test_estimate_multiplier_adapts_over_time() {
let budget = Arc::new(PipelineBudget::new(0, 1));
assert_eq!(budget.current_multiplier(), INITIAL_ESTIMATE_MULTIPLIER);
let estimated = 1000;
let actual = 2000;
let reserved1 = budget.reserve(estimated).await;
assert_eq!(reserved1, 1500);
budget.adjust_reservation(estimated, reserved1, actual);
budget.release(actual);
assert!((budget.current_multiplier() - 1.6).abs() < 1e-9);
for _ in 0..40 {
let reserved = budget.reserve(estimated).await;
budget.adjust_reservation(estimated, reserved, actual);
budget.release(actual);
}
assert!(
(budget.current_multiplier() - 2.0).abs() < 0.01,
"multiplier should converge toward 2.0, got {}",
budget.current_multiplier()
);
}
#[tokio::test]
async fn test_estimate_multiplier_is_clamped() {
let budget = Arc::new(PipelineBudget::new(0, 1));
for _ in 0..100 {
budget.record_actual_sample(100, 1000); }
assert!(
(budget.current_multiplier() - MAX_ESTIMATE_MULTIPLIER).abs() < 1e-4,
"multiplier should converge to MAX, got {}",
budget.current_multiplier()
);
for _ in 0..100 {
budget.record_actual_sample(1000, 100); }
assert!(
(budget.current_multiplier() - MIN_ESTIMATE_MULTIPLIER).abs() < 1e-4,
"multiplier should converge to MIN, got {}",
budget.current_multiplier()
);
}
#[tokio::test]
async fn test_peak_current_tracks_high_water_mark() {
let budget = Arc::new(PipelineBudget::new(0, 1));
budget.set_multiplier(1.0);
let r1 = budget.reserve(10 * 1024 * 1024).await;
let r2 = budget.reserve(5 * 1024 * 1024).await;
let peak_before = budget
.peak_current
.load(std::sync::atomic::Ordering::Acquire);
assert_eq!(peak_before, r1 + r2);
budget.release(r1 + r2);
let r3 = budget.reserve(1024 * 1024).await;
let peak_after = budget
.peak_current
.load(std::sync::atomic::Ordering::Acquire);
assert_eq!(
peak_after, peak_before,
"peak should not regress after releases",
);
budget.release(r3);
}
#[tokio::test]
async fn test_reserve_no_thundering_herd_under_contention() {
let budget = Arc::new(PipelineBudget::new(0, 1));
budget.set_multiplier(1.0);
let full = budget.budget;
let n: usize = 32;
let per_task = full / n;
assert!(per_task > 0, "budget too small for {n}-way contention test");
let mut handles = Vec::with_capacity(n);
for _ in 0..n {
let budget = Arc::clone(&budget);
handles.push(tokio::spawn(async move { budget.reserve(per_task).await }));
}
for handle in handles {
tokio::time::timeout(std::time::Duration::from_secs(5), handle)
.await
.expect("reserve hung under contention")
.expect("task panicked");
}
assert_eq!(
budget.current.load(std::sync::atomic::Ordering::Acquire),
per_task * n,
);
}
#[tokio::test]
async fn test_reserve_no_lost_wakeup_in_wait_path() {
let budget = Arc::new(PipelineBudget::new(0, 1));
budget.set_multiplier(1.0);
let full = budget.budget;
budget.reserve(full).await;
let pause = budget.arm_pause_hook();
let reserver = {
let budget = Arc::clone(&budget);
tokio::spawn(async move { budget.reserve(1).await })
};
pause.arrived.notified().await;
*budget.test_pause_hook.lock() = None;
budget.release(full);
pause.resume.notify_one();
tokio::time::timeout(std::time::Duration::from_secs(1), reserver)
.await
.expect("reserve hung — lost-wakeup race in rollback→enqueue gap")
.expect("reserver task panicked");
}
#[tokio::test]
async fn test_reservation_guard_commit_records_actual() {
let budget = Arc::new(PipelineBudget::new(0, 1));
budget.set_multiplier(1.0);
let estimated = 1000;
let actual = 800;
let guard = budget.reserve_guarded(estimated).await;
guard.commit(actual);
assert_eq!(
budget.current.load(std::sync::atomic::Ordering::Acquire),
actual,
"commit should reduce current to the actual decoded size",
);
}
#[tokio::test]
async fn test_reservation_guard_drop_refunds_reservation() {
let budget = Arc::new(PipelineBudget::new(0, 1));
budget.set_multiplier(1.0);
let estimated = 1000;
let multiplier_before = f64::from_bits(
budget
.estimate_multiplier
.load(std::sync::atomic::Ordering::Acquire),
);
{
let _guard = budget.reserve_guarded(estimated).await;
assert_eq!(
budget.current.load(std::sync::atomic::Ordering::Acquire),
estimated,
);
}
assert_eq!(
budget.current.load(std::sync::atomic::Ordering::Acquire),
0,
"dropped guard should refund the entire reservation",
);
let multiplier_after = f64::from_bits(
budget
.estimate_multiplier
.load(std::sync::atomic::Ordering::Acquire),
);
assert!(
(multiplier_after - multiplier_before).abs() < 1e-9,
"guard drop must not fold a (estimated, 0) sample into the EMA \
(before={multiplier_before}, after={multiplier_after})",
);
}
#[tokio::test]
async fn test_reservation_guard_drop_wakes_waiter() {
let budget = Arc::new(PipelineBudget::new(0, 1));
budget.set_multiplier(1.0);
let full = budget.budget;
let blocking_guard = budget.reserve_guarded(full).await;
assert_eq!(
budget.current.load(std::sync::atomic::Ordering::Acquire),
full,
);
let budget_clone = Arc::clone(&budget);
let waiter = tokio::spawn(async move { budget_clone.reserve(1).await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert!(!waiter.is_finished(), "second reserve should be parked");
drop(blocking_guard);
tokio::time::timeout(std::time::Duration::from_secs(1), waiter)
.await
.expect("guard drop did not wake parked reserver")
.expect("waiter task panicked");
}
const DEFAULT_BYTES: usize = 64 * 1024 * 1024;
#[test]
fn test_parse_bytes_accepts_iec_suffix() {
assert_eq!(
parse_bytes_or_default("TEST", "128MiB", DEFAULT_BYTES),
128 * 1024 * 1024,
);
assert_eq!(
parse_bytes_or_default("TEST", "1GiB", DEFAULT_BYTES),
1024 * 1024 * 1024,
);
assert_eq!(
parse_bytes_or_default("TEST", "512KiB", DEFAULT_BYTES),
512 * 1024,
);
}
#[test]
fn test_parse_bytes_accepts_si_suffix() {
assert_eq!(
parse_bytes_or_default("TEST", "100MB", DEFAULT_BYTES),
100_000_000,
);
assert_eq!(
parse_bytes_or_default("TEST", "2GB", DEFAULT_BYTES),
2_000_000_000,
);
}
#[test]
fn test_parse_bytes_accepts_bare_integer_as_bytes() {
assert_eq!(
parse_bytes_or_default("TEST", "67108864", DEFAULT_BYTES),
64 * 1024 * 1024,
);
}
#[test]
fn test_parse_bytes_rejects_zero() {
assert_eq!(
parse_bytes_or_default("TEST", "0", DEFAULT_BYTES),
DEFAULT_BYTES,
);
}
#[test]
fn test_parse_bytes_rejects_negative() {
assert_eq!(
parse_bytes_or_default("TEST", "-1", DEFAULT_BYTES),
DEFAULT_BYTES,
);
assert_eq!(
parse_bytes_or_default("TEST", "-1MB", DEFAULT_BYTES),
DEFAULT_BYTES,
);
}
#[test]
fn test_parse_bytes_rejects_non_numeric() {
assert_eq!(
parse_bytes_or_default("TEST", "not-a-number", DEFAULT_BYTES),
DEFAULT_BYTES,
);
}
#[test]
fn test_parse_bytes_rejects_unknown_suffix() {
assert_eq!(
parse_bytes_or_default("TEST", "10Mb", DEFAULT_BYTES),
DEFAULT_BYTES,
);
}
#[test]
fn test_parse_fraction_accepts_valid_range() {
assert!((parse_fraction_or_default("TEST", "0.5", 0.25) - 0.5).abs() < 1e-12);
assert!((parse_fraction_or_default("TEST", "1.0", 0.25) - 1.0).abs() < 1e-12);
assert!((parse_fraction_or_default("TEST", "0.0001", 0.25) - 0.0001).abs() < 1e-12);
}
#[test]
fn test_parse_fraction_rejects_zero() {
assert!((parse_fraction_or_default("TEST", "0.0", 0.25) - 0.25).abs() < 1e-12);
}
#[test]
fn test_parse_fraction_rejects_above_one() {
assert!((parse_fraction_or_default("TEST", "1.5", 0.25) - 0.25).abs() < 1e-12);
}
#[test]
fn test_parse_fraction_rejects_negative() {
assert!((parse_fraction_or_default("TEST", "-0.5", 0.25) - 0.25).abs() < 1e-12);
}
#[test]
fn test_parse_fraction_rejects_nan_and_inf() {
assert!((parse_fraction_or_default("TEST", "NaN", 0.25) - 0.25).abs() < 1e-12);
assert!((parse_fraction_or_default("TEST", "inf", 0.25) - 0.25).abs() < 1e-12);
}
#[test]
fn test_parse_fraction_rejects_non_numeric() {
assert!((parse_fraction_or_default("TEST", "bogus", 0.25) - 0.25).abs() < 1e-12);
}
}