Skip to main content

agent_engine/runtime/
telemetry.rs

1//! Structured per-request API telemetry.
2//!
3//! Opt-in via the `telemetry` config key (`off` | `basic` | `full`).
4//! Writes one JSON record per API call to `~/.cache/synaps/api-log.jsonl`
5//! (mode 0600, O_NOFOLLOW — same hardening as the legacy usage log).
6//!
7//! `basic` records timing + usage + cost. `full` additionally records
8//! rate-limit headers and cache-diagnostics results when available.
9//!
10//! Writes are best-effort: a broken log path must never break the request
11//! pipeline. All errors are silently dropped (matching `log_usage`).
12
13use serde::Serialize;
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15
16/// Telemetry verbosity level, parsed from the `telemetry` config key.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
18pub enum TelemetryLevel {
19    #[default]
20    Off,
21    Basic,
22    Full,
23}
24
25impl TelemetryLevel {
26    pub fn from_str_key(s: &str) -> Self {
27        match s.trim().to_ascii_lowercase().as_str() {
28            "basic" | "on" | "1" | "true" => Self::Basic,
29            "full" => Self::Full,
30            _ => Self::Off,
31        }
32    }
33
34    pub fn enabled(&self) -> bool {
35        !matches!(self, Self::Off)
36    }
37
38    pub fn as_str(&self) -> &'static str {
39        match self {
40            Self::Off => "off",
41            Self::Basic => "basic",
42            Self::Full => "full",
43        }
44    }
45}
46
47/// Token usage for one API call, including the cache-creation TTL breakdown.
48#[derive(Debug, Clone, Default, Serialize)]
49pub struct UsageRecord {
50    pub input: u64,
51    pub output: u64,
52    pub cache_read: u64,
53    pub cache_write: u64,
54    /// Cache writes with 5-minute TTL (from `usage.cache_creation.ephemeral_5m_input_tokens`).
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub cache_write_5m: Option<u64>,
57    /// Cache writes with 1-hour TTL (from `usage.cache_creation.ephemeral_1h_input_tokens`).
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub cache_write_1h: Option<u64>,
60    /// Cache hit percentage: cache_read / (input + cache_read + cache_write) * 100.
61    pub hit_pct: f64,
62}
63
64impl UsageRecord {
65    pub fn compute_hit_pct(&mut self) {
66        let total = self.input + self.cache_read + self.cache_write;
67        self.hit_pct = if total > 0 {
68            (self.cache_read as f64 / total as f64 * 1000.0).round() / 10.0
69        } else {
70            0.0
71        };
72    }
73}
74
75/// Rate-limit headroom captured from `anthropic-ratelimit-*` response headers.
76/// Only recorded at `full` level.
77#[derive(Debug, Clone, Default, Serialize)]
78pub struct RateLimitRecord {
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub requests_limit: Option<u64>,
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub requests_remaining: Option<u64>,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub tokens_limit: Option<u64>,
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub tokens_remaining: Option<u64>,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub input_tokens_remaining: Option<u64>,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub output_tokens_remaining: Option<u64>,
91    /// RFC 3339 timestamp when the most restrictive token limit resets.
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub tokens_reset: Option<String>,
94}
95
96impl RateLimitRecord {
97    pub fn is_empty(&self) -> bool {
98        self.requests_limit.is_none()
99            && self.requests_remaining.is_none()
100            && self.tokens_limit.is_none()
101            && self.tokens_remaining.is_none()
102            && self.input_tokens_remaining.is_none()
103            && self.output_tokens_remaining.is_none()
104            && self.tokens_reset.is_none()
105    }
106}
107
108/// Cache-diagnostics result (beta `cache-diagnosis-2026-04-07`).
109/// Only present when the user opted in via `cache_diagnostics = true`.
110#[derive(Debug, Clone, Serialize)]
111pub struct CacheDiagRecord {
112    /// `cache_miss_reason.type` — e.g. "system_changed", "tools_changed".
113    pub miss_reason: String,
114    /// `cache_missed_input_tokens` — estimated tokens lost after divergence.
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub missed_tokens: Option<u64>,
117}
118
119/// Request-shape context: what we sent, for correlating cache behavior.
120#[derive(Debug, Clone, Default, Serialize)]
121pub struct ContextRecord {
122    pub messages: usize,
123    pub tools: usize,
124    pub system_bytes: usize,
125    /// Indices of user messages carrying a conversational cache_control marker.
126    pub breakpoints: Vec<usize>,
127}
128
129/// One JSONL record per API call.
130#[derive(Debug, Clone, Default, Serialize)]
131pub struct TelemetryRecord {
132    /// Unix epoch milliseconds at request completion.
133    pub ts: u64,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub request_id: Option<String>,
136    /// Anthropic message id (`msg_...`) from message_start.
137    #[serde(skip_serializing_if = "Option::is_none")]
138    pub msg_id: Option<String>,
139    pub model: String,
140    /// 1-based attempt number that succeeded (1 = no retries).
141    pub attempt: u32,
142    /// Milliseconds from request send to first SSE byte.
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub ttft_ms: Option<u64>,
145    /// Milliseconds from request send to stream close.
146    pub total_ms: u64,
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub stop_reason: Option<String>,
149    pub usage: UsageRecord,
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub ratelimit: Option<RateLimitRecord>,
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub cache_diag: Option<CacheDiagRecord>,
154    pub context: ContextRecord,
155}
156
157impl TelemetryRecord {
158    pub fn now_ms() -> u64 {
159        SystemTime::now()
160            .duration_since(UNIX_EPOCH)
161            .map(|d| d.as_millis() as u64)
162            .unwrap_or(0)
163    }
164}
165
166/// Parse a `u64` rate-limit header value.
167fn header_u64(headers: &reqwest::header::HeaderMap, name: &str) -> Option<u64> {
168    headers.get(name)?.to_str().ok()?.parse().ok()
169}
170
171/// Parse a string rate-limit header value.
172fn header_string(headers: &reqwest::header::HeaderMap, name: &str) -> Option<String> {
173    Some(headers.get(name)?.to_str().ok()?.to_string())
174}
175
176/// Extract rate-limit headroom from response headers.
177pub fn ratelimit_from_headers(headers: &reqwest::header::HeaderMap) -> RateLimitRecord {
178    RateLimitRecord {
179        requests_limit: header_u64(headers, "anthropic-ratelimit-requests-limit"),
180        requests_remaining: header_u64(headers, "anthropic-ratelimit-requests-remaining"),
181        tokens_limit: header_u64(headers, "anthropic-ratelimit-tokens-limit"),
182        tokens_remaining: header_u64(headers, "anthropic-ratelimit-tokens-remaining"),
183        input_tokens_remaining: header_u64(headers, "anthropic-ratelimit-input-tokens-remaining"),
184        output_tokens_remaining: header_u64(headers, "anthropic-ratelimit-output-tokens-remaining"),
185        tokens_reset: header_string(headers, "anthropic-ratelimit-tokens-reset"),
186    }
187}
188
189/// Extract the `request-id` response header.
190pub fn request_id_from_headers(headers: &reqwest::header::HeaderMap) -> Option<String> {
191    header_string(headers, "request-id")
192}
193
194/// Maximum delay we'll honour from any rate-limit header. A pathological reset
195/// timestamp far in the future would otherwise stall the turn indefinitely.
196pub const RETRY_DELAY_CAP: Duration = Duration::from_secs(60);
197
198/// Compute how long to wait before the next retry, consulting response headers.
199///
200/// Priority order:
201///   1. `retry-after` (integer seconds, or an HTTP-date — integer is what
202///      Anthropic sends in practice, HTTP-date is handled as a best-effort
203///      fallback).
204///   2. `anthropic-ratelimit-tokens-reset` / `anthropic-ratelimit-requests-reset`
205///      (RFC 3339 UTC timestamp) — take the *minimum* of whichever is present.
206///   3. Classic exponential back-off: `1s * 2^(attempt-1)` (1 s, 2 s, 4 s …).
207///
208/// The returned duration is capped at [`RETRY_DELAY_CAP`] so a far-future
209/// reset timestamp never hangs the turn forever. `attempt` is 1-based (first
210/// retry = 1).
211///
212/// Returns `(delay, from_header)` — the bool is `true` when a header was
213/// used (callers can emit a more informative notice message).
214pub fn retry_delay_from_headers(
215    headers: &reqwest::header::HeaderMap,
216    attempt: u32,
217) -> (Duration, bool) {
218    let now_secs = SystemTime::now()
219        .duration_since(UNIX_EPOCH)
220        .map(|d| d.as_secs())
221        .unwrap_or(0);
222
223    // ── 1. retry-after (integer seconds preferred) ──────────────────────────
224    if let Some(ra) = header_string(headers, "retry-after") {
225        let ra = ra.trim();
226        // Integer form: "30"
227        if let Ok(secs) = ra.parse::<u64>() {
228            let d = Duration::from_secs(secs).min(RETRY_DELAY_CAP);
229            return (d, true);
230        }
231        // HTTP-date form: "Wed, 11 Jun 2025 01:46:00 GMT" — parse via chrono.
232        if let Ok(dt) = chrono::DateTime::parse_from_rfc2822(ra) {
233            let reset_secs = dt.timestamp().max(0) as u64;
234            let wait = reset_secs.saturating_sub(now_secs);
235            let d = Duration::from_secs(wait).min(RETRY_DELAY_CAP);
236            return (d, true);
237        }
238    }
239
240    // ── 2. anthropic-ratelimit-*-reset (RFC 3339) ───────────────────────────
241    let mut min_wait: Option<u64> = None;
242    for name in &[
243        "anthropic-ratelimit-tokens-reset",
244        "anthropic-ratelimit-requests-reset",
245    ] {
246        if let Some(ts) = header_string(headers, name) {
247            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts.trim()) {
248                let reset_secs = dt.timestamp().max(0) as u64;
249                let wait = reset_secs.saturating_sub(now_secs);
250                min_wait = Some(min_wait.map_or(wait, |prev| prev.min(wait)));
251            }
252        }
253    }
254    if let Some(wait) = min_wait {
255        let d = Duration::from_secs(wait).min(RETRY_DELAY_CAP);
256        return (d, true);
257    }
258
259    // ── 3. Exponential back-off fallback ────────────────────────────────────
260    let d = Duration::from_millis(1000 * 2u64.pow(attempt.saturating_sub(1)));
261    (d, false)
262}
263
264/// Default telemetry log path: `~/.cache/synaps/api-log.jsonl`.
265fn default_log_path() -> Option<std::path::PathBuf> {
266    let home = std::env::var("HOME").ok()?;
267    Some(std::path::PathBuf::from(home).join(".cache/synaps/api-log.jsonl"))
268}
269
270/// Append a record to the telemetry log. Best-effort — all errors are
271/// silently dropped so a broken log path never breaks the request pipeline.
272///
273/// File is created 0600 with O_NOFOLLOW (CWE-59 hardening, matching
274/// `HelperMethods::log_usage`).
275pub fn write_record(record: &TelemetryRecord) {
276    let Some(path) = default_log_path() else { return };
277    let Ok(line) = serde_json::to_string(record) else { return };
278
279    if let Some(parent) = path.parent() {
280        let _ = std::fs::create_dir_all(parent);
281    }
282
283    // Size-capped rotation: at >50MB, rename to <path>.1 (clobbering any old
284    // .1) before appending. One generation is enough — this is a diagnostic
285    // log, not an audit trail. Errors silently dropped (best-effort contract).
286    const MAX_BYTES: u64 = 50 * 1024 * 1024;
287    if let Ok(meta) = std::fs::metadata(&path) {
288        if meta.len() > MAX_BYTES {
289            let mut rotated = path.as_os_str().to_owned();
290            rotated.push(".1");
291            let _ = std::fs::rename(&path, std::path::PathBuf::from(rotated));
292        }
293    }
294
295    use std::os::unix::fs::OpenOptionsExt;
296
297    let result = std::fs::OpenOptions::new()
298        .create(true)
299        .append(true)
300        .mode(0o600)
301        .custom_flags(libc::O_NOFOLLOW)
302        .open(&path);
303    if let Ok(mut f) = result {
304        use std::io::Write;
305        let _ = writeln!(f, "{}", line);
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[test]
314    fn level_parses_known_values() {
315        assert_eq!(TelemetryLevel::from_str_key("off"), TelemetryLevel::Off);
316        assert_eq!(TelemetryLevel::from_str_key("basic"), TelemetryLevel::Basic);
317        assert_eq!(TelemetryLevel::from_str_key("full"), TelemetryLevel::Full);
318        assert_eq!(TelemetryLevel::from_str_key("FULL"), TelemetryLevel::Full);
319        assert_eq!(TelemetryLevel::from_str_key("true"), TelemetryLevel::Basic);
320        assert_eq!(TelemetryLevel::from_str_key("garbage"), TelemetryLevel::Off);
321        assert_eq!(TelemetryLevel::from_str_key(""), TelemetryLevel::Off);
322    }
323
324    #[test]
325    fn level_enabled() {
326        assert!(!TelemetryLevel::Off.enabled());
327        assert!(TelemetryLevel::Basic.enabled());
328        assert!(TelemetryLevel::Full.enabled());
329    }
330
331    #[test]
332    fn hit_pct_computation() {
333        let mut u = UsageRecord {
334            input: 100,
335            cache_read: 800,
336            cache_write: 100,
337            ..Default::default()
338        };
339        u.compute_hit_pct();
340        assert_eq!(u.hit_pct, 80.0);
341    }
342
343    #[test]
344    fn hit_pct_zero_total() {
345        let mut u = UsageRecord::default();
346        u.compute_hit_pct();
347        assert_eq!(u.hit_pct, 0.0);
348    }
349
350    #[test]
351    fn hit_pct_rounds_to_one_decimal() {
352        let mut u = UsageRecord {
353            input: 1,
354            cache_read: 2,
355            cache_write: 0,
356            ..Default::default()
357        };
358        u.compute_hit_pct();
359        assert_eq!(u.hit_pct, 66.7);
360    }
361
362    #[test]
363    fn record_serializes_skipping_none_fields() {
364        let record = TelemetryRecord {
365            ts: 1,
366            model: "claude-sonnet-4-6".to_string(),
367            attempt: 1,
368            total_ms: 100,
369            ..Default::default()
370        };
371        let json = serde_json::to_string(&record).unwrap();
372        assert!(!json.contains("request_id"));
373        assert!(!json.contains("ratelimit"));
374        assert!(!json.contains("cache_diag"));
375        assert!(json.contains("\"model\":\"claude-sonnet-4-6\""));
376    }
377
378    #[test]
379    fn ratelimit_empty_detection() {
380        assert!(RateLimitRecord::default().is_empty());
381        let r = RateLimitRecord {
382            requests_remaining: Some(10),
383            ..Default::default()
384        };
385        assert!(!r.is_empty());
386    }
387
388    #[test]
389    fn ratelimit_parses_headers() {
390        let mut headers = reqwest::header::HeaderMap::new();
391        headers.insert("anthropic-ratelimit-requests-limit", "5000".parse().unwrap());
392        headers.insert("anthropic-ratelimit-requests-remaining", "4900".parse().unwrap());
393        headers.insert("anthropic-ratelimit-tokens-reset", "2026-06-11T01:46:00Z".parse().unwrap());
394        let r = ratelimit_from_headers(&headers);
395        assert_eq!(r.requests_limit, Some(5000));
396        assert_eq!(r.requests_remaining, Some(4900));
397        assert_eq!(r.tokens_reset.as_deref(), Some("2026-06-11T01:46:00Z"));
398        assert_eq!(r.tokens_limit, None);
399    }
400
401    #[test]
402    fn ratelimit_ignores_malformed_values() {
403        let mut headers = reqwest::header::HeaderMap::new();
404        headers.insert("anthropic-ratelimit-requests-limit", "not-a-number".parse().unwrap());
405        let r = ratelimit_from_headers(&headers);
406        assert_eq!(r.requests_limit, None);
407    }
408}
409
410#[cfg(test)]
411mod retry_delay_tests {
412    use super::*;
413    use std::time::{Duration, SystemTime, UNIX_EPOCH};
414
415    #[test]
416    fn integer_retry_after() {
417        let mut h = reqwest::header::HeaderMap::new();
418        h.insert("retry-after", "45".parse().unwrap());
419        let (d, from_hdr) = retry_delay_from_headers(&h, 1);
420        assert_eq!(d, Duration::from_secs(45));
421        assert!(from_hdr);
422    }
423
424    #[test]
425    fn integer_retry_after_capped() {
426        let mut h = reqwest::header::HeaderMap::new();
427        h.insert("retry-after", "300".parse().unwrap()); // 5 min — beyond cap
428        let (d, from_hdr) = retry_delay_from_headers(&h, 1);
429        assert_eq!(d, RETRY_DELAY_CAP);
430        assert!(from_hdr);
431    }
432
433    #[test]
434    fn rfc3339_reset_future() {
435        let future_secs = SystemTime::now()
436            .duration_since(UNIX_EPOCH)
437            .unwrap()
438            .as_secs()
439            + 30;
440        let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(future_secs as i64, 0).unwrap();
441        let ts = dt.to_rfc3339();
442
443        let mut h = reqwest::header::HeaderMap::new();
444        h.insert("anthropic-ratelimit-tokens-reset", ts.parse().unwrap());
445        let (d, from_hdr) = retry_delay_from_headers(&h, 1);
446        assert!(d.as_secs() >= 28 && d.as_secs() <= 32, "unexpected delay: {:?}", d);
447        assert!(from_hdr);
448    }
449
450    #[test]
451    fn rfc3339_reset_beyond_cap() {
452        let future_secs = SystemTime::now()
453            .duration_since(UNIX_EPOCH)
454            .unwrap()
455            .as_secs()
456            + 600;
457        let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(future_secs as i64, 0).unwrap();
458        let ts = dt.to_rfc3339();
459
460        let mut h = reqwest::header::HeaderMap::new();
461        h.insert("anthropic-ratelimit-tokens-reset", ts.parse().unwrap());
462        let (d, from_hdr) = retry_delay_from_headers(&h, 1);
463        assert_eq!(d, RETRY_DELAY_CAP);
464        assert!(from_hdr);
465    }
466
467    #[test]
468    fn no_headers_exponential_fallback() {
469        let h = reqwest::header::HeaderMap::new();
470        let (d1, hdr1) = retry_delay_from_headers(&h, 1);
471        let (d2, hdr2) = retry_delay_from_headers(&h, 2);
472        let (d3, hdr3) = retry_delay_from_headers(&h, 3);
473        assert_eq!(d1, Duration::from_secs(1));
474        assert_eq!(d2, Duration::from_secs(2));
475        assert_eq!(d3, Duration::from_secs(4));
476        assert!(!hdr1 && !hdr2 && !hdr3);
477    }
478
479    #[test]
480    fn prefers_retry_after_over_rfc3339() {
481        let future_secs = SystemTime::now()
482            .duration_since(UNIX_EPOCH)
483            .unwrap()
484            .as_secs()
485            + 30;
486        let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(future_secs as i64, 0).unwrap();
487        let ts = dt.to_rfc3339();
488
489        let mut h = reqwest::header::HeaderMap::new();
490        h.insert("retry-after", "10".parse().unwrap());
491        h.insert("anthropic-ratelimit-tokens-reset", ts.parse().unwrap());
492        let (d, from_hdr) = retry_delay_from_headers(&h, 1);
493        assert_eq!(d, Duration::from_secs(10));
494        assert!(from_hdr);
495    }
496
497    #[test]
498    fn min_of_multiple_ratelimit_reset_headers() {
499        // tokens-reset is sooner; requests-reset is later — should pick tokens.
500        let now = SystemTime::now()
501            .duration_since(UNIX_EPOCH)
502            .unwrap()
503            .as_secs();
504        let tokens_dt = chrono::DateTime::<chrono::Utc>::from_timestamp((now + 15) as i64, 0).unwrap();
505        let requests_dt = chrono::DateTime::<chrono::Utc>::from_timestamp((now + 45) as i64, 0).unwrap();
506
507        let mut h = reqwest::header::HeaderMap::new();
508        h.insert("anthropic-ratelimit-tokens-reset", tokens_dt.to_rfc3339().parse().unwrap());
509        h.insert("anthropic-ratelimit-requests-reset", requests_dt.to_rfc3339().parse().unwrap());
510        let (d, from_hdr) = retry_delay_from_headers(&h, 1);
511        assert!(d.as_secs() >= 13 && d.as_secs() <= 17, "should be ~15s, got {:?}", d);
512        assert!(from_hdr);
513    }
514}