defect_cli/oneshot.rs
1//! Single-turn unattended mode —— `defect --message <prompt>`.
2//!
3//! Purpose: in CI / scripts, "run one prompt, produce a result, exit by
4//! success/failure". On **equal footing** with the interactive REPL and the
5//! ACP server; the three only share the [`AgentCore`] kernel:
6//!
7//! - Does **not** reuse the REPL's [`crate::repl`] rendering (that stack carries
8//! ANSI / line editing, bound to the `repl` feature's crossterm/owo-colors).
9//! This module ships its own minimal, ANSI-free event projection, so under
10//! `--no-default-features --features oneshot` it builds a slim CI binary with
11//! no TUI dependencies.
12//! - Connects in-process directly to `AgentCore` (like the REPL), bypassing the
13//! wire —— CI runs its own agent and does not need ACP's cross-process
14//! generality.
15//!
16//! ## Output contract: stdout = agent content, stderr = framework logs
17//!
18//! **All agent content** (assistant body / thinking / tool calls) goes to
19//! **stdout** as a single stream in event order; framework-level diagnostics
20//! (denial warnings, turn errors, unreached goals) go through `tracing` —— and
21//! `tracing` is uniformly written to **stderr** by `defect_obs::init_tracing`.
22//! So `2>/dev/null` cleanly filters out framework noise while preserving the
23//! agent's complete work record; the two streams no longer share a cursor or
24//! run together.
25//!
26//! ## Exit codes (CI's lifeline for judging success/failure)
27//!
28//! Priority high to low: `TurnError`(1) > `Refusal`(3) > `MaxTokens`(2) >
29//! `MaxTurnRequests`(7) > `Cancelled`(5) > unattended denial (`denied`, 4) >
30//! goal unreached (6) > `EndTurn`(0). `MaxTokens` (a single response truncated
31//! by the output limit) and `MaxTurnRequests` (the per-turn call budget
32//! exhausted) are distinct conditions and carry distinct codes. See
33//! `ExitOutcome`.
34//!
35//! ## Non-interactive permissions
36//!
37//! The caller (`bin/cli.rs`) is responsible for wrapping the session's policy in
38//! [`defect_agent::policy::NonInteractivePolicy`], so that `Ask` degrades to
39//! `Deny` and it does not hang waiting for input in a TTY-less environment. This
40//! module listens for `PolicyDecision::Deny` in the event stream: once one
41//! occurs, it logs a warning via `tracing` (→ stderr) and sets the `denied`
42//! flag, so even if the turn ends normally with `EndTurn` it exits with a
43//! non-zero code —— fail loud, letting CI know "an operation was denied, this
44//! result is not trustworthy".
45
46use std::collections::HashMap;
47use std::path::PathBuf;
48use std::process::ExitCode;
49use std::sync::Arc;
50
51use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason, TextContent, ToolCallId};
52use defect_agent::event::AgentEvent;
53use defect_agent::policy::PolicyDecision;
54use defect_agent::session::{AgentCore, TurnError};
55use futures::{FutureExt, StreamExt};
56use tokio::io::{AsyncWriteExt, Stdout};
57
58use crate::args::OutputFormat;
59use crate::session_open::{LocalSessionOpts, open_local_session};
60
61/// Runs a single-turn prompt and returns the process exit code.
62///
63/// Only when `track_denied = true` (the caller wrapped `NonInteractivePolicy`)
64/// is `PolicyDecision::Deny` treated as an "unattended gap" that affects the
65/// exit code —— modes like `deny-all`, where the user knowingly expects
66/// denials, should pass `false` to avoid spuriously reporting non-zero.
67///
68/// # Errors
69///
70/// Session open failure, stdin read failure, stdout/stderr write failure.
71/// When `goal` is `Some` (`--goal` mode): if the goal is unreached after the
72/// turn ends (turns exhausted without ever calling `goal_done`), exits with the
73/// Exhausted code —— prevents CI from mistaking "ran out of turns without
74/// reaching the goal" for success.
75#[allow(clippy::too_many_arguments)]
76pub async fn run(
77 agent: Arc<dyn AgentCore>,
78 cwd: PathBuf,
79 message: String,
80 format: OutputFormat,
81 resume: Option<SessionId>,
82 track_denied: bool,
83 goal: Option<Arc<defect_agent::session::GoalState>>,
84 shell_output_max_bytes: usize,
85) -> anyhow::Result<ExitCode> {
86 let prompt = resolve_prompt(message).await?;
87
88 let mut out = tokio::io::stdout();
89
90 let session = open_local_session(
91 &agent,
92 &cwd,
93 LocalSessionOpts {
94 resume,
95 shell_output_max_bytes,
96 },
97 )
98 .await?;
99
100 // Subscribe once outside the loop, draining across this turn (including the
101 // driver's autonomous turn continuations) —— isomorphic to the interactive
102 // REPL / ACP event pump. The turn future only yields the final StopReason;
103 // this turn's content is pushed through the event stream.
104 let mut events = session.subscribe();
105 let mut sink = EventSink::new(format, track_denied);
106
107 let prompt_blocks = vec![ContentBlock::Text(TextContent::new(prompt))];
108 let turn = session.run_turn(prompt_blocks);
109 tokio::pin!(turn);
110
111 let result = loop {
112 tokio::select! {
113 // biased + drain events first: the turn future and the event stream
114 // may become ready at the same time (turn finishes fast, trailing
115 // events already in the buffer). Poll events first to avoid dropping
116 // any rendering.
117 biased;
118 ev = events.next() => {
119 if let Some(ev) = ev {
120 sink.emit(&mut out, ev).await?;
121 }
122 }
123 r = &mut turn => break r,
124 }
125 };
126
127 // Turn has ended, but the buffer may still hold just-sent, not-yet-polled
128 // trailing events —— drain everything that is immediately ready.
129 while let Some(Some(ev)) = events.next().now_or_never() {
130 sink.emit(&mut out, ev).await?;
131 }
132
133 // goal mode: turn ended normally but the goal is unreached (turns exhausted
134 // without ever calling goal_done) → Exhausted.
135 let goal_unreached = goal.as_ref().is_some_and(|g| !g.is_reached());
136 if goal_unreached {
137 tracing::warn!(
138 "goal not reached: the agent stopped (or ran out of turns) without calling `goal_done`"
139 );
140 }
141 let outcome = ExitOutcome::from(&result, sink.denied, goal_unreached);
142 sink.finish(&mut out, &result, &outcome).await?;
143 out.flush().await?;
144 Ok(outcome.code())
145}
146
147/// Resolves the prompt source: `-`, or read from stdin when stdin is piped;
148/// otherwise use the literal value.
149async fn resolve_prompt(message: String) -> anyhow::Result<String> {
150 use std::io::IsTerminal;
151
152 let from_stdin = message == "-" || (message.is_empty() && !std::io::stdin().is_terminal());
153 if from_stdin {
154 use tokio::io::AsyncReadExt;
155 let mut buf = String::new();
156 tokio::io::stdin().read_to_string(&mut buf).await?;
157 Ok(buf)
158 } else {
159 Ok(message)
160 }
161}
162
163/// Reduction from the turn result to the process exit code.
164enum ExitOutcome {
165 Success,
166 Denied,
167 Cancelled,
168 /// A single LLM response was truncated by the output `max_tokens` limit.
169 MaxTokens,
170 /// The per-turn LLM-call budget (`request_limit`) was exhausted.
171 MaxRequests,
172 Refusal,
173 Error,
174 /// goal mode: turn ended normally but the goal is unreached (turns exhausted
175 /// / model gave up).
176 GoalUnreached,
177}
178
179impl ExitOutcome {
180 fn from(result: &Result<StopReason, TurnError>, denied: bool, goal_unreached: bool) -> Self {
181 match result {
182 Err(_) => Self::Error,
183 Ok(StopReason::Refusal) => Self::Refusal,
184 Ok(StopReason::MaxTokens) => Self::MaxTokens,
185 Ok(StopReason::MaxTurnRequests) => Self::MaxRequests,
186 Ok(StopReason::Cancelled) => Self::Cancelled,
187 // EndTurn (and any future success-class terminal states): denied >
188 // goal unreached > success.
189 Ok(_) if denied => Self::Denied,
190 Ok(_) if goal_unreached => Self::GoalUnreached,
191 Ok(_) => Self::Success,
192 }
193 }
194
195 /// Numeric exit code (0 = success).
196 fn raw(&self) -> u8 {
197 match self {
198 Self::Success => 0,
199 Self::Error => 1,
200 Self::MaxTokens => 2,
201 Self::Refusal => 3,
202 Self::Denied => 4,
203 Self::Cancelled => 5,
204 Self::GoalUnreached => 6,
205 Self::MaxRequests => 7,
206 }
207 }
208
209 fn code(&self) -> ExitCode {
210 ExitCode::from(self.raw())
211 }
212}
213
214/// Event projector: writes the [`AgentEvent`] stream to stdout/stderr according
215/// to the [`OutputFormat`].
216struct EventSink {
217 format: OutputFormat,
218 track_denied: bool,
219 /// Whether an unattended denial has occurred.
220 denied: bool,
221 /// `ToolCallId → tool name`, used to report which tool was involved on a
222 /// `PolicyDecision::Deny`.
223 tool_names: HashMap<ToolCallId, String>,
224 /// In text format: whether stdout is still mid-line (the last thing written
225 /// was not a `\n`). In goal mode it inserts newlines between multi-turn
226 /// assistant outputs, avoiding "previous tail + next head" running together
227 /// on one line.
228 mid_line: bool,
229 /// In text format: whether we are currently inside a thinking block. A
230 /// thinking block's multiple deltas share one `[thinking] ` prefix (printed
231 /// only on the first delta), so consecutive deltas merge into one block.
232 in_thought: bool,
233}
234
235impl EventSink {
236 fn new(format: OutputFormat, track_denied: bool) -> Self {
237 Self {
238 format,
239 track_denied,
240 denied: false,
241 tool_names: HashMap::new(),
242 mid_line: false,
243 in_thought: false,
244 }
245 }
246
247 async fn emit(&mut self, out: &mut Stdout, event: AgentEvent) -> anyhow::Result<()> {
248 // Record tool names (needed for every format, used for denial reports).
249 if let AgentEvent::ToolCallStarted { id, name, fields } = &event {
250 let label = fields.title.clone().unwrap_or_else(|| name.clone());
251 self.tool_names.insert(id.clone(), label);
252 }
253
254 // Unattended denial: framework-level diagnostic via tracing (→ stderr) +
255 // set the flag (fail loud).
256 if self.track_denied
257 && let AgentEvent::PolicyDecision {
258 id,
259 decision: PolicyDecision::Deny,
260 } = &event
261 {
262 self.denied = true;
263 let tool = self
264 .tool_names
265 .get(id)
266 .map(String::as_str)
267 .unwrap_or("<unknown>");
268 tracing::warn!(
269 tool = %tool,
270 "tool denied: no operator present to approve (non-interactive)"
271 );
272 }
273
274 match self.format {
275 OutputFormat::Json => self.emit_json(out, &event).await,
276 OutputFormat::Text => self.emit_text(out, &event).await,
277 OutputFormat::Quiet => Ok(()),
278 }
279 }
280
281 /// NDJSON: one line per event. `AgentEvent` already derives Serialize
282 /// (`LlmCallStarted.request` is `#[serde(skip)]`, so it never enters JSON).
283 async fn emit_json(&self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
284 let line = serde_json::to_string(event)?;
285 write_raw(out, &line).await?;
286 write_raw(out, "\n").await
287 }
288
289 /// Plain text: **all agent content** (body / thinking / tools) goes to
290 /// stdout as a single stream in event order —— framework logs (tracing) go
291 /// to stderr, the two never interfere.
292 ///
293 /// Boundary newlines: assistant body often has no trailing newline, while
294 /// what immediately follows may be a thinking / tool line or the next
295 /// generation. Before switching to a "non-body line" (thinking / tool) or
296 /// starting a new generation segment, if stdout is still mid-line insert a
297 /// `\n`, so each segment occupies whole lines and does not run together.
298 async fn emit_text(&mut self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
299 match event {
300 // A new LLM generation starts: if the previous assistant body had no
301 // newline, insert one before starting the new segment.
302 AgentEvent::LlmCallStarted { .. } | AgentEvent::TurnEnded { .. } => {
303 self.end_thought(out).await?;
304 self.break_line(out).await?;
305 }
306 AgentEvent::AssistantText { content } => {
307 if let Some(text) = block_text(content)
308 && !text.is_empty()
309 {
310 self.end_thought(out).await?;
311 write_raw(out, &text).await?;
312 out.flush().await?;
313 self.mid_line = !text.ends_with('\n');
314 }
315 }
316 // A thinking block's multiple deltas share one `[thinking] ` prefix
317 // —— consecutive deltas merge, with each delta written raw.
318 AgentEvent::AssistantThought { content } => {
319 if let Some(text) = block_text(content)
320 && !text.is_empty()
321 {
322 if !self.in_thought {
323 self.break_line(out).await?;
324 write(out, "[thinking] ").await?;
325 self.in_thought = true;
326 }
327 write_raw(out, &text).await?;
328 out.flush().await?;
329 self.mid_line = !text.ends_with('\n');
330 }
331 }
332 AgentEvent::ToolCallStarted { name, fields, .. } => {
333 self.end_thought(out).await?;
334 self.break_line(out).await?;
335 let title = fields.title.clone().unwrap_or_else(|| name.clone());
336 write(out, &format!("[tool] {title}\n")).await?;
337 out.flush().await?;
338 }
339 _ => {}
340 }
341 Ok(())
342 }
343
344 /// Ends the current thinking block: if inside one, clears the flag and
345 /// inserts a `\n`. Called before switching to body / tool / a new generation
346 /// / turn end, so the thinking block occupies whole lines.
347 async fn end_thought(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
348 if self.in_thought {
349 self.in_thought = false;
350 self.break_line(out).await?;
351 }
352 Ok(())
353 }
354
355 /// If stdout is still mid-line, insert a `\n` to close it and flush. Used
356 /// before a "non-body line" (thinking / tool) or a new generation segment,
357 /// so the previous assistant body occupies its own whole line —— even within
358 /// a single stream, content is separated by line.
359 async fn break_line(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
360 if self.mid_line {
361 write_raw(out, "\n").await?;
362 out.flush().await?;
363 self.mid_line = false;
364 }
365 Ok(())
366 }
367
368 /// Wrap-up output after the turn ends. Framework-level diagnostics (turn
369 /// errors) go through tracing (→ stderr).
370 async fn finish(
371 &self,
372 out: &mut Stdout,
373 result: &Result<StopReason, TurnError>,
374 outcome: &ExitOutcome,
375 ) -> anyhow::Result<()> {
376 if let Err(e) = result {
377 tracing::error!(error = %e, "turn error");
378 }
379 match self.format {
380 OutputFormat::Text => {
381 // When the streamed assistant body has no trailing newline,
382 // insert one to avoid running into the following shell prompt;
383 // if already at line start (mid_line=false) skip it, to not emit
384 // an extra blank line.
385 if self.mid_line {
386 write_raw(out, "\n").await?;
387 }
388 }
389 OutputFormat::Json => {
390 // Final summary line: terminal state + exit-code semantics.
391 let summary = serde_json::json!({
392 "type": "oneshot_result",
393 "stop_reason": result.as_ref().ok().map(|r| format!("{r:?}")),
394 "error": result.as_ref().err().map(|e| e.to_string()),
395 "denied": self.denied,
396 "exit_code": outcome.raw(),
397 });
398 write_raw(out, &summary.to_string()).await?;
399 write_raw(out, "\n").await?;
400 }
401 OutputFormat::Quiet => {}
402 }
403 Ok(())
404 }
405}
406
407/// Extracts text from a [`ContentBlock`]; non-text blocks return `None`.
408fn block_text(block: &ContentBlock) -> Option<String> {
409 match block {
410 ContentBlock::Text(t) => Some(t.text.clone()),
411 _ => None,
412 }
413}
414
415/// Writes a string to any async writer.
416async fn write<W>(out: &mut W, s: &str) -> anyhow::Result<()>
417where
418 W: AsyncWriteExt + Unpin,
419{
420 out.write_all(s.as_bytes()).await?;
421 Ok(())
422}
423
424/// Alias for `write`, semantically emphasizing "write as-is, with no
425/// decoration".
426async fn write_raw<W>(out: &mut W, s: &str) -> anyhow::Result<()>
427where
428 W: AsyncWriteExt + Unpin,
429{
430 write(out, s).await
431}