1use chrono::{DateTime, Utc};
13use nemo_flow::codec::response::Usage;
14use serde::{Deserialize, Serialize};
15use uuid::Uuid;
16
17use super::types::AgentIdentity;
18
19#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
25pub struct CacheRequestFacts {
26 pub provider: String,
28 pub stable_prefix_length: usize,
30 #[serde(skip_serializing_if = "Option::is_none")]
32 #[serde(default)]
33 pub stable_prefix_tokens: Option<u32>,
34 #[serde(skip_serializing_if = "Option::is_none")]
36 #[serde(default)]
37 pub required_min_tokens: Option<u32>,
38 #[serde(skip_serializing_if = "Option::is_none")]
40 #[serde(default)]
41 pub first_mismatch_span_id: Option<String>,
42 #[serde(skip_serializing_if = "Option::is_none")]
44 #[serde(default)]
45 pub first_mismatch_sequence_index: Option<u32>,
46 #[serde(skip_serializing_if = "Option::is_none")]
48 #[serde(default)]
49 pub expected_hash_prefix: Option<String>,
50 #[serde(skip_serializing_if = "Option::is_none")]
52 #[serde(default)]
53 pub actual_hash_prefix: Option<String>,
54 #[serde(skip_serializing_if = "Option::is_none")]
56 #[serde(default)]
57 pub retention_window_secs: Option<f64>,
58 #[serde(skip_serializing_if = "Option::is_none")]
60 #[serde(default)]
61 pub observed_gap_secs: Option<f64>,
62 #[serde(default)]
64 pub missing_facts: Vec<String>,
65}
66
67#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
69pub struct CacheMissDiagnosis {
70 pub summary: String,
72 pub recommendation: String,
74 pub evidence: CacheMissEvidence,
76}
77
78#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
80#[serde(tag = "kind", rename_all = "snake_case")]
81pub enum CacheMissEvidence {
82 PrefixMismatch {
84 first_mismatch_span_id: String,
86 sequence_index: u32,
88 expected_hash_prefix: String,
90 actual_hash_prefix: String,
92 },
93 BelowMinimumThreshold {
95 observed_prefix_tokens: u32,
97 required_min_tokens: u32,
99 estimation_source: String,
101 },
102 RetentionExpired {
104 observed_gap_secs: f64,
106 retention_window_secs: f64,
108 provider_semantics: String,
110 },
111 Unknown {
113 missing_facts: Vec<String>,
115 },
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
129#[serde(tag = "reason", rename_all = "snake_case")]
130pub enum CacheMissReason {
131 PrefixMismatch,
133 BelowMinimumThreshold,
135 RetentionExpired,
137 RoutingMismatch,
139 Evicted,
141 UnsupportedFeature,
143 ColdStart,
145 Unknown,
147 Other {
149 description: String,
151 },
152}
153
154#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
162pub struct CacheHitRate {
163 pub hit_rate: f64,
165 pub sample_count: u32,
167 pub window_duration_secs: f64,
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
177pub enum CacheTelemetryProvider {
178 Anthropic,
180 OpenAI,
182}
183
184impl CacheTelemetryProvider {
185 #[must_use]
187 pub fn as_str(self) -> &'static str {
188 match self {
189 Self::Anthropic => "anthropic",
190 Self::OpenAI => "openai",
191 }
192 }
193}
194
195#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
201pub struct CacheTelemetryEvent {
202 pub request_id: Uuid,
204 pub agent_identity: AgentIdentity,
206 pub cache_read_tokens: u64,
208 pub cache_creation_tokens: u64,
210 pub total_prompt_tokens: u64,
212 pub hit_rate: f64,
214 #[serde(skip_serializing_if = "Option::is_none")]
216 #[serde(default)]
217 pub miss_reason: Option<CacheMissReason>,
218 #[serde(skip_serializing_if = "Option::is_none")]
220 #[serde(default)]
221 pub miss_diagnosis: Option<CacheMissDiagnosis>,
222 pub provider: String,
224 pub timestamp: DateTime<Utc>,
226}
227
228impl CacheTelemetryEvent {
229 pub fn compute_hit_rate(cache_read_tokens: u64, total_prompt_tokens: u64) -> f64 {
232 if total_prompt_tokens == 0 {
233 0.0
234 } else {
235 cache_read_tokens as f64 / total_prompt_tokens as f64
236 }
237 }
238
239 #[must_use]
244 pub fn from_usage(
245 request_id: Uuid,
246 agent_identity: AgentIdentity,
247 provider: CacheTelemetryProvider,
248 usage: &Usage,
249 timestamp: DateTime<Utc>,
250 request_facts: Option<&CacheRequestFacts>,
251 ) -> Option<Self> {
252 let prompt_tokens = usage.prompt_tokens?;
253 let cache_read_tokens = usage.cache_read_tokens.unwrap_or(0);
254
255 let (cache_creation_tokens, total_prompt_tokens) = match provider {
256 CacheTelemetryProvider::Anthropic => {
257 let cache_creation_tokens = usage.cache_write_tokens.unwrap_or(0);
258 let total_prompt_tokens = prompt_tokens + cache_read_tokens + cache_creation_tokens;
259 (cache_creation_tokens, total_prompt_tokens)
260 }
261 CacheTelemetryProvider::OpenAI => (0, prompt_tokens),
262 };
263
264 let (miss_reason, miss_diagnosis) = if cache_read_tokens > 0 {
265 (None, None)
266 } else if matches!(provider, CacheTelemetryProvider::Anthropic) && cache_creation_tokens > 0
267 {
268 (Some(CacheMissReason::ColdStart), None)
269 } else {
270 classify_cache_miss(provider, request_facts)
271 };
272
273 Some(Self {
274 request_id,
275 agent_identity,
276 cache_read_tokens,
277 cache_creation_tokens,
278 total_prompt_tokens,
279 hit_rate: Self::compute_hit_rate(cache_read_tokens, total_prompt_tokens),
280 miss_reason,
281 miss_diagnosis,
282 provider: provider.as_str().to_string(),
283 timestamp,
284 })
285 }
286}
287
288fn classify_cache_miss(
289 provider: CacheTelemetryProvider,
290 request_facts: Option<&CacheRequestFacts>,
291) -> (Option<CacheMissReason>, Option<CacheMissDiagnosis>) {
292 if let Some(diagnosis) = prefix_mismatch_diagnosis(request_facts) {
293 return (Some(CacheMissReason::PrefixMismatch), Some(diagnosis));
294 }
295
296 if let Some(diagnosis) = below_minimum_threshold_diagnosis(request_facts) {
297 return (
298 Some(CacheMissReason::BelowMinimumThreshold),
299 Some(diagnosis),
300 );
301 }
302
303 if let Some(diagnosis) = retention_expired_diagnosis(provider, request_facts) {
304 return (Some(CacheMissReason::RetentionExpired), Some(diagnosis));
305 }
306
307 (
308 Some(CacheMissReason::Unknown),
309 Some(unknown_diagnosis(request_facts)),
310 )
311}
312
313fn prefix_mismatch_diagnosis(
314 request_facts: Option<&CacheRequestFacts>,
315) -> Option<CacheMissDiagnosis> {
316 let facts = request_facts?;
317 let span_id = facts.first_mismatch_span_id.as_ref()?;
318 let sequence_index = facts.first_mismatch_sequence_index?;
319 let expected_hash_prefix = facts.expected_hash_prefix.as_ref()?;
320 let actual_hash_prefix = facts.actual_hash_prefix.as_ref()?;
321
322 Some(CacheMissDiagnosis {
323 summary: format!(
324 "Stable prefix diverged at span {} before cache reuse.",
325 span_id
326 ),
327 recommendation: "Move or extract the mismatching block after the stable prefix."
328 .to_string(),
329 evidence: CacheMissEvidence::PrefixMismatch {
330 first_mismatch_span_id: span_id.clone(),
331 sequence_index,
332 expected_hash_prefix: canonicalize_hash_prefix(expected_hash_prefix),
333 actual_hash_prefix: canonicalize_hash_prefix(actual_hash_prefix),
334 },
335 })
336}
337
338fn below_minimum_threshold_diagnosis(
339 request_facts: Option<&CacheRequestFacts>,
340) -> Option<CacheMissDiagnosis> {
341 let facts = request_facts?;
342 let observed_prefix_tokens = facts.stable_prefix_tokens?;
343 let required_min_tokens = facts.required_min_tokens?;
344 if observed_prefix_tokens >= required_min_tokens {
345 return None;
346 }
347
348 Some(CacheMissDiagnosis {
349 summary: format!(
350 "Stable prefix has {observed_prefix_tokens} tokens, below the {required_min_tokens}-token cache minimum."
351 ),
352 recommendation:
353 "Increase the cacheable prefix above the provider minimum or stop expecting a hit."
354 .to_string(),
355 evidence: CacheMissEvidence::BelowMinimumThreshold {
356 observed_prefix_tokens,
357 required_min_tokens,
358 estimation_source: "prompt_ir_token_metadata".to_string(),
359 },
360 })
361}
362
363fn retention_expired_diagnosis(
364 provider: CacheTelemetryProvider,
365 request_facts: Option<&CacheRequestFacts>,
366) -> Option<CacheMissDiagnosis> {
367 if !matches!(provider, CacheTelemetryProvider::Anthropic) {
368 return None;
369 }
370
371 let facts = request_facts?;
372 let observed_gap_secs = facts.observed_gap_secs?;
373 let retention_window_secs = facts.retention_window_secs?;
374 if observed_gap_secs <= retention_window_secs {
375 return None;
376 }
377
378 Some(CacheMissDiagnosis {
379 summary: format!(
380 "Stable prefix reuse arrived {:.1}s after the {:.1}s retention window.",
381 observed_gap_secs, retention_window_secs
382 ),
383 recommendation:
384 "Reuse the stable prefix inside the active retention window or accept a cold rebuild."
385 .to_string(),
386 evidence: CacheMissEvidence::RetentionExpired {
387 observed_gap_secs,
388 retention_window_secs,
389 provider_semantics:
390 "anthropic prompt caching reuses prefixes inside the active retention window"
391 .to_string(),
392 },
393 })
394}
395
396fn unknown_diagnosis(request_facts: Option<&CacheRequestFacts>) -> CacheMissDiagnosis {
397 let missing_facts = request_facts.map_or_else(
398 || vec!["request_facts_unavailable".to_string()],
399 |facts| facts.missing_facts.clone(),
400 );
401
402 CacheMissDiagnosis {
403 summary: "Cache miss could not be classified from the available request facts.".to_string(),
404 recommendation: "Capture request facts earlier or keep the miss classified as unknown."
405 .to_string(),
406 evidence: CacheMissEvidence::Unknown { missing_facts },
407 }
408}
409
410fn canonicalize_hash_prefix(value: &str) -> String {
411 const PREFIX: &str = "sha256:";
412 const HEX_LEN: usize = 12;
413
414 let suffix = value
415 .strip_prefix(PREFIX)
416 .unwrap_or(value)
417 .chars()
418 .take(HEX_LEN)
419 .collect::<String>();
420
421 format!("{PREFIX}{suffix}")
422}
423
424#[cfg(test)]
425#[path = "../../tests/unit/acg/telemetry_tests.rs"]
426mod tests;