use std::collections::HashMap;
use crate::ids::{CommandId, NodeSiteId};
use crate::framework::rtt_tracker::ChainContext;
#[derive(Clone, Copy, Debug)]
pub struct InFlightSend {
pub started_at_ns: u64,
pub expires_at_ns: u64,
pub target_site: NodeSiteId,
pub chain: Option<ChainContext>,
pub parked_op: Option<crate::ids::CommandId>,
}
#[derive(Clone, Copy, Debug)]
pub struct RoundTripSample {
pub target_site: NodeSiteId,
pub chain: Option<ChainContext>,
pub elapsed_ns: u64,
}
#[derive(Default)]
pub struct RequestTracker {
next_token: u64,
in_flight: HashMap<u64, InFlightSend>,
}
impl RequestTracker {
pub fn new() -> Self {
Self::default()
}
pub fn mint_token(&mut self) -> CommandId {
let token = self.next_token;
self.next_token = self.next_token.saturating_add(1);
CommandId::from(token)
}
pub fn register_in_flight(
&mut self,
wire_req_id: u64,
started_at_ns: u64,
target_site: NodeSiteId,
chain: Option<ChainContext>,
ttl_ns: std::num::NonZeroU64,
parked_op: Option<crate::ids::CommandId>,
) {
let expires_at_ns = started_at_ns.saturating_add(ttl_ns.get());
self.in_flight.insert(
wire_req_id,
InFlightSend {
started_at_ns,
expires_at_ns,
target_site,
chain,
parked_op,
},
);
}
pub fn observe_response(&mut self, wire_req_id: u64, now_ns: u64) -> Option<RoundTripSample> {
let entry = self.in_flight.remove(&wire_req_id)?;
let elapsed_ns = now_ns.saturating_sub(entry.started_at_ns);
Some(RoundTripSample {
target_site: entry.target_site,
chain: entry.chain,
elapsed_ns,
})
}
pub fn peek(&self, wire_req_id: u64) -> Option<&InFlightSend> {
self.in_flight.get(&wire_req_id)
}
pub fn in_flight_count(&self) -> usize {
self.in_flight.len()
}
pub fn drain_stale(&mut self, now_ns: u64) -> Vec<(u64, InFlightSend)> {
let drained: Vec<(u64, InFlightSend)> = self
.in_flight
.iter()
.filter(|(_, entry)| entry.expires_at_ns > 0 && entry.expires_at_ns <= now_ns)
.map(|(k, v)| (*k, *v))
.collect();
for (k, _) in &drained {
self.in_flight.remove(k);
}
drained
}
}