lifeloop-cli 0.1.0

Provider-neutral lifecycle abstraction and normalizer for AI harnesses
Documentation
//! Codex CLI lifecycle telemetry reader.
//!
//! Parses Codex session JSONL emitting `event_msg` / `token_count`
//! payloads and extracts the lifecycle kernel: latest prompt-token
//! count, model name, and the natively reported context window.

use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};

use serde::Deserialize;
use serde_json::Value;

use super::{
    EnvAlias, PressureObservation, TelemetryError, TelemetryResult, TokenUsage, compute_pct,
    file_mtime_epoch_s, general_context_window, general_host_model, home_dir, resolve_env_string,
    string_key,
};

const ADAPTER_ID: &str = "codex";

const CODEX_HOME_ALIASES: &[EnvAlias] = &[EnvAlias {
    lifeloop: "LIFELOOP_CODEX_HOME",
    ccd_compat: "CODEX_HOME",
}];

const CODEX_THREAD_ID_ALIASES: &[EnvAlias] = &[EnvAlias {
    lifeloop: "LIFELOOP_CODEX_THREAD_ID",
    ccd_compat: "CODEX_THREAD_ID",
}];

const CODEX_MODEL_ALIASES: &[EnvAlias] = &[EnvAlias {
    lifeloop: "LIFELOOP_CODEX_MODEL",
    ccd_compat: "CCD_CODEX_MODEL",
}];

/// Probe the active Codex session log for the configured thread id.
/// Returns `Ok(None)` when no thread id is configured or the session
/// log has no `token_count` events yet.
pub fn current() -> TelemetryResult<Option<PressureObservation>> {
    let Some(thread_id) = resolve_env_string(CODEX_THREAD_ID_ALIASES) else {
        return Ok(None);
    };

    let codex_home = codex_home()?;
    let Some(session_path) = find_session_log(&codex_home.join("sessions"), &thread_id)? else {
        return Ok(None);
    };

    let observed_at_epoch_s = match file_mtime_epoch_s(&session_path)? {
        Some(epoch_s) => epoch_s,
        None => return Ok(None),
    };

    let bytes = fs::read(&session_path).map_err(TelemetryError::from)?;
    parse_session_log(&bytes, observed_at_epoch_s)
}

/// Parse a Codex session log byte slice. Public for tests and for
/// callers that read the file themselves.
pub fn parse_session_log(
    bytes: &[u8],
    observed_at_epoch_s: u64,
) -> TelemetryResult<Option<PressureObservation>> {
    let reader = BufReader::new(bytes);
    let mut latest: Option<PressureObservation> = None;
    let mut latest_model = resolve_env_string(CODEX_MODEL_ALIASES).or_else(general_host_model);

    for line in reader.lines() {
        let line = line.map_err(TelemetryError::from)?;
        let value: Value = match serde_json::from_str(&line) {
            Ok(v) => v,
            Err(_) => continue,
        };
        if let Some(model_name) = string_key(
            &value,
            &[
                "model",
                "model_name",
                "modelName",
                "model_slug",
                "modelSlug",
            ],
        ) {
            latest_model = Some(model_name);
        }
        let entry: SessionEntry = match serde_json::from_value(value) {
            Ok(e) => e,
            Err(_) => continue,
        };
        if entry.entry_type != "event_msg" {
            continue;
        }
        let Some(payload) = entry.payload else {
            continue;
        };
        if payload.payload_type != "token_count" {
            continue;
        }

        let native_window = match payload.info.model_context_window {
            0 => None,
            value => Some(value),
        };
        let context_window = native_window.or_else(general_context_window);

        let total_tokens = payload
            .info
            .last_token_usage
            .as_ref()
            .map(|u| u.total_tokens)
            .unwrap_or(payload.info.total_token_usage.total_tokens);

        latest = Some(PressureObservation {
            adapter_id: ADAPTER_ID.into(),
            adapter_version: None,
            observed_at_epoch_s,
            model_name: latest_model.clone(),
            total_tokens: Some(total_tokens),
            context_window_tokens: context_window,
            context_used_pct: compute_pct(total_tokens, context_window),
            compaction_signal: None,
            usage: TokenUsage {
                blended_total_tokens: Some(payload.info.total_token_usage.total_tokens),
                ..TokenUsage::default()
            },
        });
    }

    Ok(latest)
}

fn codex_home() -> TelemetryResult<PathBuf> {
    if let Some(path) = resolve_env_string(CODEX_HOME_ALIASES) {
        return Ok(PathBuf::from(path));
    }
    Ok(home_dir()?.join(".codex"))
}

fn find_session_log(root: &Path, thread_id: &str) -> TelemetryResult<Option<PathBuf>> {
    if !root.is_dir() {
        return Ok(None);
    }
    let mut stack = vec![root.to_path_buf()];
    while let Some(dir) = stack.pop() {
        for entry in fs::read_dir(&dir).map_err(TelemetryError::from)? {
            let entry = entry.map_err(TelemetryError::from)?;
            let path = entry.path();
            if path.is_dir() {
                stack.push(path);
                continue;
            }
            let matches = path
                .file_name()
                .and_then(|n| n.to_str())
                .map(|n| n.contains(thread_id) && n.ends_with(".jsonl"))
                .unwrap_or(false);
            if matches {
                return Ok(Some(path));
            }
        }
    }
    Ok(None)
}

#[derive(Deserialize)]
struct SessionEntry {
    #[serde(rename = "type")]
    entry_type: String,
    payload: Option<TokenCountPayload>,
}

#[derive(Deserialize)]
struct TokenCountPayload {
    #[serde(rename = "type")]
    payload_type: String,
    info: TokenCountInfo,
}

#[derive(Deserialize)]
struct TokenCountInfo {
    total_token_usage: TotalTokenUsage,
    last_token_usage: Option<TotalTokenUsage>,
    #[serde(default)]
    model_context_window: u64,
}

#[derive(Deserialize)]
struct TotalTokenUsage {
    total_tokens: u64,
}