Skip to main content

defect_cli/
oneshot.rs

1//! Single-turn unattended mode —— `defect --message <prompt>`.
2//!
3//! Purpose: in CI / scripts, "run one prompt, produce a result, exit by
4//! success/failure". On **equal footing** with the interactive REPL and the
5//! ACP server; the three only share the [`AgentCore`] kernel:
6//!
7//! - Does **not** reuse the REPL's [`crate::repl`] rendering (that stack carries
8//!   ANSI / line editing, bound to the `repl` feature's crossterm/owo-colors).
9//!   This module ships its own minimal, ANSI-free event projection, so under
10//!   `--no-default-features --features oneshot` it builds a slim CI binary with
11//!   no TUI dependencies.
12//! - Connects in-process directly to `AgentCore` (like the REPL), bypassing the
13//!   wire —— CI runs its own agent and does not need ACP's cross-process
14//!   generality.
15//!
16//! ## Output contract: stdout = agent content, stderr = framework logs
17//!
18//! **All agent content** (assistant body / thinking / tool calls) goes to
19//! **stdout** as a single stream in event order; framework-level diagnostics
20//! (denial warnings, turn errors, unreached goals) go through `tracing` —— and
21//! `tracing` is uniformly written to **stderr** by `defect_obs::init_tracing`.
22//! So `2>/dev/null` cleanly filters out framework noise while preserving the
23//! agent's complete work record; the two streams no longer share a cursor or
24//! run together.
25//!
26//! ## Exit codes (CI's lifeline for judging success/failure)
27//!
28//! Priority high to low: `TurnError` > `Refusal` > `MaxTokens`/`MaxTurnRequests`
29//! > `Cancelled` > unattended denial (`denied`) > `EndTurn`(0). See `ExitOutcome`.
30//!
31//! ## Non-interactive permissions
32//!
33//! The caller (`bin/cli.rs`) is responsible for wrapping the session's policy in
34//! [`defect_agent::policy::NonInteractivePolicy`], so that `Ask` degrades to
35//! `Deny` and it does not hang waiting for input in a TTY-less environment. This
36//! module listens for `PolicyDecision::Deny` in the event stream: once one
37//! occurs, it logs a warning via `tracing` (→ stderr) and sets the `denied`
38//! flag, so even if the turn ends normally with `EndTurn` it exits with a
39//! non-zero code —— fail loud, letting CI know "an operation was denied, this
40//! result is not trustworthy".
41
42use std::collections::HashMap;
43use std::path::PathBuf;
44use std::process::ExitCode;
45use std::sync::Arc;
46
47use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason, TextContent, ToolCallId};
48use defect_agent::event::AgentEvent;
49use defect_agent::policy::PolicyDecision;
50use defect_agent::session::{AgentCore, TurnError};
51use futures::{FutureExt, StreamExt};
52use tokio::io::{AsyncWriteExt, Stdout};
53
54use crate::args::OutputFormat;
55use crate::session_open::open_session;
56
57/// Runs a single-turn prompt and returns the process exit code.
58///
59/// Only when `track_denied = true` (the caller wrapped `NonInteractivePolicy`)
60/// is `PolicyDecision::Deny` treated as an "unattended gap" that affects the
61/// exit code —— modes like `deny-all`, where the user knowingly expects
62/// denials, should pass `false` to avoid spuriously reporting non-zero.
63///
64/// # Errors
65///
66/// Session open failure, stdin read failure, stdout/stderr write failure.
67/// When `goal` is `Some` (`--goal` mode): if the goal is unreached after the
68/// turn ends (turns exhausted without ever calling `goal_done`), exits with the
69/// Exhausted code —— prevents CI from mistaking "ran out of turns without
70/// reaching the goal" for success.
71pub async fn run(
72    agent: Arc<dyn AgentCore>,
73    cwd: PathBuf,
74    message: String,
75    format: OutputFormat,
76    resume: Option<SessionId>,
77    track_denied: bool,
78    goal: Option<Arc<defect_agent::session::GoalState>>,
79) -> anyhow::Result<ExitCode> {
80    let prompt = resolve_prompt(message).await?;
81
82    let mut out = tokio::io::stdout();
83
84    let session = open_session(&agent, &cwd, resume).await?;
85
86    // Subscribe once outside the loop, draining across this turn (including the
87    // driver's autonomous turn continuations) —— isomorphic to the interactive
88    // REPL / ACP event pump. The turn future only yields the final StopReason;
89    // this turn's content is pushed through the event stream.
90    let mut events = session.subscribe();
91    let mut sink = EventSink::new(format, track_denied);
92
93    let prompt_blocks = vec![ContentBlock::Text(TextContent::new(prompt))];
94    let turn = session.run_turn(prompt_blocks);
95    tokio::pin!(turn);
96
97    let result = loop {
98        tokio::select! {
99            // biased + drain events first: the turn future and the event stream
100            // may become ready at the same time (turn finishes fast, trailing
101            // events already in the buffer). Poll events first to avoid dropping
102            // any rendering.
103            biased;
104            ev = events.next() => {
105                if let Some(ev) = ev {
106                    sink.emit(&mut out, ev).await?;
107                }
108            }
109            r = &mut turn => break r,
110        }
111    };
112
113    // Turn has ended, but the buffer may still hold just-sent, not-yet-polled
114    // trailing events —— drain everything that is immediately ready.
115    while let Some(Some(ev)) = events.next().now_or_never() {
116        sink.emit(&mut out, ev).await?;
117    }
118
119    // goal mode: turn ended normally but the goal is unreached (turns exhausted
120    // without ever calling goal_done) → Exhausted.
121    let goal_unreached = goal.as_ref().is_some_and(|g| !g.is_reached());
122    if goal_unreached {
123        tracing::warn!(
124            "goal not reached: the agent stopped (or ran out of turns) without calling `goal_done`"
125        );
126    }
127    let outcome = ExitOutcome::from(&result, sink.denied, goal_unreached);
128    sink.finish(&mut out, &result, &outcome).await?;
129    out.flush().await?;
130    Ok(outcome.code())
131}
132
133/// Resolves the prompt source: `-`, or read from stdin when stdin is piped;
134/// otherwise use the literal value.
135async fn resolve_prompt(message: String) -> anyhow::Result<String> {
136    use std::io::IsTerminal;
137
138    let from_stdin = message == "-" || (message.is_empty() && !std::io::stdin().is_terminal());
139    if from_stdin {
140        use tokio::io::AsyncReadExt;
141        let mut buf = String::new();
142        tokio::io::stdin().read_to_string(&mut buf).await?;
143        Ok(buf)
144    } else {
145        Ok(message)
146    }
147}
148
149/// Reduction from the turn result to the process exit code.
150enum ExitOutcome {
151    Success,
152    Denied,
153    Cancelled,
154    MaxTokens,
155    Refusal,
156    Error,
157    /// goal mode: turn ended normally but the goal is unreached (turns exhausted
158    /// / model gave up).
159    GoalUnreached,
160}
161
162impl ExitOutcome {
163    fn from(result: &Result<StopReason, TurnError>, denied: bool, goal_unreached: bool) -> Self {
164        match result {
165            Err(_) => Self::Error,
166            Ok(StopReason::Refusal) => Self::Refusal,
167            Ok(StopReason::MaxTokens) | Ok(StopReason::MaxTurnRequests) => Self::MaxTokens,
168            Ok(StopReason::Cancelled) => Self::Cancelled,
169            // EndTurn (and any future success-class terminal states): denied >
170            // goal unreached > success.
171            Ok(_) if denied => Self::Denied,
172            Ok(_) if goal_unreached => Self::GoalUnreached,
173            Ok(_) => Self::Success,
174        }
175    }
176
177    /// Numeric exit code (0 = success).
178    fn raw(&self) -> u8 {
179        match self {
180            Self::Success => 0,
181            Self::Error => 1,
182            Self::MaxTokens => 2,
183            Self::Refusal => 3,
184            Self::Denied => 4,
185            Self::Cancelled => 5,
186            Self::GoalUnreached => 6,
187        }
188    }
189
190    fn code(&self) -> ExitCode {
191        ExitCode::from(self.raw())
192    }
193}
194
195/// Event projector: writes the [`AgentEvent`] stream to stdout/stderr according
196/// to the [`OutputFormat`].
197struct EventSink {
198    format: OutputFormat,
199    track_denied: bool,
200    /// Whether an unattended denial has occurred.
201    denied: bool,
202    /// `ToolCallId → tool name`, used to report which tool was involved on a
203    /// `PolicyDecision::Deny`.
204    tool_names: HashMap<ToolCallId, String>,
205    /// In text format: whether stdout is still mid-line (the last thing written
206    /// was not a `\n`). In goal mode it inserts newlines between multi-turn
207    /// assistant outputs, avoiding "previous tail + next head" running together
208    /// on one line.
209    mid_line: bool,
210    /// In text format: whether we are currently inside a thinking block. A
211    /// thinking block's multiple deltas share one `[thinking] ` prefix (printed
212    /// only on the first delta), so consecutive deltas merge into one block.
213    in_thought: bool,
214}
215
216impl EventSink {
217    fn new(format: OutputFormat, track_denied: bool) -> Self {
218        Self {
219            format,
220            track_denied,
221            denied: false,
222            tool_names: HashMap::new(),
223            mid_line: false,
224            in_thought: false,
225        }
226    }
227
228    async fn emit(&mut self, out: &mut Stdout, event: AgentEvent) -> anyhow::Result<()> {
229        // Record tool names (needed for every format, used for denial reports).
230        if let AgentEvent::ToolCallStarted { id, name, fields } = &event {
231            let label = fields.title.clone().unwrap_or_else(|| name.clone());
232            self.tool_names.insert(id.clone(), label);
233        }
234
235        // Unattended denial: framework-level diagnostic via tracing (→ stderr) +
236        // set the flag (fail loud).
237        if self.track_denied
238            && let AgentEvent::PolicyDecision {
239                id,
240                decision: PolicyDecision::Deny,
241            } = &event
242        {
243            self.denied = true;
244            let tool = self
245                .tool_names
246                .get(id)
247                .map(String::as_str)
248                .unwrap_or("<unknown>");
249            tracing::warn!(
250                tool = %tool,
251                "tool denied: no operator present to approve (non-interactive)"
252            );
253        }
254
255        match self.format {
256            OutputFormat::Json => self.emit_json(out, &event).await,
257            OutputFormat::Text => self.emit_text(out, &event).await,
258            OutputFormat::Quiet => Ok(()),
259        }
260    }
261
262    /// NDJSON: one line per event. `AgentEvent` already derives Serialize
263    /// (`LlmCallStarted.request` is `#[serde(skip)]`, so it never enters JSON).
264    async fn emit_json(&self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
265        let line = serde_json::to_string(event)?;
266        write_raw(out, &line).await?;
267        write_raw(out, "\n").await
268    }
269
270    /// Plain text: **all agent content** (body / thinking / tools) goes to
271    /// stdout as a single stream in event order —— framework logs (tracing) go
272    /// to stderr, the two never interfere.
273    ///
274    /// Boundary newlines: assistant body often has no trailing newline, while
275    /// what immediately follows may be a thinking / tool line or the next
276    /// generation. Before switching to a "non-body line" (thinking / tool) or
277    /// starting a new generation segment, if stdout is still mid-line insert a
278    /// `\n`, so each segment occupies whole lines and does not run together.
279    async fn emit_text(&mut self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
280        match event {
281            // A new LLM generation starts: if the previous assistant body had no
282            // newline, insert one before starting the new segment.
283            AgentEvent::LlmCallStarted { .. } | AgentEvent::TurnEnded { .. } => {
284                self.end_thought(out).await?;
285                self.break_line(out).await?;
286            }
287            AgentEvent::AssistantText { content } => {
288                if let Some(text) = block_text(content)
289                    && !text.is_empty()
290                {
291                    self.end_thought(out).await?;
292                    write_raw(out, &text).await?;
293                    out.flush().await?;
294                    self.mid_line = !text.ends_with('\n');
295                }
296            }
297            // A thinking block's multiple deltas share one `[thinking] ` prefix
298            // —— consecutive deltas merge, with each delta written raw.
299            AgentEvent::AssistantThought { content } => {
300                if let Some(text) = block_text(content)
301                    && !text.is_empty()
302                {
303                    if !self.in_thought {
304                        self.break_line(out).await?;
305                        write(out, "[thinking] ").await?;
306                        self.in_thought = true;
307                    }
308                    write_raw(out, &text).await?;
309                    out.flush().await?;
310                    self.mid_line = !text.ends_with('\n');
311                }
312            }
313            AgentEvent::ToolCallStarted { name, fields, .. } => {
314                self.end_thought(out).await?;
315                self.break_line(out).await?;
316                let title = fields.title.clone().unwrap_or_else(|| name.clone());
317                write(out, &format!("[tool] {title}\n")).await?;
318                out.flush().await?;
319            }
320            _ => {}
321        }
322        Ok(())
323    }
324
325    /// Ends the current thinking block: if inside one, clears the flag and
326    /// inserts a `\n`. Called before switching to body / tool / a new generation
327    /// / turn end, so the thinking block occupies whole lines.
328    async fn end_thought(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
329        if self.in_thought {
330            self.in_thought = false;
331            self.break_line(out).await?;
332        }
333        Ok(())
334    }
335
336    /// If stdout is still mid-line, insert a `\n` to close it and flush. Used
337    /// before a "non-body line" (thinking / tool) or a new generation segment,
338    /// so the previous assistant body occupies its own whole line —— even within
339    /// a single stream, content is separated by line.
340    async fn break_line(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
341        if self.mid_line {
342            write_raw(out, "\n").await?;
343            out.flush().await?;
344            self.mid_line = false;
345        }
346        Ok(())
347    }
348
349    /// Wrap-up output after the turn ends. Framework-level diagnostics (turn
350    /// errors) go through tracing (→ stderr).
351    async fn finish(
352        &self,
353        out: &mut Stdout,
354        result: &Result<StopReason, TurnError>,
355        outcome: &ExitOutcome,
356    ) -> anyhow::Result<()> {
357        if let Err(e) = result {
358            tracing::error!(error = %e, "turn error");
359        }
360        match self.format {
361            OutputFormat::Text => {
362                // When the streamed assistant body has no trailing newline,
363                // insert one to avoid running into the following shell prompt;
364                // if already at line start (mid_line=false) skip it, to not emit
365                // an extra blank line.
366                if self.mid_line {
367                    write_raw(out, "\n").await?;
368                }
369            }
370            OutputFormat::Json => {
371                // Final summary line: terminal state + exit-code semantics.
372                let summary = serde_json::json!({
373                    "type": "oneshot_result",
374                    "stop_reason": result.as_ref().ok().map(|r| format!("{r:?}")),
375                    "error": result.as_ref().err().map(|e| e.to_string()),
376                    "denied": self.denied,
377                    "exit_code": outcome.raw(),
378                });
379                write_raw(out, &summary.to_string()).await?;
380                write_raw(out, "\n").await?;
381            }
382            OutputFormat::Quiet => {}
383        }
384        Ok(())
385    }
386}
387
388/// Extracts text from a [`ContentBlock`]; non-text blocks return `None`.
389fn block_text(block: &ContentBlock) -> Option<String> {
390    match block {
391        ContentBlock::Text(t) => Some(t.text.clone()),
392        _ => None,
393    }
394}
395
396/// Writes a string to any async writer.
397async fn write<W>(out: &mut W, s: &str) -> anyhow::Result<()>
398where
399    W: AsyncWriteExt + Unpin,
400{
401    out.write_all(s.as_bytes()).await?;
402    Ok(())
403}
404
405/// Alias for `write`, semantically emphasizing "write as-is, with no
406/// decoration".
407async fn write_raw<W>(out: &mut W, s: &str) -> anyhow::Result<()>
408where
409    W: AsyncWriteExt + Unpin,
410{
411    write(out, s).await
412}