Skip to main content

kaizen/shell/
observe.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen observe` process wrapper: daemon proxy/session env.
3
4use crate::ipc::{ObservedSession, ProxyEndpoint};
5use crate::shell::cli::workspace_path;
6use anyhow::{Context, Result};
7use std::path::Path;
8use std::process::Command;
9use uuid::Uuid;
10
11pub fn cmd_observe(workspace: Option<&Path>, agent: &str, argv: &[String]) -> Result<()> {
12    let ws = workspace_path(workspace)?;
13    let observed = begin_observed_session(&ws, agent);
14    let status = observed_command(argv)?
15        .current_dir(&ws)
16        .envs(observed_env(agent, &observed))
17        .status()
18        .with_context(|| format!("run observed command: {}", argv[0]))?;
19    if !status.success() {
20        std::process::exit(status.code().unwrap_or(1));
21    }
22    Ok(())
23}
24
25fn observed_command(argv: &[String]) -> Result<Command> {
26    let (program, args) = argv.split_first().context("missing observed command")?;
27    let mut cmd = Command::new(program);
28    cmd.args(args);
29    Ok(cmd)
30}
31
32fn begin_observed_session(workspace: &Path, agent: &str) -> ObservedSession {
33    let workspace = workspace.to_string_lossy().to_string();
34    match crate::daemon::begin_observed_session_blocking(workspace, agent.into()) {
35        Ok(session) => session,
36        Err(err) => {
37            eprintln!(
38                "kaizen observe: daemon capture unavailable ({err:#}); running without proxy"
39            );
40            ObservedSession {
41                session: format!("observe-{}", Uuid::now_v7()),
42                proxies: Vec::new(),
43            }
44        }
45    }
46}
47
48fn observed_env(agent: &str, observed: &ObservedSession) -> Vec<(String, String)> {
49    let mut env = session_env(&observed.session);
50    for endpoint in &observed.proxies {
51        env.extend(observed_session_env(agent, endpoint, &observed.session));
52    }
53    dedupe_env(env)
54}
55
56fn session_env(session: &str) -> Vec<(String, String)> {
57    vec![
58        ("KAIZEN_SESSION_KEY".into(), session.into()),
59        ("X_KAIZEN_SESSION".into(), session.into()),
60    ]
61}
62
63/// Env vars for one daemon proxy endpoint.
64pub fn observed_session_env(
65    agent: &str,
66    endpoint: &ProxyEndpoint,
67    session: &str,
68) -> Vec<(String, String)> {
69    let mut env = session_env(session);
70    if endpoint_applies(agent, endpoint, "openai")
71        && let Some(base) = &endpoint.v1_base_url
72    {
73        env.push(("OPENAI_BASE_URL".into(), base.clone()));
74    }
75    if endpoint_applies(agent, endpoint, "anthropic") {
76        env.push(("ANTHROPIC_BASE_URL".into(), endpoint.base_url.clone()));
77    }
78    env
79}
80
81fn endpoint_applies(agent: &str, endpoint: &ProxyEndpoint, provider: &str) -> bool {
82    if endpoint.provider != provider {
83        return false;
84    }
85    match agent.to_ascii_lowercase().as_str() {
86        "codex" => provider == "openai",
87        "claude" => provider == "anthropic",
88        _ => true,
89    }
90}
91
92fn dedupe_env(env: Vec<(String, String)>) -> Vec<(String, String)> {
93    env.into_iter()
94        .fold(
95            std::collections::BTreeMap::new(),
96            |mut acc, (key, value)| {
97                acc.insert(key, value);
98                acc
99            },
100        )
101        .into_iter()
102        .collect()
103}