#[cfg(feature = "load")]
mod load_sensor_tests {
use celers_core::{BrokerStats, PoolStats, WorkerStats};
use oxirouter::{CelersLoadSensor, context::sensor::LoadSensor};
fn make_stats(
total: u64,
active: u32,
succeeded: u64,
failed: u64,
loadavg: Option<[f64; 3]>,
pool: Option<PoolStats>,
broker: Option<BrokerStats>,
) -> WorkerStats {
WorkerStats {
total_tasks: total,
active_tasks: active,
succeeded,
failed,
retried: 0,
uptime: 3600.0,
loadavg,
memory_usage: None,
pool,
broker,
clock: None,
}
}
#[test]
fn active_tasks_maps_to_pending_tasks() {
let sensor = CelersLoadSensor::new(|| make_stats(100, 7, 90, 3, None, None, None));
let ctx = sensor.sense().expect("sensor must return Some");
assert_eq!(ctx.pending_tasks, 7);
}
#[test]
fn loadavg_maps_to_global_load() {
let sensor = CelersLoadSensor::new(|| {
make_stats(
100,
5,
95,
0,
Some([0.5, 0.4, 0.3]), None,
None,
)
});
let ctx = sensor.sense().expect("sensor must return Some");
assert!(
(ctx.global_load - 0.5).abs() < 1e-6,
"global_load should be 0.5 (loadavg/1 concurrency)"
);
}
#[test]
fn loadavg_normalized_by_pool_concurrency() {
let pool = PoolStats {
pool_type: "threads".to_string(),
max_concurrency: 4,
pool_size: 4,
available: 2,
processes: alloc::vec![],
};
let sensor = CelersLoadSensor::new(move || {
make_stats(
100,
2,
98,
0,
Some([2.0, 1.5, 1.0]), Some(pool.clone()),
None,
)
});
let ctx = sensor.sense().expect("sensor must return Some");
assert!(
(ctx.global_load - 0.5).abs() < 1e-6,
"global_load should be 2/4 = 0.5"
);
}
#[test]
fn pool_available_maps_to_queue_depth() {
let pool = PoolStats {
pool_type: "prefork".to_string(),
max_concurrency: 8,
pool_size: 8,
available: 3,
processes: alloc::vec![1000, 1001, 1002],
};
let sensor =
CelersLoadSensor::new(move || make_stats(50, 5, 45, 0, None, Some(pool.clone()), None));
let ctx = sensor.sense().expect("sensor must return Some");
let queue_depth = ctx
.queue_depth
.get("__pool__")
.copied()
.expect("__pool__ key should exist");
assert_eq!(queue_depth, 5, "pool queue_depth should be busy count (5)");
}
#[test]
fn broker_connected_sets_circuit_closed() {
use oxirouter::CircuitState;
let broker = BrokerStats {
url: "amqp://localhost:5672".to_string(),
connected: true,
heartbeat: Some(60),
transport: "amqp".to_string(),
};
let sensor = CelersLoadSensor::new(move || {
make_stats(20, 1, 19, 0, None, None, Some(broker.clone()))
});
let ctx = sensor.sense().expect("sensor must return Some");
let state = ctx
.circuit_breakers
.get("__broker__")
.copied()
.expect("__broker__ circuit state should be set");
assert_eq!(state, CircuitState::Closed);
let health = ctx
.target_health
.get("__broker__")
.copied()
.expect("__broker__ health should be set");
assert!((health - 1.0).abs() < 1e-6);
}
#[test]
fn broker_disconnected_sets_circuit_open() {
use oxirouter::CircuitState;
let broker = BrokerStats {
url: "amqp://remote:5672".to_string(),
connected: false,
heartbeat: None,
transport: "amqp".to_string(),
};
let sensor =
CelersLoadSensor::new(move || make_stats(0, 0, 0, 0, None, None, Some(broker.clone())));
let ctx = sensor.sense().expect("sensor must return Some");
let state = ctx
.circuit_breakers
.get("__broker__")
.copied()
.expect("__broker__ circuit state should be set");
assert_eq!(state, CircuitState::Open);
}
#[test]
fn error_rate_computed_from_failed_over_total() {
let sensor = CelersLoadSensor::new(|| make_stats(100, 0, 90, 10, None, None, None));
let ctx = sensor.sense().expect("sensor must return Some");
let error_rate = ctx
.error_rates
.get("__global__")
.copied()
.expect("__global__ error rate should be set");
assert!((error_rate - 0.1).abs() < 1e-6, "error_rate should be 0.1");
}
#[test]
fn zero_total_tasks_no_error_rate() {
let sensor = CelersLoadSensor::new(|| make_stats(0, 0, 0, 0, None, None, None));
let ctx = sensor.sense().expect("sensor must return Some");
assert!(
ctx.error_rates.get("__global__").is_none(),
"should not insert error rate when total_tasks == 0"
);
}
#[test]
fn global_load_clamps_to_one() {
let sensor =
CelersLoadSensor::new(|| make_stats(10, 10, 0, 0, Some([5.0, 4.0, 3.0]), None, None));
let ctx = sensor.sense().expect("sensor must return Some");
assert!(
(ctx.global_load - 1.0).abs() < 1e-6,
"global_load must be clamped to 1.0"
);
}
}
extern crate alloc;