Skip to main content

nemo_flow_adaptive/
context_helpers.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Context helpers for reading scope metadata on the intercept hot path.
5//!
6//! These functions read from the NeMo Flow scope stack (via [`current_scope_stack`])
7//! to extract information needed by the LLM request intercept:
8//!
9//! - [`extract_scope_path`]: collects function names from the scope stack for trie lookup
10//! - [`read_manual_latency_sensitivity`]: walks all scopes for manual `latency_sensitive` annotations
11//! - [`resolve_agent_id`]: returns the first Agent scope name from the scope stack
12//!
13//! All functions are safe to call from sync contexts (intercepts are sync closures).
14//! They acquire a read lock on the scope stack, which is always fast.
15//!
16//! # Metadata Convention
17//!
18//! Manual latency sensitivity is stored in scope metadata under the JSON path
19//! `/nemo_flow_adaptive/latency_sensitivity` as a positive integer.
20
21use nemo_flow::api::runtime::current_scope_stack;
22use nemo_flow::api::scope::ScopeType;
23use uuid::Uuid;
24
25/// Metadata key path for manual latency sensitivity annotation.
26pub const LATENCY_SENSITIVITY_POINTER: &str = "/nemo_flow_adaptive/latency_sensitivity";
27
28/// Session-local scope identity used to coordinate warm-first cohorts.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct SharedParentScopeIdentity {
31    /// UUID of the root scope for the current execution tree.
32    pub root_uuid: Uuid,
33    /// UUID of the parent scope shared by sibling fan-out work.
34    pub shared_parent_uuid: Uuid,
35}
36
37/// Extracts the current function call path from the NeMo Flow scope stack.
38///
39/// Walks all scopes from root to top, skipping the root scope (index 0),
40/// and collects names of Agent and Function scopes. This path is used
41/// for prediction trie lookup.
42///
43/// # Returns
44/// A vector of scope names from the current Agent and Function scope path.
45/// Returns an empty vector when the scope stack cannot be read safely.
46///
47/// # Notes
48/// The implicit root scope is always skipped.
49pub fn extract_scope_path() -> Vec<String> {
50    let stack_handle = current_scope_stack();
51    let stack = match stack_handle.read() {
52        Ok(s) => s,
53        Err(_) => return vec![],
54    };
55    stack
56        .scopes()
57        .iter()
58        .skip(1) // skip root
59        .filter(|s| matches!(s.scope_type, ScopeType::Agent | ScopeType::Function))
60        .map(|s| s.name.clone())
61        .collect()
62}
63
64/// Reads the maximum manual latency sensitivity from all scopes in the current scope stack.
65///
66/// Walks all scopes and checks metadata for `/nemo_flow_adaptive/latency_sensitivity`.
67/// Uses max-merge semantics: if multiple scopes have annotations, the highest wins.
68///
69/// # Returns
70/// The highest manual latency sensitivity annotation visible on the current
71/// scope stack, or `None` when no annotation exists.
72///
73/// # Notes
74/// Returns `None` when the scope stack cannot be read safely.
75pub fn read_manual_latency_sensitivity() -> Option<u32> {
76    let stack_handle = current_scope_stack();
77    let stack = match stack_handle.read() {
78        Ok(s) => s,
79        Err(_) => return None,
80    };
81    let mut max_val: Option<u32> = None;
82    for scope in stack.scopes() {
83        if let Some(ref meta) = scope.metadata
84            && let Some(val) = meta
85                .pointer(LATENCY_SENSITIVITY_POINTER)
86                .and_then(|v| v.as_u64())
87        {
88            let val = val as u32;
89            max_val = Some(max_val.map_or(val, |prev: u32| prev.max(val)));
90        }
91    }
92    max_val
93}
94
95/// Sets latency sensitivity on the current (top) scope using max-merge semantics.
96///
97/// If the current scope already has a latency_sensitivity value, the new value
98/// is only applied if it is greater than the existing one.
99///
100/// # Parameters
101/// - `value`: New non-negative latency sensitivity hint (`>= 0`) for the
102///   current top scope.
103///
104/// # Returns
105/// `Ok(())` when the current scope metadata has been updated or left unchanged.
106///
107/// # Errors
108/// Returns an error string when the scope stack lock is poisoned.
109///
110/// # Notes
111/// Existing non-negative latency sensitivity values are updated using
112/// max-merge semantics.
113pub fn set_latency_sensitivity(value: u32) -> std::result::Result<(), String> {
114    let stack_handle = current_scope_stack();
115    let mut stack = stack_handle
116        .write()
117        .map_err(|e| format!("scope stack lock poisoned: {e}"))?;
118    let scope = stack.top_mut();
119
120    let existing = scope
121        .metadata
122        .as_ref()
123        .and_then(|m| m.pointer(LATENCY_SENSITIVITY_POINTER))
124        .and_then(|v| v.as_u64())
125        .map(|v| v as u32);
126
127    let effective = match existing {
128        Some(prev) if prev >= value => return Ok(()),
129        _ => value,
130    };
131
132    let meta = scope.metadata.get_or_insert_with(|| serde_json::json!({}));
133    if let Some(obj) = meta.as_object_mut() {
134        let nemo_flow_adaptive = obj
135            .entry("nemo_flow_adaptive")
136            .or_insert_with(|| serde_json::json!({}));
137        if let Some(np_obj) = nemo_flow_adaptive.as_object_mut() {
138            np_obj.insert(
139                "latency_sensitivity".to_string(),
140                serde_json::json!(effective),
141            );
142        }
143    }
144    Ok(())
145}
146
147/// Resolves the agent ID from the current scope stack.
148///
149/// Walks all scopes from root to top, skipping the implicit root scope
150/// (index 0, name="root"), and returns the name of the first Agent-typed scope.
151///
152/// # Returns
153/// The first Agent scope name found on the current stack, or `None` when no
154/// Agent scope is active.
155///
156/// # Notes
157/// Returns `None` when the scope stack cannot be read safely.
158pub fn resolve_agent_id() -> Option<String> {
159    let stack_handle = current_scope_stack();
160    let stack = match stack_handle.read() {
161        Ok(s) => s,
162        Err(_) => return None,
163    };
164    stack
165        .scopes()
166        .iter()
167        .skip(1) // skip implicit root
168        .find(|s| matches!(s.scope_type, ScopeType::Agent))
169        .map(|s| s.name.clone())
170}
171
172/// Resolves the session-local identity used by warm-first cohort coordination.
173///
174/// The shared parent must come from the parent scope, not the current scope's
175/// own UUID, so siblings under the same fan-out coordinate with one another.
176/// Returns `None` if the scope stack cannot be read.
177pub fn resolve_shared_parent_scope_identity() -> Option<SharedParentScopeIdentity> {
178    let stack_handle = current_scope_stack();
179    let stack = match stack_handle.read() {
180        Ok(s) => s,
181        Err(_) => return None,
182    };
183
184    let root_uuid = stack.root_uuid();
185    let shared_parent_uuid = stack.top().parent_uuid.unwrap_or(root_uuid);
186
187    Some(SharedParentScopeIdentity {
188        root_uuid,
189        shared_parent_uuid,
190    })
191}
192
193#[cfg(test)]
194#[path = "../tests/unit/context_helpers_tests.rs"]
195mod tests;