defect_cli/oneshot.rs
1//! Single-turn unattended mode —— `defect --message <prompt>`.
2//!
3//! Purpose: in CI / scripts, "run one prompt, produce a result, exit by
4//! success/failure". On **equal footing** with the interactive REPL and the
5//! ACP server; the three only share the [`AgentCore`] kernel:
6//!
7//! - Does **not** reuse the REPL's [`crate::repl`] rendering (that stack carries
8//! ANSI / line editing, bound to the `repl` feature's crossterm/owo-colors).
9//! This module ships its own minimal, ANSI-free event projection, so under
10//! `--no-default-features --features oneshot` it builds a slim CI binary with
11//! no TUI dependencies.
12//! - Connects in-process directly to `AgentCore` (like the REPL), bypassing the
13//! wire —— CI runs its own agent and does not need ACP's cross-process
14//! generality.
15//!
16//! ## Output contract: stdout = agent content, stderr = framework logs
17//!
18//! **All agent content** (assistant body / thinking / tool calls) goes to
19//! **stdout** as a single stream in event order; framework-level diagnostics
20//! (denial warnings, turn errors, unreached goals) go through `tracing` —— and
21//! `tracing` is uniformly written to **stderr** by `defect_obs::init_tracing`.
22//! So `2>/dev/null` cleanly filters out framework noise while preserving the
23//! agent's complete work record; the two streams no longer share a cursor or
24//! run together.
25//!
26//! ## Exit codes (CI's lifeline for judging success/failure)
27//!
28//! Priority high to low: `TurnError` > `Refusal` > `MaxTokens`/`MaxTurnRequests`
29//! > `Cancelled` > unattended denial (`denied`) > `EndTurn`(0). See `ExitOutcome`.
30//!
31//! ## Non-interactive permissions
32//!
33//! The caller (`bin/cli.rs`) is responsible for wrapping the session's policy in
34//! [`defect_agent::policy::NonInteractivePolicy`], so that `Ask` degrades to
35//! `Deny` and it does not hang waiting for input in a TTY-less environment. This
36//! module listens for `PolicyDecision::Deny` in the event stream: once one
37//! occurs, it logs a warning via `tracing` (→ stderr) and sets the `denied`
38//! flag, so even if the turn ends normally with `EndTurn` it exits with a
39//! non-zero code —— fail loud, letting CI know "an operation was denied, this
40//! result is not trustworthy".
41
42use std::collections::HashMap;
43use std::path::PathBuf;
44use std::process::ExitCode;
45use std::sync::Arc;
46
47use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason, TextContent, ToolCallId};
48use defect_agent::event::AgentEvent;
49use defect_agent::policy::PolicyDecision;
50use defect_agent::session::{AgentCore, TurnError};
51use futures::{FutureExt, StreamExt};
52use tokio::io::{AsyncWriteExt, Stdout};
53
54use crate::args::OutputFormat;
55use crate::session_open::open_session;
56
57/// Runs a single-turn prompt and returns the process exit code.
58///
59/// Only when `track_denied = true` (the caller wrapped `NonInteractivePolicy`)
60/// is `PolicyDecision::Deny` treated as an "unattended gap" that affects the
61/// exit code —— modes like `deny-all`, where the user knowingly expects
62/// denials, should pass `false` to avoid spuriously reporting non-zero.
63///
64/// # Errors
65///
66/// Session open failure, stdin read failure, stdout/stderr write failure.
67/// When `goal` is `Some` (`--goal` mode): if the goal is unreached after the
68/// turn ends (turns exhausted without ever calling `goal_done`), exits with the
69/// Exhausted code —— prevents CI from mistaking "ran out of turns without
70/// reaching the goal" for success.
71pub async fn run(
72 agent: Arc<dyn AgentCore>,
73 cwd: PathBuf,
74 message: String,
75 format: OutputFormat,
76 resume: Option<SessionId>,
77 track_denied: bool,
78 goal: Option<Arc<defect_agent::session::GoalState>>,
79) -> anyhow::Result<ExitCode> {
80 let prompt = resolve_prompt(message).await?;
81
82 let mut out = tokio::io::stdout();
83
84 let session = open_session(&agent, &cwd, resume).await?;
85
86 // Subscribe once outside the loop, draining across this turn (including the
87 // driver's autonomous turn continuations) —— isomorphic to the interactive
88 // REPL / ACP event pump. The turn future only yields the final StopReason;
89 // this turn's content is pushed through the event stream.
90 let mut events = session.subscribe();
91 let mut sink = EventSink::new(format, track_denied);
92
93 let prompt_blocks = vec![ContentBlock::Text(TextContent::new(prompt))];
94 let turn = session.run_turn(prompt_blocks);
95 tokio::pin!(turn);
96
97 let result = loop {
98 tokio::select! {
99 // biased + drain events first: the turn future and the event stream
100 // may become ready at the same time (turn finishes fast, trailing
101 // events already in the buffer). Poll events first to avoid dropping
102 // any rendering.
103 biased;
104 ev = events.next() => {
105 if let Some(ev) = ev {
106 sink.emit(&mut out, ev).await?;
107 }
108 }
109 r = &mut turn => break r,
110 }
111 };
112
113 // Turn has ended, but the buffer may still hold just-sent, not-yet-polled
114 // trailing events —— drain everything that is immediately ready.
115 while let Some(Some(ev)) = events.next().now_or_never() {
116 sink.emit(&mut out, ev).await?;
117 }
118
119 // goal mode: turn ended normally but the goal is unreached (turns exhausted
120 // without ever calling goal_done) → Exhausted.
121 let goal_unreached = goal.as_ref().is_some_and(|g| !g.is_reached());
122 if goal_unreached {
123 tracing::warn!(
124 "goal not reached: the agent stopped (or ran out of turns) without calling `goal_done`"
125 );
126 }
127 let outcome = ExitOutcome::from(&result, sink.denied, goal_unreached);
128 sink.finish(&mut out, &result, &outcome).await?;
129 out.flush().await?;
130 Ok(outcome.code())
131}
132
133/// Resolves the prompt source: `-`, or read from stdin when stdin is piped;
134/// otherwise use the literal value.
135async fn resolve_prompt(message: String) -> anyhow::Result<String> {
136 use std::io::IsTerminal;
137
138 let from_stdin = message == "-" || (message.is_empty() && !std::io::stdin().is_terminal());
139 if from_stdin {
140 use tokio::io::AsyncReadExt;
141 let mut buf = String::new();
142 tokio::io::stdin().read_to_string(&mut buf).await?;
143 Ok(buf)
144 } else {
145 Ok(message)
146 }
147}
148
149/// Reduction from the turn result to the process exit code.
150enum ExitOutcome {
151 Success,
152 Denied,
153 Cancelled,
154 MaxTokens,
155 Refusal,
156 Error,
157 /// goal mode: turn ended normally but the goal is unreached (turns exhausted
158 /// / model gave up).
159 GoalUnreached,
160}
161
162impl ExitOutcome {
163 fn from(result: &Result<StopReason, TurnError>, denied: bool, goal_unreached: bool) -> Self {
164 match result {
165 Err(_) => Self::Error,
166 Ok(StopReason::Refusal) => Self::Refusal,
167 Ok(StopReason::MaxTokens) | Ok(StopReason::MaxTurnRequests) => Self::MaxTokens,
168 Ok(StopReason::Cancelled) => Self::Cancelled,
169 // EndTurn (and any future success-class terminal states): denied >
170 // goal unreached > success.
171 Ok(_) if denied => Self::Denied,
172 Ok(_) if goal_unreached => Self::GoalUnreached,
173 Ok(_) => Self::Success,
174 }
175 }
176
177 /// Numeric exit code (0 = success).
178 fn raw(&self) -> u8 {
179 match self {
180 Self::Success => 0,
181 Self::Error => 1,
182 Self::MaxTokens => 2,
183 Self::Refusal => 3,
184 Self::Denied => 4,
185 Self::Cancelled => 5,
186 Self::GoalUnreached => 6,
187 }
188 }
189
190 fn code(&self) -> ExitCode {
191 ExitCode::from(self.raw())
192 }
193}
194
195/// Event projector: writes the [`AgentEvent`] stream to stdout/stderr according
196/// to the [`OutputFormat`].
197struct EventSink {
198 format: OutputFormat,
199 track_denied: bool,
200 /// Whether an unattended denial has occurred.
201 denied: bool,
202 /// `ToolCallId → tool name`, used to report which tool was involved on a
203 /// `PolicyDecision::Deny`.
204 tool_names: HashMap<ToolCallId, String>,
205 /// In text format: whether stdout is still mid-line (the last thing written
206 /// was not a `\n`). In goal mode it inserts newlines between multi-turn
207 /// assistant outputs, avoiding "previous tail + next head" running together
208 /// on one line.
209 mid_line: bool,
210 /// In text format: whether we are currently inside a thinking block. A
211 /// thinking block's multiple deltas share one `[thinking] ` prefix (printed
212 /// only on the first delta), so consecutive deltas merge into one block.
213 in_thought: bool,
214}
215
216impl EventSink {
217 fn new(format: OutputFormat, track_denied: bool) -> Self {
218 Self {
219 format,
220 track_denied,
221 denied: false,
222 tool_names: HashMap::new(),
223 mid_line: false,
224 in_thought: false,
225 }
226 }
227
228 async fn emit(&mut self, out: &mut Stdout, event: AgentEvent) -> anyhow::Result<()> {
229 // Record tool names (needed for every format, used for denial reports).
230 if let AgentEvent::ToolCallStarted { id, name, fields } = &event {
231 let label = fields.title.clone().unwrap_or_else(|| name.clone());
232 self.tool_names.insert(id.clone(), label);
233 }
234
235 // Unattended denial: framework-level diagnostic via tracing (→ stderr) +
236 // set the flag (fail loud).
237 if self.track_denied
238 && let AgentEvent::PolicyDecision {
239 id,
240 decision: PolicyDecision::Deny,
241 } = &event
242 {
243 self.denied = true;
244 let tool = self
245 .tool_names
246 .get(id)
247 .map(String::as_str)
248 .unwrap_or("<unknown>");
249 tracing::warn!(
250 tool = %tool,
251 "tool denied: no operator present to approve (non-interactive)"
252 );
253 }
254
255 match self.format {
256 OutputFormat::Json => self.emit_json(out, &event).await,
257 OutputFormat::Text => self.emit_text(out, &event).await,
258 OutputFormat::Quiet => Ok(()),
259 }
260 }
261
262 /// NDJSON: one line per event. `AgentEvent` already derives Serialize
263 /// (`LlmCallStarted.request` is `#[serde(skip)]`, so it never enters JSON).
264 async fn emit_json(&self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
265 let line = serde_json::to_string(event)?;
266 write_raw(out, &line).await?;
267 write_raw(out, "\n").await
268 }
269
270 /// Plain text: **all agent content** (body / thinking / tools) goes to
271 /// stdout as a single stream in event order —— framework logs (tracing) go
272 /// to stderr, the two never interfere.
273 ///
274 /// Boundary newlines: assistant body often has no trailing newline, while
275 /// what immediately follows may be a thinking / tool line or the next
276 /// generation. Before switching to a "non-body line" (thinking / tool) or
277 /// starting a new generation segment, if stdout is still mid-line insert a
278 /// `\n`, so each segment occupies whole lines and does not run together.
279 async fn emit_text(&mut self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
280 match event {
281 // A new LLM generation starts: if the previous assistant body had no
282 // newline, insert one before starting the new segment.
283 AgentEvent::LlmCallStarted { .. } | AgentEvent::TurnEnded { .. } => {
284 self.end_thought(out).await?;
285 self.break_line(out).await?;
286 }
287 AgentEvent::AssistantText { content } => {
288 if let Some(text) = block_text(content)
289 && !text.is_empty()
290 {
291 self.end_thought(out).await?;
292 write_raw(out, &text).await?;
293 out.flush().await?;
294 self.mid_line = !text.ends_with('\n');
295 }
296 }
297 // A thinking block's multiple deltas share one `[thinking] ` prefix
298 // —— consecutive deltas merge, with each delta written raw.
299 AgentEvent::AssistantThought { content } => {
300 if let Some(text) = block_text(content)
301 && !text.is_empty()
302 {
303 if !self.in_thought {
304 self.break_line(out).await?;
305 write(out, "[thinking] ").await?;
306 self.in_thought = true;
307 }
308 write_raw(out, &text).await?;
309 out.flush().await?;
310 self.mid_line = !text.ends_with('\n');
311 }
312 }
313 AgentEvent::ToolCallStarted { name, fields, .. } => {
314 self.end_thought(out).await?;
315 self.break_line(out).await?;
316 let title = fields.title.clone().unwrap_or_else(|| name.clone());
317 write(out, &format!("[tool] {title}\n")).await?;
318 out.flush().await?;
319 }
320 _ => {}
321 }
322 Ok(())
323 }
324
325 /// Ends the current thinking block: if inside one, clears the flag and
326 /// inserts a `\n`. Called before switching to body / tool / a new generation
327 /// / turn end, so the thinking block occupies whole lines.
328 async fn end_thought(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
329 if self.in_thought {
330 self.in_thought = false;
331 self.break_line(out).await?;
332 }
333 Ok(())
334 }
335
336 /// If stdout is still mid-line, insert a `\n` to close it and flush. Used
337 /// before a "non-body line" (thinking / tool) or a new generation segment,
338 /// so the previous assistant body occupies its own whole line —— even within
339 /// a single stream, content is separated by line.
340 async fn break_line(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
341 if self.mid_line {
342 write_raw(out, "\n").await?;
343 out.flush().await?;
344 self.mid_line = false;
345 }
346 Ok(())
347 }
348
349 /// Wrap-up output after the turn ends. Framework-level diagnostics (turn
350 /// errors) go through tracing (→ stderr).
351 async fn finish(
352 &self,
353 out: &mut Stdout,
354 result: &Result<StopReason, TurnError>,
355 outcome: &ExitOutcome,
356 ) -> anyhow::Result<()> {
357 if let Err(e) = result {
358 tracing::error!(error = %e, "turn error");
359 }
360 match self.format {
361 OutputFormat::Text => {
362 // When the streamed assistant body has no trailing newline,
363 // insert one to avoid running into the following shell prompt;
364 // if already at line start (mid_line=false) skip it, to not emit
365 // an extra blank line.
366 if self.mid_line {
367 write_raw(out, "\n").await?;
368 }
369 }
370 OutputFormat::Json => {
371 // Final summary line: terminal state + exit-code semantics.
372 let summary = serde_json::json!({
373 "type": "oneshot_result",
374 "stop_reason": result.as_ref().ok().map(|r| format!("{r:?}")),
375 "error": result.as_ref().err().map(|e| e.to_string()),
376 "denied": self.denied,
377 "exit_code": outcome.raw(),
378 });
379 write_raw(out, &summary.to_string()).await?;
380 write_raw(out, "\n").await?;
381 }
382 OutputFormat::Quiet => {}
383 }
384 Ok(())
385 }
386}
387
388/// Extracts text from a [`ContentBlock`]; non-text blocks return `None`.
389fn block_text(block: &ContentBlock) -> Option<String> {
390 match block {
391 ContentBlock::Text(t) => Some(t.text.clone()),
392 _ => None,
393 }
394}
395
396/// Writes a string to any async writer.
397async fn write<W>(out: &mut W, s: &str) -> anyhow::Result<()>
398where
399 W: AsyncWriteExt + Unpin,
400{
401 out.write_all(s.as_bytes()).await?;
402 Ok(())
403}
404
405/// Alias for `write`, semantically emphasizing "write as-is, with no
406/// decoration".
407async fn write_raw<W>(out: &mut W, s: &str) -> anyhow::Result<()>
408where
409 W: AsyncWriteExt + Unpin,
410{
411 write(out, s).await
412}