Skip to main content

nemo_flow_adaptive/
adaptive_hints_intercept.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! AdaptiveHintsIntercept: opt-in LLM request intercept that injects AgentHints
5//! from HotCache trie.
6//!
7//! This module provides [`AdaptiveHintsIntercept`], which builds [`AgentHints`] from
8//! the prediction trie in [`HotCache`] and injects them into LLM request
9//! headers as a request intercept. AdaptiveHintsIntercept is opt-in and synchronously
10//! transforms the [`LlmRequest`] before it reaches the callable.
11
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::{Arc, RwLock};
14
15use nemo_flow::api::llm::LlmRequest;
16use nemo_flow::api::runtime::LlmRequestInterceptFn;
17use nemo_flow::codec::request::AnnotatedLlmRequest;
18
19use crate::context_helpers::{
20    extract_scope_path, read_manual_latency_sensitivity, resolve_agent_id,
21};
22use crate::intercepts::AGENT_HINTS_HEADER_KEY;
23use crate::trie::builder::SensitivityConfig;
24use crate::trie::lookup::PredictionTrieLookup;
25use crate::types::cache::HotCache;
26use crate::types::metadata::AgentHints;
27
28/// Builds [`AgentHints`] from a trie prediction and optional default hints.
29///
30/// Falls back to `default_hints` if no prediction is available.
31/// Sets `prefix_id` to `"{agent_id}-d{scope_depth}"` per architecture doc.
32pub(crate) fn build_agent_hints(
33    prediction: Option<&crate::trie::data_models::LlmCallPrediction>,
34    default_hints: &Option<AgentHints>,
35    agent_id: &str,
36    call_index: u32,
37    scope_depth: usize,
38) -> Option<AgentHints> {
39    if let Some(pred) = prediction {
40        let scale = SensitivityConfig::default().sensitivity_scale;
41        let ls = pred.latency_sensitivity.unwrap_or(1);
42        Some(AgentHints {
43            osl: pred.output_tokens.p90.round() as u32,
44            iat: pred.interarrival_ms.mean.round() as u32,
45            priority: (scale as i32 - ls as i32).max(0),
46            latency_sensitivity: ls as f64,
47            prefix_id: format!("{agent_id}-d{scope_depth}"),
48            total_requests: pred.remaining_calls.mean.round() as u32 + call_index,
49        })
50    } else {
51        default_hints.clone()
52    }
53}
54
55fn apply_manual_latency_override(
56    hints: Option<AgentHints>,
57    manual_ls: Option<u32>,
58    effective_agent_id: &str,
59    scope_depth: usize,
60) -> Option<AgentHints> {
61    match (hints, manual_ls) {
62        (Some(mut hints), Some(manual)) => {
63            let manual_f = manual as f64;
64            if manual_f > hints.latency_sensitivity {
65                let scale = SensitivityConfig::default().sensitivity_scale;
66                hints.latency_sensitivity = manual_f;
67                hints.priority = (scale as i32 - manual_f.round() as i32).max(0);
68            }
69            Some(hints)
70        }
71        (Some(hints), None) => Some(hints),
72        (None, Some(manual)) => Some(manual_agent_hints(manual, effective_agent_id, scope_depth)),
73        (None, None) => None,
74    }
75}
76
77fn manual_agent_hints(manual: u32, effective_agent_id: &str, scope_depth: usize) -> AgentHints {
78    let scale = SensitivityConfig::default().sensitivity_scale;
79    AgentHints {
80        osl: 0,
81        iat: 0,
82        priority: (scale as i32 - manual as i32).max(0),
83        latency_sensitivity: manual as f64,
84        prefix_id: format!("{effective_agent_id}-d{scope_depth}"),
85        total_requests: 0,
86    }
87}
88
89fn inject_agent_hints(request: &mut LlmRequest, hints: &AgentHints) {
90    let Ok(serialized_hints) = serde_json::to_value(hints) else {
91        return;
92    };
93
94    if let Some(body) = request.content.as_object_mut() {
95        if !body.contains_key("nvext") {
96            body.insert(
97                "nvext".to_string(),
98                serde_json::Value::Object(serde_json::Map::new()),
99            );
100        }
101        if let Some(nvext) = body
102            .get_mut("nvext")
103            .and_then(|value| value.as_object_mut())
104        {
105            nvext.insert("agent_hints".to_string(), serialized_hints.clone());
106        }
107    }
108
109    request
110        .headers
111        .insert(AGENT_HINTS_HEADER_KEY.to_string(), serialized_hints);
112}
113
114/// Opt-in LLM request intercept that injects [`AgentHints`] into request
115/// headers from the prediction trie in [`HotCache`].
116///
117/// Constructed via [`AdaptiveHintsIntercept::new`] and converted to an
118/// [`LlmRequestInterceptFn`] via [`AdaptiveHintsIntercept::into_request_fn`] for
119/// registration with the NeMo Flow runtime.
120pub struct AdaptiveHintsIntercept {
121    hot_cache: Arc<RwLock<HotCache>>,
122    agent_id: String,
123    call_counter: AtomicU32,
124}
125
126impl AdaptiveHintsIntercept {
127    /// Creates a new `AdaptiveHintsIntercept`.
128    pub fn new(hot_cache: Arc<RwLock<HotCache>>, agent_id: String) -> Self {
129        Self {
130            hot_cache,
131            agent_id,
132            call_counter: AtomicU32::new(1),
133        }
134    }
135
136    fn effective_agent_id(&self) -> String {
137        resolve_agent_id().unwrap_or_else(|| self.agent_id.clone())
138    }
139
140    fn load_hints(
141        &self,
142        scope_path: &[String],
143        effective_agent_id: &str,
144        call_index: u32,
145        scope_depth: usize,
146    ) -> Option<AgentHints> {
147        let Ok(cache_guard) = self.hot_cache.read() else {
148            return None;
149        };
150
151        if let Some(ref trie) = cache_guard.trie {
152            let lookup = PredictionTrieLookup::new(trie);
153            let prediction = lookup.find(scope_path, call_index);
154            build_agent_hints(
155                prediction,
156                &cache_guard.agent_hints_default,
157                effective_agent_id,
158                call_index,
159                scope_depth,
160            )
161        } else {
162            cache_guard.agent_hints_default.clone()
163        }
164    }
165
166    /// Converts this intercept into an [`LlmRequestInterceptFn`] suitable for
167    /// registration with [`register_llm_request_intercept`].
168    ///
169    /// The returned closure reads the HotCache trie, builds AgentHints,
170    /// injects them into the request headers and body, and returns the
171    /// transformed request.
172    pub fn into_request_fn(self) -> LlmRequestInterceptFn {
173        let this = Arc::new(self);
174        Box::new(
175            move |_name: &str, mut request: LlmRequest, annotated: Option<AnnotatedLlmRequest>| {
176                let scope_path = extract_scope_path();
177                let manual_ls = read_manual_latency_sensitivity();
178                let scope_depth = scope_path.len();
179                let call_index = this.call_counter.fetch_add(1, Ordering::Relaxed);
180
181                let effective_agent_id = this.effective_agent_id();
182                let cached_hints =
183                    this.load_hints(&scope_path, &effective_agent_id, call_index, scope_depth);
184                let final_hints = apply_manual_latency_override(
185                    cached_hints,
186                    manual_ls,
187                    &effective_agent_id,
188                    scope_depth,
189                );
190
191                if let Some(hints) = final_hints {
192                    inject_agent_hints(&mut request, &hints);
193                }
194
195                Ok((request, annotated))
196            },
197        )
198    }
199}
200
201#[cfg(test)]
202#[path = "../tests/unit/adaptive_hints_intercept_tests.rs"]
203mod tests;