bb_runtime/framework/rtt_tracker.rs
1//! for adaptive deadlines on every wire round-trip.
2//!
3//! Sits alongside [`super::address_book::AddressBook`] in the
4//! framework. Keyed by [`crate::ids::NodeSiteId`] so a single
5//! physical peer hosting two logical sites (a fast ping handler +
6//! an async GPU compute handler) keeps independent EMAs.
7//!
8//! ## Hierarchical fallback for `estimate_budget_ns`
9//!
10//! When the engine needs a deadline for a Send to a site, it walks
11//! these tiers in order, stopping at the first warm hit:
12//!
13//! 1. Per-edge stats for `(site, chain_id, hop_index)` - exact
14//! match in this chain context.
15//! 2. Per-site aggregate Jacobson - every round-trip to this site
16//! feeds this EMA regardless of context.
17//! 3. Per-chain prior - refines the global "what's typical for the
18//! kind of topology this chain represents" based on any peer
19//! that's carried chain traffic before.
20//! 4. Global prior - every round-trip in the runtime feeds this
21//! EMA with a small learning rate.
22//! 5. Static `NodeConfig.per_hop_budget_ns` fallback.
23//!
24//! ## Reverse-path piggyback (consumed in Phase 3e-iii)
25//!
26//! On response landing, the runtime parses [`EdgeRttReport`] entries
27//! that downstream sites attached. Each report becomes a
28//! `reported_outgoing` entry on the caller's per-site
29//! [`RttTrackerEntry`] so multi-hop chain budgets can compose from
30//! one address-book entry per direct neighbor.
31
32use std::collections::HashMap;
33
34use crate::ids::NodeSiteId;
35
36/// Stable identifier for a chain. Hash of the compiler-stamped
37/// comma-separated `chain_targets` string. Producer and consumer
38/// derive the same value from the same string, so cross-site EMAs
39/// align without exchanging the raw chain composition.
40pub type ChainId = u64;
41
42/// Hash the compiler's `ai.bytesandbrains.wire.chain_targets`
43/// metadata value into a stable [`ChainId`]. The hash is FNV-1a
44/// over the raw bytes - fast, no allocations, deterministic across
45/// runs.
46pub fn chain_id_from_targets(chain_targets: &str) -> ChainId {
47 let mut hash: u64 = 0xcbf29ce484222325;
48 for b in chain_targets.as_bytes() {
49 hash ^= *b as u64;
50 hash = hash.wrapping_mul(0x100000001b3);
51 }
52 hash
53}
54
55/// Per-(chain, hop) refinement key.
56#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
57pub struct EdgeKey {
58 /// Hash of the compiler-stamped `chain_targets` CSV.
59 pub chain_id: ChainId,
60 /// Zero-based hop position in the chain.
61 pub hop_index: u8,
62}
63
64/// Jacobson/Karels RTT EMA: smoothed SRTT + smoothed RTTVAR with
65/// sample-count tracking. Mirrors RFC 6298 §2 with α = 1/8 and
66/// β = 1/4.
67///
68/// The deadline-derivation formula `SRTT + 4·RTTVAR` follows the
69/// canonical Karn/Partridge recommendation for retransmission
70/// timeout. [`Self::is_warm`] gates the per-tier fallback so cold
71/// EMAs (fewer than three samples) fall through to coarser priors.
72#[derive(Clone, Copy, Debug, Default)]
73pub struct RttEma {
74 /// Smoothed round-trip time, nanoseconds.
75 pub srtt_ns: u64,
76 /// Smoothed round-trip-time variance, nanoseconds.
77 pub rttvar_ns: u64,
78 /// Count of samples observed.
79 pub sample_count: u64,
80}
81
82impl RttEma {
83 /// Observe a round-trip sample. Updates SRTT + RTTVAR using
84 /// Jacobson's α = 1/8 / β = 1/4 weights.
85 pub fn observe(&mut self, sample_ns: u64) {
86 self.observe_with_alpha_beta(sample_ns, 3, 2);
87 }
88
89 /// Observe a round-trip sample with a smaller learning rate.
90 /// Used by the global prior to dampen the influence of any one
91 /// peer's bursty samples on the cross-runtime estimate.
92 ///
93 /// `alpha_shift` = log2(1/α), `beta_shift` = log2(1/β). Values
94 /// of (3, 2) match the Jacobson recommendation (α=1/8, β=1/4);
95 /// larger shifts (e.g. (6, 5)) make the EMA more conservative.
96 pub fn observe_with_alpha_beta(&mut self, sample_ns: u64, alpha_shift: u8, beta_shift: u8) {
97 if self.sample_count == 0 {
98 // RFC 6298 §2.2: first sample initializes SRTT = sample
99 // and RTTVAR = sample / 2.
100 self.srtt_ns = sample_ns;
101 self.rttvar_ns = sample_ns >> 1;
102 } else {
103 // RTTVAR ← (1 − β)·RTTVAR + β·|SRTT − sample|
104 let delta = sample_ns.abs_diff(self.srtt_ns);
105 let beta_div = 1u64 << beta_shift;
106 self.rttvar_ns =
107 self.rttvar_ns - (self.rttvar_ns >> beta_shift) + (delta >> beta_shift);
108 // SRTT ← (1 − α)·SRTT + α·sample
109 let alpha_div = 1u64 << alpha_shift;
110 self.srtt_ns =
111 self.srtt_ns - (self.srtt_ns >> alpha_shift) + (sample_ns >> alpha_shift);
112 let _ = beta_div;
113 let _ = alpha_div;
114 }
115 self.sample_count = self.sample_count.saturating_add(1);
116 }
117
118 /// Recommended budget: `SRTT + 4·RTTVAR` per RFC 6298 §2.3.
119 pub fn budget_ns(&self) -> u64 {
120 self.srtt_ns
121 .saturating_add(self.rttvar_ns.saturating_mul(4))
122 }
123
124 /// "Warm" once we have three samples - gates the fallback
125 /// hierarchy so very-fresh EMAs don't poison budgets.
126 pub fn is_warm(&self) -> bool {
127 self.sample_count >= 3
128 }
129}
130
131/// φ-accrual failure detector per direct chain neighbor. Heartbeat
132/// = any wire round-trip in the last window; rising φ indicates the
133/// peer is silent relative to its historical inter-arrival
134/// distribution.
135///
136/// Implementation per Hayashibara et al. - exponential
137/// approximation of the empirical inter-arrival distribution. The
138/// threshold defaults to 8 (Cassandra / Akka conservative).
139#[derive(Clone, Debug)]
140pub struct PhiAccrualState {
141 /// Recent inter-arrival times of heartbeats, nanoseconds.
142 ///
143 /// `VecDeque` so eviction at capacity is
144 /// O(1) `pop_front` instead of O(n) `Vec::remove(0)` memmove.
145 /// At `history_capacity = 1000` and per-heartbeat ingest, the
146 /// quadratic cost dominated when φ-accrual ran on dozens of
147 /// peers in steady state.
148 pub inter_arrival_history: std::collections::VecDeque<u64>,
149 /// Capacity of the rolling history.
150 pub history_capacity: usize,
151 /// Suspicion threshold; φ > this → peer is suspect.
152 pub threshold_phi: f64,
153 /// Hard-down threshold; φ > this → peer is down.
154 pub down_phi: f64,
155}
156
157impl Default for PhiAccrualState {
158 fn default() -> Self {
159 Self {
160 inter_arrival_history: std::collections::VecDeque::new(),
161 history_capacity: 1000,
162 threshold_phi: 8.0,
163 down_phi: 16.0,
164 }
165 }
166}
167
168impl PhiAccrualState {
169 /// Record a heartbeat at `now_ns` given the prior heartbeat at
170 /// `last_seen_at_ns`. The inter-arrival time enters the rolling
171 /// history (oldest sample evicted at capacity).
172 pub fn record_heartbeat(&mut self, now_ns: u64, last_seen_at_ns: u64) {
173 if last_seen_at_ns == 0 {
174 return;
175 }
176 let delta = now_ns.saturating_sub(last_seen_at_ns);
177 if self.inter_arrival_history.len() == self.history_capacity {
178 self.inter_arrival_history.pop_front();
179 }
180 self.inter_arrival_history.push_back(delta);
181 }
182
183 /// Compute current suspicion level φ. Returns 0.0 when no
184 /// history is available (i.e., no heartbeats yet - the peer is
185 /// assumed alive on the first contact).
186 pub fn phi(&self, now_ns: u64, last_seen_at_ns: u64) -> f64 {
187 if self.inter_arrival_history.is_empty() || last_seen_at_ns == 0 {
188 return 0.0;
189 }
190 let elapsed = now_ns.saturating_sub(last_seen_at_ns) as f64;
191 let sum: f64 = self.inter_arrival_history.iter().map(|&x| x as f64).sum();
192 let mean = sum / self.inter_arrival_history.len() as f64;
193 if mean <= 0.0 {
194 return 0.0;
195 }
196 // Exponential approximation: φ = -log10(P_later(elapsed)) =
197 // elapsed / (mean · ln(10)). Hayashibara §5.1.
198 elapsed / (mean * std::f64::consts::LN_10)
199 }
200
201 /// `true` once φ crosses [`Self::threshold_phi`].
202 pub fn is_suspect(&self, now_ns: u64, last_seen_at_ns: u64) -> bool {
203 self.phi(now_ns, last_seen_at_ns) > self.threshold_phi
204 }
205
206 /// `true` once φ crosses [`Self::down_phi`] (hard fail).
207 pub fn is_down(&self, now_ns: u64, last_seen_at_ns: u64) -> bool {
208 self.phi(now_ns, last_seen_at_ns) > self.down_phi
209 }
210}
211
212/// One AddressBook-side entry per logical site we've ever observed.
213#[derive(Default)]
214pub struct RttTrackerEntry {
215 /// Aggregate Jacobson over ALL round-trips to this site, any
216 /// context. Fed by data plane, control plane, handshakes,
217 /// anything using `Engine::wire_send_tracked`.
218 pub site_stats: RttEma,
219
220 /// Per-(chain, hop) refinement specific to this site.
221 pub per_edge_stats: HashMap<EdgeKey, RttEma>,
222
223 /// Reverse-path piggyback: this site told us about ITS outgoing
224 /// edges in chains, indexed by (next-hop, chain_id). Lets a
225 /// chain originator compose a multi-hop budget from one entry
226 /// per direct neighbor.
227 pub reported_outgoing: HashMap<(NodeSiteId, ChainId), RttEma>,
228
229 /// φ-accrual per direct neighbor.
230 pub phi_accrual: PhiAccrualState,
231
232 /// Timestamp of the most recent wire round-trip with this site,
233 /// nanoseconds since the engine clock epoch.
234 pub last_seen_at_ns: u64,
235
236 /// Timestamp of the most recent EMA update.
237 pub last_updated_at_ns: u64,
238
239 /// -v - last φ state surfaced
240 /// by [`RttTracker::scan_phi`]. The scan emits a transition only
241 /// when the state changes so subscribers don't get a `Suspect`
242 /// event every poll cycle while the site stays silent.
243 pub last_phi_state: PhiState,
244}
245
246/// Runtime-owned RTT tracker.
247///
248/// Sits alongside [`super::address_book::AddressBook`] in the
249/// framework. Every wire round-trip the engine observes (any
250/// protocol, any chain context) feeds [`Self::observe_round_trip`];
251/// every outgoing send queries [`Self::estimate_budget_ns`] for
252/// its deadline.
253#[derive(Default)]
254pub struct RttTracker {
255 entries: HashMap<NodeSiteId, RttTrackerEntry>,
256 /// Per-chain aggregate. Survives peer churn - even if every
257 /// peer hosting chain X gets evicted, future peers joining the
258 /// chain pick up this prior as their first-guess budget.
259 chain_priors: HashMap<ChainId, RttEma>,
260 /// Final fallback before the static `NodeConfig` default.
261 global_prior: RttEma,
262}
263
264/// Optional chain context the engine threads to
265/// [`RttTracker::estimate_budget_ns`] and
266/// [`RttTracker::observe_round_trip`].
267#[derive(Clone, Copy, Debug)]
268pub struct ChainContext {
269 /// Hash of the compiler-stamped `chain_targets` CSV.
270 pub chain_id: ChainId,
271 /// Zero-based hop position in the chain.
272 pub hop_index: u8,
273}
274
275impl RttTracker {
276 /// Fresh, empty tracker.
277 pub fn new() -> Self {
278 Self::default()
279 }
280
281 /// Hierarchical fallback: per-edge → per-site → per-chain →
282 /// global → static. First warm tier wins.
283 pub fn estimate_budget_ns(
284 &self,
285 site: NodeSiteId,
286 chain: Option<ChainContext>,
287 static_default_ns: u64,
288 ) -> u64 {
289 // Tier 1: per-edge exact match in this chain context.
290 if let (Some(ctx), Some(entry)) = (chain, self.entries.get(&site)) {
291 let key = EdgeKey {
292 chain_id: ctx.chain_id,
293 hop_index: ctx.hop_index,
294 };
295 if let Some(ema) = entry.per_edge_stats.get(&key) {
296 if ema.is_warm() {
297 return ema.budget_ns();
298 }
299 }
300 }
301 // Tier 2: per-site aggregate.
302 if let Some(entry) = self.entries.get(&site) {
303 if entry.site_stats.is_warm() {
304 return entry.site_stats.budget_ns();
305 }
306 }
307 // Tier 3: per-chain prior.
308 if let Some(ctx) = chain {
309 if let Some(prior) = self.chain_priors.get(&ctx.chain_id) {
310 if prior.is_warm() {
311 return prior.budget_ns();
312 }
313 }
314 }
315 // Tier 4: global prior.
316 if self.global_prior.is_warm() {
317 return self.global_prior.budget_ns();
318 }
319 // Tier 5: static default.
320 static_default_ns
321 }
322
323 /// Feed a round-trip sample. Updates per-site Jacobson EMA
324 /// always; per-edge + per-chain when chain context is present;
325 /// global prior always (with a smaller learning rate).
326 pub fn observe_round_trip(
327 &mut self,
328 site: NodeSiteId,
329 chain: Option<ChainContext>,
330 elapsed_ns: u64,
331 now_ns: u64,
332 ) {
333 let entry = self.entries.entry(site).or_default();
334 // Per-site aggregate.
335 entry.site_stats.observe(elapsed_ns);
336 entry.last_updated_at_ns = now_ns;
337 // φ-accrual heartbeat
338 entry
339 .phi_accrual
340 .record_heartbeat(now_ns, entry.last_seen_at_ns);
341 entry.last_seen_at_ns = now_ns;
342
343 // Per-edge + per-chain refinement.
344 if let Some(ctx) = chain {
345 let key = EdgeKey {
346 chain_id: ctx.chain_id,
347 hop_index: ctx.hop_index,
348 };
349 entry
350 .per_edge_stats
351 .entry(key)
352 .or_default()
353 .observe(elapsed_ns);
354 self.chain_priors
355 .entry(ctx.chain_id)
356 .or_default()
357 .observe(elapsed_ns);
358 }
359
360 // Global prior - small learning rate so noisy samples don't
361 // dominate.
362 self.global_prior.observe_with_alpha_beta(elapsed_ns, 6, 5);
363 }
364
365 /// Ingest a reverse-path piggyback report - a downstream site
366 /// telling us about ITS outgoing edge to `next_hop` in chain
367 /// `chain_id`.
368 pub fn ingest_reported_outgoing(
369 &mut self,
370 from_site: NodeSiteId,
371 next_hop: NodeSiteId,
372 chain_id: ChainId,
373 srtt_ns: u64,
374 rttvar_ns: u64,
375 sample_count: u64,
376 ) {
377 let entry = self.entries.entry(from_site).or_default();
378 let report = entry
379 .reported_outgoing
380 .entry((next_hop, chain_id))
381 .or_default();
382 report.srtt_ns = srtt_ns;
383 report.rttvar_ns = rttvar_ns;
384 report.sample_count = sample_count;
385 }
386
387 /// Read-only access to a per-site entry. Returns `None` when no
388 /// round-trip with the site has been observed.
389 pub fn entry(&self, site: NodeSiteId) -> Option<&RttTrackerEntry> {
390 self.entries.get(&site)
391 }
392
393 /// Read-only access to the per-chain prior.
394 pub fn chain_prior(&self, chain_id: ChainId) -> Option<&RttEma> {
395 self.chain_priors.get(&chain_id)
396 }
397
398 /// Read-only access to the global prior.
399 pub fn global_prior(&self) -> &RttEma {
400 &self.global_prior
401 }
402
403 /// Snapshot of every site currently tracked.
404 pub fn tracked_sites(&self) -> impl Iterator<Item = NodeSiteId> + '_ {
405 self.entries.keys().copied()
406 }
407
408 /// -v - scan φ-accrual states
409 /// at the current engine clock and surface state transitions.
410 /// Returns one entry per tracked site whose suspicion level
411 /// changed since the last scan: `PhiTransition::Suspect`,
412 /// `Down`, or `Live` (after a previous `Suspect`/`Down` resolves).
413 ///
414 /// The tracker keeps a per-site `last_phi_state` ratchet so
415 /// repeat scans don't re-emit the same event every poll cycle.
416 pub fn scan_phi(&mut self, now_ns: u64) -> Vec<PhiTransition> {
417 let mut transitions = Vec::new();
418 for (&site, entry) in self.entries.iter_mut() {
419 let phi = entry.phi_accrual.phi(now_ns, entry.last_seen_at_ns);
420 let new_state = if entry.phi_accrual.is_down(now_ns, entry.last_seen_at_ns) {
421 PhiState::Down
422 } else if entry.phi_accrual.is_suspect(now_ns, entry.last_seen_at_ns) {
423 PhiState::Suspect
424 } else {
425 PhiState::Live
426 };
427 if new_state != entry.last_phi_state {
428 transitions.push(match new_state {
429 PhiState::Live => PhiTransition::Live { site },
430 PhiState::Suspect => PhiTransition::Suspect { site, phi },
431 PhiState::Down => PhiTransition::Down { site, phi },
432 });
433 entry.last_phi_state = new_state;
434 }
435 }
436 transitions
437 }
438}
439
440/// Discrete states tracked by [`RttTracker::scan_phi`] for each
441/// per-site φ-accrual detector.
442#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
443pub enum PhiState {
444 /// Default - peer is healthy.
445 #[default]
446 Live,
447 /// φ crossed the suspect threshold.
448 Suspect,
449 /// φ crossed the hard-down threshold.
450 Down,
451}
452
453/// State transitions surfaced by [`RttTracker::scan_phi`]. The
454/// engine maps each entry onto a bus
455/// [`crate::bus::InfraEvent::PeerSuspect`] /
456/// [`crate::bus::InfraEvent::PeerDown`] /
457/// [`crate::bus::InfraEvent::PeerLive`].
458#[derive(Clone, Copy, Debug)]
459pub enum PhiTransition {
460 /// Site recovered (φ collapsed below suspect threshold).
461 Live {
462 /// Per-Node site whose φ-accrual detector dropped back below the suspect threshold.
463 site: NodeSiteId,
464 },
465 /// Site crossed the suspect threshold.
466 Suspect {
467 /// Per-Node site whose φ-accrual detector crossed into the suspect band.
468 site: NodeSiteId,
469 /// φ value at the moment the suspect threshold was crossed.
470 phi: f64,
471 },
472 /// Site crossed the hard-down threshold.
473 Down {
474 /// Per-Node site whose φ-accrual detector crossed into the hard-down band.
475 site: NodeSiteId,
476 /// φ value at the moment the hard-down threshold was crossed.
477 phi: f64,
478 },
479}
480