1use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use crate::acg::canonicalize::sha256_hex;
10use crate::acg::ir_builder::build_prompt_ir;
11use crate::acg::prompt_ir::PromptIR;
12use crate::acg::{CacheRequestFacts, CapabilityRegistry};
13use chrono::{DateTime, Utc};
14use nemo_flow::codec::request::AnnotatedLlmRequest;
15
16use crate::acg_profile::derive_acg_learning_key;
17use crate::types::cache::HotCache;
18
19const DEFAULT_ANTHROPIC_MIN_TOKENS: u32 = 1024;
20const OPENAI_MIN_TOKENS: u32 = 1024;
21const ANTHROPIC_RETENTION_WINDOW_SECS: f64 = 300.0;
22const HASH_PREFIX_LEN: usize = 12;
23
24type StablePrefixKey = (String, String, String);
25type StablePrefixExemplar = Vec<(String, u32, String)>;
26type AgentProviderKey = (String, String);
27
28struct CacheFactsBuildInput<'a> {
29 agent_id: &'a str,
30 provider: &'a str,
31 model: Option<&'a str>,
32 prompt_ir: &'a PromptIR,
33 hot_cache: &'a HotCache,
34 profile_key: &'a str,
35 now: DateTime<Utc>,
36}
37
38#[derive(Debug, Default)]
40pub struct CacheDiagnosticsTracker {
41 pub last_seen_by_prefix: HashMap<StablePrefixKey, DateTime<Utc>>,
43 pub last_exemplar_by_agent: HashMap<AgentProviderKey, StablePrefixExemplar>,
45}
46
47#[must_use]
49pub fn build_cache_request_facts(
50 agent_id: &str,
51 provider: &str,
52 annotated_request: &AnnotatedLlmRequest,
53 hot_cache: &Arc<RwLock<HotCache>>,
54 tracker: &Arc<RwLock<CacheDiagnosticsTracker>>,
55) -> Option<CacheRequestFacts> {
56 let prompt_ir = build_prompt_ir(annotated_request).ok()?;
57 let hot_cache = hot_cache.read().ok()?;
58 let mut tracker = tracker.write().ok()?;
59 let profile_key = derive_acg_learning_key(agent_id, annotated_request);
60
61 Some(build_cache_request_facts_from_prompt_ir(
62 CacheFactsBuildInput {
63 agent_id,
64 provider,
65 model: annotated_request.model.as_deref(),
66 prompt_ir: &prompt_ir,
67 hot_cache: &hot_cache,
68 profile_key: &profile_key,
69 now: Utc::now(),
70 },
71 &mut tracker,
72 ))
73}
74
75fn build_cache_request_facts_from_prompt_ir(
76 input: CacheFactsBuildInput<'_>,
77 tracker: &mut CacheDiagnosticsTracker,
78) -> CacheRequestFacts {
79 let CacheFactsBuildInput {
80 agent_id,
81 provider,
82 model,
83 prompt_ir,
84 hot_cache,
85 profile_key,
86 now,
87 } = input;
88
89 let Some(stability) = hot_cache
90 .acg_profiles
91 .get(profile_key)
92 .or(hot_cache.acg_stability.as_ref())
93 else {
94 return CacheRequestFacts {
95 provider: provider.to_string(),
96 stable_prefix_length: 0,
97 stable_prefix_tokens: None,
98 required_min_tokens: None,
99 first_mismatch_span_id: None,
100 first_mismatch_sequence_index: None,
101 expected_hash_prefix: None,
102 actual_hash_prefix: None,
103 retention_window_secs: None,
104 observed_gap_secs: None,
105 missing_facts: vec!["acg_stability_unavailable".to_string()],
106 };
107 };
108
109 let stable_prefix_length = stability.stable_prefix_length;
110 let stable_blocks = prompt_ir
111 .blocks
112 .iter()
113 .take(stable_prefix_length)
114 .collect::<Vec<_>>();
115 let mut missing_facts = Vec::new();
116
117 let stable_prefix_tokens = if stable_blocks.len() < stable_prefix_length {
118 missing_facts.push("stable_prefix_tokens_unavailable".to_string());
119 None
120 } else {
121 stable_blocks
122 .iter()
123 .try_fold(0_u32, |acc, block| {
124 block
125 .token_metadata
126 .as_ref()
127 .and_then(|meta| acc.checked_add(meta.token_count))
128 })
129 .or_else(|| {
130 if stable_blocks
131 .iter()
132 .any(|block| block.token_metadata.is_none())
133 {
134 missing_facts.push("stable_prefix_tokens_unavailable".to_string());
135 }
136 None
137 })
138 };
139
140 let current_exemplar = stable_blocks
141 .iter()
142 .map(|block| {
143 (
144 block.span_id.0.clone(),
145 block.sequence_index,
146 short_hash_prefix(&block.content),
147 )
148 })
149 .collect::<Vec<_>>();
150 let current_prefix_hash =
151 if stable_prefix_length == 0 || stable_blocks.len() < stable_prefix_length {
152 None
153 } else {
154 Some(prefix_hash(
155 stable_blocks.iter().map(|block| block.content.as_str()),
156 ))
157 };
158
159 let agent_provider_key = (agent_id.to_string(), provider.to_string());
160 let first_mismatch = tracker
161 .last_exemplar_by_agent
162 .get(&agent_provider_key)
163 .and_then(|previous| first_mismatch(previous, ¤t_exemplar));
164
165 let (
166 first_mismatch_span_id,
167 first_mismatch_sequence_index,
168 expected_hash_prefix,
169 actual_hash_prefix,
170 ) = if let Some((span_id, sequence_index, expected_hash_prefix, actual_hash_prefix)) =
171 first_mismatch
172 {
173 (
174 Some(span_id),
175 Some(sequence_index),
176 Some(expected_hash_prefix),
177 Some(actual_hash_prefix),
178 )
179 } else {
180 (None, None, None, None)
181 };
182
183 let required_min_tokens = match provider {
184 "anthropic" => Some(resolve_anthropic_min_tokens(model)),
185 "openai" => Some(OPENAI_MIN_TOKENS),
186 _ => None,
187 };
188 let retention_window_secs = if provider == "anthropic" {
189 Some(ANTHROPIC_RETENTION_WINDOW_SECS)
190 } else {
191 None
192 };
193
194 let observed_gap_secs = current_prefix_hash.as_ref().and_then(|stable_prefix_hash| {
195 tracker
196 .last_seen_by_prefix
197 .get(&(
198 agent_id.to_string(),
199 provider.to_string(),
200 stable_prefix_hash.clone(),
201 ))
202 .map(|last_seen| {
203 (now.signed_duration_since(*last_seen).num_milliseconds() as f64 / 1000.0).max(0.0)
204 })
205 });
206
207 if let Some(stable_prefix_hash) = current_prefix_hash {
208 tracker.last_seen_by_prefix.insert(
209 (
210 agent_id.to_string(),
211 provider.to_string(),
212 stable_prefix_hash,
213 ),
214 now,
215 );
216 }
217 tracker
218 .last_exemplar_by_agent
219 .insert(agent_provider_key, current_exemplar);
220
221 CacheRequestFacts {
222 provider: provider.to_string(),
223 stable_prefix_length,
224 stable_prefix_tokens,
225 required_min_tokens,
226 first_mismatch_span_id,
227 first_mismatch_sequence_index,
228 expected_hash_prefix,
229 actual_hash_prefix,
230 retention_window_secs,
231 observed_gap_secs,
232 missing_facts,
233 }
234}
235
236fn resolve_anthropic_min_tokens(model: Option<&str>) -> u32 {
237 let registry = CapabilityRegistry::with_defaults();
238 model
239 .and_then(|model| {
240 registry.get_backend("anthropic").and_then(|backend| {
241 backend
242 .model_families
243 .get(model)
244 .or_else(|| {
245 backend
246 .model_families
247 .iter()
248 .filter(|(family, _)| model.starts_with(family.as_str()))
249 .max_by_key(|(family, _)| family.len())
250 .map(|(_, caps)| caps)
251 })
252 .and_then(|family| family.min_cacheable_tokens)
253 })
254 })
255 .unwrap_or(DEFAULT_ANTHROPIC_MIN_TOKENS)
256}
257
258fn first_mismatch(
259 previous: &[(String, u32, String)],
260 current: &[(String, u32, String)],
261) -> Option<(String, u32, String, String)> {
262 previous.iter().zip(current.iter()).find_map(
263 |(
264 (expected_span_id, expected_sequence_index, expected_hash),
265 (actual_span_id, actual_sequence_index, actual_hash),
266 )| {
267 if expected_span_id != actual_span_id
268 || expected_sequence_index != actual_sequence_index
269 || expected_hash != actual_hash
270 {
271 Some((
272 actual_span_id.clone(),
273 *actual_sequence_index,
274 expected_hash.clone(),
275 actual_hash.clone(),
276 ))
277 } else {
278 None
279 }
280 },
281 )
282}
283
284fn prefix_hash<'a>(stable_contents: impl Iterator<Item = &'a str>) -> String {
285 let joined = stable_contents
286 .map(sha256_hex)
287 .collect::<Vec<_>>()
288 .join("|");
289 sha256_hex(&joined)
290}
291
292fn short_hash_prefix(content: &str) -> String {
293 let full_hash = sha256_hex(content);
294 format!(
295 "sha256:{}",
296 &full_hash["sha256:".len()..][..HASH_PREFIX_LEN]
297 )
298}
299
300#[cfg(test)]
301#[path = "../tests/unit/cache_diagnostics_tests.rs"]
302mod tests;