use std::collections::HashMap;
use std::path::PathBuf;
use std::process::ExitCode;
use std::sync::Arc;
use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason, TextContent, ToolCallId};
use defect_agent::event::AgentEvent;
use defect_agent::policy::PolicyDecision;
use defect_agent::session::{AgentCore, TurnError};
use futures::{FutureExt, StreamExt};
use tokio::io::{AsyncWriteExt, Stdout};
use crate::args::OutputFormat;
use crate::session_open::open_session;
pub async fn run(
agent: Arc<dyn AgentCore>,
cwd: PathBuf,
message: String,
format: OutputFormat,
resume: Option<SessionId>,
track_denied: bool,
goal: Option<Arc<defect_agent::session::GoalState>>,
) -> anyhow::Result<ExitCode> {
let prompt = resolve_prompt(message).await?;
let mut out = tokio::io::stdout();
let session = open_session(&agent, &cwd, resume).await?;
let mut events = session.subscribe();
let mut sink = EventSink::new(format, track_denied);
let prompt_blocks = vec![ContentBlock::Text(TextContent::new(prompt))];
let turn = session.run_turn(prompt_blocks);
tokio::pin!(turn);
let result = loop {
tokio::select! {
biased;
ev = events.next() => {
if let Some(ev) = ev {
sink.emit(&mut out, ev).await?;
}
}
r = &mut turn => break r,
}
};
while let Some(Some(ev)) = events.next().now_or_never() {
sink.emit(&mut out, ev).await?;
}
let goal_unreached = goal.as_ref().is_some_and(|g| !g.is_reached());
if goal_unreached {
tracing::warn!(
"goal not reached: the agent stopped (or ran out of turns) without calling `goal_done`"
);
}
let outcome = ExitOutcome::from(&result, sink.denied, goal_unreached);
sink.finish(&mut out, &result, &outcome).await?;
out.flush().await?;
Ok(outcome.code())
}
async fn resolve_prompt(message: String) -> anyhow::Result<String> {
use std::io::IsTerminal;
let from_stdin = message == "-" || (message.is_empty() && !std::io::stdin().is_terminal());
if from_stdin {
use tokio::io::AsyncReadExt;
let mut buf = String::new();
tokio::io::stdin().read_to_string(&mut buf).await?;
Ok(buf)
} else {
Ok(message)
}
}
enum ExitOutcome {
Success,
Denied,
Cancelled,
MaxTokens,
Refusal,
Error,
GoalUnreached,
}
impl ExitOutcome {
fn from(result: &Result<StopReason, TurnError>, denied: bool, goal_unreached: bool) -> Self {
match result {
Err(_) => Self::Error,
Ok(StopReason::Refusal) => Self::Refusal,
Ok(StopReason::MaxTokens) | Ok(StopReason::MaxTurnRequests) => Self::MaxTokens,
Ok(StopReason::Cancelled) => Self::Cancelled,
Ok(_) if denied => Self::Denied,
Ok(_) if goal_unreached => Self::GoalUnreached,
Ok(_) => Self::Success,
}
}
fn raw(&self) -> u8 {
match self {
Self::Success => 0,
Self::Error => 1,
Self::MaxTokens => 2,
Self::Refusal => 3,
Self::Denied => 4,
Self::Cancelled => 5,
Self::GoalUnreached => 6,
}
}
fn code(&self) -> ExitCode {
ExitCode::from(self.raw())
}
}
struct EventSink {
format: OutputFormat,
track_denied: bool,
denied: bool,
tool_names: HashMap<ToolCallId, String>,
mid_line: bool,
in_thought: bool,
}
impl EventSink {
fn new(format: OutputFormat, track_denied: bool) -> Self {
Self {
format,
track_denied,
denied: false,
tool_names: HashMap::new(),
mid_line: false,
in_thought: false,
}
}
async fn emit(&mut self, out: &mut Stdout, event: AgentEvent) -> anyhow::Result<()> {
if let AgentEvent::ToolCallStarted { id, name, fields } = &event {
let label = fields.title.clone().unwrap_or_else(|| name.clone());
self.tool_names.insert(id.clone(), label);
}
if self.track_denied
&& let AgentEvent::PolicyDecision {
id,
decision: PolicyDecision::Deny,
} = &event
{
self.denied = true;
let tool = self
.tool_names
.get(id)
.map(String::as_str)
.unwrap_or("<unknown>");
tracing::warn!(
tool = %tool,
"tool denied: no operator present to approve (non-interactive)"
);
}
match self.format {
OutputFormat::Json => self.emit_json(out, &event).await,
OutputFormat::Text => self.emit_text(out, &event).await,
OutputFormat::Quiet => Ok(()),
}
}
async fn emit_json(&self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
let line = serde_json::to_string(event)?;
write_raw(out, &line).await?;
write_raw(out, "\n").await
}
async fn emit_text(&mut self, out: &mut Stdout, event: &AgentEvent) -> anyhow::Result<()> {
match event {
AgentEvent::LlmCallStarted { .. } | AgentEvent::TurnEnded { .. } => {
self.end_thought(out).await?;
self.break_line(out).await?;
}
AgentEvent::AssistantText { content } => {
if let Some(text) = block_text(content)
&& !text.is_empty()
{
self.end_thought(out).await?;
write_raw(out, &text).await?;
out.flush().await?;
self.mid_line = !text.ends_with('\n');
}
}
AgentEvent::AssistantThought { content } => {
if let Some(text) = block_text(content)
&& !text.is_empty()
{
if !self.in_thought {
self.break_line(out).await?;
write(out, "[thinking] ").await?;
self.in_thought = true;
}
write_raw(out, &text).await?;
out.flush().await?;
self.mid_line = !text.ends_with('\n');
}
}
AgentEvent::ToolCallStarted { name, fields, .. } => {
self.end_thought(out).await?;
self.break_line(out).await?;
let title = fields.title.clone().unwrap_or_else(|| name.clone());
write(out, &format!("[tool] {title}\n")).await?;
out.flush().await?;
}
_ => {}
}
Ok(())
}
async fn end_thought(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
if self.in_thought {
self.in_thought = false;
self.break_line(out).await?;
}
Ok(())
}
async fn break_line(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
if self.mid_line {
write_raw(out, "\n").await?;
out.flush().await?;
self.mid_line = false;
}
Ok(())
}
async fn finish(
&self,
out: &mut Stdout,
result: &Result<StopReason, TurnError>,
outcome: &ExitOutcome,
) -> anyhow::Result<()> {
if let Err(e) = result {
tracing::error!(error = %e, "turn error");
}
match self.format {
OutputFormat::Text => {
if self.mid_line {
write_raw(out, "\n").await?;
}
}
OutputFormat::Json => {
let summary = serde_json::json!({
"type": "oneshot_result",
"stop_reason": result.as_ref().ok().map(|r| format!("{r:?}")),
"error": result.as_ref().err().map(|e| e.to_string()),
"denied": self.denied,
"exit_code": outcome.raw(),
});
write_raw(out, &summary.to_string()).await?;
write_raw(out, "\n").await?;
}
OutputFormat::Quiet => {}
}
Ok(())
}
}
fn block_text(block: &ContentBlock) -> Option<String> {
match block {
ContentBlock::Text(t) => Some(t.text.clone()),
_ => None,
}
}
async fn write<W>(out: &mut W, s: &str) -> anyhow::Result<()>
where
W: AsyncWriteExt + Unpin,
{
out.write_all(s.as_bytes()).await?;
Ok(())
}
async fn write_raw<W>(out: &mut W, s: &str) -> anyhow::Result<()>
where
W: AsyncWriteExt + Unpin,
{
write(out, s).await
}