Skip to main content

bb_runtime/framework/
request_tracker.rs

1//! `RequestTracker` - mint correlation tokens for `CorrelateTag` and
2//! track in-flight wire requests.
3//!
4//! 7 token minter with an `in_flight` map keyed by `wire_req_id`.
5//! `register_in_flight` records the dispatch-time clock + target
6//! site + (optional) chain context; `observe_response` pops the
7//! entry on the matching response landing and surfaces the data the
8//! [`crate::framework::rtt_tracker::RttTracker`] needs to update its
9//! EMAs.
10
11use std::collections::HashMap;
12
13use crate::ids::{CommandId, NodeSiteId};
14
15use crate::framework::rtt_tracker::ChainContext;
16
17/// Bookkeeping for a wire round-trip in flight.
18#[derive(Clone, Copy, Debug)]
19pub struct InFlightSend {
20    /// Engine-clock timestamp when the send was dispatched.
21    pub started_at_ns: u64,
22    /// Engine-clock timestamp at which this entry should be evicted
23    /// as timed-out. Zero means "no TTL — never evict on age".
24    pub expires_at_ns: u64,
25    /// Destination logical site, derived from the resolved PeerId.
26    pub target_site: NodeSiteId,
27    /// Optional compiler-stamped chain context.
28    pub chain: Option<ChainContext>,
29    /// `CommandId` of the originator's local op parked waiting for
30    /// the chain's response. When `drain_stale` evicts on TTL, the
31    /// engine routes a `WireTimeout` failure through this CommandId
32    /// so the parked continuation unsticks. `None` for entries
33    /// registered outside an async-suspension context (e.g.
34    /// fire-and-forget Sends).
35    pub parked_op: Option<crate::ids::CommandId>,
36}
37
38/// Round-trip sample surfaced by [`RequestTracker::observe_response`].
39#[derive(Clone, Copy, Debug)]
40pub struct RoundTripSample {
41    /// Destination site for the round trip.
42    pub target_site: NodeSiteId,
43    /// Chain context if one was recorded at dispatch.
44    pub chain: Option<ChainContext>,
45    /// Elapsed wall-clock time, nanoseconds.
46    pub elapsed_ns: u64,
47}
48
49/// Monotonically-increasing token minter + in-flight tracker.
50#[derive(Default)]
51pub struct RequestTracker {
52    next_token: u64,
53    in_flight: HashMap<u64, InFlightSend>,
54}
55
56impl RequestTracker {
57    /// Construct fresh with token counter at 0.
58    pub fn new() -> Self {
59        Self::default()
60    }
61
62    /// Mint a fresh correlation token (used by `CorrelateTag` +
63    /// `wire::Send` to identify the round trip).
64    pub fn mint_token(&mut self) -> CommandId {
65        let token = self.next_token;
66        self.next_token = self.next_token.saturating_add(1);
67        CommandId::from(token)
68    }
69
70    /// Record an outbound wire round-trip's dispatch-time
71    /// bookkeeping. `wire_req_id` is the token returned by
72    /// [`Self::mint_token`]; reused on the response side to match.
73    /// `ttl_ns` is the maximum age the entry will survive before
74    /// the tracker considers it stale (typed as
75    /// `NonZeroU64` so callers cannot accidentally register an
76    /// entry that never expires, which was the only failure mode
77    /// the `RequestTracker.in_flight` map had as an unbounded
78    /// resource). `parked_op` is the CommandId of the originator's
79    /// local op parked waiting for the chain response; `None` for
80    /// entries registered outside an async-suspension context.
81    pub fn register_in_flight(
82        &mut self,
83        wire_req_id: u64,
84        started_at_ns: u64,
85        target_site: NodeSiteId,
86        chain: Option<ChainContext>,
87        ttl_ns: std::num::NonZeroU64,
88        parked_op: Option<crate::ids::CommandId>,
89    ) {
90        let expires_at_ns = started_at_ns.saturating_add(ttl_ns.get());
91        self.in_flight.insert(
92            wire_req_id,
93            InFlightSend {
94                started_at_ns,
95                expires_at_ns,
96                target_site,
97                chain,
98                parked_op,
99            },
100        );
101    }
102
103    /// Pop the in-flight entry for a landing response and surface
104    /// the elapsed-time sample the RTT tracker should observe.
105    /// Returns `None` when no matching in-flight entry exists (the
106    /// response is unsolicited, duplicate, or fired before the
107    /// tracker was populated).
108    pub fn observe_response(&mut self, wire_req_id: u64, now_ns: u64) -> Option<RoundTripSample> {
109        let entry = self.in_flight.remove(&wire_req_id)?;
110        let elapsed_ns = now_ns.saturating_sub(entry.started_at_ns);
111        Some(RoundTripSample {
112            target_site: entry.target_site,
113            chain: entry.chain,
114            elapsed_ns,
115        })
116    }
117
118    /// Read-only peek at an in-flight entry without consuming it.
119    pub fn peek(&self, wire_req_id: u64) -> Option<&InFlightSend> {
120        self.in_flight.get(&wire_req_id)
121    }
122
123    /// Count of currently in-flight round trips.
124    pub fn in_flight_count(&self) -> usize {
125        self.in_flight.len()
126    }
127
128    /// Drain in-flight entries whose per-entry `expires_at_ns` has
129    /// passed (entries with `expires_at_ns == 0` never expire on
130    /// age). Returns the dropped entries so the engine can emit
131    /// `EngineStep::WireTimeout` and fail any parked CommandId.
132    pub fn drain_stale(&mut self, now_ns: u64) -> Vec<(u64, InFlightSend)> {
133        let drained: Vec<(u64, InFlightSend)> = self
134            .in_flight
135            .iter()
136            .filter(|(_, entry)| entry.expires_at_ns > 0 && entry.expires_at_ns <= now_ns)
137            .map(|(k, v)| (*k, *v))
138            .collect();
139        for (k, _) in &drained {
140            self.in_flight.remove(k);
141        }
142        drained
143    }
144}
145