nemo_flow_adaptive/context_helpers.rs
1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Context helpers for reading scope metadata on the intercept hot path.
5//!
6//! These functions read from the NeMo Flow scope stack (via [`current_scope_stack`])
7//! to extract information needed by the LLM request intercept:
8//!
9//! - [`extract_scope_path`]: collects function names from the scope stack for trie lookup
10//! - [`read_manual_latency_sensitivity`]: walks all scopes for manual `latency_sensitive` annotations
11//! - [`resolve_agent_id`]: returns the first Agent scope name from the scope stack
12//!
13//! All functions are safe to call from sync contexts (intercepts are sync closures).
14//! They acquire a read lock on the scope stack, which is always fast.
15//!
16//! # Metadata Convention
17//!
18//! Manual latency sensitivity is stored in scope metadata under the JSON path
19//! `/nemo_flow_adaptive/latency_sensitivity` as a positive integer.
20
21use nemo_flow::api::runtime::current_scope_stack;
22use nemo_flow::api::scope::ScopeType;
23use uuid::Uuid;
24
25/// Metadata key path for manual latency sensitivity annotation.
26pub const LATENCY_SENSITIVITY_POINTER: &str = "/nemo_flow_adaptive/latency_sensitivity";
27
28/// Session-local scope identity used to coordinate warm-first cohorts.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct SharedParentScopeIdentity {
31 /// UUID of the root scope for the current execution tree.
32 pub root_uuid: Uuid,
33 /// UUID of the parent scope shared by sibling fan-out work.
34 pub shared_parent_uuid: Uuid,
35}
36
37/// Extracts the current function call path from the NeMo Flow scope stack.
38///
39/// Walks all scopes from root to top, skipping the root scope (index 0),
40/// and collects names of Agent and Function scopes. This path is used
41/// for prediction trie lookup.
42///
43/// # Returns
44/// A vector of scope names from the current Agent and Function scope path.
45/// Returns an empty vector when the scope stack cannot be read safely.
46///
47/// # Notes
48/// The implicit root scope is always skipped.
49pub fn extract_scope_path() -> Vec<String> {
50 let stack_handle = current_scope_stack();
51 let stack = match stack_handle.read() {
52 Ok(s) => s,
53 Err(_) => return vec![],
54 };
55 stack
56 .scopes()
57 .iter()
58 .skip(1) // skip root
59 .filter(|s| matches!(s.scope_type, ScopeType::Agent | ScopeType::Function))
60 .map(|s| s.name.clone())
61 .collect()
62}
63
64/// Reads the maximum manual latency sensitivity from all scopes in the current scope stack.
65///
66/// Walks all scopes and checks metadata for `/nemo_flow_adaptive/latency_sensitivity`.
67/// Uses max-merge semantics: if multiple scopes have annotations, the highest wins.
68///
69/// # Returns
70/// The highest manual latency sensitivity annotation visible on the current
71/// scope stack, or `None` when no annotation exists.
72///
73/// # Notes
74/// Returns `None` when the scope stack cannot be read safely.
75pub fn read_manual_latency_sensitivity() -> Option<u32> {
76 let stack_handle = current_scope_stack();
77 let stack = match stack_handle.read() {
78 Ok(s) => s,
79 Err(_) => return None,
80 };
81 let mut max_val: Option<u32> = None;
82 for scope in stack.scopes() {
83 if let Some(ref meta) = scope.metadata
84 && let Some(val) = meta
85 .pointer(LATENCY_SENSITIVITY_POINTER)
86 .and_then(|v| v.as_u64())
87 {
88 let val = val as u32;
89 max_val = Some(max_val.map_or(val, |prev: u32| prev.max(val)));
90 }
91 }
92 max_val
93}
94
95/// Sets latency sensitivity on the current (top) scope using max-merge semantics.
96///
97/// If the current scope already has a latency_sensitivity value, the new value
98/// is only applied if it is greater than the existing one.
99///
100/// # Parameters
101/// - `value`: New non-negative latency sensitivity hint (`>= 0`) for the
102/// current top scope.
103///
104/// # Returns
105/// `Ok(())` when the current scope metadata has been updated or left unchanged.
106///
107/// # Errors
108/// Returns an error string when the scope stack lock is poisoned.
109///
110/// # Notes
111/// Existing non-negative latency sensitivity values are updated using
112/// max-merge semantics.
113pub fn set_latency_sensitivity(value: u32) -> std::result::Result<(), String> {
114 let stack_handle = current_scope_stack();
115 let mut stack = stack_handle
116 .write()
117 .map_err(|e| format!("scope stack lock poisoned: {e}"))?;
118 let scope = stack.top_mut();
119
120 let existing = scope
121 .metadata
122 .as_ref()
123 .and_then(|m| m.pointer(LATENCY_SENSITIVITY_POINTER))
124 .and_then(|v| v.as_u64())
125 .map(|v| v as u32);
126
127 let effective = match existing {
128 Some(prev) if prev >= value => return Ok(()),
129 _ => value,
130 };
131
132 let meta = scope.metadata.get_or_insert_with(|| serde_json::json!({}));
133 if let Some(obj) = meta.as_object_mut() {
134 let nemo_flow_adaptive = obj
135 .entry("nemo_flow_adaptive")
136 .or_insert_with(|| serde_json::json!({}));
137 if let Some(np_obj) = nemo_flow_adaptive.as_object_mut() {
138 np_obj.insert(
139 "latency_sensitivity".to_string(),
140 serde_json::json!(effective),
141 );
142 }
143 }
144 Ok(())
145}
146
147/// Resolves the agent ID from the current scope stack.
148///
149/// Walks all scopes from root to top, skipping the implicit root scope
150/// (index 0, name="root"), and returns the name of the first Agent-typed scope.
151///
152/// # Returns
153/// The first Agent scope name found on the current stack, or `None` when no
154/// Agent scope is active.
155///
156/// # Notes
157/// Returns `None` when the scope stack cannot be read safely.
158pub fn resolve_agent_id() -> Option<String> {
159 let stack_handle = current_scope_stack();
160 let stack = match stack_handle.read() {
161 Ok(s) => s,
162 Err(_) => return None,
163 };
164 stack
165 .scopes()
166 .iter()
167 .skip(1) // skip implicit root
168 .find(|s| matches!(s.scope_type, ScopeType::Agent))
169 .map(|s| s.name.clone())
170}
171
172/// Resolves the session-local identity used by warm-first cohort coordination.
173///
174/// The shared parent must come from the parent scope, not the current scope's
175/// own UUID, so siblings under the same fan-out coordinate with one another.
176/// Returns `None` if the scope stack cannot be read.
177pub fn resolve_shared_parent_scope_identity() -> Option<SharedParentScopeIdentity> {
178 let stack_handle = current_scope_stack();
179 let stack = match stack_handle.read() {
180 Ok(s) => s,
181 Err(_) => return None,
182 };
183
184 let root_uuid = stack.root_uuid();
185 let shared_parent_uuid = stack.top().parent_uuid.unwrap_or(root_uuid);
186
187 Some(SharedParentScopeIdentity {
188 root_uuid,
189 shared_parent_uuid,
190 })
191}
192
193#[cfg(test)]
194#[path = "../tests/unit/context_helpers_tests.rs"]
195mod tests;