bb_runtime/framework/request_tracker.rs
1//! `RequestTracker` - mint correlation tokens for `CorrelateTag` and
2//! track in-flight wire requests.
3//!
4//! 7 token minter with an `in_flight` map keyed by `wire_req_id`.
5//! `register_in_flight` records the dispatch-time clock + target
6//! site + (optional) chain context; `observe_response` pops the
7//! entry on the matching response landing and surfaces the data the
8//! [`crate::framework::rtt_tracker::RttTracker`] needs to update its
9//! EMAs.
10
11use std::collections::HashMap;
12
13use crate::ids::{CommandId, NodeSiteId};
14
15use crate::framework::rtt_tracker::ChainContext;
16
17/// Bookkeeping for a wire round-trip in flight.
18#[derive(Clone, Copy, Debug)]
19pub struct InFlightSend {
20 /// Engine-clock timestamp when the send was dispatched.
21 pub started_at_ns: u64,
22 /// Engine-clock timestamp at which this entry should be evicted
23 /// as timed-out. Zero means "no TTL — never evict on age".
24 pub expires_at_ns: u64,
25 /// Destination logical site, derived from the resolved PeerId.
26 pub target_site: NodeSiteId,
27 /// Optional compiler-stamped chain context.
28 pub chain: Option<ChainContext>,
29 /// `CommandId` of the originator's local op parked waiting for
30 /// the chain's response. When `drain_stale` evicts on TTL, the
31 /// engine routes a `WireTimeout` failure through this CommandId
32 /// so the parked continuation unsticks. `None` for entries
33 /// registered outside an async-suspension context (e.g.
34 /// fire-and-forget Sends).
35 pub parked_op: Option<crate::ids::CommandId>,
36}
37
38/// Round-trip sample surfaced by [`RequestTracker::observe_response`].
39#[derive(Clone, Copy, Debug)]
40pub struct RoundTripSample {
41 /// Destination site for the round trip.
42 pub target_site: NodeSiteId,
43 /// Chain context if one was recorded at dispatch.
44 pub chain: Option<ChainContext>,
45 /// Elapsed wall-clock time, nanoseconds.
46 pub elapsed_ns: u64,
47}
48
49/// Monotonically-increasing token minter + in-flight tracker.
50#[derive(Default)]
51pub struct RequestTracker {
52 next_token: u64,
53 in_flight: HashMap<u64, InFlightSend>,
54}
55
56impl RequestTracker {
57 /// Construct fresh with token counter at 0.
58 pub fn new() -> Self {
59 Self::default()
60 }
61
62 /// Mint a fresh correlation token (used by `CorrelateTag` +
63 /// `wire::Send` to identify the round trip).
64 pub fn mint_token(&mut self) -> CommandId {
65 let token = self.next_token;
66 self.next_token = self.next_token.saturating_add(1);
67 CommandId::from(token)
68 }
69
70 /// Record an outbound wire round-trip's dispatch-time
71 /// bookkeeping. `wire_req_id` is the token returned by
72 /// [`Self::mint_token`]; reused on the response side to match.
73 /// `ttl_ns` is the maximum age the entry will survive before
74 /// the tracker considers it stale (typed as
75 /// `NonZeroU64` so callers cannot accidentally register an
76 /// entry that never expires, which was the only failure mode
77 /// the `RequestTracker.in_flight` map had as an unbounded
78 /// resource). `parked_op` is the CommandId of the originator's
79 /// local op parked waiting for the chain response; `None` for
80 /// entries registered outside an async-suspension context.
81 pub fn register_in_flight(
82 &mut self,
83 wire_req_id: u64,
84 started_at_ns: u64,
85 target_site: NodeSiteId,
86 chain: Option<ChainContext>,
87 ttl_ns: std::num::NonZeroU64,
88 parked_op: Option<crate::ids::CommandId>,
89 ) {
90 let expires_at_ns = started_at_ns.saturating_add(ttl_ns.get());
91 self.in_flight.insert(
92 wire_req_id,
93 InFlightSend {
94 started_at_ns,
95 expires_at_ns,
96 target_site,
97 chain,
98 parked_op,
99 },
100 );
101 }
102
103 /// Pop the in-flight entry for a landing response and surface
104 /// the elapsed-time sample the RTT tracker should observe.
105 /// Returns `None` when no matching in-flight entry exists (the
106 /// response is unsolicited, duplicate, or fired before the
107 /// tracker was populated).
108 pub fn observe_response(&mut self, wire_req_id: u64, now_ns: u64) -> Option<RoundTripSample> {
109 let entry = self.in_flight.remove(&wire_req_id)?;
110 let elapsed_ns = now_ns.saturating_sub(entry.started_at_ns);
111 Some(RoundTripSample {
112 target_site: entry.target_site,
113 chain: entry.chain,
114 elapsed_ns,
115 })
116 }
117
118 /// Read-only peek at an in-flight entry without consuming it.
119 pub fn peek(&self, wire_req_id: u64) -> Option<&InFlightSend> {
120 self.in_flight.get(&wire_req_id)
121 }
122
123 /// Count of currently in-flight round trips.
124 pub fn in_flight_count(&self) -> usize {
125 self.in_flight.len()
126 }
127
128 /// Drain in-flight entries whose per-entry `expires_at_ns` has
129 /// passed (entries with `expires_at_ns == 0` never expire on
130 /// age). Returns the dropped entries so the engine can emit
131 /// `EngineStep::WireTimeout` and fail any parked CommandId.
132 pub fn drain_stale(&mut self, now_ns: u64) -> Vec<(u64, InFlightSend)> {
133 let drained: Vec<(u64, InFlightSend)> = self
134 .in_flight
135 .iter()
136 .filter(|(_, entry)| entry.expires_at_ns > 0 && entry.expires_at_ns <= now_ns)
137 .map(|(k, v)| (*k, *v))
138 .collect();
139 for (k, _) in &drained {
140 self.in_flight.remove(k);
141 }
142 drained
143 }
144}
145