Skip to main content

bb_runtime/framework/
rtt_tracker.rs

1//! for adaptive deadlines on every wire round-trip.
2//!
3//! Sits alongside [`super::address_book::AddressBook`] in the
4//! framework. Keyed by [`crate::ids::NodeSiteId`] so a single
5//! physical peer hosting two logical sites (a fast ping handler +
6//! an async GPU compute handler) keeps independent EMAs.
7//!
8//! ## Hierarchical fallback for `estimate_budget_ns`
9//!
10//! When the engine needs a deadline for a Send to a site, it walks
11//! these tiers in order, stopping at the first warm hit:
12//!
13//! 1. Per-edge stats for `(site, chain_id, hop_index)` - exact
14//!    match in this chain context.
15//! 2. Per-site aggregate Jacobson - every round-trip to this site
16//!    feeds this EMA regardless of context.
17//! 3. Per-chain prior - refines the global "what's typical for the
18//!    kind of topology this chain represents" based on any peer
19//!    that's carried chain traffic before.
20//! 4. Global prior - every round-trip in the runtime feeds this
21//!    EMA with a small learning rate.
22//! 5. Static `NodeConfig.per_hop_budget_ns` fallback.
23//!
24//! ## Reverse-path piggyback (consumed in Phase 3e-iii)
25//!
26//! On response landing, the runtime parses [`EdgeRttReport`] entries
27//! that downstream sites attached. Each report becomes a
28//! `reported_outgoing` entry on the caller's per-site
29//! [`RttTrackerEntry`] so multi-hop chain budgets can compose from
30//! one address-book entry per direct neighbor.
31
32use std::collections::HashMap;
33
34use crate::ids::NodeSiteId;
35
36/// Stable identifier for a chain. Hash of the compiler-stamped
37/// comma-separated `chain_targets` string. Producer and consumer
38/// derive the same value from the same string, so cross-site EMAs
39/// align without exchanging the raw chain composition.
40pub type ChainId = u64;
41
42/// Hash the compiler's `ai.bytesandbrains.wire.chain_targets`
43/// metadata value into a stable [`ChainId`]. The hash is FNV-1a
44/// over the raw bytes - fast, no allocations, deterministic across
45/// runs.
46pub fn chain_id_from_targets(chain_targets: &str) -> ChainId {
47    let mut hash: u64 = 0xcbf29ce484222325;
48    for b in chain_targets.as_bytes() {
49        hash ^= *b as u64;
50        hash = hash.wrapping_mul(0x100000001b3);
51    }
52    hash
53}
54
55/// Per-(chain, hop) refinement key.
56#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
57pub struct EdgeKey {
58    /// Hash of the compiler-stamped `chain_targets` CSV.
59    pub chain_id: ChainId,
60    /// Zero-based hop position in the chain.
61    pub hop_index: u8,
62}
63
64/// Jacobson/Karels RTT EMA: smoothed SRTT + smoothed RTTVAR with
65/// sample-count tracking. Mirrors RFC 6298 §2 with α = 1/8 and
66/// β = 1/4.
67///
68/// The deadline-derivation formula `SRTT + 4·RTTVAR` follows the
69/// canonical Karn/Partridge recommendation for retransmission
70/// timeout. [`Self::is_warm`] gates the per-tier fallback so cold
71/// EMAs (fewer than three samples) fall through to coarser priors.
72#[derive(Clone, Copy, Debug, Default)]
73pub struct RttEma {
74    /// Smoothed round-trip time, nanoseconds.
75    pub srtt_ns: u64,
76    /// Smoothed round-trip-time variance, nanoseconds.
77    pub rttvar_ns: u64,
78    /// Count of samples observed.
79    pub sample_count: u64,
80}
81
82impl RttEma {
83    /// Observe a round-trip sample. Updates SRTT + RTTVAR using
84    /// Jacobson's α = 1/8 / β = 1/4 weights.
85    pub fn observe(&mut self, sample_ns: u64) {
86        self.observe_with_alpha_beta(sample_ns, 3, 2);
87    }
88
89    /// Observe a round-trip sample with a smaller learning rate.
90    /// Used by the global prior to dampen the influence of any one
91    /// peer's bursty samples on the cross-runtime estimate.
92    ///
93    /// `alpha_shift` = log2(1/α), `beta_shift` = log2(1/β). Values
94    /// of (3, 2) match the Jacobson recommendation (α=1/8, β=1/4);
95    /// larger shifts (e.g. (6, 5)) make the EMA more conservative.
96    pub fn observe_with_alpha_beta(&mut self, sample_ns: u64, alpha_shift: u8, beta_shift: u8) {
97        if self.sample_count == 0 {
98            // RFC 6298 §2.2: first sample initializes SRTT = sample
99            // and RTTVAR = sample / 2.
100            self.srtt_ns = sample_ns;
101            self.rttvar_ns = sample_ns >> 1;
102        } else {
103            // RTTVAR ← (1 − β)·RTTVAR + β·|SRTT − sample|
104            let delta = sample_ns.abs_diff(self.srtt_ns);
105            let beta_div = 1u64 << beta_shift;
106            self.rttvar_ns =
107                self.rttvar_ns - (self.rttvar_ns >> beta_shift) + (delta >> beta_shift);
108            // SRTT ← (1 − α)·SRTT + α·sample
109            let alpha_div = 1u64 << alpha_shift;
110            self.srtt_ns =
111                self.srtt_ns - (self.srtt_ns >> alpha_shift) + (sample_ns >> alpha_shift);
112            let _ = beta_div;
113            let _ = alpha_div;
114        }
115        self.sample_count = self.sample_count.saturating_add(1);
116    }
117
118    /// Recommended budget: `SRTT + 4·RTTVAR` per RFC 6298 §2.3.
119    pub fn budget_ns(&self) -> u64 {
120        self.srtt_ns
121            .saturating_add(self.rttvar_ns.saturating_mul(4))
122    }
123
124    /// "Warm" once we have three samples - gates the fallback
125    /// hierarchy so very-fresh EMAs don't poison budgets.
126    pub fn is_warm(&self) -> bool {
127        self.sample_count >= 3
128    }
129}
130
131/// φ-accrual failure detector per direct chain neighbor. Heartbeat
132/// = any wire round-trip in the last window; rising φ indicates the
133/// peer is silent relative to its historical inter-arrival
134/// distribution.
135///
136/// Implementation per Hayashibara et al. - exponential
137/// approximation of the empirical inter-arrival distribution. The
138/// threshold defaults to 8 (Cassandra / Akka conservative).
139#[derive(Clone, Debug)]
140pub struct PhiAccrualState {
141    /// Recent inter-arrival times of heartbeats, nanoseconds.
142    ///
143    /// `VecDeque` so eviction at capacity is
144    /// O(1) `pop_front` instead of O(n) `Vec::remove(0)` memmove.
145    /// At `history_capacity = 1000` and per-heartbeat ingest, the
146    /// quadratic cost dominated when φ-accrual ran on dozens of
147    /// peers in steady state.
148    pub inter_arrival_history: std::collections::VecDeque<u64>,
149    /// Capacity of the rolling history.
150    pub history_capacity: usize,
151    /// Suspicion threshold; φ > this → peer is suspect.
152    pub threshold_phi: f64,
153    /// Hard-down threshold; φ > this → peer is down.
154    pub down_phi: f64,
155}
156
157impl Default for PhiAccrualState {
158    fn default() -> Self {
159        Self {
160            inter_arrival_history: std::collections::VecDeque::new(),
161            history_capacity: 1000,
162            threshold_phi: 8.0,
163            down_phi: 16.0,
164        }
165    }
166}
167
168impl PhiAccrualState {
169    /// Record a heartbeat at `now_ns` given the prior heartbeat at
170    /// `last_seen_at_ns`. The inter-arrival time enters the rolling
171    /// history (oldest sample evicted at capacity).
172    pub fn record_heartbeat(&mut self, now_ns: u64, last_seen_at_ns: u64) {
173        if last_seen_at_ns == 0 {
174            return;
175        }
176        let delta = now_ns.saturating_sub(last_seen_at_ns);
177        if self.inter_arrival_history.len() == self.history_capacity {
178            self.inter_arrival_history.pop_front();
179        }
180        self.inter_arrival_history.push_back(delta);
181    }
182
183    /// Compute current suspicion level φ. Returns 0.0 when no
184    /// history is available (i.e., no heartbeats yet - the peer is
185    /// assumed alive on the first contact).
186    pub fn phi(&self, now_ns: u64, last_seen_at_ns: u64) -> f64 {
187        if self.inter_arrival_history.is_empty() || last_seen_at_ns == 0 {
188            return 0.0;
189        }
190        let elapsed = now_ns.saturating_sub(last_seen_at_ns) as f64;
191        let sum: f64 = self.inter_arrival_history.iter().map(|&x| x as f64).sum();
192        let mean = sum / self.inter_arrival_history.len() as f64;
193        if mean <= 0.0 {
194            return 0.0;
195        }
196        // Exponential approximation: φ = -log10(P_later(elapsed)) =
197        // elapsed / (mean · ln(10)). Hayashibara §5.1.
198        elapsed / (mean * std::f64::consts::LN_10)
199    }
200
201    /// `true` once φ crosses [`Self::threshold_phi`].
202    pub fn is_suspect(&self, now_ns: u64, last_seen_at_ns: u64) -> bool {
203        self.phi(now_ns, last_seen_at_ns) > self.threshold_phi
204    }
205
206    /// `true` once φ crosses [`Self::down_phi`] (hard fail).
207    pub fn is_down(&self, now_ns: u64, last_seen_at_ns: u64) -> bool {
208        self.phi(now_ns, last_seen_at_ns) > self.down_phi
209    }
210}
211
212/// One AddressBook-side entry per logical site we've ever observed.
213#[derive(Default)]
214pub struct RttTrackerEntry {
215    /// Aggregate Jacobson over ALL round-trips to this site, any
216    /// context. Fed by data plane, control plane, handshakes,
217    /// anything using `Engine::wire_send_tracked`.
218    pub site_stats: RttEma,
219
220    /// Per-(chain, hop) refinement specific to this site.
221    pub per_edge_stats: HashMap<EdgeKey, RttEma>,
222
223    /// Reverse-path piggyback: this site told us about ITS outgoing
224    /// edges in chains, indexed by (next-hop, chain_id). Lets a
225    /// chain originator compose a multi-hop budget from one entry
226    /// per direct neighbor.
227    pub reported_outgoing: HashMap<(NodeSiteId, ChainId), RttEma>,
228
229    /// φ-accrual per direct neighbor.
230    pub phi_accrual: PhiAccrualState,
231
232    /// Timestamp of the most recent wire round-trip with this site,
233    /// nanoseconds since the engine clock epoch.
234    pub last_seen_at_ns: u64,
235
236    /// Timestamp of the most recent EMA update.
237    pub last_updated_at_ns: u64,
238
239    /// -v - last φ state surfaced
240    /// by [`RttTracker::scan_phi`]. The scan emits a transition only
241    /// when the state changes so subscribers don't get a `Suspect`
242    /// event every poll cycle while the site stays silent.
243    pub last_phi_state: PhiState,
244}
245
246/// Runtime-owned RTT tracker.
247///
248/// Sits alongside [`super::address_book::AddressBook`] in the
249/// framework. Every wire round-trip the engine observes (any
250/// protocol, any chain context) feeds [`Self::observe_round_trip`];
251/// every outgoing send queries [`Self::estimate_budget_ns`] for
252/// its deadline.
253#[derive(Default)]
254pub struct RttTracker {
255    entries: HashMap<NodeSiteId, RttTrackerEntry>,
256    /// Per-chain aggregate. Survives peer churn - even if every
257    /// peer hosting chain X gets evicted, future peers joining the
258    /// chain pick up this prior as their first-guess budget.
259    chain_priors: HashMap<ChainId, RttEma>,
260    /// Final fallback before the static `NodeConfig` default.
261    global_prior: RttEma,
262}
263
264/// Optional chain context the engine threads to
265/// [`RttTracker::estimate_budget_ns`] and
266/// [`RttTracker::observe_round_trip`].
267#[derive(Clone, Copy, Debug)]
268pub struct ChainContext {
269    /// Hash of the compiler-stamped `chain_targets` CSV.
270    pub chain_id: ChainId,
271    /// Zero-based hop position in the chain.
272    pub hop_index: u8,
273}
274
275impl RttTracker {
276    /// Fresh, empty tracker.
277    pub fn new() -> Self {
278        Self::default()
279    }
280
281    /// Hierarchical fallback: per-edge → per-site → per-chain →
282    /// global → static. First warm tier wins.
283    pub fn estimate_budget_ns(
284        &self,
285        site: NodeSiteId,
286        chain: Option<ChainContext>,
287        static_default_ns: u64,
288    ) -> u64 {
289        // Tier 1: per-edge exact match in this chain context.
290        if let (Some(ctx), Some(entry)) = (chain, self.entries.get(&site)) {
291            let key = EdgeKey {
292                chain_id: ctx.chain_id,
293                hop_index: ctx.hop_index,
294            };
295            if let Some(ema) = entry.per_edge_stats.get(&key) {
296                if ema.is_warm() {
297                    return ema.budget_ns();
298                }
299            }
300        }
301        // Tier 2: per-site aggregate.
302        if let Some(entry) = self.entries.get(&site) {
303            if entry.site_stats.is_warm() {
304                return entry.site_stats.budget_ns();
305            }
306        }
307        // Tier 3: per-chain prior.
308        if let Some(ctx) = chain {
309            if let Some(prior) = self.chain_priors.get(&ctx.chain_id) {
310                if prior.is_warm() {
311                    return prior.budget_ns();
312                }
313            }
314        }
315        // Tier 4: global prior.
316        if self.global_prior.is_warm() {
317            return self.global_prior.budget_ns();
318        }
319        // Tier 5: static default.
320        static_default_ns
321    }
322
323    /// Feed a round-trip sample. Updates per-site Jacobson EMA
324    /// always; per-edge + per-chain when chain context is present;
325    /// global prior always (with a smaller learning rate).
326    pub fn observe_round_trip(
327        &mut self,
328        site: NodeSiteId,
329        chain: Option<ChainContext>,
330        elapsed_ns: u64,
331        now_ns: u64,
332    ) {
333        let entry = self.entries.entry(site).or_default();
334        // Per-site aggregate.
335        entry.site_stats.observe(elapsed_ns);
336        entry.last_updated_at_ns = now_ns;
337        // φ-accrual heartbeat
338        entry
339            .phi_accrual
340            .record_heartbeat(now_ns, entry.last_seen_at_ns);
341        entry.last_seen_at_ns = now_ns;
342
343        // Per-edge + per-chain refinement.
344        if let Some(ctx) = chain {
345            let key = EdgeKey {
346                chain_id: ctx.chain_id,
347                hop_index: ctx.hop_index,
348            };
349            entry
350                .per_edge_stats
351                .entry(key)
352                .or_default()
353                .observe(elapsed_ns);
354            self.chain_priors
355                .entry(ctx.chain_id)
356                .or_default()
357                .observe(elapsed_ns);
358        }
359
360        // Global prior - small learning rate so noisy samples don't
361        // dominate.
362        self.global_prior.observe_with_alpha_beta(elapsed_ns, 6, 5);
363    }
364
365    /// Ingest a reverse-path piggyback report - a downstream site
366    /// telling us about ITS outgoing edge to `next_hop` in chain
367    /// `chain_id`.
368    pub fn ingest_reported_outgoing(
369        &mut self,
370        from_site: NodeSiteId,
371        next_hop: NodeSiteId,
372        chain_id: ChainId,
373        srtt_ns: u64,
374        rttvar_ns: u64,
375        sample_count: u64,
376    ) {
377        let entry = self.entries.entry(from_site).or_default();
378        let report = entry
379            .reported_outgoing
380            .entry((next_hop, chain_id))
381            .or_default();
382        report.srtt_ns = srtt_ns;
383        report.rttvar_ns = rttvar_ns;
384        report.sample_count = sample_count;
385    }
386
387    /// Read-only access to a per-site entry. Returns `None` when no
388    /// round-trip with the site has been observed.
389    pub fn entry(&self, site: NodeSiteId) -> Option<&RttTrackerEntry> {
390        self.entries.get(&site)
391    }
392
393    /// Read-only access to the per-chain prior.
394    pub fn chain_prior(&self, chain_id: ChainId) -> Option<&RttEma> {
395        self.chain_priors.get(&chain_id)
396    }
397
398    /// Read-only access to the global prior.
399    pub fn global_prior(&self) -> &RttEma {
400        &self.global_prior
401    }
402
403    /// Snapshot of every site currently tracked.
404    pub fn tracked_sites(&self) -> impl Iterator<Item = NodeSiteId> + '_ {
405        self.entries.keys().copied()
406    }
407
408    /// -v - scan φ-accrual states
409    /// at the current engine clock and surface state transitions.
410    /// Returns one entry per tracked site whose suspicion level
411    /// changed since the last scan: `PhiTransition::Suspect`,
412    /// `Down`, or `Live` (after a previous `Suspect`/`Down` resolves).
413    ///
414    /// The tracker keeps a per-site `last_phi_state` ratchet so
415    /// repeat scans don't re-emit the same event every poll cycle.
416    pub fn scan_phi(&mut self, now_ns: u64) -> Vec<PhiTransition> {
417        let mut transitions = Vec::new();
418        for (&site, entry) in self.entries.iter_mut() {
419            let phi = entry.phi_accrual.phi(now_ns, entry.last_seen_at_ns);
420            let new_state = if entry.phi_accrual.is_down(now_ns, entry.last_seen_at_ns) {
421                PhiState::Down
422            } else if entry.phi_accrual.is_suspect(now_ns, entry.last_seen_at_ns) {
423                PhiState::Suspect
424            } else {
425                PhiState::Live
426            };
427            if new_state != entry.last_phi_state {
428                transitions.push(match new_state {
429                    PhiState::Live => PhiTransition::Live { site },
430                    PhiState::Suspect => PhiTransition::Suspect { site, phi },
431                    PhiState::Down => PhiTransition::Down { site, phi },
432                });
433                entry.last_phi_state = new_state;
434            }
435        }
436        transitions
437    }
438}
439
440/// Discrete states tracked by [`RttTracker::scan_phi`] for each
441/// per-site φ-accrual detector.
442#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
443pub enum PhiState {
444    /// Default - peer is healthy.
445    #[default]
446    Live,
447    /// φ crossed the suspect threshold.
448    Suspect,
449    /// φ crossed the hard-down threshold.
450    Down,
451}
452
453/// State transitions surfaced by [`RttTracker::scan_phi`]. The
454/// engine maps each entry onto a bus
455/// [`crate::bus::InfraEvent::PeerSuspect`] /
456/// [`crate::bus::InfraEvent::PeerDown`] /
457/// [`crate::bus::InfraEvent::PeerLive`].
458#[derive(Clone, Copy, Debug)]
459pub enum PhiTransition {
460    /// Site recovered (φ collapsed below suspect threshold).
461    Live {
462        /// Per-Node site whose φ-accrual detector dropped back below the suspect threshold.
463        site: NodeSiteId,
464    },
465    /// Site crossed the suspect threshold.
466    Suspect {
467        /// Per-Node site whose φ-accrual detector crossed into the suspect band.
468        site: NodeSiteId,
469        /// φ value at the moment the suspect threshold was crossed.
470        phi: f64,
471    },
472    /// Site crossed the hard-down threshold.
473    Down {
474        /// Per-Node site whose φ-accrual detector crossed into the hard-down band.
475        site: NodeSiteId,
476        /// φ value at the moment the hard-down threshold was crossed.
477        phi: f64,
478    },
479}
480