nemo_flow_adaptive/
adaptive_hints_intercept.rs1use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::{Arc, RwLock};
14
15use nemo_flow::api::llm::LlmRequest;
16use nemo_flow::api::runtime::LlmRequestInterceptFn;
17use nemo_flow::codec::request::AnnotatedLlmRequest;
18
19use crate::context_helpers::{
20 extract_scope_path, read_manual_latency_sensitivity, resolve_agent_id,
21};
22use crate::intercepts::AGENT_HINTS_HEADER_KEY;
23use crate::trie::builder::SensitivityConfig;
24use crate::trie::lookup::PredictionTrieLookup;
25use crate::types::cache::HotCache;
26use crate::types::metadata::AgentHints;
27
28pub(crate) fn build_agent_hints(
33 prediction: Option<&crate::trie::data_models::LlmCallPrediction>,
34 default_hints: &Option<AgentHints>,
35 agent_id: &str,
36 call_index: u32,
37 scope_depth: usize,
38) -> Option<AgentHints> {
39 if let Some(pred) = prediction {
40 let scale = SensitivityConfig::default().sensitivity_scale;
41 let ls = pred.latency_sensitivity.unwrap_or(1);
42 Some(AgentHints {
43 osl: pred.output_tokens.p90.round() as u32,
44 iat: pred.interarrival_ms.mean.round() as u32,
45 priority: (scale as i32 - ls as i32).max(0),
46 latency_sensitivity: ls as f64,
47 prefix_id: format!("{agent_id}-d{scope_depth}"),
48 total_requests: pred.remaining_calls.mean.round() as u32 + call_index,
49 })
50 } else {
51 default_hints.clone()
52 }
53}
54
55fn apply_manual_latency_override(
56 hints: Option<AgentHints>,
57 manual_ls: Option<u32>,
58 effective_agent_id: &str,
59 scope_depth: usize,
60) -> Option<AgentHints> {
61 match (hints, manual_ls) {
62 (Some(mut hints), Some(manual)) => {
63 let manual_f = manual as f64;
64 if manual_f > hints.latency_sensitivity {
65 let scale = SensitivityConfig::default().sensitivity_scale;
66 hints.latency_sensitivity = manual_f;
67 hints.priority = (scale as i32 - manual_f.round() as i32).max(0);
68 }
69 Some(hints)
70 }
71 (Some(hints), None) => Some(hints),
72 (None, Some(manual)) => Some(manual_agent_hints(manual, effective_agent_id, scope_depth)),
73 (None, None) => None,
74 }
75}
76
77fn manual_agent_hints(manual: u32, effective_agent_id: &str, scope_depth: usize) -> AgentHints {
78 let scale = SensitivityConfig::default().sensitivity_scale;
79 AgentHints {
80 osl: 0,
81 iat: 0,
82 priority: (scale as i32 - manual as i32).max(0),
83 latency_sensitivity: manual as f64,
84 prefix_id: format!("{effective_agent_id}-d{scope_depth}"),
85 total_requests: 0,
86 }
87}
88
89fn inject_agent_hints(request: &mut LlmRequest, hints: &AgentHints) {
90 let Ok(serialized_hints) = serde_json::to_value(hints) else {
91 return;
92 };
93
94 if let Some(body) = request.content.as_object_mut() {
95 if !body.contains_key("nvext") {
96 body.insert(
97 "nvext".to_string(),
98 serde_json::Value::Object(serde_json::Map::new()),
99 );
100 }
101 if let Some(nvext) = body
102 .get_mut("nvext")
103 .and_then(|value| value.as_object_mut())
104 {
105 nvext.insert("agent_hints".to_string(), serialized_hints.clone());
106 }
107 }
108
109 request
110 .headers
111 .insert(AGENT_HINTS_HEADER_KEY.to_string(), serialized_hints);
112}
113
114pub struct AdaptiveHintsIntercept {
121 hot_cache: Arc<RwLock<HotCache>>,
122 agent_id: String,
123 call_counter: AtomicU32,
124}
125
126impl AdaptiveHintsIntercept {
127 pub fn new(hot_cache: Arc<RwLock<HotCache>>, agent_id: String) -> Self {
129 Self {
130 hot_cache,
131 agent_id,
132 call_counter: AtomicU32::new(1),
133 }
134 }
135
136 fn effective_agent_id(&self) -> String {
137 resolve_agent_id().unwrap_or_else(|| self.agent_id.clone())
138 }
139
140 fn load_hints(
141 &self,
142 scope_path: &[String],
143 effective_agent_id: &str,
144 call_index: u32,
145 scope_depth: usize,
146 ) -> Option<AgentHints> {
147 let Ok(cache_guard) = self.hot_cache.read() else {
148 return None;
149 };
150
151 if let Some(ref trie) = cache_guard.trie {
152 let lookup = PredictionTrieLookup::new(trie);
153 let prediction = lookup.find(scope_path, call_index);
154 build_agent_hints(
155 prediction,
156 &cache_guard.agent_hints_default,
157 effective_agent_id,
158 call_index,
159 scope_depth,
160 )
161 } else {
162 cache_guard.agent_hints_default.clone()
163 }
164 }
165
166 pub fn into_request_fn(self) -> LlmRequestInterceptFn {
173 let this = Arc::new(self);
174 Box::new(
175 move |_name: &str, mut request: LlmRequest, annotated: Option<AnnotatedLlmRequest>| {
176 let scope_path = extract_scope_path();
177 let manual_ls = read_manual_latency_sensitivity();
178 let scope_depth = scope_path.len();
179 let call_index = this.call_counter.fetch_add(1, Ordering::Relaxed);
180
181 let effective_agent_id = this.effective_agent_id();
182 let cached_hints =
183 this.load_hints(&scope_path, &effective_agent_id, call_index, scope_depth);
184 let final_hints = apply_manual_latency_override(
185 cached_hints,
186 manual_ls,
187 &effective_agent_id,
188 scope_depth,
189 );
190
191 if let Some(hints) = final_hints {
192 inject_agent_hints(&mut request, &hints);
193 }
194
195 Ok((request, annotated))
196 },
197 )
198 }
199}
200
201#[cfg(test)]
202#[path = "../tests/unit/adaptive_hints_intercept_tests.rs"]
203mod tests;