Skip to main content

lifeloop/telemetry/
mod.rs

1//! Lifecycle telemetry readers.
2//!
3//! Lifeloop owns lifecycle-relevant telemetry parsing for harness adapters.
4//! Clients (CCD, RLM, and other lifecycle clients) consume neutral
5//! [`PressureObservation`] snapshots instead of writing per-harness log
6//! readers themselves.
7//!
8//! # Boundary (issue #5)
9//!
10//! This module owns:
11//! * parsing harness-native session/telemetry artifacts into a neutral
12//!   [`PressureObservation`] (adapter id, observed time, model name, token
13//!   counts, context window, compaction signal),
14//! * the env-var resolution rules used to locate those artifacts and to
15//!   override their parsed values (with `LIFELOOP_*` aliases winning over
16//!   the compatibility `CCD_*` inputs),
17//! * a neutral telemetry [`TelemetryError`] type whose variants name the
18//!   lifecycle failure classes (telemetry_unavailable, hook_protocol_error,
19//!   internal_error).
20//!
21//! This module does **not** own:
22//! * receipt emission (callers translate [`TelemetryError`] into a
23//!   [`crate::LifecycleReceipt`]),
24//! * the placement/payload pipeline (issue #4 owns asset rendering),
25//! * adapter manifest registration (issue #6 owns the manifest registry —
26//!   this module merely reports `support` states the registry can attach
27//!   to a `context_pressure` claim),
28//! * the hook protocol command strings (issue #3),
29//! * lifecycle routing (issue #7),
30//! * any client-side state, prompt semantics, or continuity vocabulary.
31//!   Specifically: no memory, recall, promotion, compaction policy, radar,
32//!   or governance reasoning. Lifeloop reports the *signal*; clients
33//!   decide what it means.
34//!
35//! # CCD compatibility
36//!
37//! Existing `CCD_*` env vars (e.g. `CCD_CLAUDE_HOME`,
38//! `CCD_CONTEXT_WINDOW_TOKENS`, `CCD_HOST_MODEL`) are honored as
39//! compatibility inputs through `lifeloop.v0.x`. Each has a
40//! `LIFELOOP_*` alias; when both are set, the `LIFELOOP_*` value wins
41//! and a single bounded warning is recorded per resolved key per
42//! process. Removal criteria are tracked in
43//! `docs/tombstones/lifeloop.v0.md`.
44
45use std::path::{Path, PathBuf};
46use std::sync::Mutex;
47use std::time::{SystemTime, UNIX_EPOCH};
48
49use serde::{Deserialize, Serialize};
50
51pub mod claude;
52pub mod codex;
53pub mod gemini;
54pub mod host;
55pub mod opencode;
56
57// ============================================================================
58// Neutral observation types
59// ============================================================================
60
61/// A neutral lifecycle-pressure observation extracted from harness
62/// telemetry. Carries everything a `context.pressure_observed` event or a
63/// receipt `telemetry_summary` needs, with no harness-private fields.
64///
65/// Wire-stable field names; `Option`s are `skip_serializing_if` so absent
66/// signals stay absent on the wire (callers must not rely on `null`
67/// vs missing for these — they are not required-nullable).
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(deny_unknown_fields)]
70pub struct PressureObservation {
71    /// Stable adapter id (matches `AdapterManifest::adapter_id`, e.g.
72    /// `"claude"`, `"codex"`, `"gemini"`, `"opencode"`).
73    pub adapter_id: String,
74    /// Adapter version when discoverable from the telemetry source.
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub adapter_version: Option<String>,
77    /// Seconds since UNIX epoch at which this observation was sourced
78    /// (typically the underlying log/session-file mtime).
79    pub observed_at_epoch_s: u64,
80    /// Model identifier when surfaced by the telemetry source or env.
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub model_name: Option<String>,
83    /// Most recent prompt-side token count (window-relative).
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub total_tokens: Option<u64>,
86    /// Adapter-reported context window in tokens.
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub context_window_tokens: Option<u64>,
89    /// Percentage of the context window consumed (0..=100), when both
90    /// numerator and denominator are available.
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub context_used_pct: Option<u8>,
93    /// `Some(true)` if the adapter signaled a compaction/compression event
94    /// in the observed session window. `None` means "no signal" (not
95    /// "definitely false").
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub compaction_signal: Option<bool>,
98    /// Granular usage breakdown, when the source provides it.
99    #[serde(skip_serializing_if = "TokenUsage::is_empty", default)]
100    pub usage: TokenUsage,
101}
102
103/// Per-side token usage breakdown. All fields default to zero so the type
104/// stays format-agnostic for `Deserialize`.
105#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(deny_unknown_fields)]
107pub struct TokenUsage {
108    #[serde(default, skip_serializing_if = "is_zero_u64")]
109    pub input_tokens: u64,
110    #[serde(default, skip_serializing_if = "is_zero_u64")]
111    pub output_tokens: u64,
112    #[serde(default, skip_serializing_if = "is_zero_u64")]
113    pub cache_creation_input_tokens: u64,
114    #[serde(default, skip_serializing_if = "is_zero_u64")]
115    pub cache_read_input_tokens: u64,
116    #[serde(default, skip_serializing_if = "Option::is_none")]
117    pub blended_total_tokens: Option<u64>,
118}
119
120impl TokenUsage {
121    pub fn is_empty(&self) -> bool {
122        self.input_tokens == 0
123            && self.output_tokens == 0
124            && self.cache_creation_input_tokens == 0
125            && self.cache_read_input_tokens == 0
126            && self.blended_total_tokens.unwrap_or(0) == 0
127    }
128}
129
130fn is_zero_u64(value: &u64) -> bool {
131    *value == 0
132}
133
134// ============================================================================
135// Errors
136// ============================================================================
137
138/// Telemetry-side errors. The variants name the failure classes the spec
139/// uses for telemetry-derived receipts:
140///
141/// * [`TelemetryError::Unavailable`] → `telemetry_unavailable`
142/// * [`TelemetryError::HookProtocol`] → `hook_protocol_error`
143/// * [`TelemetryError::Internal`] → `internal_error` (the only one of the
144///   three currently named in `docs/specs/lifecycle-contract/body.md`).
145///
146/// The first two are *pending* as `LifecycleReceipt::failure_class`
147/// variants — they are added to the receipt enum in a follow-up issue
148/// once the spec body names them. Until then, callers wishing to emit a
149/// failed receipt for these conditions should use `internal_error` and
150/// attach the precise class via `warnings`.
151#[derive(Debug)]
152pub enum TelemetryError {
153    /// The telemetry source (file, directory, db) was missing, empty, or
154    /// stale. Distinct from `internal_error` because it's an expected,
155    /// observable absence rather than a Lifeloop bug.
156    Unavailable(String),
157    /// The shape of a parsed telemetry artifact violated the adapter's
158    /// hook protocol contract (e.g. expected JSONL but lines were not
159    /// objects). Distinct from `Unavailable` because something *was*
160    /// there and it was wrong.
161    HookProtocol(String),
162    /// Lifeloop failed unexpectedly while parsing telemetry.
163    Internal(String),
164}
165
166impl TelemetryError {
167    /// Stable wire string for the failure class this error maps to.
168    pub fn failure_class(&self) -> &'static str {
169        match self {
170            Self::Unavailable(_) => "telemetry_unavailable",
171            Self::HookProtocol(_) => "hook_protocol_error",
172            Self::Internal(_) => "internal_error",
173        }
174    }
175}
176
177impl std::fmt::Display for TelemetryError {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        match self {
180            Self::Unavailable(msg) => write!(f, "telemetry_unavailable: {msg}"),
181            Self::HookProtocol(msg) => write!(f, "hook_protocol_error: {msg}"),
182            Self::Internal(msg) => write!(f, "internal_error: {msg}"),
183        }
184    }
185}
186
187impl std::error::Error for TelemetryError {}
188
189impl From<std::io::Error> for TelemetryError {
190    fn from(error: std::io::Error) -> Self {
191        match error.kind() {
192            std::io::ErrorKind::NotFound => Self::Unavailable(error.to_string()),
193            _ => Self::Internal(error.to_string()),
194        }
195    }
196}
197
198pub type TelemetryResult<T> = Result<T, TelemetryError>;
199
200// ============================================================================
201// Env var resolution: LIFELOOP_* wins over CCD_*, with bounded warning.
202// ============================================================================
203
204/// A single CCD→Lifeloop env-var alias.
205///
206/// Each adapter declares the keys it cares about as a `&'static [EnvAlias]`.
207/// When both the `lifeloop` and `ccd_compat` keys are set in the process
208/// environment, the `lifeloop` value wins and [`EnvWarningSink`] records a
209/// single bounded warning per resolved key per process.
210#[derive(Debug, Clone, Copy)]
211pub struct EnvAlias {
212    pub lifeloop: &'static str,
213    pub ccd_compat: &'static str,
214}
215
216/// Sink for env-precedence warnings.
217///
218/// One global instance ([`env_warning_sink`]) deduplicates warnings by the
219/// `lifeloop` key name so a process that resolves the same alias many
220/// times only emits one warning. Tests can inspect [`drain`] to assert
221/// precedence behavior.
222///
223/// [`drain`]: EnvWarningSink::drain
224#[derive(Debug, Default)]
225pub struct EnvWarningSink {
226    inner: Mutex<EnvWarningInner>,
227}
228
229#[derive(Debug, Default)]
230struct EnvWarningInner {
231    seen: Vec<String>,
232    queued: Vec<EnvPrecedenceWarning>,
233}
234
235/// Bounded warning emitted once per resolved alias when both
236/// `LIFELOOP_*` and `CCD_*` are set. The `lifeloop` value wins; the
237/// `ccd_compat` value is shadowed.
238#[derive(Debug, Clone, PartialEq, Eq)]
239pub struct EnvPrecedenceWarning {
240    pub lifeloop_key: &'static str,
241    pub ccd_compat_key: &'static str,
242}
243
244impl EnvWarningSink {
245    fn note(&self, alias: EnvAlias) {
246        let mut inner = self.inner.lock().expect("env warning sink poisoned");
247        if inner.seen.iter().any(|k| k == alias.lifeloop) {
248            return;
249        }
250        inner.seen.push(alias.lifeloop.to_string());
251        inner.queued.push(EnvPrecedenceWarning {
252            lifeloop_key: alias.lifeloop,
253            ccd_compat_key: alias.ccd_compat,
254        });
255    }
256
257    /// Drain queued warnings (FIFO). Each warning is yielded at most
258    /// once: a key that has already been drained will not appear again
259    /// in a later call, even if its alias resolves repeatedly.
260    pub fn drain(&self) -> Vec<EnvPrecedenceWarning> {
261        let mut inner = self.inner.lock().expect("env warning sink poisoned");
262        std::mem::take(&mut inner.queued)
263    }
264
265    /// Test-only: forget all dedupe state. Production code never calls
266    /// this; the sink is intended to live for the process lifetime.
267    #[doc(hidden)]
268    pub fn reset_for_tests(&self) {
269        let mut inner = self.inner.lock().expect("env warning sink poisoned");
270        inner.seen.clear();
271        inner.queued.clear();
272    }
273}
274
275/// Process-wide warning sink used by [`resolve_env_string`] and
276/// [`resolve_env_u64`]. Callers can `drain` to surface these warnings on
277/// their preferred channel.
278pub fn env_warning_sink() -> &'static EnvWarningSink {
279    use std::sync::OnceLock;
280    static SINK: OnceLock<EnvWarningSink> = OnceLock::new();
281    SINK.get_or_init(EnvWarningSink::default)
282}
283
284/// Resolve an env-var string through the alias list. The first alias
285/// whose `lifeloop` key is set wins; otherwise the first alias whose
286/// `ccd_compat` key is set wins. Whenever an alias has *both* sides set,
287/// a precedence warning is recorded (once per `lifeloop` key per
288/// process) regardless of which alias actually carried the resolution.
289pub fn resolve_env_string(aliases: &[EnvAlias]) -> Option<String> {
290    resolve_env_string_with(aliases, &|name| std::env::var(name).ok())
291}
292
293/// Like [`resolve_env_string`] but reads through a closure (for tests
294/// that don't want to mutate process env).
295pub fn resolve_env_string_with(
296    aliases: &[EnvAlias],
297    read: &dyn Fn(&str) -> Option<String>,
298) -> Option<String> {
299    let mut chosen: Option<String> = None;
300
301    for alias in aliases {
302        let lifeloop_value = read(alias.lifeloop)
303            .map(|v| v.trim().to_owned())
304            .filter(|v| !v.is_empty());
305        let ccd_value = read(alias.ccd_compat)
306            .map(|v| v.trim().to_owned())
307            .filter(|v| !v.is_empty());
308
309        if lifeloop_value.is_some() && ccd_value.is_some() {
310            env_warning_sink().note(*alias);
311        }
312
313        if chosen.is_none() {
314            chosen = lifeloop_value.or(ccd_value);
315        }
316    }
317
318    chosen
319}
320
321/// Resolve an env alias as `u64`, parsing the string form.
322pub fn resolve_env_u64(aliases: &[EnvAlias]) -> Option<u64> {
323    resolve_env_string(aliases).and_then(|v| v.parse().ok())
324}
325
326/// `Lifeloop`/`CCD` general context-window fallback. Adapter readers
327/// should consult their adapter-specific alias first, then fall back to
328/// this. The general fallback exists so users with custom or local
329/// setups don't have to set a runtime-prefixed env var.
330pub const GENERAL_CONTEXT_WINDOW_ALIASES: &[EnvAlias] = &[EnvAlias {
331    lifeloop: "LIFELOOP_CONTEXT_WINDOW_TOKENS",
332    ccd_compat: "CCD_CONTEXT_WINDOW_TOKENS",
333}];
334
335/// Generic host-model fallback used by every adapter when a more
336/// specific alias does not resolve.
337pub const GENERAL_HOST_MODEL_ALIASES: &[EnvAlias] = &[EnvAlias {
338    lifeloop: "LIFELOOP_HOST_MODEL",
339    ccd_compat: "CCD_HOST_MODEL",
340}];
341
342pub fn general_context_window() -> Option<u64> {
343    resolve_env_u64(GENERAL_CONTEXT_WINDOW_ALIASES)
344}
345
346pub fn general_host_model() -> Option<String> {
347    resolve_env_string(GENERAL_HOST_MODEL_ALIASES)
348}
349
350// ============================================================================
351// Filesystem helpers (shared by per-adapter readers)
352// ============================================================================
353
354/// Seconds since UNIX epoch for the file's mtime, or `None` if the file
355/// is missing.
356pub fn file_mtime_epoch_s(path: &Path) -> TelemetryResult<Option<u64>> {
357    let metadata = match std::fs::metadata(path) {
358        Ok(m) => m,
359        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
360        Err(error) => return Err(TelemetryError::Internal(error.to_string())),
361    };
362    let modified = metadata
363        .modified()
364        .map_err(|e| TelemetryError::Internal(e.to_string()))?;
365    let epoch_s = modified
366        .duration_since(UNIX_EPOCH)
367        .map_err(|e| TelemetryError::Internal(format!("mtime before UNIX_EPOCH: {e}")))?
368        .as_secs();
369    Ok(Some(epoch_s))
370}
371
372/// Default recency window (seconds) for considering a telemetry artifact
373/// fresh enough to drive a `context.pressure_observed` event. Mirrors the
374/// 30-minute threshold the CCD readers used.
375pub const RECENT_ACTIVITY_SECS: u64 = 30 * 60;
376
377pub fn now_epoch_s() -> TelemetryResult<u64> {
378    SystemTime::now()
379        .duration_since(UNIX_EPOCH)
380        .map(|d| d.as_secs())
381        .map_err(|e| TelemetryError::Internal(format!("system clock before UNIX_EPOCH: {e}")))
382}
383
384pub fn is_recent(epoch_s: u64) -> TelemetryResult<bool> {
385    Ok(now_epoch_s()?.saturating_sub(epoch_s) <= RECENT_ACTIVITY_SECS)
386}
387
388pub fn home_dir() -> TelemetryResult<PathBuf> {
389    match std::env::var_os("HOME") {
390        Some(home) => Ok(PathBuf::from(home)),
391        None => Err(TelemetryError::Unavailable(
392            "HOME environment variable is not set".into(),
393        )),
394    }
395}
396
397pub fn compute_pct(total_tokens: u64, context_window: Option<u64>) -> Option<u8> {
398    let cw = context_window?;
399    if cw == 0 {
400        return None;
401    }
402    Some(((total_tokens.saturating_mul(100)) / cw).min(100) as u8)
403}
404
405// ============================================================================
406// JSON probing helpers (shared by per-adapter readers)
407// ============================================================================
408
409/// Recursively search a JSON value for the first matching string by
410/// candidate key names. Descends into objects and arrays depth-first.
411pub fn string_key(value: &serde_json::Value, keys: &[&str]) -> Option<String> {
412    match value {
413        serde_json::Value::Object(map) => {
414            for key in keys {
415                if let Some(serde_json::Value::String(found)) = map.get(*key) {
416                    return Some(found.clone());
417                }
418            }
419            for child in map.values() {
420                if let Some(found) = string_key(child, keys) {
421                    return Some(found);
422                }
423            }
424            None
425        }
426        serde_json::Value::Array(items) => items.iter().find_map(|i| string_key(i, keys)),
427        _ => None,
428    }
429}
430
431/// Recursively search a JSON value for the first matching numeric value
432/// by candidate key names. Accepts both JSON numbers and stringified
433/// integers.
434pub fn number_key(value: &serde_json::Value, keys: &[&str]) -> Option<u64> {
435    match value {
436        serde_json::Value::Object(map) => {
437            for key in keys {
438                if let Some(found) = map.get(*key)
439                    && let Some(number) = as_u64(found)
440                {
441                    return Some(number);
442                }
443            }
444            for child in map.values() {
445                if let Some(found) = number_key(child, keys) {
446                    return Some(found);
447                }
448            }
449            None
450        }
451        serde_json::Value::Array(items) => items.iter().find_map(|i| number_key(i, keys)),
452        _ => None,
453    }
454}
455
456/// Coerce a JSON value to u64, accepting both numbers and parseable
457/// strings.
458pub fn as_u64(value: &serde_json::Value) -> Option<u64> {
459    match value {
460        serde_json::Value::Number(number) => number.as_u64(),
461        serde_json::Value::String(text) => text.parse().ok(),
462        _ => None,
463    }
464}
465
466// ============================================================================
467// Tests
468// ============================================================================
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473    use std::sync::Mutex;
474
475    // Tests below that touch the global `env_warning_sink` must serialize:
476    // cargo test runs unit tests in parallel by default and the sink's
477    // reset/drain pair races otherwise. A process-local mutex keeps the
478    // test set dependency-free.
479    static ENV_SINK_LOCK: Mutex<()> = Mutex::new(());
480    use serde_json::json;
481
482    #[test]
483    fn as_u64_accepts_numbers_and_strings() {
484        assert_eq!(as_u64(&json!(42)), Some(42));
485        assert_eq!(as_u64(&json!("1024")), Some(1024));
486        assert_eq!(as_u64(&json!(-1)), None);
487        assert_eq!(as_u64(&json!(1.5)), None);
488        assert_eq!(as_u64(&json!("hello")), None);
489    }
490
491    #[test]
492    fn string_key_descends() {
493        let v = json!({"outer": {"inner": {"target": "found"}}});
494        assert_eq!(string_key(&v, &["target"]), Some("found".into()));
495    }
496
497    #[test]
498    fn number_key_descends() {
499        let v = json!({"usage": {"prompt_tokens": 200}});
500        assert_eq!(number_key(&v, &["prompt_tokens"]), Some(200));
501    }
502
503    #[test]
504    fn telemetry_error_failure_classes_are_stable() {
505        assert_eq!(
506            TelemetryError::Unavailable("x".into()).failure_class(),
507            "telemetry_unavailable"
508        );
509        assert_eq!(
510            TelemetryError::HookProtocol("x".into()).failure_class(),
511            "hook_protocol_error"
512        );
513        assert_eq!(
514            TelemetryError::Internal("x".into()).failure_class(),
515            "internal_error"
516        );
517    }
518
519    #[test]
520    fn pressure_observation_serializes_minimally() {
521        let obs = PressureObservation {
522            adapter_id: "claude".into(),
523            adapter_version: None,
524            observed_at_epoch_s: 100,
525            model_name: None,
526            total_tokens: Some(500),
527            context_window_tokens: Some(1000),
528            context_used_pct: Some(50),
529            compaction_signal: None,
530            usage: TokenUsage::default(),
531        };
532        let json = serde_json::to_value(&obs).unwrap();
533        assert_eq!(json["adapter_id"], "claude");
534        assert_eq!(json["observed_at_epoch_s"], 100);
535        assert_eq!(json["total_tokens"], 500);
536        assert!(json.get("adapter_version").is_none());
537        assert!(json.get("compaction_signal").is_none());
538        assert!(json.get("usage").is_none());
539    }
540
541    #[test]
542    fn resolve_env_string_with_lifeloop_winning() {
543        let _g = ENV_SINK_LOCK.lock().unwrap();
544        env_warning_sink().reset_for_tests();
545        let aliases = &[EnvAlias {
546            lifeloop: "LIFELOOP_TEST_X",
547            ccd_compat: "CCD_TEST_X",
548        }];
549        let read = |name: &str| -> Option<String> {
550            match name {
551                "LIFELOOP_TEST_X" => Some("ll".into()),
552                "CCD_TEST_X" => Some("ccd".into()),
553                _ => None,
554            }
555        };
556        assert_eq!(resolve_env_string_with(aliases, &read), Some("ll".into()));
557        let warnings = env_warning_sink().drain();
558        assert_eq!(warnings.len(), 1);
559        assert_eq!(warnings[0].lifeloop_key, "LIFELOOP_TEST_X");
560        assert_eq!(warnings[0].ccd_compat_key, "CCD_TEST_X");
561    }
562
563    #[test]
564    fn resolve_env_string_falls_back_to_ccd() {
565        let _g = ENV_SINK_LOCK.lock().unwrap();
566        env_warning_sink().reset_for_tests();
567        let aliases = &[EnvAlias {
568            lifeloop: "LIFELOOP_TEST_Y",
569            ccd_compat: "CCD_TEST_Y",
570        }];
571        let read = |name: &str| -> Option<String> {
572            match name {
573                "CCD_TEST_Y" => Some("ccd-only".into()),
574                _ => None,
575            }
576        };
577        assert_eq!(
578            resolve_env_string_with(aliases, &read),
579            Some("ccd-only".into())
580        );
581        assert!(env_warning_sink().drain().is_empty());
582    }
583
584    #[test]
585    fn warning_is_bounded_to_one_per_key() {
586        let _g = ENV_SINK_LOCK.lock().unwrap();
587        env_warning_sink().reset_for_tests();
588        let aliases = &[EnvAlias {
589            lifeloop: "LIFELOOP_TEST_Z",
590            ccd_compat: "CCD_TEST_Z",
591        }];
592        let read = |name: &str| -> Option<String> {
593            match name {
594                "LIFELOOP_TEST_Z" => Some("ll".into()),
595                "CCD_TEST_Z" => Some("ccd".into()),
596                _ => None,
597            }
598        };
599        for _ in 0..5 {
600            let _ = resolve_env_string_with(aliases, &read);
601        }
602        let warnings = env_warning_sink().drain();
603        assert_eq!(warnings.len(), 1);
604        // After drain, the seen-set still holds the key, so subsequent
605        // resolutions must NOT requeue it.
606        for _ in 0..3 {
607            let _ = resolve_env_string_with(aliases, &read);
608        }
609        assert!(env_warning_sink().drain().is_empty());
610    }
611}