Skip to main content

defect_cli/
oneshot.rs

1//! Single-turn unattended mode —— `defect --message <prompt>`.
2//!
3//! Purpose: in CI / scripts, "run one prompt, produce a result, exit by
4//! success/failure". On **equal footing** with the interactive REPL and the
5//! ACP server; the three only share the [`AgentCore`] kernel:
6//!
7//! - Does **not** reuse the REPL's [`crate::repl`] rendering (that stack carries
8//!   ANSI / line editing, bound to the `repl` feature's crossterm/owo-colors).
9//!   This module ships its own minimal, ANSI-free event projection, so under
10//!   `--no-default-features --features oneshot` it builds a slim CI binary with
11//!   no TUI dependencies.
12//! - Connects in-process directly to `AgentCore` (like the REPL), bypassing the
13//!   wire —— CI runs its own agent and does not need ACP's cross-process
14//!   generality.
15//!
16//! ## Output contract: stdout = agent content, stderr = framework logs
17//!
18//! **All agent content** (assistant body / thinking / tool calls) goes to
19//! **stdout** as a single stream in event order; framework-level diagnostics
20//! (denial warnings, turn errors, unreached goals) go through `tracing` —— and
21//! `tracing` is uniformly written to **stderr** by `defect_obs::init_tracing`.
22//! So `2>/dev/null` cleanly filters out framework noise while preserving the
23//! agent's complete work record; the two streams no longer share a cursor or
24//! run together.
25//!
26//! ## Exit codes (CI's lifeline for judging success/failure)
27//!
28//! Priority high to low: `TurnError`(1) > `Refusal`(3) > `MaxTokens`(2) >
29//! `MaxTurnRequests`(7) > `Cancelled`(5) > unattended denial (`denied`, 4) >
30//! goal unreached (6) > `EndTurn`(0). `MaxTokens` (a single response truncated
31//! by the output limit) and `MaxTurnRequests` (the per-turn call budget
32//! exhausted) are distinct conditions and carry distinct codes. See
33//! `ExitOutcome`.
34//!
35//! ## Non-interactive permissions
36//!
37//! The caller (`bin/cli.rs`) is responsible for wrapping the session's policy in
38//! [`defect_agent::policy::NonInteractivePolicy`], so that `Ask` degrades to
39//! `Deny` and it does not hang waiting for input in a TTY-less environment. This
40//! module listens for `PolicyDecision::Deny` in the event stream: once one
41//! occurs, it logs a warning via `tracing` (→ stderr) and sets the `denied`
42//! flag, so even if the turn ends normally with `EndTurn` it exits with a
43//! non-zero code —— fail loud, letting CI know "an operation was denied, this
44//! result is not trustworthy".
45
46use std::collections::HashMap;
47use std::path::PathBuf;
48use std::process::ExitCode;
49use std::sync::Arc;
50
51use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason, TextContent, ToolCallId};
52use defect_agent::event::AgentEvent;
53use defect_agent::policy::PolicyDecision;
54use defect_agent::session::{AgentCore, TurnError};
55use futures::{FutureExt, StreamExt};
56use tokio::io::{AsyncWriteExt, Stdout};
57
58use crate::args::OutputFormat;
59use crate::session_open::{LocalSessionOpts, open_local_session};
60
61/// Runs a single-turn prompt and returns the process exit code.
62///
63/// Only when `track_denied = true` (the caller wrapped `NonInteractivePolicy`)
64/// is `PolicyDecision::Deny` treated as an "unattended gap" that affects the
65/// exit code —— modes like `deny-all`, where the user knowingly expects
66/// denials, should pass `false` to avoid spuriously reporting non-zero.
67///
68/// # Errors
69///
70/// Session open failure, stdin read failure, stdout/stderr write failure.
71/// When `goal` is `Some` (`--goal` mode): if the goal is unreached after the
72/// turn ends (turns exhausted without ever calling `goal_done`), exits with the
73/// Exhausted code —— prevents CI from mistaking "ran out of turns without
74/// reaching the goal" for success.
75#[allow(clippy::too_many_arguments)]
76pub async fn run(
77    agent: Arc<dyn AgentCore>,
78    cwd: PathBuf,
79    message: String,
80    format: OutputFormat,
81    resume: Option<SessionId>,
82    track_denied: bool,
83    goal: Option<Arc<defect_agent::session::GoalState>>,
84    shell_output_max_bytes: usize,
85) -> anyhow::Result<ExitCode> {
86    let prompt = resolve_prompt(message).await?;
87
88    let mut out = tokio::io::stdout();
89
90    let session = open_local_session(
91        &agent,
92        &cwd,
93        LocalSessionOpts {
94            resume,
95            shell_output_max_bytes,
96        },
97    )
98    .await?;
99
100    // Subscribe once outside the loop, draining across this turn (including the
101    // driver's autonomous turn continuations) —— isomorphic to the interactive
102    // REPL / ACP event pump. The turn future only yields the final StopReason;
103    // this turn's content is pushed through the event stream.
104    let mut events = session.subscribe();
105    let mut sink = EventSink::new(format, track_denied);
106
107    let prompt_blocks = vec![ContentBlock::Text(TextContent::new(prompt))];
108    let turn = session.run_turn(prompt_blocks);
109    tokio::pin!(turn);
110
111    let result = loop {
112        tokio::select! {
113            // biased + drain events first: the turn future and the event stream
114            // may become ready at the same time (turn finishes fast, trailing
115            // events already in the buffer). Poll events first to avoid dropping
116            // any rendering.
117            biased;
118            ev = events.next() => {
119                if let Some(ev) = ev {
120                    sink.emit(&mut out, ev).await?;
121                }
122            }
123            r = &mut turn => break r,
124        }
125    };
126
127    // Turn has ended, but the buffer may still hold just-sent, not-yet-polled
128    // trailing events —— drain everything that is immediately ready.
129    while let Some(Some(ev)) = events.next().now_or_never() {
130        sink.emit(&mut out, ev).await?;
131    }
132
133    // goal mode: turn ended normally but the goal is unreached (turns exhausted
134    // without ever calling goal_done) → Exhausted.
135    let goal_unreached = goal.as_ref().is_some_and(|g| !g.is_reached());
136    if goal_unreached {
137        tracing::warn!(
138            "goal not reached: the agent stopped (or ran out of turns) without calling `goal_done`"
139        );
140    }
141    let outcome = ExitOutcome::from(&result, sink.denied, goal_unreached);
142    sink.finish(&mut out, &result, &outcome).await?;
143    out.flush().await?;
144    Ok(outcome.code())
145}
146
147/// Resolves the prompt source: `-`, or read from stdin when stdin is piped;
148/// otherwise use the literal value.
149async fn resolve_prompt(message: String) -> anyhow::Result<String> {
150    use std::io::IsTerminal;
151
152    let from_stdin = message == "-" || (message.is_empty() && !std::io::stdin().is_terminal());
153    if from_stdin {
154        use tokio::io::AsyncReadExt;
155        let mut buf = String::new();
156        tokio::io::stdin().read_to_string(&mut buf).await?;
157        Ok(buf)
158    } else {
159        Ok(message)
160    }
161}
162
163/// Reduction from the turn result to the process exit code.
164enum ExitOutcome {
165    Success,
166    Denied,
167    Cancelled,
168    /// A single LLM response was truncated by the output `max_tokens` limit.
169    MaxTokens,
170    /// The per-turn LLM-call budget (`request_limit`) was exhausted.
171    MaxRequests,
172    Refusal,
173    Error,
174    /// goal mode: turn ended normally but the goal is unreached (turns exhausted
175    /// / model gave up).
176    GoalUnreached,
177}
178
179impl ExitOutcome {
180    fn from(result: &Result<StopReason, TurnError>, denied: bool, goal_unreached: bool) -> Self {
181        match result {
182            Err(_) => Self::Error,
183            Ok(StopReason::Refusal) => Self::Refusal,
184            Ok(StopReason::MaxTokens) => Self::MaxTokens,
185            Ok(StopReason::MaxTurnRequests) => Self::MaxRequests,
186            Ok(StopReason::Cancelled) => Self::Cancelled,
187            // EndTurn (and any future success-class terminal states): denied >
188            // goal unreached > success.
189            Ok(_) if denied => Self::Denied,
190            Ok(_) if goal_unreached => Self::GoalUnreached,
191            Ok(_) => Self::Success,
192        }
193    }
194
195    /// Numeric exit code (0 = success).
196    fn raw(&self) -> u8 {
197        match self {
198            Self::Success => 0,
199            Self::Error => 1,
200            Self::MaxTokens => 2,
201            Self::Refusal => 3,
202            Self::Denied => 4,
203            Self::Cancelled => 5,
204            Self::GoalUnreached => 6,
205            Self::MaxRequests => 7,
206        }
207    }
208
209    fn code(&self) -> ExitCode {
210        ExitCode::from(self.raw())
211    }
212}
213
214/// Event projector: writes the [`AgentEvent`] stream to stdout/stderr according
215/// to the [`OutputFormat`].
216struct EventSink {
217    format: OutputFormat,
218    track_denied: bool,
219    /// Whether an unattended denial has occurred.
220    denied: bool,
221    /// `ToolCallId → tool name`, used to report which tool was involved on a
222    /// `PolicyDecision::Deny`.
223    tool_names: HashMap<ToolCallId, String>,
224    /// In text format: whether stdout is still mid-line (the last thing written
225    /// was not a `\n`). In goal mode it inserts newlines between multi-turn
226    /// assistant outputs, avoiding "previous tail + next head" running together
227    /// on one line.
228    mid_line: bool,
229    /// In text format: whether we are currently inside a thinking block. A
230    /// thinking block's multiple deltas share one `[thinking] ` prefix (printed
231    /// only on the first delta), so consecutive deltas merge into one block.
232    in_thought: bool,
233}
234
235impl EventSink {
236    fn new(format: OutputFormat, track_denied: bool) -> Self {
237        Self {
238            format,
239            track_denied,
240            denied: false,
241            tool_names: HashMap::new(),
242            mid_line: false,
243            in_thought: false,
244        }
245    }
246
247    async fn emit(&mut self, out: &mut Stdout, event: AgentEvent) -> anyhow::Result<()> {
248        // Record tool names (needed for every format, used for denial reports).
249        if let AgentEvent::ToolCallStarted { id, name, fields } = &event {
250            let label = fields.title.clone().unwrap_or_else(|| name.clone());
251            self.tool_names.insert(id.clone(), label);
252        }
253
254        // Unattended denial: framework-level diagnostic via tracing (→ stderr) +
255        // set the flag (fail loud).
256        if self.track_denied
257            && let AgentEvent::PolicyDecision {
258                id,
259                decision: PolicyDecision::Deny,
260            } = &event
261        {
262            self.denied = true;
263            let tool = self
264                .tool_names
265                .get(id)
266                .map(String::as_str)
267                .unwrap_or("<unknown>");
268            tracing::warn!(
269                tool = %tool,
270                "tool denied: no operator present to approve (non-interactive)"
271            );
272        }
273
274        match self.format {
275            OutputFormat::Json => self.emit_json(out, &event).await,
276            OutputFormat::Text => self.emit_text(out, &event).await,
277            OutputFormat::Quiet => Ok(()),
278        }
279    }
280
281    /// NDJSON: one line per event. `AgentEvent` already derives Serialize
282    /// (`LlmCallStarted.request` is `#[serde(skip)]`, so it never enters JSON).
283    async fn emit_json(&self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
284        let line = serde_json::to_string(event)?;
285        write_raw(out, &line).await?;
286        write_raw(out, "\n").await
287    }
288
289    /// Plain text: **all agent content** (body / thinking / tools) goes to
290    /// stdout as a single stream in event order —— framework logs (tracing) go
291    /// to stderr, the two never interfere.
292    ///
293    /// Boundary newlines: assistant body often has no trailing newline, while
294    /// what immediately follows may be a thinking / tool line or the next
295    /// generation. Before switching to a "non-body line" (thinking / tool) or
296    /// starting a new generation segment, if stdout is still mid-line insert a
297    /// `\n`, so each segment occupies whole lines and does not run together.
298    async fn emit_text(&mut self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
299        match event {
300            // A new LLM generation starts: if the previous assistant body had no
301            // newline, insert one before starting the new segment.
302            AgentEvent::LlmCallStarted { .. } | AgentEvent::TurnEnded { .. } => {
303                self.end_thought(out).await?;
304                self.break_line(out).await?;
305            }
306            AgentEvent::AssistantText { content } => {
307                if let Some(text) = block_text(content)
308                    && !text.is_empty()
309                {
310                    self.end_thought(out).await?;
311                    write_raw(out, &text).await?;
312                    out.flush().await?;
313                    self.mid_line = !text.ends_with('\n');
314                }
315            }
316            // A thinking block's multiple deltas share one `[thinking] ` prefix
317            // —— consecutive deltas merge, with each delta written raw.
318            AgentEvent::AssistantThought { content } => {
319                if let Some(text) = block_text(content)
320                    && !text.is_empty()
321                {
322                    if !self.in_thought {
323                        self.break_line(out).await?;
324                        write(out, "[thinking] ").await?;
325                        self.in_thought = true;
326                    }
327                    write_raw(out, &text).await?;
328                    out.flush().await?;
329                    self.mid_line = !text.ends_with('\n');
330                }
331            }
332            AgentEvent::ToolCallStarted { name, fields, .. } => {
333                self.end_thought(out).await?;
334                self.break_line(out).await?;
335                let title = fields.title.clone().unwrap_or_else(|| name.clone());
336                write(out, &format!("[tool] {title}\n")).await?;
337                out.flush().await?;
338            }
339            _ => {}
340        }
341        Ok(())
342    }
343
344    /// Ends the current thinking block: if inside one, clears the flag and
345    /// inserts a `\n`. Called before switching to body / tool / a new generation
346    /// / turn end, so the thinking block occupies whole lines.
347    async fn end_thought(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
348        if self.in_thought {
349            self.in_thought = false;
350            self.break_line(out).await?;
351        }
352        Ok(())
353    }
354
355    /// If stdout is still mid-line, insert a `\n` to close it and flush. Used
356    /// before a "non-body line" (thinking / tool) or a new generation segment,
357    /// so the previous assistant body occupies its own whole line —— even within
358    /// a single stream, content is separated by line.
359    async fn break_line(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
360        if self.mid_line {
361            write_raw(out, "\n").await?;
362            out.flush().await?;
363            self.mid_line = false;
364        }
365        Ok(())
366    }
367
368    /// Wrap-up output after the turn ends. Framework-level diagnostics (turn
369    /// errors) go through tracing (→ stderr).
370    async fn finish(
371        &self,
372        out: &mut Stdout,
373        result: &Result<StopReason, TurnError>,
374        outcome: &ExitOutcome,
375    ) -> anyhow::Result<()> {
376        if let Err(e) = result {
377            tracing::error!(error = %e, "turn error");
378        }
379        match self.format {
380            OutputFormat::Text => {
381                // When the streamed assistant body has no trailing newline,
382                // insert one to avoid running into the following shell prompt;
383                // if already at line start (mid_line=false) skip it, to not emit
384                // an extra blank line.
385                if self.mid_line {
386                    write_raw(out, "\n").await?;
387                }
388            }
389            OutputFormat::Json => {
390                // Final summary line: terminal state + exit-code semantics.
391                let summary = serde_json::json!({
392                    "type": "oneshot_result",
393                    "stop_reason": result.as_ref().ok().map(|r| format!("{r:?}")),
394                    "error": result.as_ref().err().map(|e| e.to_string()),
395                    "denied": self.denied,
396                    "exit_code": outcome.raw(),
397                });
398                write_raw(out, &summary.to_string()).await?;
399                write_raw(out, "\n").await?;
400            }
401            OutputFormat::Quiet => {}
402        }
403        Ok(())
404    }
405}
406
407/// Extracts text from a [`ContentBlock`]; non-text blocks return `None`.
408fn block_text(block: &ContentBlock) -> Option<String> {
409    match block {
410        ContentBlock::Text(t) => Some(t.text.clone()),
411        _ => None,
412    }
413}
414
415/// Writes a string to any async writer.
416async fn write<W>(out: &mut W, s: &str) -> anyhow::Result<()>
417where
418    W: AsyncWriteExt + Unpin,
419{
420    out.write_all(s.as_bytes()).await?;
421    Ok(())
422}
423
424/// Alias for `write`, semantically emphasizing "write as-is, with no
425/// decoration".
426async fn write_raw<W>(out: &mut W, s: &str) -> anyhow::Result<()>
427where
428    W: AsyncWriteExt + Unpin,
429{
430    write(out, s).await
431}