use super::{PlanDag, PlanId, PlanNode};
use std::collections::BTreeMap;
use std::fmt;
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct Segment {
pub start: f64,
pub rate: f64,
pub burst: f64,
}
impl Segment {
#[must_use]
#[inline]
pub fn new(start: f64, rate: f64, burst: f64) -> Self {
Self { start, rate, burst }
}
#[must_use]
#[inline]
pub fn eval_at(&self, t: f64) -> f64 {
if t <= self.start {
self.burst
} else {
self.rate.mul_add(t - self.start, self.burst)
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PiecewiseLinearCurve {
segments: Vec<Segment>,
}
impl PiecewiseLinearCurve {
#[must_use]
pub fn zero() -> Self {
Self {
segments: vec![Segment::new(0.0, 0.0, 0.0)],
}
}
#[must_use]
pub fn from_segments(segments: Vec<Segment>) -> Option<Self> {
if segments.is_empty() {
return None;
}
for i in 0..segments.len() {
if !segments[i].rate.is_finite()
|| !segments[i].burst.is_finite()
|| !segments[i].start.is_finite()
{
return None;
}
if segments[i].rate < 0.0 || segments[i].burst < 0.0 {
return None;
}
if segments[i].start < 0.0 {
return None;
}
if i > 0 {
if segments[i].start <= segments[i - 1].start {
return None;
}
let prev = &segments[i - 1];
let expected = prev
.rate
.mul_add(segments[i].start - prev.start, prev.burst);
if (expected - segments[i].burst).abs() > 1e-9 {
return None;
}
}
}
Some(Self { segments })
}
#[must_use]
pub fn affine(rate: f64, burst: f64) -> Self {
debug_assert!(rate >= 0.0 && burst >= 0.0);
Self {
segments: vec![Segment::new(0.0, rate, burst)],
}
}
#[must_use]
pub fn rate_latency(rate: f64, latency: f64) -> Self {
debug_assert!(rate >= 0.0 && latency >= 0.0);
if latency.abs() < f64::EPSILON {
return Self::affine(rate, 0.0);
}
Self {
segments: vec![
Segment::new(0.0, 0.0, 0.0),
Segment::new(latency, rate, 0.0),
],
}
}
#[must_use]
pub fn staircase(step_size: f64, period: f64, num_steps: usize) -> Self {
debug_assert!(step_size > 0.0 && period > 0.0 && num_steps > 0);
let mut segments = Vec::with_capacity(num_steps * 2);
let epsilon = (period * 1e-6).max(f64::MIN_POSITIVE);
let steep_rate = step_size / epsilon;
#[allow(clippy::cast_precision_loss)]
for i in 0..num_steps {
let fi = i as f64;
let t = fi * period;
let base = fi * step_size;
if i == 0 {
segments.push(Segment::new(0.0, steep_rate, 0.0));
segments.push(Segment::new(epsilon, 0.0, step_size));
} else {
segments.push(Segment::new(t, steep_rate, base));
segments.push(Segment::new(t + epsilon, 0.0, base + step_size));
}
}
Self { segments }
}
#[must_use]
pub fn eval(&self, t: f64) -> f64 {
if t < 0.0 {
return 0.0;
}
if self.segments.is_empty() {
return 0.0;
}
let idx = match self
.segments
.binary_search_by(|s| s.start.partial_cmp(&t).unwrap_or(std::cmp::Ordering::Less))
{
Ok(i) => i,
Err(i) => {
if i == 0 {
return 0.0;
}
i - 1
}
};
self.segments[idx].eval_at(t)
}
#[must_use]
#[inline]
pub fn segment_count(&self) -> usize {
self.segments.len()
}
#[must_use]
#[inline]
pub fn segments(&self) -> &[Segment] {
&self.segments
}
#[must_use]
#[inline]
pub fn asymptotic_rate(&self) -> f64 {
self.segments.last().map_or(0.0, |s| s.rate)
}
}
#[must_use]
pub fn min_plus_convolution(
f: &PiecewiseLinearCurve,
g: &PiecewiseLinearCurve,
) -> PiecewiseLinearCurve {
let f_breaks: Vec<f64> = f.segments.iter().map(|s| s.start).collect();
let g_breaks: Vec<f64> = g.segments.iter().map(|s| s.start).collect();
let mut all_t: Vec<f64> = Vec::new();
for &fb in &f_breaks {
for &gb in &g_breaks {
all_t.push(fb + gb);
}
}
all_t.extend_from_slice(&f_breaks);
all_t.extend_from_slice(&g_breaks);
all_t.push(0.0);
let t_max = f_breaks
.last()
.copied()
.unwrap_or(0.0)
.max(g_breaks.last().copied().unwrap_or(0.0))
.mul_add(2.0, 1.0);
let num_samples: u32 = 64;
for i in 0..=num_samples {
all_t.push(t_max * f64::from(i) / f64::from(num_samples));
}
all_t.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
all_t.dedup_by(|a, b| (*a - *b).abs() < 1e-12);
all_t.retain(|&t| t >= 0.0);
if all_t.is_empty() {
return PiecewiseLinearCurve::zero();
}
let mut points: Vec<(f64, f64)> = Vec::with_capacity(all_t.len());
for &t in &all_t {
let val = convolution_at(f, g, t, &f_breaks, &g_breaks);
points.push((t, val));
}
build_curve_from_points(&points)
}
fn convolution_at(
f: &PiecewiseLinearCurve,
g: &PiecewiseLinearCurve,
t: f64,
f_breaks: &[f64],
g_breaks: &[f64],
) -> f64 {
let mut min_val = f64::INFINITY;
for &s in f_breaks {
if s <= t + 1e-12 {
let val = f.eval(s) + g.eval(t - s);
if val < min_val {
min_val = val;
}
}
}
for &gb in g_breaks {
if gb <= t + 1e-12 {
let s = t - gb;
if s >= -1e-12 {
let val = f.eval(s.max(0.0)) + g.eval(gb);
if val < min_val {
min_val = val;
}
}
}
}
let val_0 = f.eval(0.0) + g.eval(t);
if val_0 < min_val {
min_val = val_0;
}
let val_t = f.eval(t) + g.eval(0.0);
if val_t < min_val {
min_val = val_t;
}
min_val
}
fn build_curve_from_points(points: &[(f64, f64)]) -> PiecewiseLinearCurve {
if points.is_empty() {
return PiecewiseLinearCurve::zero();
}
let mut segments = Vec::new();
for i in 0..points.len() {
let (t, v) = points[i];
let rate = if i + 1 < points.len() {
let (t_next, v_next) = points[i + 1];
let dt = t_next - t;
if dt > 1e-12 {
((v_next - v) / dt).max(0.0)
} else {
0.0
}
} else {
if i > 0 {
let (t_prev, v_prev) = points[i - 1];
let dt = t - t_prev;
if dt > 1e-12 {
((v - v_prev) / dt).max(0.0)
} else {
0.0
}
} else {
0.0
}
};
segments.push(Segment::new(t, rate, v));
}
let mut simplified = Vec::with_capacity(segments.len());
for seg in &segments {
if let Some(last) = simplified.last() {
let last: &Segment = last;
if (last.rate - seg.rate).abs() < 1e-9 {
let expected_burst = last.rate.mul_add(seg.start - last.start, last.burst);
if (expected_burst - seg.burst).abs() < 1e-9 {
continue;
}
}
}
simplified.push(*seg);
}
PiecewiseLinearCurve {
segments: if simplified.is_empty() {
vec![Segment::new(0.0, 0.0, 0.0)]
} else {
simplified
},
}
}
#[must_use]
pub fn horizontal_deviation(alpha: &PiecewiseLinearCurve, beta: &PiecewiseLinearCurve) -> f64 {
let alpha_rate = alpha.asymptotic_rate();
let beta_rate = beta.asymptotic_rate();
if alpha_rate > beta_rate + 1e-12 {
return f64::INFINITY;
}
let mut sample_times: Vec<f64> = Vec::new();
for seg in alpha.segments() {
sample_times.push(seg.start);
}
for seg in beta.segments() {
sample_times.push(seg.start);
}
let t_max = sample_times
.iter()
.copied()
.fold(0.0_f64, f64::max)
.mul_add(2.0, 10.0);
let num_extra: u32 = 256;
for i in 0..=num_extra {
sample_times.push(t_max * f64::from(i) / f64::from(num_extra));
}
sample_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
sample_times.dedup_by(|a, b| (*a - *b).abs() < 1e-12);
let mut max_delay = 0.0_f64;
for &t in &sample_times {
let alpha_t = alpha.eval(t);
if alpha_t <= 1e-12 {
continue;
}
let d = find_delay_for_value(beta, t, alpha_t);
if d > max_delay {
max_delay = d;
}
}
max_delay
}
fn find_delay_for_value(curve: &PiecewiseLinearCurve, t: f64, target: f64) -> f64 {
if curve.eval(t) >= target - 1e-12 {
return 0.0;
}
let mut lo = 0.0_f64;
let mut hi = 1.0_f64;
loop {
if curve.eval(t + hi) >= target - 1e-12 {
break;
}
hi *= 2.0;
if hi > 1e15 {
return f64::INFINITY;
}
}
for _ in 0..64 {
let mid = f64::midpoint(lo, hi);
if curve.eval(t + mid) >= target - 1e-12 {
hi = mid;
} else {
lo = mid;
}
}
hi
}
#[must_use]
pub fn vertical_deviation(alpha: &PiecewiseLinearCurve, beta: &PiecewiseLinearCurve) -> f64 {
let alpha_rate = alpha.asymptotic_rate();
let beta_rate = beta.asymptotic_rate();
if alpha_rate > beta_rate + 1e-12 {
return f64::INFINITY;
}
let mut sample_times: Vec<f64> = Vec::new();
for seg in alpha.segments() {
sample_times.push(seg.start);
}
for seg in beta.segments() {
sample_times.push(seg.start);
}
let t_max = sample_times
.iter()
.copied()
.fold(0.0_f64, f64::max)
.mul_add(2.0, 10.0);
let num_extra: u32 = 256;
for i in 0..=num_extra {
sample_times.push(t_max * f64::from(i) / f64::from(num_extra));
}
sample_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
sample_times.dedup_by(|a, b| (*a - *b).abs() < 1e-12);
let mut max_backlog = 0.0_f64;
for &t in &sample_times {
let diff = alpha.eval(t) - beta.eval(t);
if diff > max_backlog {
max_backlog = diff;
}
}
max_backlog
}
#[must_use]
pub fn min_plus_deconvolution(
f: &PiecewiseLinearCurve,
g: &PiecewiseLinearCurve,
) -> PiecewiseLinearCurve {
let f_breaks: Vec<f64> = f.segments.iter().map(|s| s.start).collect();
let g_breaks: Vec<f64> = g.segments.iter().map(|s| s.start).collect();
let t_max = f_breaks
.last()
.copied()
.unwrap_or(0.0)
.max(g_breaks.last().copied().unwrap_or(0.0))
.mul_add(2.0, 1.0);
let mut all_t: Vec<f64> = Vec::new();
all_t.extend_from_slice(&f_breaks);
all_t.extend_from_slice(&g_breaks);
all_t.push(0.0);
let num_samples: u32 = 64;
for i in 0..=num_samples {
all_t.push(t_max * f64::from(i) / f64::from(num_samples));
}
all_t.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
all_t.dedup_by(|a, b| (*a - *b).abs() < 1e-12);
all_t.retain(|&t| t >= 0.0);
let s_max = g_breaks.last().copied().unwrap_or(0.0).mul_add(2.0, 10.0);
let mut candidate_s: Vec<f64> = Vec::new();
candidate_s.extend_from_slice(&g_breaks);
candidate_s.push(0.0);
let s_samples: u32 = 64;
for i in 0..=s_samples {
candidate_s.push(s_max * f64::from(i) / f64::from(s_samples));
}
candidate_s.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
candidate_s.dedup_by(|a, b| (*a - *b).abs() < 1e-12);
let mut points: Vec<(f64, f64)> = Vec::with_capacity(all_t.len());
for &t in &all_t {
let mut sup_val = f64::NEG_INFINITY;
for &s in &candidate_s {
let val = f.eval(t + s) - g.eval(s);
if val > sup_val {
sup_val = val;
}
}
points.push((t, sup_val.max(0.0)));
}
build_curve_from_points(&points)
}
#[derive(Debug, Clone, PartialEq)]
pub struct ArrivalCurve(pub PiecewiseLinearCurve);
impl ArrivalCurve {
#[must_use]
pub fn token_bucket(rate: f64, burst: f64) -> Self {
Self(PiecewiseLinearCurve::affine(rate, burst))
}
#[must_use]
pub fn constant_rate(rate: f64) -> Self {
Self(PiecewiseLinearCurve::affine(rate, 0.0))
}
#[must_use]
pub fn from_curve(curve: PiecewiseLinearCurve) -> Self {
Self(curve)
}
#[must_use]
#[inline]
pub fn eval(&self, t: f64) -> f64 {
self.0.eval(t)
}
#[must_use]
#[inline]
pub fn curve(&self) -> &PiecewiseLinearCurve {
&self.0
}
#[must_use]
#[inline]
pub fn asymptotic_rate(&self) -> f64 {
self.0.asymptotic_rate()
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ServiceCurve(pub PiecewiseLinearCurve);
impl ServiceCurve {
#[must_use]
pub fn rate_latency(rate: f64, latency: f64) -> Self {
Self(PiecewiseLinearCurve::rate_latency(rate, latency))
}
#[must_use]
pub fn constant_rate(rate: f64) -> Self {
Self(PiecewiseLinearCurve::affine(rate, 0.0))
}
#[must_use]
pub fn from_curve(curve: PiecewiseLinearCurve) -> Self {
Self(curve)
}
#[must_use]
#[inline]
pub fn eval(&self, t: f64) -> f64 {
self.0.eval(t)
}
#[must_use]
#[inline]
pub fn curve(&self) -> &PiecewiseLinearCurve {
&self.0
}
#[must_use]
#[inline]
pub fn asymptotic_rate(&self) -> f64 {
self.0.asymptotic_rate()
}
#[must_use]
pub fn sequential(&self, other: &Self) -> Self {
Self(min_plus_convolution(&self.0, &other.0))
}
#[must_use]
pub fn scale(&self, factor: f64) -> Self {
debug_assert!(factor > 0.0);
let scaled_segments: Vec<Segment> = self
.0
.segments
.iter()
.map(|s| Segment::new(s.start, s.rate * factor, s.burst * factor))
.collect();
Self(PiecewiseLinearCurve {
segments: scaled_segments,
})
}
}
#[derive(Debug, Clone)]
pub struct BoundContribution {
pub node_id: PlanId,
pub delay: f64,
pub description: String,
}
impl fmt::Display for BoundContribution {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"node[{}]: {:.6}s — {}",
self.node_id.index(),
self.delay,
self.description
)
}
}
#[derive(Debug, Clone)]
pub struct LatencyBound {
pub delay: f64,
pub backlog: f64,
pub utilization: f64,
pub provenance: Vec<BoundContribution>,
}
impl LatencyBound {
#[must_use]
#[inline]
pub fn is_stable(&self) -> bool {
self.delay.is_finite() && self.utilization < 1.0
}
#[must_use]
pub fn bottleneck(&self) -> Option<&BoundContribution> {
self.provenance.first()
}
#[must_use]
pub fn summary(&self) -> String {
if !self.is_stable() {
return format!(
"UNSTABLE: utilization={:.2}%, delay=INF",
self.utilization * 100.0
);
}
let bottleneck = self
.bottleneck()
.map(|b| format!(", bottleneck=node[{}]", b.node_id.index()))
.unwrap_or_default();
format!(
"delay<={:.6}s, backlog<={:.2}, util={:.1}%{}",
self.delay,
self.backlog,
self.utilization * 100.0,
bottleneck
)
}
}
impl fmt::Display for LatencyBound {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Latency Bound Analysis")?;
writeln!(f, " delay <= {:.6}s", self.delay)?;
writeln!(f, " backlog <= {:.2} units", self.backlog)?;
writeln!(f, " util = {:.1}%", self.utilization * 100.0)?;
if !self.provenance.is_empty() {
writeln!(f, " provenance:")?;
for contrib in &self.provenance {
writeln!(f, " {contrib}")?;
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct NodeCurves {
pub arrival: ArrivalCurve,
pub service: ServiceCurve,
}
impl NodeCurves {
#[must_use]
pub fn new(arrival: ArrivalCurve, service: ServiceCurve) -> Self {
Self { arrival, service }
}
#[must_use]
pub fn delay_bound(&self) -> f64 {
horizontal_deviation(self.arrival.curve(), self.service.curve())
}
#[must_use]
pub fn backlog_bound(&self) -> f64 {
vertical_deviation(self.arrival.curve(), self.service.curve())
}
#[must_use]
pub fn utilization(&self) -> f64 {
let service_rate = self.service.asymptotic_rate();
if service_rate <= 1e-15 {
return f64::INFINITY;
}
self.arrival.asymptotic_rate() / service_rate
}
}
#[derive(Debug, Clone)]
struct NodeLatency {
delay: f64,
backlog: f64,
output_arrival: ArrivalCurve,
effective_service: ServiceCurve,
contributions: Vec<BoundContribution>,
}
#[derive(Debug, Clone)]
pub struct LatencyAnalysis {
pub node_delays: BTreeMap<usize, f64>,
pub node_backlogs: BTreeMap<usize, f64>,
pub root_bound: Option<LatencyBound>,
}
impl LatencyAnalysis {
#[must_use]
pub fn delay_at(&self, id: PlanId) -> Option<f64> {
self.node_delays.get(&id.index()).copied()
}
#[must_use]
pub fn backlog_at(&self, id: PlanId) -> Option<f64> {
self.node_backlogs.get(&id.index()).copied()
}
#[must_use]
pub fn end_to_end_delay(&self) -> Option<f64> {
self.root_bound.as_ref().map(|b| b.delay)
}
#[must_use]
pub fn summary(&self) -> String {
self.root_bound.as_ref().map_or_else(
|| format!("{} nodes analyzed, no root bound", self.node_delays.len()),
|bound| {
format!(
"{} nodes analyzed, e2e: {}",
self.node_delays.len(),
bound.summary()
)
},
)
}
}
impl fmt::Display for LatencyAnalysis {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Latency Analysis ({} nodes)", self.node_delays.len())?;
for (&idx, &delay) in &self.node_delays {
let backlog = self.node_backlogs.get(&idx).copied().unwrap_or(0.0);
writeln!(
f,
" node[{idx}]: delay<={delay:.6}s, backlog<={backlog:.2}"
)?;
}
if let Some(bound) = &self.root_bound {
writeln!(f, " --- end-to-end ---")?;
write!(f, "{bound}")?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct LatencyAnalyzer {
annotations: BTreeMap<usize, NodeCurves>,
default_arrival: Option<ArrivalCurve>,
default_service: Option<ServiceCurve>,
}
impl LatencyAnalyzer {
#[must_use]
pub fn new() -> Self {
Self {
annotations: BTreeMap::new(),
default_arrival: None,
default_service: None,
}
}
#[must_use]
pub fn with_defaults(arrival: ArrivalCurve, service: ServiceCurve) -> Self {
Self {
annotations: BTreeMap::new(),
default_arrival: Some(arrival),
default_service: Some(service),
}
}
pub fn annotate(&mut self, id: PlanId, curves: NodeCurves) {
self.annotations.insert(id.index(), curves);
}
pub fn remove_annotation(&mut self, id: PlanId) {
self.annotations.remove(&id.index());
}
#[must_use]
pub fn analyze(&self, dag: &PlanDag) -> LatencyAnalysis {
let mut cache: BTreeMap<usize, NodeLatency> = BTreeMap::new();
let mut node_delays = BTreeMap::new();
let mut node_backlogs = BTreeMap::new();
for idx in 0..dag.node_count() {
let id = PlanId::new(idx);
let result = self.analyze_node(dag, id, &mut cache);
node_delays.insert(idx, result.delay);
node_backlogs.insert(idx, result.backlog);
}
let root_bound = dag.root().map(|root_id| {
let root_result = cache
.get(&root_id.index())
.cloned()
.unwrap_or_else(|| NodeLatency {
delay: f64::INFINITY,
backlog: f64::INFINITY,
output_arrival: ArrivalCurve::constant_rate(0.0),
effective_service: ServiceCurve::constant_rate(0.0),
contributions: Vec::new(),
});
let utilization = {
let svc_rate = root_result.effective_service.asymptotic_rate();
if svc_rate <= 1e-15 {
f64::INFINITY
} else {
root_result.output_arrival.asymptotic_rate() / svc_rate
}
};
let mut provenance = root_result.contributions;
provenance.sort_by(|a, b| {
b.delay
.partial_cmp(&a.delay)
.unwrap_or(std::cmp::Ordering::Equal)
});
LatencyBound {
delay: root_result.delay,
backlog: root_result.backlog,
utilization,
provenance,
}
});
LatencyAnalysis {
node_delays,
node_backlogs,
root_bound,
}
}
fn analyze_node(
&self,
dag: &PlanDag,
id: PlanId,
cache: &mut BTreeMap<usize, NodeLatency>,
) -> NodeLatency {
if let Some(existing) = cache.get(&id.index()) {
return existing.clone();
}
let result = dag.node(id).map_or_else(
|| NodeLatency {
delay: f64::INFINITY,
backlog: f64::INFINITY,
output_arrival: ArrivalCurve::constant_rate(0.0),
effective_service: ServiceCurve::constant_rate(0.0),
contributions: vec![BoundContribution {
node_id: id,
delay: f64::INFINITY,
description: "missing node".to_string(),
}],
},
|node| match node.clone() {
PlanNode::Leaf { label } => self.analyze_leaf(id, &label),
PlanNode::Join { children } => self.analyze_join(dag, id, &children, cache),
PlanNode::Race { children } => self.analyze_race(dag, id, &children, cache),
PlanNode::Timeout { child, duration } => {
self.analyze_timeout(dag, id, child, duration, cache)
}
},
);
cache.insert(id.index(), result.clone());
result
}
fn analyze_leaf(&self, id: PlanId, label: &str) -> NodeLatency {
let curves = self.annotations.get(&id.index()).cloned().or_else(|| {
match (&self.default_arrival, &self.default_service) {
(Some(a), Some(s)) => Some(NodeCurves::new(a.clone(), s.clone())),
_ => None,
}
});
curves.map_or_else(
|| NodeLatency {
delay: f64::INFINITY,
backlog: f64::INFINITY,
output_arrival: ArrivalCurve::constant_rate(0.0),
effective_service: ServiceCurve::constant_rate(0.0),
contributions: vec![BoundContribution {
node_id: id,
delay: f64::INFINITY,
description: format!("leaf \"{label}\": no annotation"),
}],
},
|curves| {
let delay = curves.delay_bound();
let backlog = curves.backlog_bound();
let description = format!("leaf \"{label}\": delay={delay:.6}s");
NodeLatency {
delay,
backlog,
output_arrival: ArrivalCurve::from_curve(min_plus_deconvolution(
curves.arrival.curve(),
curves.service.curve(),
)),
effective_service: curves.service.clone(),
contributions: vec![BoundContribution {
node_id: id,
delay,
description,
}],
}
},
)
}
fn analyze_join(
&self,
dag: &PlanDag,
id: PlanId,
children: &[PlanId],
cache: &mut BTreeMap<usize, NodeLatency>,
) -> NodeLatency {
if children.is_empty() {
return NodeLatency {
delay: 0.0,
backlog: 0.0,
output_arrival: ArrivalCurve::constant_rate(0.0),
effective_service: ServiceCurve::constant_rate(f64::INFINITY),
contributions: vec![BoundContribution {
node_id: id,
delay: 0.0,
description: "empty join (trivial)".to_string(),
}],
};
}
let child_results: Vec<NodeLatency> = children
.iter()
.map(|c| self.analyze_node(dag, *c, cache))
.collect();
let delay = child_results
.iter()
.map(|r| r.delay)
.fold(0.0_f64, f64::max);
let backlog = child_results
.iter()
.map(|r| r.backlog)
.fold(0.0_f64, f64::max);
let min_rate_child = child_results.iter().enumerate().min_by(|(_, a), (_, b)| {
a.effective_service
.asymptotic_rate()
.partial_cmp(&b.effective_service.asymptotic_rate())
.unwrap_or(std::cmp::Ordering::Equal)
});
let effective_service = min_rate_child.map_or_else(
|| ServiceCurve::constant_rate(0.0),
|(_, r)| r.effective_service.clone(),
);
let max_rate_child = child_results.iter().enumerate().max_by(|(_, a), (_, b)| {
a.output_arrival
.asymptotic_rate()
.partial_cmp(&b.output_arrival.asymptotic_rate())
.unwrap_or(std::cmp::Ordering::Equal)
});
let output_arrival = max_rate_child.map_or_else(
|| ArrivalCurve::constant_rate(0.0),
|(_, r)| r.output_arrival.clone(),
);
let mut contributions: Vec<BoundContribution> = child_results
.iter()
.flat_map(|r| r.contributions.iter().cloned())
.collect();
contributions.push(BoundContribution {
node_id: id,
delay,
description: format!("join of {} children: max delay={delay:.6}s", children.len()),
});
NodeLatency {
delay,
backlog,
output_arrival,
effective_service,
contributions,
}
}
fn analyze_race(
&self,
dag: &PlanDag,
id: PlanId,
children: &[PlanId],
cache: &mut BTreeMap<usize, NodeLatency>,
) -> NodeLatency {
if children.is_empty() {
return NodeLatency {
delay: f64::INFINITY,
backlog: 0.0,
output_arrival: ArrivalCurve::constant_rate(0.0),
effective_service: ServiceCurve::constant_rate(0.0),
contributions: vec![BoundContribution {
node_id: id,
delay: f64::INFINITY,
description: "empty race (deadlock)".to_string(),
}],
};
}
let child_results: Vec<NodeLatency> = children
.iter()
.map(|c| self.analyze_node(dag, *c, cache))
.collect();
let delay = child_results
.iter()
.map(|r| r.delay)
.fold(f64::INFINITY, f64::min);
let backlog = child_results
.iter()
.map(|r| r.backlog)
.fold(f64::INFINITY, f64::min);
let max_rate_child = child_results.iter().enumerate().max_by(|(_, a), (_, b)| {
a.effective_service
.asymptotic_rate()
.partial_cmp(&b.effective_service.asymptotic_rate())
.unwrap_or(std::cmp::Ordering::Equal)
});
let effective_service = max_rate_child.map_or_else(
|| ServiceCurve::constant_rate(0.0),
|(_, r)| r.effective_service.clone(),
);
let min_delay_child = child_results.iter().enumerate().min_by(|(_, a), (_, b)| {
a.delay
.partial_cmp(&b.delay)
.unwrap_or(std::cmp::Ordering::Equal)
});
let output_arrival = min_delay_child.map_or_else(
|| ArrivalCurve::constant_rate(0.0),
|(_, r)| r.output_arrival.clone(),
);
let winner_idx = child_results
.iter()
.enumerate()
.min_by(|(_, a), (_, b)| {
a.delay
.partial_cmp(&b.delay)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map_or(0, |(i, _)| i);
let mut contributions: Vec<BoundContribution> =
child_results[winner_idx].contributions.clone();
contributions.push(BoundContribution {
node_id: id,
delay,
description: format!(
"race of {} children: min delay={delay:.6}s (winner=child[{winner_idx}])",
children.len()
),
});
NodeLatency {
delay,
backlog,
output_arrival,
effective_service,
contributions,
}
}
fn analyze_timeout(
&self,
dag: &PlanDag,
id: PlanId,
child: PlanId,
duration: std::time::Duration,
cache: &mut BTreeMap<usize, NodeLatency>,
) -> NodeLatency {
let child_result = self.analyze_node(dag, child, cache);
let timeout_secs = duration.as_secs_f64();
let delay = child_result.delay.min(timeout_secs);
let backlog = if child_result.delay <= timeout_secs {
child_result.backlog
} else {
child_result
.backlog
.min(child_result.output_arrival.eval(timeout_secs))
};
let mut contributions = child_result.contributions;
contributions.push(BoundContribution {
node_id: id,
delay,
description: format!("timeout({timeout_secs:.6}s): capped delay={delay:.6}s"),
});
NodeLatency {
delay,
backlog,
output_arrival: child_result.output_arrival,
effective_service: child_result.effective_service,
contributions,
}
}
}
impl Default for LatencyAnalyzer {
fn default() -> Self {
Self::new()
}
}
#[must_use]
pub fn delay_bound(arrival: &ArrivalCurve, service: &ServiceCurve) -> f64 {
horizontal_deviation(arrival.curve(), service.curve())
}
#[must_use]
pub fn backlog_bound(arrival: &ArrivalCurve, service: &ServiceCurve) -> f64 {
vertical_deviation(arrival.curve(), service.curve())
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
const EPS: f64 = 1e-3;
fn approx_eq(a: f64, b: f64) -> bool {
(a - b).abs() < EPS || (a.is_infinite() && b.is_infinite() && a.signum() == b.signum())
}
#[test]
fn zero_curve_evaluates_to_zero() {
let c = PiecewiseLinearCurve::zero();
assert!(approx_eq(c.eval(0.0), 0.0));
assert!(approx_eq(c.eval(1.0), 0.0));
assert!(approx_eq(c.eval(100.0), 0.0));
}
#[test]
fn affine_curve_evaluates_correctly() {
let c = PiecewiseLinearCurve::affine(5.0, 10.0);
assert!(approx_eq(c.eval(0.0), 10.0));
assert!(approx_eq(c.eval(1.0), 15.0));
assert!(approx_eq(c.eval(2.0), 20.0));
assert!(approx_eq(c.eval(10.0), 60.0));
}
#[test]
fn rate_latency_curve_has_zero_before_latency() {
let c = PiecewiseLinearCurve::rate_latency(100.0, 0.5);
assert!(approx_eq(c.eval(0.0), 0.0));
assert!(approx_eq(c.eval(0.25), 0.0));
assert!(approx_eq(c.eval(0.5), 0.0));
assert!(approx_eq(c.eval(1.0), 50.0));
assert!(approx_eq(c.eval(1.5), 100.0));
}
#[test]
fn from_segments_rejects_unsorted() {
let segments = vec![Segment::new(1.0, 1.0, 0.0), Segment::new(0.0, 1.0, 0.0)];
assert!(PiecewiseLinearCurve::from_segments(segments).is_none());
}
#[test]
fn from_segments_rejects_negative_rate() {
let segments = vec![Segment::new(0.0, -1.0, 0.0)];
assert!(PiecewiseLinearCurve::from_segments(segments).is_none());
}
#[test]
fn from_segments_rejects_discontinuity() {
let segments = vec![
Segment::new(0.0, 1.0, 0.0),
Segment::new(5.0, 2.0, 10.0), ];
assert!(PiecewiseLinearCurve::from_segments(segments).is_none());
}
#[test]
fn from_segments_accepts_valid_continuous_curve() {
let segments = vec![Segment::new(0.0, 1.0, 0.0), Segment::new(1.0, 2.0, 1.0)];
let c = PiecewiseLinearCurve::from_segments(segments).unwrap();
assert!(approx_eq(c.eval(0.5), 0.5));
assert!(approx_eq(c.eval(1.0), 1.0));
assert!(approx_eq(c.eval(2.0), 3.0));
}
#[test]
fn negative_time_evaluates_to_zero() {
let c = PiecewiseLinearCurve::affine(1.0, 5.0);
assert!(approx_eq(c.eval(-1.0), 0.0));
assert!(approx_eq(c.eval(-100.0), 0.0));
}
#[test]
fn delay_bound_affine_token_bucket_rate_latency() {
let alpha = ArrivalCurve::token_bucket(100.0, 50.0);
let beta = ServiceCurve::rate_latency(200.0, 0.01);
let d = delay_bound(&alpha, &beta);
assert!(approx_eq(d, 0.26), "expected ~0.26, got {d}");
}
#[test]
fn delay_bound_equal_rates_burst_only() {
let alpha = ArrivalCurve::token_bucket(5.0, 10.0);
let beta = ServiceCurve::constant_rate(5.0);
let d = delay_bound(&alpha, &beta);
assert!(approx_eq(d, 2.0), "expected ~2.0, got {d}");
}
#[test]
fn delay_bound_unstable_system() {
let alpha = ArrivalCurve::constant_rate(200.0);
let beta = ServiceCurve::constant_rate(100.0);
let d = delay_bound(&alpha, &beta);
assert!(d.is_infinite(), "expected infinite, got {d}");
}
#[test]
fn delay_bound_zero_burst_zero_latency() {
let alpha = ArrivalCurve::constant_rate(50.0);
let beta = ServiceCurve::constant_rate(100.0);
let d = delay_bound(&alpha, &beta);
assert!(d < EPS, "expected ~0, got {d}");
}
#[test]
fn backlog_bound_token_bucket_rate_latency() {
let alpha = ArrivalCurve::token_bucket(100.0, 50.0);
let beta = ServiceCurve::rate_latency(200.0, 0.01);
let b = backlog_bound(&alpha, &beta);
assert!(approx_eq(b, 51.0), "expected ~51.0, got {b}");
}
#[test]
fn backlog_bound_unstable_system() {
let alpha = ArrivalCurve::constant_rate(200.0);
let beta = ServiceCurve::constant_rate(100.0);
let b = backlog_bound(&alpha, &beta);
assert!(b.is_infinite(), "expected infinite, got {b}");
}
#[test]
fn convolution_identity() {
let f = PiecewiseLinearCurve::affine(10.0, 0.0);
let g = PiecewiseLinearCurve::affine(10.0, 0.0);
let conv = min_plus_convolution(&f, &g);
assert!(
approx_eq(conv.eval(1.0), 10.0),
"expected ~10.0, got {}",
conv.eval(1.0)
);
}
#[test]
fn convolution_commutativity() {
let f = PiecewiseLinearCurve::affine(5.0, 10.0);
let g = PiecewiseLinearCurve::rate_latency(8.0, 0.5);
let fg = min_plus_convolution(&f, &g);
let gf = min_plus_convolution(&g, &f);
for &t in &[0.0, 0.5, 1.0, 2.0, 5.0, 10.0] {
assert!(
approx_eq(fg.eval(t), gf.eval(t)),
"commutativity failed at t={t}: fg={}, gf={}",
fg.eval(t),
gf.eval(t)
);
}
}
#[test]
fn convolution_rate_latency_sum() {
let beta1 = ServiceCurve::rate_latency(100.0, 0.1);
let beta2 = ServiceCurve::rate_latency(100.0, 0.2);
let combined = beta1.sequential(&beta2);
assert!(
approx_eq(combined.eval(0.3), 0.0),
"expected ~0.0 at t=0.3, got {}",
combined.eval(0.3)
);
assert!(
approx_eq(combined.eval(0.5), 20.0),
"expected ~20.0 at t=0.5, got {}",
combined.eval(0.5)
);
assert!(
approx_eq(combined.eval(1.0), 70.0),
"expected ~70.0 at t=1.0, got {}",
combined.eval(1.0)
);
}
#[test]
fn convolution_associativity() {
let f = PiecewiseLinearCurve::rate_latency(10.0, 0.1);
let g = PiecewiseLinearCurve::rate_latency(15.0, 0.2);
let h = PiecewiseLinearCurve::rate_latency(20.0, 0.05);
let fg_h = min_plus_convolution(&min_plus_convolution(&f, &g), &h);
let f_gh = min_plus_convolution(&f, &min_plus_convolution(&g, &h));
for &t in &[0.0, 0.5, 1.0, 2.0, 5.0] {
assert!(
approx_eq(fg_h.eval(t), f_gh.eval(t)),
"associativity failed at t={t}: (fg)h={}, f(gh)={}",
fg_h.eval(t),
f_gh.eval(t)
);
}
}
#[test]
fn service_curve_scaling() {
let beta = ServiceCurve::rate_latency(100.0, 0.01);
let scaled = beta.scale(3.0);
assert!(approx_eq(scaled.eval(0.01), 0.0));
assert!(approx_eq(scaled.eval(0.02), 3.0)); assert!(approx_eq(scaled.eval(0.11), 30.0)); }
#[test]
fn dag_join_takes_max_delay() {
let mut dag = PlanDag::new();
let a = dag.leaf("fast");
let b = dag.leaf("slow");
let joined = dag.join(vec![a, b]);
dag.set_root(joined);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
a,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 10.0),
ServiceCurve::rate_latency(200.0, 0.0),
),
);
analyzer.annotate(
b,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 50.0),
ServiceCurve::rate_latency(200.0, 0.01),
),
);
let analysis = analyzer.analyze(&dag);
let root_delay = analysis.end_to_end_delay().unwrap();
let slow_delay = analysis.delay_at(b).unwrap();
assert!(
approx_eq(root_delay, slow_delay),
"join delay {root_delay} should equal slow delay {slow_delay}"
);
assert!(
(root_delay - 0.26).abs() < 0.05,
"expected join delay ~0.26, got {root_delay}"
);
}
#[test]
fn dag_join_empty_children_is_zero() {
let mut dag = PlanDag::new();
let joined = dag.join(vec![]);
dag.set_root(joined);
let analyzer = LatencyAnalyzer::new();
let analysis = analyzer.analyze(&dag);
assert!(approx_eq(analysis.end_to_end_delay().unwrap(), 0.0));
}
#[test]
fn dag_race_takes_min_delay() {
let mut dag = PlanDag::new();
let a = dag.leaf("fast");
let b = dag.leaf("slow");
let raced = dag.race(vec![a, b]);
dag.set_root(raced);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
a,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 10.0),
ServiceCurve::rate_latency(200.0, 0.0),
),
);
analyzer.annotate(
b,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 50.0),
ServiceCurve::rate_latency(200.0, 0.01),
),
);
let analysis = analyzer.analyze(&dag);
let root_delay = analysis.end_to_end_delay().unwrap();
let fast_delay = analysis.delay_at(a).unwrap();
assert!(
approx_eq(root_delay, fast_delay),
"race delay {root_delay} should equal fast delay {fast_delay}"
);
assert!(
root_delay < 0.1,
"race delay should be small, got {root_delay}"
);
}
#[test]
fn dag_race_empty_children_is_infinite() {
let mut dag = PlanDag::new();
let raced = dag.race(vec![]);
dag.set_root(raced);
let analyzer = LatencyAnalyzer::new();
let analysis = analyzer.analyze(&dag);
assert!(analysis.end_to_end_delay().unwrap().is_infinite());
}
#[test]
fn dag_timeout_caps_delay() {
let mut dag = PlanDag::new();
let slow = dag.leaf("slow");
let timed = dag.timeout(slow, Duration::from_millis(100));
dag.set_root(timed);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
slow,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 100.0),
ServiceCurve::rate_latency(200.0, 0.5),
),
);
let analysis = analyzer.analyze(&dag);
let root_delay = analysis.end_to_end_delay().unwrap();
assert!(
root_delay <= 0.1 + EPS,
"timeout should cap delay to 0.1s, got {root_delay}"
);
}
#[test]
fn dag_timeout_passthrough_when_child_is_fast() {
let mut dag = PlanDag::new();
let fast = dag.leaf("fast");
let timed = dag.timeout(fast, Duration::from_secs(10));
dag.set_root(timed);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
fast,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 10.0),
ServiceCurve::rate_latency(200.0, 0.0),
),
);
let analysis = analyzer.analyze(&dag);
let root_delay = analysis.end_to_end_delay().unwrap();
let child_delay = analysis.delay_at(fast).unwrap();
assert!(
approx_eq(root_delay, child_delay),
"with generous timeout, delay should match child: root={root_delay}, child={child_delay}"
);
}
#[test]
fn nested_timeout_min_law() {
let mut dag = PlanDag::new();
let leaf = dag.leaf("task");
let inner_timeout = dag.timeout(leaf, Duration::from_millis(500));
let outer_timeout = dag.timeout(inner_timeout, Duration::from_millis(200));
dag.set_root(outer_timeout);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
leaf,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 200.0),
ServiceCurve::rate_latency(200.0, 1.0), ),
);
let analysis = analyzer.analyze(&dag);
let root_delay = analysis.end_to_end_delay().unwrap();
assert!(
root_delay <= 0.2 + EPS,
"nested timeouts should collapse to min: got {root_delay}"
);
}
#[test]
fn complex_dag_join_race_timeout() {
let mut dag = PlanDag::new();
let a = dag.leaf("a");
let b = dag.leaf("b");
let c = dag.leaf("c");
let race_ab = dag.race(vec![a, b]);
let timeout_c = dag.timeout(c, Duration::from_millis(500));
let root = dag.join(vec![race_ab, timeout_c]);
dag.set_root(root);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
a,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 10.0),
ServiceCurve::rate_latency(200.0, 0.0),
),
);
analyzer.annotate(
b,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 50.0),
ServiceCurve::rate_latency(200.0, 0.01),
),
);
analyzer.annotate(
c,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 200.0),
ServiceCurve::rate_latency(200.0, 1.0),
),
);
let analysis = analyzer.analyze(&dag);
let root_delay = analysis.end_to_end_delay().unwrap();
assert!(
(root_delay - 0.5).abs() < 0.1,
"expected root delay ~0.5, got {root_delay}"
);
}
#[test]
fn default_curves_used_for_unannotated_leaves() {
let mut dag = PlanDag::new();
let a = dag.leaf("annotated");
let b = dag.leaf("unannotated");
let joined = dag.join(vec![a, b]);
dag.set_root(joined);
let mut analyzer = LatencyAnalyzer::with_defaults(
ArrivalCurve::token_bucket(50.0, 20.0),
ServiceCurve::rate_latency(100.0, 0.005),
);
analyzer.annotate(
a,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 10.0),
ServiceCurve::rate_latency(200.0, 0.0),
),
);
let analysis = analyzer.analyze(&dag);
assert!(analysis.delay_at(a).unwrap().is_finite());
assert!(analysis.delay_at(b).unwrap().is_finite());
assert!(analysis.end_to_end_delay().unwrap().is_finite());
}
#[test]
fn missing_annotation_no_defaults_gives_infinity() {
let mut dag = PlanDag::new();
let a = dag.leaf("no_annotation");
dag.set_root(a);
let analyzer = LatencyAnalyzer::new();
let analysis = analyzer.analyze(&dag);
assert!(analysis.delay_at(a).unwrap().is_infinite());
}
#[test]
fn provenance_identifies_bottleneck() {
let mut dag = PlanDag::new();
let fast = dag.leaf("fast");
let slow = dag.leaf("slow");
let joined = dag.join(vec![fast, slow]);
dag.set_root(joined);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
fast,
NodeCurves::new(
ArrivalCurve::constant_rate(10.0),
ServiceCurve::constant_rate(100.0),
),
);
analyzer.annotate(
slow,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 500.0),
ServiceCurve::rate_latency(200.0, 0.1),
),
);
let analysis = analyzer.analyze(&dag);
let bound = analysis.root_bound.as_ref().unwrap();
assert!(!bound.provenance.is_empty());
let top = bound.bottleneck().unwrap();
assert!(
top.delay > 0.1,
"bottleneck delay should be significant, got {}",
top.delay
);
}
#[test]
fn latency_bound_display_format() {
let bound = LatencyBound {
delay: 0.123_456,
backlog: 42.0,
utilization: 0.75,
provenance: vec![BoundContribution {
node_id: PlanId::new(0),
delay: 0.123_456,
description: "test node".to_string(),
}],
};
let display = format!("{bound}");
assert!(display.contains("0.123456s"));
assert!(display.contains("42.00"));
assert!(display.contains("75.0%"));
assert!(display.contains("test node"));
}
#[test]
fn latency_bound_summary_stable() {
let bound = LatencyBound {
delay: 0.5,
backlog: 100.0,
utilization: 0.6,
provenance: vec![BoundContribution {
node_id: PlanId::new(3),
delay: 0.5,
description: "slow".to_string(),
}],
};
let s = bound.summary();
assert!(s.contains("delay<="));
assert!(s.contains("bottleneck=node[3]"));
}
#[test]
fn latency_bound_summary_unstable() {
let bound = LatencyBound {
delay: f64::INFINITY,
backlog: f64::INFINITY,
utilization: 1.5,
provenance: vec![],
};
let s = bound.summary();
assert!(s.contains("UNSTABLE"));
}
#[test]
fn node_curves_utilization() {
let curves = NodeCurves::new(
ArrivalCurve::constant_rate(80.0),
ServiceCurve::constant_rate(100.0),
);
assert!(approx_eq(curves.utilization(), 0.8));
}
#[test]
fn node_curves_utilization_zero_service() {
let curves = NodeCurves::new(
ArrivalCurve::constant_rate(80.0),
ServiceCurve::constant_rate(0.0),
);
assert!(curves.utilization().is_infinite());
}
#[test]
fn deconvolution_token_bucket_rate_latency() {
let alpha = PiecewiseLinearCurve::affine(100.0, 50.0); let beta = PiecewiseLinearCurve::rate_latency(200.0, 0.01);
let output = min_plus_deconvolution(&alpha, &beta);
let val_0 = output.eval(0.0);
assert!(
(val_0 - 51.0).abs() < 2.0,
"deconvolution at t=0: expected ~51, got {val_0}"
);
}
#[test]
fn staircase_curve_step_values() {
let c = PiecewiseLinearCurve::staircase(10.0, 1.0, 3);
assert!(c.eval(0.5) >= 9.0 && c.eval(0.5) <= 11.0);
assert!(c.eval(1.5) >= 19.0 && c.eval(1.5) <= 21.0);
}
#[test]
fn race_join_distributivity_delay() {
let mut dag_lhs = PlanDag::new();
let a1 = dag_lhs.leaf("a");
let b = dag_lhs.leaf("b");
let a2 = dag_lhs.leaf("a_copy");
let c = dag_lhs.leaf("c");
let lhs_join_with_b = dag_lhs.join(vec![a1, b]);
let lhs_join_with_c = dag_lhs.join(vec![a2, c]);
let race_root = dag_lhs.race(vec![lhs_join_with_b, lhs_join_with_c]);
dag_lhs.set_root(race_root);
let mut dag_rhs = PlanDag::new();
let a3 = dag_rhs.leaf("a");
let b2 = dag_rhs.leaf("b");
let c2 = dag_rhs.leaf("c");
let race_bc = dag_rhs.race(vec![b2, c2]);
let join_root = dag_rhs.join(vec![a3, race_bc]);
dag_rhs.set_root(join_root);
let a_curves = NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 30.0),
ServiceCurve::rate_latency(200.0, 0.005),
);
let b_curves = NodeCurves::new(
ArrivalCurve::token_bucket(80.0, 50.0),
ServiceCurve::rate_latency(150.0, 0.01),
);
let c_curves = NodeCurves::new(
ArrivalCurve::token_bucket(60.0, 10.0),
ServiceCurve::rate_latency(120.0, 0.002),
);
let mut analyzer_lhs = LatencyAnalyzer::new();
analyzer_lhs.annotate(a1, a_curves.clone());
analyzer_lhs.annotate(b, b_curves.clone());
analyzer_lhs.annotate(a2, a_curves.clone());
analyzer_lhs.annotate(c, c_curves.clone());
let mut analyzer_rhs = LatencyAnalyzer::new();
analyzer_rhs.annotate(a3, a_curves);
analyzer_rhs.annotate(b2, b_curves);
analyzer_rhs.annotate(c2, c_curves);
let lhs_delay = analyzer_lhs.analyze(&dag_lhs).end_to_end_delay().unwrap();
let rhs_delay = analyzer_rhs.analyze(&dag_rhs).end_to_end_delay().unwrap();
assert!(
lhs_delay <= rhs_delay + EPS,
"distributivity: lhs={lhs_delay} should be <= rhs={rhs_delay}"
);
}
#[test]
fn analysis_summary_format() {
let mut dag = PlanDag::new();
let a = dag.leaf("a");
dag.set_root(a);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
a,
NodeCurves::new(
ArrivalCurve::constant_rate(10.0),
ServiceCurve::constant_rate(20.0),
),
);
let analysis = analyzer.analyze(&dag);
let summary = analysis.summary();
assert!(summary.contains("nodes analyzed"));
}
#[test]
fn analysis_display_includes_all_nodes() {
let mut dag = PlanDag::new();
let a = dag.leaf("a");
let b = dag.leaf("b");
let joined = dag.join(vec![a, b]);
dag.set_root(joined);
let analyzer = LatencyAnalyzer::with_defaults(
ArrivalCurve::constant_rate(10.0),
ServiceCurve::constant_rate(20.0),
);
let analysis = analyzer.analyze(&dag);
let display = format!("{analysis}");
assert!(display.contains("node[0]"));
assert!(display.contains("node[1]"));
assert!(display.contains("node[2]"));
}
#[test]
fn single_leaf_dag_analysis() {
let mut dag = PlanDag::new();
let leaf = dag.leaf("single");
dag.set_root(leaf);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
leaf,
NodeCurves::new(
ArrivalCurve::token_bucket(50.0, 25.0),
ServiceCurve::rate_latency(100.0, 0.01),
),
);
let analysis = analyzer.analyze(&dag);
let delay = analysis.end_to_end_delay().unwrap();
assert!(
(delay - 0.26).abs() < 0.05,
"single leaf delay expected ~0.26, got {delay}"
);
}
#[test]
fn dag_with_no_root_has_no_bound() {
let mut dag = PlanDag::new();
let _a = dag.leaf("orphan");
let analyzer = LatencyAnalyzer::with_defaults(
ArrivalCurve::constant_rate(10.0),
ServiceCurve::constant_rate(20.0),
);
let analysis = analyzer.analyze(&dag);
assert!(analysis.root_bound.is_none());
assert!(!analysis.node_delays.is_empty());
}
#[test]
fn deeply_nested_timeouts() {
let mut dag = PlanDag::new();
let leaf = dag.leaf("task");
let t1 = dag.timeout(leaf, Duration::from_millis(50));
let t2 = dag.timeout(t1, Duration::from_millis(200));
let t3 = dag.timeout(t2, Duration::from_millis(100));
dag.set_root(t3);
let mut analyzer = LatencyAnalyzer::new();
analyzer.annotate(
leaf,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, 500.0),
ServiceCurve::rate_latency(200.0, 5.0), ),
);
let analysis = analyzer.analyze(&dag);
let delay = analysis.end_to_end_delay().unwrap();
assert!(
delay <= 0.05 + EPS,
"deeply nested timeouts should collapse to min(50ms): got {delay}"
);
}
#[test]
fn wide_join_many_children() {
let mut dag = PlanDag::new();
let children: Vec<PlanId> = (0..20).map(|i| dag.leaf(format!("child_{i}"))).collect();
let joined = dag.join(children.clone());
dag.set_root(joined);
let mut analyzer = LatencyAnalyzer::new();
for (i, &child) in children.iter().enumerate() {
let i_u32 = u32::try_from(i).expect("test uses small child index");
let burst = f64::from(i_u32).mul_add(5.0, 10.0);
analyzer.annotate(
child,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, burst),
ServiceCurve::rate_latency(200.0, 0.001),
),
);
}
let analysis = analyzer.analyze(&dag);
let delay = analysis.end_to_end_delay().unwrap();
assert!(delay.is_finite());
assert!(
delay > 0.4,
"wide join should be dominated by slowest child, got {delay}"
);
}
#[test]
fn wide_race_many_children() {
let mut dag = PlanDag::new();
let children: Vec<PlanId> = (0..20).map(|i| dag.leaf(format!("child_{i}"))).collect();
let raced = dag.race(children.clone());
dag.set_root(raced);
let mut analyzer = LatencyAnalyzer::new();
for (i, &child) in children.iter().enumerate() {
let i_u32 = u32::try_from(i).expect("test uses small child index");
let burst = f64::from(i_u32).mul_add(5.0, 10.0);
analyzer.annotate(
child,
NodeCurves::new(
ArrivalCurve::token_bucket(100.0, burst),
ServiceCurve::rate_latency(200.0, 0.001),
),
);
}
let analysis = analyzer.analyze(&dag);
let delay = analysis.end_to_end_delay().unwrap();
assert!(delay.is_finite());
assert!(
delay < 0.15,
"wide race should be dominated by fastest child, got {delay}"
);
}
#[test]
fn delay_monotonic_in_burst() {
let service = ServiceCurve::rate_latency(200.0, 0.01);
let d1 = delay_bound(&ArrivalCurve::token_bucket(100.0, 10.0), &service);
let d2 = delay_bound(&ArrivalCurve::token_bucket(100.0, 50.0), &service);
let d3 = delay_bound(&ArrivalCurve::token_bucket(100.0, 100.0), &service);
assert!(d1 <= d2 + EPS, "d1={d1} should be <= d2={d2}");
assert!(d2 <= d3 + EPS, "d2={d2} should be <= d3={d3}");
}
#[test]
fn delay_monotonic_in_latency() {
let arrival = ArrivalCurve::token_bucket(100.0, 50.0);
let d1 = delay_bound(&arrival, &ServiceCurve::rate_latency(200.0, 0.0));
let d2 = delay_bound(&arrival, &ServiceCurve::rate_latency(200.0, 0.1));
let d3 = delay_bound(&arrival, &ServiceCurve::rate_latency(200.0, 0.5));
assert!(d1 <= d2 + EPS, "d1={d1} should be <= d2={d2}");
assert!(d2 <= d3 + EPS, "d2={d2} should be <= d3={d3}");
}
#[test]
fn convolution_isotone() {
let f = PiecewiseLinearCurve::affine(5.0, 0.0);
let g = PiecewiseLinearCurve::affine(10.0, 0.0);
let h = PiecewiseLinearCurve::rate_latency(8.0, 0.1);
let fh = min_plus_convolution(&f, &h);
let gh = min_plus_convolution(&g, &h);
for &t in &[0.0, 0.5, 1.0, 2.0, 5.0] {
assert!(
fh.eval(t) <= gh.eval(t) + EPS,
"isotonicity failed at t={t}: fh={} > gh={}",
fh.eval(t),
gh.eval(t)
);
}
}
}