Skip to main content

nemo_flow_adaptive/acg/
telemetry.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Cache performance telemetry types for the Adaptive Cache Governor (ACG)
5//! system.
6//!
7//! These types normalize provider-specific cache metrics (Anthropic
8//! `cache_read_input_tokens`/`cache_creation_input_tokens`, OpenAI
9//! `cached_tokens`) into a common schema for uniform measurement.
10//! Populated by provider-specific normalization logic in Phase 9.
11
12use chrono::{DateTime, Utc};
13use nemo_flow::codec::response::Usage;
14use serde::{Deserialize, Serialize};
15use uuid::Uuid;
16
17use super::types::AgentIdentity;
18
19// ===================================================================
20// Cache miss diagnosis contract
21// ===================================================================
22
23/// Request-time facts used to classify a cache miss without leaking prompt text.
24#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
25pub struct CacheRequestFacts {
26    /// Canonical provider string associated with the request facts.
27    pub provider: String,
28    /// Number of stable prefix blocks observed in the request.
29    pub stable_prefix_length: usize,
30    /// Token count for the stable prefix when it can be measured safely.
31    #[serde(skip_serializing_if = "Option::is_none")]
32    #[serde(default)]
33    pub stable_prefix_tokens: Option<u32>,
34    /// Minimum provider threshold required for cache reuse.
35    #[serde(skip_serializing_if = "Option::is_none")]
36    #[serde(default)]
37    pub required_min_tokens: Option<u32>,
38    /// Span ID of the first stable block that mismatched the retained exemplar.
39    #[serde(skip_serializing_if = "Option::is_none")]
40    #[serde(default)]
41    pub first_mismatch_span_id: Option<String>,
42    /// Sequence index of the first mismatching stable block.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    #[serde(default)]
45    pub first_mismatch_sequence_index: Option<u32>,
46    /// Expected short SHA-256 hash prefix for the first mismatching block.
47    #[serde(skip_serializing_if = "Option::is_none")]
48    #[serde(default)]
49    pub expected_hash_prefix: Option<String>,
50    /// Actual short SHA-256 hash prefix for the first mismatching block.
51    #[serde(skip_serializing_if = "Option::is_none")]
52    #[serde(default)]
53    pub actual_hash_prefix: Option<String>,
54    /// Active cache retention window in seconds when provider semantics expose one.
55    #[serde(skip_serializing_if = "Option::is_none")]
56    #[serde(default)]
57    pub retention_window_secs: Option<f64>,
58    /// Observed elapsed time since the same stable prefix was last seen.
59    #[serde(skip_serializing_if = "Option::is_none")]
60    #[serde(default)]
61    pub observed_gap_secs: Option<f64>,
62    /// Facts that were unavailable when the runtime attempted diagnosis.
63    #[serde(default)]
64    pub missing_facts: Vec<String>,
65}
66
67/// Structured diagnosis for a cache miss.
68#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
69pub struct CacheMissDiagnosis {
70    /// Single-line bounded explanation of the miss.
71    pub summary: String,
72    /// Single actionable follow-up for the caller.
73    pub recommendation: String,
74    /// Evidence supporting the diagnosis.
75    pub evidence: CacheMissEvidence,
76}
77
78/// Typed evidence for a cache miss diagnosis.
79#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
80#[serde(tag = "kind", rename_all = "snake_case")]
81pub enum CacheMissEvidence {
82    /// Stable prefix diverged from the retained exemplar.
83    PrefixMismatch {
84        /// Span ID of the first mismatching stable block.
85        first_mismatch_span_id: String,
86        /// Zero-based sequence index of the mismatching block.
87        sequence_index: u32,
88        /// Expected short SHA-256 hash prefix.
89        expected_hash_prefix: String,
90        /// Actual short SHA-256 hash prefix.
91        actual_hash_prefix: String,
92    },
93    /// Stable prefix is too short for provider cache reuse.
94    BelowMinimumThreshold {
95        /// Observed stable prefix tokens.
96        observed_prefix_tokens: u32,
97        /// Required minimum tokens for cache reuse.
98        required_min_tokens: u32,
99        /// Source of the token estimate.
100        estimation_source: String,
101    },
102    /// Stable prefix likely aged out of the provider retention window.
103    RetentionExpired {
104        /// Observed gap between requests with the same stable prefix.
105        observed_gap_secs: f64,
106        /// Provider retention window in seconds.
107        retention_window_secs: f64,
108        /// Human-readable provider semantics summary.
109        provider_semantics: String,
110    },
111    /// Diagnosis could not be justified from the available facts.
112    Unknown {
113        /// List of facts that were unavailable at classification time.
114        missing_facts: Vec<String>,
115    },
116}
117
118// ===================================================================
119// Cache miss reason taxonomy
120// ===================================================================
121
122/// Reason why a cache miss occurred.
123///
124/// Covers 8 determinable reasons plus an extensible `Other` variant.
125/// Uses internally-tagged JSON representation (`"reason"` field) so
126/// each variant serializes as `{"reason": "snake_case"}` and the
127/// `Other` variant additionally carries a `description` field.
128#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
129#[serde(tag = "reason", rename_all = "snake_case")]
130pub enum CacheMissReason {
131    /// Prompt prefix didn't match cached prefix.
132    PrefixMismatch,
133    /// Stable prefix shorter than provider minimum for caching.
134    BelowMinimumThreshold,
135    /// Cached prefix retention window elapsed.
136    RetentionExpired,
137    /// Request routed to different worker/pool.
138    RoutingMismatch,
139    /// Cache evicted due to capacity pressure.
140    Evicted,
141    /// Backend/model doesn't support caching.
142    UnsupportedFeature,
143    /// First request for this prefix (no prior cache entry).
144    ColdStart,
145    /// Reason could not be determined from provider response.
146    Unknown,
147    /// Extensible escape hatch for reasons not yet in the enum.
148    Other {
149        /// Human-readable description of the miss reason.
150        description: String,
151    },
152}
153
154// ===================================================================
155// Cache hit rate (aggregated)
156// ===================================================================
157
158/// Aggregated cache hit rate over a time window.
159///
160/// Used for dashboard metrics and trend analysis.
161#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
162pub struct CacheHitRate {
163    /// Hit rate in the range `[0.0, 1.0]`.
164    pub hit_rate: f64,
165    /// Number of requests in the measurement window.
166    pub sample_count: u32,
167    /// Duration of the measurement window in seconds.
168    pub window_duration_secs: f64,
169}
170
171// ===================================================================
172// Cache telemetry event (per-call)
173// ===================================================================
174
175/// Cache telemetry provider identity for canonical `Usage` normalization.
176#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
177pub enum CacheTelemetryProvider {
178    /// Anthropic Messages cache telemetry semantics.
179    Anthropic,
180    /// OpenAI Chat/Responses cache telemetry semantics.
181    OpenAI,
182}
183
184impl CacheTelemetryProvider {
185    /// Returns the canonical provider string for serialized cache telemetry.
186    #[must_use]
187    pub fn as_str(self) -> &'static str {
188        match self {
189            Self::Anthropic => "anthropic",
190            Self::OpenAI => "openai",
191        }
192    }
193}
194
195/// Per-call cache telemetry event.
196///
197/// Captures provider-agnostic cache metrics for a single LLM request.
198/// The `agent_identity` field cross-references the Phase 3
199/// [`AgentIdentity`] type for per-agent grouping.
200#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
201pub struct CacheTelemetryEvent {
202    /// Request ID this telemetry pertains to.
203    pub request_id: Uuid,
204    /// Identity of the agent that issued the request.
205    pub agent_identity: AgentIdentity,
206    /// Number of tokens served from cache.
207    pub cache_read_tokens: u64,
208    /// Number of tokens written to cache.
209    pub cache_creation_tokens: u64,
210    /// Total prompt tokens (for hit rate calculation).
211    pub total_prompt_tokens: u64,
212    /// Computed cache hit rate `[0.0, 1.0]`.
213    pub hit_rate: f64,
214    /// Reason for cache miss, if applicable.
215    #[serde(skip_serializing_if = "Option::is_none")]
216    #[serde(default)]
217    pub miss_reason: Option<CacheMissReason>,
218    /// Structured miss diagnosis, when the miss can be justified safely.
219    #[serde(skip_serializing_if = "Option::is_none")]
220    #[serde(default)]
221    pub miss_diagnosis: Option<CacheMissDiagnosis>,
222    /// Provider name (e.g., "anthropic", "openai").
223    pub provider: String,
224    /// When this telemetry was recorded.
225    pub timestamp: DateTime<Utc>,
226}
227
228impl CacheTelemetryEvent {
229    /// Computes hit rate from token counts. Returns `0.0` if
230    /// `total_prompt_tokens` is zero to avoid division by zero.
231    pub fn compute_hit_rate(cache_read_tokens: u64, total_prompt_tokens: u64) -> f64 {
232        if total_prompt_tokens == 0 {
233            0.0
234        } else {
235            cache_read_tokens as f64 / total_prompt_tokens as f64
236        }
237    }
238
239    /// Builds a canonical cache telemetry event from normalized usage fields.
240    ///
241    /// Returns `None` when the normalized usage payload does not contain
242    /// `prompt_tokens`, because Phase 10 does not invent missing totals.
243    #[must_use]
244    pub fn from_usage(
245        request_id: Uuid,
246        agent_identity: AgentIdentity,
247        provider: CacheTelemetryProvider,
248        usage: &Usage,
249        timestamp: DateTime<Utc>,
250        request_facts: Option<&CacheRequestFacts>,
251    ) -> Option<Self> {
252        let prompt_tokens = usage.prompt_tokens?;
253        let cache_read_tokens = usage.cache_read_tokens.unwrap_or(0);
254
255        let (cache_creation_tokens, total_prompt_tokens) = match provider {
256            CacheTelemetryProvider::Anthropic => {
257                let cache_creation_tokens = usage.cache_write_tokens.unwrap_or(0);
258                let total_prompt_tokens = prompt_tokens + cache_read_tokens + cache_creation_tokens;
259                (cache_creation_tokens, total_prompt_tokens)
260            }
261            CacheTelemetryProvider::OpenAI => (0, prompt_tokens),
262        };
263
264        let (miss_reason, miss_diagnosis) = if cache_read_tokens > 0 {
265            (None, None)
266        } else if matches!(provider, CacheTelemetryProvider::Anthropic) && cache_creation_tokens > 0
267        {
268            (Some(CacheMissReason::ColdStart), None)
269        } else {
270            classify_cache_miss(provider, request_facts)
271        };
272
273        Some(Self {
274            request_id,
275            agent_identity,
276            cache_read_tokens,
277            cache_creation_tokens,
278            total_prompt_tokens,
279            hit_rate: Self::compute_hit_rate(cache_read_tokens, total_prompt_tokens),
280            miss_reason,
281            miss_diagnosis,
282            provider: provider.as_str().to_string(),
283            timestamp,
284        })
285    }
286}
287
288fn classify_cache_miss(
289    provider: CacheTelemetryProvider,
290    request_facts: Option<&CacheRequestFacts>,
291) -> (Option<CacheMissReason>, Option<CacheMissDiagnosis>) {
292    if let Some(diagnosis) = prefix_mismatch_diagnosis(request_facts) {
293        return (Some(CacheMissReason::PrefixMismatch), Some(diagnosis));
294    }
295
296    if let Some(diagnosis) = below_minimum_threshold_diagnosis(request_facts) {
297        return (
298            Some(CacheMissReason::BelowMinimumThreshold),
299            Some(diagnosis),
300        );
301    }
302
303    if let Some(diagnosis) = retention_expired_diagnosis(provider, request_facts) {
304        return (Some(CacheMissReason::RetentionExpired), Some(diagnosis));
305    }
306
307    (
308        Some(CacheMissReason::Unknown),
309        Some(unknown_diagnosis(request_facts)),
310    )
311}
312
313fn prefix_mismatch_diagnosis(
314    request_facts: Option<&CacheRequestFacts>,
315) -> Option<CacheMissDiagnosis> {
316    let facts = request_facts?;
317    let span_id = facts.first_mismatch_span_id.as_ref()?;
318    let sequence_index = facts.first_mismatch_sequence_index?;
319    let expected_hash_prefix = facts.expected_hash_prefix.as_ref()?;
320    let actual_hash_prefix = facts.actual_hash_prefix.as_ref()?;
321
322    Some(CacheMissDiagnosis {
323        summary: format!(
324            "Stable prefix diverged at span {} before cache reuse.",
325            span_id
326        ),
327        recommendation: "Move or extract the mismatching block after the stable prefix."
328            .to_string(),
329        evidence: CacheMissEvidence::PrefixMismatch {
330            first_mismatch_span_id: span_id.clone(),
331            sequence_index,
332            expected_hash_prefix: canonicalize_hash_prefix(expected_hash_prefix),
333            actual_hash_prefix: canonicalize_hash_prefix(actual_hash_prefix),
334        },
335    })
336}
337
338fn below_minimum_threshold_diagnosis(
339    request_facts: Option<&CacheRequestFacts>,
340) -> Option<CacheMissDiagnosis> {
341    let facts = request_facts?;
342    let observed_prefix_tokens = facts.stable_prefix_tokens?;
343    let required_min_tokens = facts.required_min_tokens?;
344    if observed_prefix_tokens >= required_min_tokens {
345        return None;
346    }
347
348    Some(CacheMissDiagnosis {
349        summary: format!(
350            "Stable prefix has {observed_prefix_tokens} tokens, below the {required_min_tokens}-token cache minimum."
351        ),
352        recommendation:
353            "Increase the cacheable prefix above the provider minimum or stop expecting a hit."
354                .to_string(),
355        evidence: CacheMissEvidence::BelowMinimumThreshold {
356            observed_prefix_tokens,
357            required_min_tokens,
358            estimation_source: "prompt_ir_token_metadata".to_string(),
359        },
360    })
361}
362
363fn retention_expired_diagnosis(
364    provider: CacheTelemetryProvider,
365    request_facts: Option<&CacheRequestFacts>,
366) -> Option<CacheMissDiagnosis> {
367    if !matches!(provider, CacheTelemetryProvider::Anthropic) {
368        return None;
369    }
370
371    let facts = request_facts?;
372    let observed_gap_secs = facts.observed_gap_secs?;
373    let retention_window_secs = facts.retention_window_secs?;
374    if observed_gap_secs <= retention_window_secs {
375        return None;
376    }
377
378    Some(CacheMissDiagnosis {
379        summary: format!(
380            "Stable prefix reuse arrived {:.1}s after the {:.1}s retention window.",
381            observed_gap_secs, retention_window_secs
382        ),
383        recommendation:
384            "Reuse the stable prefix inside the active retention window or accept a cold rebuild."
385                .to_string(),
386        evidence: CacheMissEvidence::RetentionExpired {
387            observed_gap_secs,
388            retention_window_secs,
389            provider_semantics:
390                "anthropic prompt caching reuses prefixes inside the active retention window"
391                    .to_string(),
392        },
393    })
394}
395
396fn unknown_diagnosis(request_facts: Option<&CacheRequestFacts>) -> CacheMissDiagnosis {
397    let missing_facts = request_facts.map_or_else(
398        || vec!["request_facts_unavailable".to_string()],
399        |facts| facts.missing_facts.clone(),
400    );
401
402    CacheMissDiagnosis {
403        summary: "Cache miss could not be classified from the available request facts.".to_string(),
404        recommendation: "Capture request facts earlier or keep the miss classified as unknown."
405            .to_string(),
406        evidence: CacheMissEvidence::Unknown { missing_facts },
407    }
408}
409
410fn canonicalize_hash_prefix(value: &str) -> String {
411    const PREFIX: &str = "sha256:";
412    const HEX_LEN: usize = 12;
413
414    let suffix = value
415        .strip_prefix(PREFIX)
416        .unwrap_or(value)
417        .chars()
418        .take(HEX_LEN)
419        .collect::<String>();
420
421    format!("{PREFIX}{suffix}")
422}
423
424#[cfg(test)]
425#[path = "../../tests/unit/acg/telemetry_tests.rs"]
426mod tests;