Skip to main content

tj_core/classifier/
agent_sdk.rs

1//! Claude CLI ("agent SDK") classifier backend.
2//!
3//! Runs the locally-installed, already-authenticated `claude` binary in
4//! non-interactive print mode, pinned to Haiku, to classify a chunk *without*
5//! an `ANTHROPIC_API_KEY`. This resurrects the v0.7.x `cli` backend that was
6//! removed in v0.8.0 — but honestly: since **2026-06-15** a headless
7//! `claude -p` run draws from the separate **Agent SDK** monthly credit pool
8//! (~$20 Pro / $100 Max 5x / $200 Max 20x, at API rates), not the interactive
9//! Pro/Max pool. Classification is Haiku-class and tiny (a few hundred tokens
10//! per chunk), so the credit lasts a long time — but it is not strictly free.
11//!
12//! The command execution is abstracted behind [`CommandRunner`] so the parsing
13//! path is unit-testable with a fake; the suite never shells out to `claude`.
14
15use super::{Classifier, ClassifyInput, ClassifyOutput};
16use anyhow::{anyhow, Context};
17use std::process::Command;
18
19/// Default model. `claude --model` accepts the short alias and resolves it to
20/// the current dated id (`claude-haiku-4-5-20251001`). Override with
21/// `TJ_AGENT_SDK_MODEL`.
22pub const DEFAULT_MODEL: &str = "claude-haiku-4-5";
23
24/// Env var stamped onto every spawned classifier `claude -p` subprocess. That
25/// subprocess is a full Claude Code instance, so on startup it re-runs the
26/// user's SessionStart hooks — including `task-journal ingest-hook`, which
27/// would spawn yet another classifier `claude -p`, and so on: an unbounded
28/// fork bomb. `ingest-hook` checks for this marker and no-ops when it is set,
29/// breaking the recursion. The CLI guard and the worker's `env_remove` both
30/// reference this constant so the setter and the checker can never drift
31/// (which is exactly the bug that let the fork bomb through: the guard checked
32/// `TJ_IN_CLASSIFIER` but no spawn site ever set it).
33pub const IN_CLASSIFIER_ENV: &str = "TJ_IN_CLASSIFIER";
34
35/// "Run the classifier command and hand back its raw stdout." The production
36/// impl shells out to `claude`; tests inject a fake returning canned JSON.
37pub trait CommandRunner: Send + Sync {
38    /// Run the classification for `prompt` against `model`, returning the raw
39    /// stdout (the `--output-format json` wrapper) on success.
40    fn run(&self, model: &str, prompt: &str) -> anyhow::Result<String>;
41}
42
43/// Build the base `claude` invocation shared by both runners: print mode, the
44/// pinned model, the JSON envelope, an isolated MCP config, and — critically —
45/// the [`IN_CLASSIFIER_ENV`] recursion marker. The argv runner appends the
46/// prompt as a positional arg; the stdin runner feeds it on stdin. Extracted so
47/// a unit test can assert the marker is present without spawning `claude` (the
48/// missing marker is exactly what let the fork bomb through before).
49fn base_claude_command(model: &str) -> Command {
50    let mut cmd = Command::new("claude");
51    cmd.arg("-p")
52        .arg("--model")
53        .arg(model)
54        .arg("--output-format")
55        .arg("json")
56        .arg("--strict-mcp-config")
57        .env(IN_CLASSIFIER_ENV, "1");
58    cmd
59}
60
61/// Production runner: invokes the local `claude` binary in print mode, pinned
62/// to the given model, asking for the JSON envelope and an isolated MCP config
63/// (`--strict-mcp-config` keeps the project's own MCP servers — including this
64/// very journal — out of the classification subprocess).
65pub struct ClaudeBinaryRunner;
66
67/// Build the error for a non-zero `claude -p` exit. With `--output-format
68/// json` claude reports the real cause (invalid model, usage limit, auth) as
69/// JSON on **stdout**, not stderr — so surface both, capped, or the user just
70/// sees a bare "exit status 1".
71fn claude_exit_error(
72    status: std::process::ExitStatus,
73    stdout: &[u8],
74    stderr: &[u8],
75) -> anyhow::Error {
76    let cap = |b: &[u8]| {
77        let s = String::from_utf8_lossy(b);
78        let s = s.trim().to_string();
79        if s.chars().count() > 600 {
80            format!("{}…", s.chars().take(600).collect::<String>())
81        } else {
82            s
83        }
84    };
85    let out = cap(stdout);
86    let err = cap(stderr);
87    let detail = match (out.is_empty(), err.is_empty()) {
88        (true, true) => "(no output)".to_string(),
89        (false, true) => out,
90        (true, false) => err,
91        (false, false) => format!("{err} | stdout: {out}"),
92    };
93    anyhow!("`claude -p` exited with {status}: {detail}")
94}
95
96/// Per-call wall-clock ceiling for a `claude -p` invocation. A spawned full
97/// Claude Code instance normally answers in seconds; this kills a wedged one so
98/// a multi-chunk enrich can't hang the whole `complete`. Override with
99/// `TJ_CLAUDE_TIMEOUT_SECS`.
100fn claude_timeout() -> std::time::Duration {
101    let secs = std::env::var("TJ_CLAUDE_TIMEOUT_SECS")
102        .ok()
103        .and_then(|s| s.parse::<u64>().ok())
104        .filter(|n| *n > 0)
105        .unwrap_or(90);
106    std::time::Duration::from_secs(secs)
107}
108
109/// Wait for `child` up to `timeout`, draining stdout/stderr concurrently so a
110/// full pipe can't deadlock the wait. On timeout the child is killed and an
111/// error returned; otherwise the captured output is handed back.
112fn wait_with_timeout(
113    mut child: std::process::Child,
114    timeout: std::time::Duration,
115) -> anyhow::Result<std::process::Output> {
116    use std::io::Read;
117    let mut out_pipe = child.stdout.take();
118    let mut err_pipe = child.stderr.take();
119    let so = std::thread::spawn(move || {
120        let mut b = Vec::new();
121        if let Some(p) = out_pipe.as_mut() {
122            let _ = p.read_to_end(&mut b);
123        }
124        b
125    });
126    let se = std::thread::spawn(move || {
127        let mut b = Vec::new();
128        if let Some(p) = err_pipe.as_mut() {
129            let _ = p.read_to_end(&mut b);
130        }
131        b
132    });
133    let start = std::time::Instant::now();
134    let status = loop {
135        if let Some(status) = child.try_wait()? {
136            break status;
137        }
138        if start.elapsed() >= timeout {
139            let _ = child.kill();
140            let _ = child.wait();
141            anyhow::bail!("`claude -p` timed out after {}s", timeout.as_secs());
142        }
143        std::thread::sleep(std::time::Duration::from_millis(150));
144    };
145    Ok(std::process::Output {
146        status,
147        stdout: so.join().unwrap_or_default(),
148        stderr: se.join().unwrap_or_default(),
149    })
150}
151
152impl CommandRunner for ClaudeBinaryRunner {
153    fn run(&self, model: &str, prompt: &str) -> anyhow::Result<String> {
154        let child = base_claude_command(model)
155            .arg(prompt)
156            .stdout(std::process::Stdio::piped())
157            .stderr(std::process::Stdio::piped())
158            .spawn()
159            .context("failed to spawn `claude` (is Claude Code installed and on PATH?)")?;
160        let output = wait_with_timeout(child, claude_timeout())?;
161        if !output.status.success() {
162            return Err(claude_exit_error(
163                output.status,
164                &output.stdout,
165                &output.stderr,
166            ));
167        }
168        Ok(String::from_utf8_lossy(&output.stdout).into_owned())
169    }
170}
171
172/// Like [`ClaudeBinaryRunner`] but feeds the prompt on **stdin** instead of as
173/// an argv argument. Use for large prompts (e.g. a whole session transcript in
174/// dream backfill) that would otherwise blow the per-argument size limit
175/// (`E2BIG`, ~128 KiB on Linux). `claude -p` with no positional prompt reads
176/// the prompt from stdin.
177pub struct ClaudeBinaryStdinRunner;
178
179impl CommandRunner for ClaudeBinaryStdinRunner {
180    fn run(&self, model: &str, prompt: &str) -> anyhow::Result<String> {
181        use std::io::Write;
182        use std::process::Stdio;
183        let mut child = base_claude_command(model)
184            .stdin(Stdio::piped())
185            .stdout(Stdio::piped())
186            .stderr(Stdio::piped())
187            .spawn()
188            .context("failed to spawn `claude` (is Claude Code installed and on PATH?)")?;
189        // Write the prompt, then drop the handle to close stdin so `claude`
190        // sees EOF and starts working.
191        child
192            .stdin
193            .take()
194            .context("claude stdin was not captured")?
195            .write_all(prompt.as_bytes())
196            .context("failed to write prompt to claude stdin")?;
197        let output = wait_with_timeout(child, claude_timeout())?;
198        if !output.status.success() {
199            return Err(claude_exit_error(
200                output.status,
201                &output.stdout,
202                &output.stderr,
203            ));
204        }
205        Ok(String::from_utf8_lossy(&output.stdout).into_owned())
206    }
207}
208
209pub struct ClaudeCliClassifier {
210    model: String,
211    runner: Box<dyn CommandRunner>,
212}
213
214impl ClaudeCliClassifier {
215    /// Build from environment. Returns `None` unless a `claude` binary is on
216    /// PATH (probed with `claude --version`) — the caller then falls through to
217    /// the next backend. Model comes from `TJ_AGENT_SDK_MODEL`, else Haiku.
218    pub fn from_env() -> Option<Self> {
219        if !claude_on_path() {
220            return None;
221        }
222        let model = std::env::var("TJ_AGENT_SDK_MODEL").unwrap_or_else(|_| DEFAULT_MODEL.into());
223        Some(Self {
224            model,
225            runner: Box::new(ClaudeBinaryRunner),
226        })
227    }
228
229    /// Test/dev constructor: inject a fake runner and an explicit model so the
230    /// parse path can be exercised without a live `claude` login.
231    pub fn with_runner(model: impl Into<String>, runner: Box<dyn CommandRunner>) -> Self {
232        Self {
233            model: model.into(),
234            runner,
235        }
236    }
237}
238
239/// The JSON wrapper emitted by `claude --output-format json`. We read the error
240/// flag, the `result` string (the model's verdict text), and the usage/cost so
241/// callers can report what a call actually consumed.
242#[derive(serde::Deserialize)]
243struct CliEnvelope {
244    #[serde(default)]
245    is_error: bool,
246    #[serde(default)]
247    result: Option<String>,
248    #[serde(default)]
249    subtype: Option<String>,
250    #[serde(default)]
251    usage: Option<EnvelopeUsage>,
252    #[serde(default)]
253    total_cost_usd: Option<f64>,
254}
255
256#[derive(serde::Deserialize, Default)]
257struct EnvelopeUsage {
258    #[serde(default)]
259    input_tokens: u64,
260    #[serde(default)]
261    output_tokens: u64,
262    #[serde(default)]
263    cache_creation_input_tokens: u64,
264    #[serde(default)]
265    cache_read_input_tokens: u64,
266}
267
268impl Classifier for ClaudeCliClassifier {
269    fn classify(&self, input: &ClassifyInput) -> anyhow::Result<ClassifyOutput> {
270        let prompt = crate::classifier::prompt::build(input);
271        let verdict = run_claude_json(self.runner.as_ref(), &self.model, &prompt)?;
272        super::parse_verdict(&verdict)
273    }
274}
275
276/// Run `prompt` through the claude CLI (via `runner`) and return the model's
277/// reply text — the `result` field of the `--output-format json` envelope.
278/// Shared by the classifier and the dream agent-sdk backend so the envelope
279/// handling lives in one place.
280pub fn run_claude_json(
281    runner: &dyn CommandRunner,
282    model: &str,
283    prompt: &str,
284) -> anyhow::Result<String> {
285    run_claude_json_usage(runner, model, prompt).map(|(text, _)| text)
286}
287
288/// Like [`run_claude_json`] but also returns the envelope's reported token
289/// usage and cost (zeros when the envelope omits them).
290pub fn run_claude_json_usage(
291    runner: &dyn CommandRunner,
292    model: &str,
293    prompt: &str,
294) -> anyhow::Result<(String, crate::llm::LlmUsage)> {
295    let stdout = runner.run(model, prompt)?;
296    let envelope: CliEnvelope = serde_json::from_str(stdout.trim()).with_context(|| {
297        format!(
298            "claude --output-format json wrapper parse failed; got: {}",
299            stdout.trim()
300        )
301    })?;
302    if envelope.is_error {
303        return Err(anyhow!(
304            "claude reported an error (subtype={})",
305            envelope.subtype.as_deref().unwrap_or("unknown")
306        ));
307    }
308    let u = envelope.usage.unwrap_or_default();
309    let usage = crate::llm::LlmUsage {
310        // Count cache reads/writes as input so the total reflects real context.
311        input_tokens: u.input_tokens + u.cache_creation_input_tokens + u.cache_read_input_tokens,
312        output_tokens: u.output_tokens,
313        cost_usd: envelope.total_cost_usd,
314    };
315    let result = envelope
316        .result
317        .ok_or_else(|| anyhow!("claude json wrapper had no `result` field"))?;
318    Ok((result, usage))
319}
320
321/// Probe whether `claude` resolves on PATH and runs. Cheap (`--version` does
322/// no network) and tolerant — any spawn/exec failure means "not available".
323pub fn claude_on_path() -> bool {
324    Command::new("claude")
325        .arg("--version")
326        .output()
327        .map(|o| o.status.success())
328        .unwrap_or(false)
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334    use crate::classifier::{decide_status, CONFIDENCE_THRESHOLD};
335    use crate::event::{EventStatus, EventType};
336
337    /// Fake runner: returns canned stdout, ignoring model/prompt. Captures the
338    /// model it was asked for so tests can assert the pin.
339    struct FakeRunner {
340        canned: String,
341        seen_model: std::sync::Mutex<Option<String>>,
342    }
343
344    impl FakeRunner {
345        fn new(canned: impl Into<String>) -> Self {
346            Self {
347                canned: canned.into(),
348                seen_model: std::sync::Mutex::new(None),
349            }
350        }
351    }
352
353    impl CommandRunner for FakeRunner {
354        fn run(&self, model: &str, _prompt: &str) -> anyhow::Result<String> {
355            *self.seen_model.lock().unwrap() = Some(model.to_string());
356            Ok(self.canned.clone())
357        }
358    }
359
360    fn input() -> ClassifyInput {
361        ClassifyInput {
362            text: "We adopted Rust for the journal core.".into(),
363            author_hint: "assistant".into(),
364            recent_tasks: vec![],
365        }
366    }
367
368    fn envelope(result_json: &str) -> String {
369        serde_json::json!({
370            "type": "result",
371            "subtype": "success",
372            "is_error": false,
373            "result": result_json,
374        })
375        .to_string()
376    }
377
378    #[test]
379    fn base_command_carries_recursion_marker() {
380        use std::ffi::OsStr;
381        // The tj-cli ingest-hook guard short-circuits on this exact var; if the
382        // const and the spawn site ever drift, the fork bomb returns.
383        assert_eq!(IN_CLASSIFIER_ENV, "TJ_IN_CLASSIFIER");
384        let cmd = base_claude_command("claude-haiku-4-5");
385        let marker = cmd
386            .get_envs()
387            .any(|(k, v)| k == OsStr::new(IN_CLASSIFIER_ENV) && v == Some(OsStr::new("1")));
388        assert!(
389            marker,
390            "every spawned `claude -p` must set {IN_CLASSIFIER_ENV}=1 to break ingest-hook recursion"
391        );
392    }
393
394    #[test]
395    fn parses_canned_verdict_into_classify_output() {
396        let verdict = r#"{"event_type":"decision","task_id_guess":"tj-x","confidence":0.93,"evidence_strength":null,"suggested_text":"Adopt Rust."}"#;
397        let c = ClaudeCliClassifier::with_runner(
398            DEFAULT_MODEL,
399            Box::new(FakeRunner::new(envelope(verdict))),
400        );
401        let out = c.classify(&input()).unwrap();
402        assert_eq!(out.event_type, EventType::Decision);
403        assert_eq!(out.task_id_guess.as_deref(), Some("tj-x"));
404        assert!((out.confidence - 0.93).abs() < 1e-6);
405        // 0.93 >= 0.85 → confirmed.
406        assert_eq!(decide_status(out.confidence), EventStatus::Confirmed);
407    }
408
409    /// Adapter so a test can keep an `Arc` handle to inspect the runner after
410    /// it is boxed into the classifier.
411    struct ArcRunner(std::sync::Arc<FakeRunner>);
412    impl CommandRunner for ArcRunner {
413        fn run(&self, model: &str, prompt: &str) -> anyhow::Result<String> {
414            self.0.run(model, prompt)
415        }
416    }
417
418    #[test]
419    fn pins_the_configured_model() {
420        let verdict = r#"{"event_type":"finding","task_id_guess":null,"confidence":0.9,"evidence_strength":null,"suggested_text":"x"}"#;
421        let captured = std::sync::Arc::new(FakeRunner::new(envelope(verdict)));
422        let c = ClaudeCliClassifier::with_runner(
423            "claude-haiku-4-5",
424            Box::new(ArcRunner(captured.clone())),
425        );
426        let _ = c.classify(&input()).unwrap();
427        assert_eq!(
428            captured.seen_model.lock().unwrap().as_deref(),
429            Some("claude-haiku-4-5"),
430            "classifier must pin the model it was constructed with"
431        );
432    }
433
434    #[test]
435    fn decide_status_at_the_0_85_threshold() {
436        for (conf, expect) in [
437            (0.85_f64, EventStatus::Confirmed),
438            (0.84_f64, EventStatus::Suggested),
439        ] {
440            let verdict = format!(
441                r#"{{"event_type":"evidence","task_id_guess":null,"confidence":{conf},"evidence_strength":"strong","suggested_text":"t"}}"#
442            );
443            let c = ClaudeCliClassifier::with_runner(
444                DEFAULT_MODEL,
445                Box::new(FakeRunner::new(envelope(&verdict))),
446            );
447            let out = c.classify(&input()).unwrap();
448            assert!((out.confidence - conf).abs() < 1e-6);
449            assert_eq!(decide_status(out.confidence), expect);
450            assert_eq!(CONFIDENCE_THRESHOLD, 0.85);
451        }
452    }
453
454    #[test]
455    fn tolerates_code_fence_wrapped_verdict() {
456        let verdict = "```json\n{\"event_type\":\"rejection\",\"task_id_guess\":null,\"confidence\":0.88,\"evidence_strength\":null,\"suggested_text\":\"won't work\"}\n```";
457        let c = ClaudeCliClassifier::with_runner(
458            DEFAULT_MODEL,
459            Box::new(FakeRunner::new(envelope(verdict))),
460        );
461        let out = c.classify(&input()).unwrap();
462        assert_eq!(out.event_type, EventType::Rejection);
463    }
464
465    #[test]
466    fn errors_when_claude_reports_is_error() {
467        let canned = serde_json::json!({
468            "type": "result",
469            "subtype": "error_during_execution",
470            "is_error": true,
471            "result": null,
472        })
473        .to_string();
474        let c = ClaudeCliClassifier::with_runner(DEFAULT_MODEL, Box::new(FakeRunner::new(canned)));
475        let err = c.classify(&input()).unwrap_err();
476        assert!(format!("{err}").contains("error"), "got: {err}");
477    }
478}