Skip to main content

harn_vm/llm/api/
telemetry.rs

1//! Per-call provider telemetry envelope.
2//!
3//! Local runtimes (Ollama in particular) report enough server-side timing
4//! information to diagnose slow runs without scraping daemon logs: cold load
5//! vs steady state, prefill vs generation, and tokens-per-second ratios. The
6//! Anthropic and OpenAI hosted APIs don't expose comparable metrics, but the
7//! local runtimes Harn cares most about (Ollama, llama.cpp, MLX, vLLM) all
8//! ship at least a partial subset. This module normalizes whatever the
9//! provider reports into one stable envelope and represents missing fields
10//! as `None` so downstream evals can distinguish "not reported" from "zero".
11//!
12//! Conventions:
13//! - Durations are milliseconds (Ollama reports nanoseconds; we convert).
14//! - Token counts are signed `i64` to match the rest of `LlmResult`.
15//! - `source` identifies which wire format the values were lifted from so
16//!   eval scripts can route on it without re-deriving provider names.
17
18use std::collections::BTreeMap;
19use std::rc::Rc;
20
21use crate::value::VmValue;
22
23/// Wire-format identifiers for `ProviderTelemetry::source`. Keep these in
24/// sync with the matching strings in `docs/src/observability/*` and the
25/// Burin eval aggregator.
26pub mod source {
27    /// Ollama `/api/chat` NDJSON stream — full timing breakdown.
28    pub const OLLAMA_CHAT: &str = "ollama_chat";
29    /// Ollama `/api/generate` (raw) — full timing breakdown.
30    pub const OLLAMA_GENERATE: &str = "ollama_generate";
31    /// OpenAI-style `usage` block (prompt/completion tokens, optional cache
32    /// details). No server-side timings unless the runtime extends the
33    /// schema.
34    pub const OPENAI_USAGE: &str = "openai_usage";
35    /// llama.cpp `usage.timings` extension (`prompt_ms`, `predicted_ms`,
36    /// `predicted_n`, `prompt_n`, ...). Preserved verbatim from the
37    /// non-OpenAI subset.
38    pub const LLAMACPP_TIMINGS: &str = "llamacpp_timings";
39    /// Anthropic Messages API — usage counts only; no timings.
40    pub const ANTHROPIC_USAGE: &str = "anthropic_usage";
41    /// Provider responded but we did not capture anything beyond what
42    /// already lives on `LlmResult` (e.g. mock / fake providers, or a
43    /// stream that finished without a usage frame).
44    pub const UNKNOWN: &str = "unknown";
45}
46
47pub(crate) fn elapsed_ms(started: std::time::Instant) -> u64 {
48    started.elapsed().as_millis().min(u128::from(u64::MAX)) as u64
49}
50
51/// Provider-side timing and runtime accounting captured per LLM call.
52///
53/// All fields default to `None` / empty. Producers fill in what they can
54/// extract and leave the rest absent; consumers must treat missing fields as
55/// "not reported by this provider", not "zero".
56#[derive(Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
57pub struct ProviderTelemetry {
58    /// Wire format the values came from (`ollama_chat`, `openai_usage`, ...).
59    /// See [`source`] for the canonical strings. Empty when no telemetry was
60    /// captured.
61    #[serde(default, skip_serializing_if = "String::is_empty")]
62    pub source: String,
63    /// Total server-side wall clock (Ollama `total_duration`).
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub server_total_ms: Option<u64>,
66    /// Time the server spent loading/warming the model (Ollama
67    /// `load_duration`). Useful for detecting cold-start latency.
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub server_load_ms: Option<u64>,
70    /// Prompt-prefill time (Ollama `prompt_eval_duration`). Anything else
71    /// would be marketing — this is the field evals key on for prefill
72    /// regression detection.
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub server_prompt_eval_ms: Option<u64>,
75    /// Generation/decode time (Ollama `eval_duration`).
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub server_generation_ms: Option<u64>,
78    /// Tokens the server reports it prefilled (Ollama `prompt_eval_count`).
79    /// Distinct from `LlmResult::input_tokens` because hosted providers
80    /// frequently bill different token boundaries than the on-device
81    /// tokenizer reports.
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub server_prompt_tokens: Option<i64>,
84    /// Tokens the server reports it generated (Ollama `eval_count`).
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub server_output_tokens: Option<i64>,
87    /// Client-side wall clock around the HTTP request. Includes network and
88    /// streaming latency the server-side counters omit. Recorded for every
89    /// call regardless of provider.
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub client_wall_ms: Option<u64>,
92    /// Context window the model was loaded with (where the runtime
93    /// reports it; `/api/ps` for Ollama).
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub runtime_context_length: Option<u64>,
96    /// Exact model id the server resolved. May differ from
97    /// `LlmResult::model` when an alias / digest is rewritten upstream.
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub runtime_loaded_model: Option<String>,
100    /// Total resident bytes for the loaded model (Ollama `/api/ps.size`).
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub runtime_memory_bytes: Option<u64>,
103    /// VRAM bytes for the loaded model (Ollama `/api/ps.size_vram`).
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub runtime_memory_vram_bytes: Option<u64>,
106    /// When the server will unload the model (Ollama `/api/ps.expires_at`).
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub runtime_keep_alive_until: Option<String>,
109    /// Provider-supplied request id (`x-request-id` / `request_id`).
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub request_id: Option<String>,
112}
113
114impl ProviderTelemetry {
115    pub fn new(source: &str) -> Self {
116        Self {
117            source: source.to_string(),
118            ..Self::default()
119        }
120    }
121
122    /// Returns `true` when no meaningful telemetry was captured. A bare
123    /// `client_wall_ms` is still "meaningful" — it lets evals reason about
124    /// per-call latency even for providers that report nothing else.
125    pub fn is_empty(&self) -> bool {
126        let Self {
127            source,
128            server_total_ms,
129            server_load_ms,
130            server_prompt_eval_ms,
131            server_generation_ms,
132            server_prompt_tokens,
133            server_output_tokens,
134            client_wall_ms,
135            runtime_context_length,
136            runtime_loaded_model,
137            runtime_memory_bytes,
138            runtime_memory_vram_bytes,
139            runtime_keep_alive_until,
140            request_id,
141        } = self;
142        source.is_empty()
143            && server_total_ms.is_none()
144            && server_load_ms.is_none()
145            && server_prompt_eval_ms.is_none()
146            && server_generation_ms.is_none()
147            && server_prompt_tokens.is_none()
148            && server_output_tokens.is_none()
149            && client_wall_ms.is_none()
150            && runtime_context_length.is_none()
151            && runtime_loaded_model.is_none()
152            && runtime_memory_bytes.is_none()
153            && runtime_memory_vram_bytes.is_none()
154            && runtime_keep_alive_until.is_none()
155            && request_id.is_none()
156    }
157
158    /// Convert nanoseconds (Ollama's reporting unit) to milliseconds with
159    /// integer rounding. Centralized so every Ollama timing field uses the
160    /// same conversion and zero-vs-None semantics line up.
161    pub fn ns_to_ms(ns: u64) -> u64 {
162        // Use floor division (the conversion is approximate by design); when
163        // the upstream reports 0 ns we want a 0 ms entry rather than None,
164        // so callers should pass through `Some(ns_to_ms(0))` consciously.
165        ns / 1_000_000
166    }
167
168    /// Extract Ollama-shape telemetry from a `done=true` chat or generate
169    /// frame. Returns a populated [`ProviderTelemetry`] whose `source` is
170    /// the caller-provided wire identifier.
171    pub fn from_ollama_done(frame: &serde_json::Value, source: &str) -> Self {
172        let mut telemetry = Self::new(source);
173        telemetry.server_total_ms = ns_field(frame, "total_duration");
174        telemetry.server_load_ms = ns_field(frame, "load_duration");
175        telemetry.server_prompt_eval_ms = ns_field(frame, "prompt_eval_duration");
176        telemetry.server_generation_ms = ns_field(frame, "eval_duration");
177        telemetry.server_prompt_tokens = frame
178            .get("prompt_eval_count")
179            .and_then(serde_json::Value::as_i64);
180        telemetry.server_output_tokens =
181            frame.get("eval_count").and_then(serde_json::Value::as_i64);
182        if let Some(model) = frame.get("model").and_then(serde_json::Value::as_str) {
183            telemetry.runtime_loaded_model = Some(model.to_string());
184        }
185        telemetry
186    }
187
188    /// Extract OpenAI-shape `usage` telemetry. Most OpenAI-compatible local
189    /// runtimes only report counts; llama.cpp's `usage.timings` extension is
190    /// folded in here as well so a single envelope captures both shapes.
191    pub fn from_openai_usage(usage: &serde_json::Value, request_id: Option<&str>) -> Self {
192        let mut telemetry = Self::new(source::OPENAI_USAGE);
193        telemetry.server_prompt_tokens = usage
194            .get("prompt_tokens")
195            .and_then(serde_json::Value::as_i64);
196        telemetry.server_output_tokens = usage
197            .get("completion_tokens")
198            .and_then(serde_json::Value::as_i64);
199        if let Some(timings) = usage.get("timings").filter(|value| value.is_object()) {
200            telemetry.source = source::LLAMACPP_TIMINGS.to_string();
201            telemetry.server_prompt_eval_ms = ms_or_round(timings.get("prompt_ms"));
202            telemetry.server_generation_ms = ms_or_round(timings.get("predicted_ms"));
203            // llama.cpp also reports `prompt_n` / `predicted_n` here when
204            // its usage breakdown is enabled; prefer them over the
205            // legacy top-level counts so prefill cache hits surface
206            // correctly.
207            if let Some(prefill) = timings.get("prompt_n").and_then(serde_json::Value::as_i64) {
208                telemetry.server_prompt_tokens = Some(prefill);
209            }
210            if let Some(predicted) = timings
211                .get("predicted_n")
212                .and_then(serde_json::Value::as_i64)
213            {
214                telemetry.server_output_tokens = Some(predicted);
215            }
216            let total = telemetry
217                .server_prompt_eval_ms
218                .unwrap_or(0)
219                .saturating_add(telemetry.server_generation_ms.unwrap_or(0));
220            if total > 0 {
221                telemetry.server_total_ms = Some(total);
222            }
223        }
224        if let Some(request_id) = request_id.filter(|value| !value.is_empty()) {
225            telemetry.request_id = Some(request_id.to_string());
226        }
227        telemetry
228    }
229
230    /// Extract Anthropic-shape `usage` telemetry. Anthropic only reports
231    /// input/output (and cache) counts — preserving the request id is the
232    /// most useful incremental signal beyond what `LlmResult` already
233    /// carries.
234    pub fn from_anthropic_usage(usage: &serde_json::Value, request_id: Option<&str>) -> Self {
235        let mut telemetry = Self::new(source::ANTHROPIC_USAGE);
236        telemetry.server_prompt_tokens = usage
237            .get("input_tokens")
238            .and_then(serde_json::Value::as_i64);
239        telemetry.server_output_tokens = usage
240            .get("output_tokens")
241            .and_then(serde_json::Value::as_i64);
242        if let Some(request_id) = request_id.filter(|value| !value.is_empty()) {
243            telemetry.request_id = Some(request_id.to_string());
244        }
245        telemetry
246    }
247
248    /// Merge a `/api/ps` snapshot of a loaded Ollama model into this
249    /// telemetry envelope. Only fills in fields that were not already
250    /// populated so a per-call extraction keeps precedence.
251    pub fn merge_ollama_ps(&mut self, ps: &OllamaPsModel) {
252        if self.runtime_loaded_model.is_none() {
253            self.runtime_loaded_model = ps.name.clone();
254        }
255        if self.runtime_context_length.is_none() {
256            self.runtime_context_length = ps.context_length;
257        }
258        if self.runtime_memory_bytes.is_none() {
259            self.runtime_memory_bytes = ps.size_bytes;
260        }
261        if self.runtime_memory_vram_bytes.is_none() {
262            self.runtime_memory_vram_bytes = ps.size_vram_bytes;
263        }
264        if self.runtime_keep_alive_until.is_none() {
265            self.runtime_keep_alive_until = ps.expires_at.clone();
266        }
267    }
268
269    /// Render this envelope into the dictionary shape `llm_call` returns.
270    /// Returns `None` if the envelope is empty so callers can omit the key
271    /// entirely.
272    pub fn as_vm_dict(&self) -> Option<VmValue> {
273        if self.is_empty() {
274            return None;
275        }
276        let mut dict: BTreeMap<String, VmValue> = BTreeMap::new();
277        if !self.source.is_empty() {
278            dict.insert(
279                "source".to_string(),
280                VmValue::String(Rc::from(self.source.as_str())),
281            );
282        }
283        insert_opt_u64(&mut dict, "server_total_ms", self.server_total_ms);
284        insert_opt_u64(&mut dict, "server_load_ms", self.server_load_ms);
285        insert_opt_u64(
286            &mut dict,
287            "server_prompt_eval_ms",
288            self.server_prompt_eval_ms,
289        );
290        insert_opt_u64(&mut dict, "server_generation_ms", self.server_generation_ms);
291        insert_opt_i64(&mut dict, "server_prompt_tokens", self.server_prompt_tokens);
292        insert_opt_i64(&mut dict, "server_output_tokens", self.server_output_tokens);
293        insert_opt_u64(&mut dict, "client_wall_ms", self.client_wall_ms);
294        insert_opt_u64(
295            &mut dict,
296            "runtime_context_length",
297            self.runtime_context_length,
298        );
299        if let Some(ref model) = self.runtime_loaded_model {
300            dict.insert(
301                "runtime_loaded_model".to_string(),
302                VmValue::String(Rc::from(model.as_str())),
303            );
304        }
305        insert_opt_u64(&mut dict, "runtime_memory_bytes", self.runtime_memory_bytes);
306        insert_opt_u64(
307            &mut dict,
308            "runtime_memory_vram_bytes",
309            self.runtime_memory_vram_bytes,
310        );
311        if let Some(ref expires) = self.runtime_keep_alive_until {
312            dict.insert(
313                "runtime_keep_alive_until".to_string(),
314                VmValue::String(Rc::from(expires.as_str())),
315            );
316        }
317        if let Some(ref request_id) = self.request_id {
318            dict.insert(
319                "request_id".to_string(),
320                VmValue::String(Rc::from(request_id.as_str())),
321            );
322        }
323        Some(VmValue::Dict(Rc::new(dict)))
324    }
325}
326
327/// Loaded-model snapshot from Ollama's `/api/ps`. Shared with the CLI's
328/// `harn local` family so we don't duplicate the response shape.
329#[derive(Clone, Debug, Default, PartialEq)]
330pub struct OllamaPsModel {
331    pub name: Option<String>,
332    pub size_bytes: Option<u64>,
333    pub size_vram_bytes: Option<u64>,
334    pub expires_at: Option<String>,
335    pub context_length: Option<u64>,
336}
337
338impl OllamaPsModel {
339    /// Decode one `/api/ps` `models[]` entry. Returns `None` when the entry
340    /// has no usable identifier (an old daemon may emit completely empty
341    /// rows under load).
342    pub fn from_ps_entry(entry: &serde_json::Value) -> Option<Self> {
343        let name = entry
344            .get("name")
345            .and_then(serde_json::Value::as_str)
346            .or_else(|| entry.get("model").and_then(serde_json::Value::as_str))
347            .map(str::to_string);
348        let context_length = entry
349            .get("context_length")
350            .and_then(serde_json::Value::as_u64)
351            .or_else(|| {
352                entry
353                    .get("details")
354                    .and_then(|details| details.get("context_length"))
355                    .and_then(serde_json::Value::as_u64)
356            });
357        Some(Self {
358            name,
359            size_bytes: entry.get("size").and_then(serde_json::Value::as_u64),
360            size_vram_bytes: entry.get("size_vram").and_then(serde_json::Value::as_u64),
361            expires_at: entry
362                .get("expires_at")
363                .and_then(serde_json::Value::as_str)
364                .map(str::to_string),
365            context_length,
366        })
367    }
368}
369
370fn ns_field(frame: &serde_json::Value, key: &str) -> Option<u64> {
371    frame
372        .get(key)
373        .and_then(serde_json::Value::as_u64)
374        .map(ProviderTelemetry::ns_to_ms)
375}
376
377fn ms_or_round(value: Option<&serde_json::Value>) -> Option<u64> {
378    let value = value?;
379    if let Some(n) = value.as_u64() {
380        return Some(n);
381    }
382    value.as_f64().map(|n| n.round().max(0.0) as u64)
383}
384
385fn insert_opt_u64(dict: &mut BTreeMap<String, VmValue>, key: &str, value: Option<u64>) {
386    if let Some(value) = value {
387        dict.insert(key.to_string(), VmValue::Int(value as i64));
388    }
389}
390
391fn insert_opt_i64(dict: &mut BTreeMap<String, VmValue>, key: &str, value: Option<i64>) {
392    if let Some(value) = value {
393        dict.insert(key.to_string(), VmValue::Int(value));
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400
401    #[test]
402    fn ollama_done_frame_extracts_full_breakdown() {
403        let frame = serde_json::json!({
404            "model": "qwen3.6:35b-a3b-coding-nvfp4",
405            "total_duration": 7_400_000_000u64,
406            "load_duration": 400_000_000u64,
407            "prompt_eval_duration": 1_200_000_000u64,
408            "eval_duration": 5_800_000_000u64,
409            "prompt_eval_count": 1024,
410            "eval_count": 64
411        });
412
413        let telemetry = ProviderTelemetry::from_ollama_done(&frame, source::OLLAMA_CHAT);
414
415        assert_eq!(telemetry.source, source::OLLAMA_CHAT);
416        assert_eq!(telemetry.server_total_ms, Some(7400));
417        assert_eq!(telemetry.server_load_ms, Some(400));
418        assert_eq!(telemetry.server_prompt_eval_ms, Some(1200));
419        assert_eq!(telemetry.server_generation_ms, Some(5800));
420        assert_eq!(telemetry.server_prompt_tokens, Some(1024));
421        assert_eq!(telemetry.server_output_tokens, Some(64));
422        assert_eq!(
423            telemetry.runtime_loaded_model.as_deref(),
424            Some("qwen3.6:35b-a3b-coding-nvfp4")
425        );
426        assert!(!telemetry.is_empty());
427    }
428
429    #[test]
430    fn ollama_done_frame_leaves_missing_fields_as_none() {
431        let frame = serde_json::json!({
432            "model": "qwen3.6:35b-a3b-coding-nvfp4",
433            // Older Ollama builds omit duration fields on early frames.
434        });
435
436        let telemetry = ProviderTelemetry::from_ollama_done(&frame, source::OLLAMA_CHAT);
437
438        assert_eq!(telemetry.server_total_ms, None);
439        assert_eq!(telemetry.server_load_ms, None);
440        assert_eq!(telemetry.server_prompt_eval_ms, None);
441        assert_eq!(telemetry.server_generation_ms, None);
442        assert_eq!(telemetry.server_prompt_tokens, None);
443        assert_eq!(telemetry.server_output_tokens, None);
444    }
445
446    #[test]
447    fn openai_usage_partial_extracts_counts_only() {
448        let usage = serde_json::json!({
449            "prompt_tokens": 200,
450            "completion_tokens": 50
451        });
452
453        let telemetry = ProviderTelemetry::from_openai_usage(&usage, Some("req-abc"));
454
455        assert_eq!(telemetry.source, source::OPENAI_USAGE);
456        assert_eq!(telemetry.server_prompt_tokens, Some(200));
457        assert_eq!(telemetry.server_output_tokens, Some(50));
458        assert_eq!(telemetry.server_prompt_eval_ms, None);
459        assert_eq!(telemetry.request_id.as_deref(), Some("req-abc"));
460    }
461
462    #[test]
463    fn llamacpp_timings_promotes_source_and_fills_durations() {
464        let usage = serde_json::json!({
465            "prompt_tokens": 220,
466            "completion_tokens": 17,
467            "timings": {
468                "prompt_n": 200,
469                "prompt_ms": 145.4,
470                "predicted_n": 17,
471                "predicted_ms": 89.1,
472            }
473        });
474
475        let telemetry = ProviderTelemetry::from_openai_usage(&usage, None);
476
477        assert_eq!(telemetry.source, source::LLAMACPP_TIMINGS);
478        assert_eq!(telemetry.server_prompt_eval_ms, Some(145));
479        assert_eq!(telemetry.server_generation_ms, Some(89));
480        assert_eq!(telemetry.server_total_ms, Some(234));
481        assert_eq!(telemetry.server_prompt_tokens, Some(200));
482        assert_eq!(telemetry.server_output_tokens, Some(17));
483        assert!(!telemetry.is_empty());
484    }
485
486    #[test]
487    fn ps_entry_pulls_context_length_from_top_level_or_details() {
488        let entry = serde_json::json!({
489            "name": "qwen3.6:35b-a3b-coding-nvfp4",
490            "size": 4_700_000_000u64,
491            "size_vram": 4_500_000_000u64,
492            "expires_at": "2026-05-14T10:30:00Z",
493            "context_length": 32768
494        });
495        let model = OllamaPsModel::from_ps_entry(&entry).expect("ps entry parses");
496        assert_eq!(model.context_length, Some(32768));
497
498        let entry_nested = serde_json::json!({
499            "name": "qwen3.6:35b-a3b-coding-nvfp4",
500            "details": {"context_length": 16384}
501        });
502        let nested = OllamaPsModel::from_ps_entry(&entry_nested).expect("ps entry parses");
503        assert_eq!(nested.context_length, Some(16384));
504    }
505
506    #[test]
507    fn merge_ollama_ps_preserves_call_level_values() {
508        let mut telemetry = ProviderTelemetry::new(source::OLLAMA_CHAT);
509        telemetry.runtime_loaded_model = Some("real-model".to_string());
510        let ps = OllamaPsModel {
511            name: Some("alias-model".to_string()),
512            size_bytes: Some(1),
513            size_vram_bytes: Some(2),
514            expires_at: Some("forever".to_string()),
515            context_length: Some(8192),
516        };
517        telemetry.merge_ollama_ps(&ps);
518        assert_eq!(
519            telemetry.runtime_loaded_model.as_deref(),
520            Some("real-model")
521        );
522        assert_eq!(telemetry.runtime_memory_bytes, Some(1));
523        assert_eq!(telemetry.runtime_memory_vram_bytes, Some(2));
524        assert_eq!(
525            telemetry.runtime_keep_alive_until.as_deref(),
526            Some("forever")
527        );
528        assert_eq!(telemetry.runtime_context_length, Some(8192));
529    }
530
531    #[test]
532    fn as_vm_dict_returns_none_when_empty() {
533        let telemetry = ProviderTelemetry::default();
534        assert!(telemetry.is_empty());
535        assert!(telemetry.as_vm_dict().is_none());
536    }
537
538    #[test]
539    fn as_vm_dict_serializes_all_present_fields() {
540        let telemetry = ProviderTelemetry {
541            source: source::OLLAMA_CHAT.to_string(),
542            server_total_ms: Some(100),
543            client_wall_ms: Some(120),
544            runtime_loaded_model: Some("qwen".to_string()),
545            ..Default::default()
546        };
547        let value = telemetry.as_vm_dict().expect("dict present");
548        let dict = value.as_dict().expect("dict body");
549        assert_eq!(
550            dict.get("source").map(VmValue::display).as_deref(),
551            Some(source::OLLAMA_CHAT)
552        );
553        assert_eq!(
554            dict.get("server_total_ms").and_then(|v| match v {
555                VmValue::Int(n) => Some(*n),
556                _ => None,
557            }),
558            Some(100)
559        );
560        assert_eq!(
561            dict.get("client_wall_ms").and_then(|v| match v {
562                VmValue::Int(n) => Some(*n),
563                _ => None,
564            }),
565            Some(120)
566        );
567    }
568}