Skip to main content

nemo_flow_adaptive/
cache_diagnostics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Runtime-local cache miss request facts and diagnostics tracking.
5
6use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use crate::acg::canonicalize::sha256_hex;
10use crate::acg::ir_builder::build_prompt_ir;
11use crate::acg::prompt_ir::PromptIR;
12use crate::acg::{CacheRequestFacts, CapabilityRegistry};
13use chrono::{DateTime, Utc};
14use nemo_flow::codec::request::AnnotatedLlmRequest;
15
16use crate::acg_profile::derive_acg_learning_key;
17use crate::types::cache::HotCache;
18
19const DEFAULT_ANTHROPIC_MIN_TOKENS: u32 = 1024;
20const OPENAI_MIN_TOKENS: u32 = 1024;
21const ANTHROPIC_RETENTION_WINDOW_SECS: f64 = 300.0;
22const HASH_PREFIX_LEN: usize = 12;
23
24type StablePrefixKey = (String, String, String);
25type StablePrefixExemplar = Vec<(String, u32, String)>;
26type AgentProviderKey = (String, String);
27
28struct CacheFactsBuildInput<'a> {
29    agent_id: &'a str,
30    provider: &'a str,
31    model: Option<&'a str>,
32    prompt_ir: &'a PromptIR,
33    hot_cache: &'a HotCache,
34    profile_key: &'a str,
35    now: DateTime<Utc>,
36}
37
38/// Runtime-local miss diagnosis tracker.
39#[derive(Debug, Default)]
40pub struct CacheDiagnosticsTracker {
41    /// Last time a specific stable prefix hash was observed for an agent/provider pair.
42    pub last_seen_by_prefix: HashMap<StablePrefixKey, DateTime<Utc>>,
43    /// Last retained stable prefix exemplar for an agent/provider pair.
44    pub last_exemplar_by_agent: HashMap<AgentProviderKey, StablePrefixExemplar>,
45}
46
47/// Builds canonical request facts for cache miss diagnosis from the live runtime state.
48#[must_use]
49pub fn build_cache_request_facts(
50    agent_id: &str,
51    provider: &str,
52    annotated_request: &AnnotatedLlmRequest,
53    hot_cache: &Arc<RwLock<HotCache>>,
54    tracker: &Arc<RwLock<CacheDiagnosticsTracker>>,
55) -> Option<CacheRequestFacts> {
56    let prompt_ir = build_prompt_ir(annotated_request).ok()?;
57    let hot_cache = hot_cache.read().ok()?;
58    let mut tracker = tracker.write().ok()?;
59    let profile_key = derive_acg_learning_key(agent_id, annotated_request);
60
61    Some(build_cache_request_facts_from_prompt_ir(
62        CacheFactsBuildInput {
63            agent_id,
64            provider,
65            model: annotated_request.model.as_deref(),
66            prompt_ir: &prompt_ir,
67            hot_cache: &hot_cache,
68            profile_key: &profile_key,
69            now: Utc::now(),
70        },
71        &mut tracker,
72    ))
73}
74
75fn build_cache_request_facts_from_prompt_ir(
76    input: CacheFactsBuildInput<'_>,
77    tracker: &mut CacheDiagnosticsTracker,
78) -> CacheRequestFacts {
79    let CacheFactsBuildInput {
80        agent_id,
81        provider,
82        model,
83        prompt_ir,
84        hot_cache,
85        profile_key,
86        now,
87    } = input;
88
89    let Some(stability) = hot_cache
90        .acg_profiles
91        .get(profile_key)
92        .or(hot_cache.acg_stability.as_ref())
93    else {
94        return CacheRequestFacts {
95            provider: provider.to_string(),
96            stable_prefix_length: 0,
97            stable_prefix_tokens: None,
98            required_min_tokens: None,
99            first_mismatch_span_id: None,
100            first_mismatch_sequence_index: None,
101            expected_hash_prefix: None,
102            actual_hash_prefix: None,
103            retention_window_secs: None,
104            observed_gap_secs: None,
105            missing_facts: vec!["acg_stability_unavailable".to_string()],
106        };
107    };
108
109    let stable_prefix_length = stability.stable_prefix_length;
110    let stable_blocks = prompt_ir
111        .blocks
112        .iter()
113        .take(stable_prefix_length)
114        .collect::<Vec<_>>();
115    let mut missing_facts = Vec::new();
116
117    let stable_prefix_tokens = if stable_blocks.len() < stable_prefix_length {
118        missing_facts.push("stable_prefix_tokens_unavailable".to_string());
119        None
120    } else {
121        stable_blocks
122            .iter()
123            .try_fold(0_u32, |acc, block| {
124                block
125                    .token_metadata
126                    .as_ref()
127                    .and_then(|meta| acc.checked_add(meta.token_count))
128            })
129            .or_else(|| {
130                if stable_blocks
131                    .iter()
132                    .any(|block| block.token_metadata.is_none())
133                {
134                    missing_facts.push("stable_prefix_tokens_unavailable".to_string());
135                }
136                None
137            })
138    };
139
140    let current_exemplar = stable_blocks
141        .iter()
142        .map(|block| {
143            (
144                block.span_id.0.clone(),
145                block.sequence_index,
146                short_hash_prefix(&block.content),
147            )
148        })
149        .collect::<Vec<_>>();
150    let current_prefix_hash =
151        if stable_prefix_length == 0 || stable_blocks.len() < stable_prefix_length {
152            None
153        } else {
154            Some(prefix_hash(
155                stable_blocks.iter().map(|block| block.content.as_str()),
156            ))
157        };
158
159    let agent_provider_key = (agent_id.to_string(), provider.to_string());
160    let first_mismatch = tracker
161        .last_exemplar_by_agent
162        .get(&agent_provider_key)
163        .and_then(|previous| first_mismatch(previous, &current_exemplar));
164
165    let (
166        first_mismatch_span_id,
167        first_mismatch_sequence_index,
168        expected_hash_prefix,
169        actual_hash_prefix,
170    ) = if let Some((span_id, sequence_index, expected_hash_prefix, actual_hash_prefix)) =
171        first_mismatch
172    {
173        (
174            Some(span_id),
175            Some(sequence_index),
176            Some(expected_hash_prefix),
177            Some(actual_hash_prefix),
178        )
179    } else {
180        (None, None, None, None)
181    };
182
183    let required_min_tokens = match provider {
184        "anthropic" => Some(resolve_anthropic_min_tokens(model)),
185        "openai" => Some(OPENAI_MIN_TOKENS),
186        _ => None,
187    };
188    let retention_window_secs = if provider == "anthropic" {
189        Some(ANTHROPIC_RETENTION_WINDOW_SECS)
190    } else {
191        None
192    };
193
194    let observed_gap_secs = current_prefix_hash.as_ref().and_then(|stable_prefix_hash| {
195        tracker
196            .last_seen_by_prefix
197            .get(&(
198                agent_id.to_string(),
199                provider.to_string(),
200                stable_prefix_hash.clone(),
201            ))
202            .map(|last_seen| {
203                (now.signed_duration_since(*last_seen).num_milliseconds() as f64 / 1000.0).max(0.0)
204            })
205    });
206
207    if let Some(stable_prefix_hash) = current_prefix_hash {
208        tracker.last_seen_by_prefix.insert(
209            (
210                agent_id.to_string(),
211                provider.to_string(),
212                stable_prefix_hash,
213            ),
214            now,
215        );
216    }
217    tracker
218        .last_exemplar_by_agent
219        .insert(agent_provider_key, current_exemplar);
220
221    CacheRequestFacts {
222        provider: provider.to_string(),
223        stable_prefix_length,
224        stable_prefix_tokens,
225        required_min_tokens,
226        first_mismatch_span_id,
227        first_mismatch_sequence_index,
228        expected_hash_prefix,
229        actual_hash_prefix,
230        retention_window_secs,
231        observed_gap_secs,
232        missing_facts,
233    }
234}
235
236fn resolve_anthropic_min_tokens(model: Option<&str>) -> u32 {
237    let registry = CapabilityRegistry::with_defaults();
238    model
239        .and_then(|model| {
240            registry.get_backend("anthropic").and_then(|backend| {
241                backend
242                    .model_families
243                    .get(model)
244                    .or_else(|| {
245                        backend
246                            .model_families
247                            .iter()
248                            .filter(|(family, _)| model.starts_with(family.as_str()))
249                            .max_by_key(|(family, _)| family.len())
250                            .map(|(_, caps)| caps)
251                    })
252                    .and_then(|family| family.min_cacheable_tokens)
253            })
254        })
255        .unwrap_or(DEFAULT_ANTHROPIC_MIN_TOKENS)
256}
257
258fn first_mismatch(
259    previous: &[(String, u32, String)],
260    current: &[(String, u32, String)],
261) -> Option<(String, u32, String, String)> {
262    previous.iter().zip(current.iter()).find_map(
263        |(
264            (expected_span_id, expected_sequence_index, expected_hash),
265            (actual_span_id, actual_sequence_index, actual_hash),
266        )| {
267            if expected_span_id != actual_span_id
268                || expected_sequence_index != actual_sequence_index
269                || expected_hash != actual_hash
270            {
271                Some((
272                    actual_span_id.clone(),
273                    *actual_sequence_index,
274                    expected_hash.clone(),
275                    actual_hash.clone(),
276                ))
277            } else {
278                None
279            }
280        },
281    )
282}
283
284fn prefix_hash<'a>(stable_contents: impl Iterator<Item = &'a str>) -> String {
285    let joined = stable_contents
286        .map(sha256_hex)
287        .collect::<Vec<_>>()
288        .join("|");
289    sha256_hex(&joined)
290}
291
292fn short_hash_prefix(content: &str) -> String {
293    let full_hash = sha256_hex(content);
294    format!(
295        "sha256:{}",
296        &full_hash["sha256:".len()..][..HASH_PREFIX_LEN]
297    )
298}
299
300#[cfg(test)]
301#[path = "../tests/unit/cache_diagnostics_tests.rs"]
302mod tests;