use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use chrono::Utc;
use tokio::sync::broadcast;
use tracing::warn;
use atomr_agents_coding_cli_core::{
CliRequest, CliResult, CliRunId, CliVendor, CodingCliEvent, FinishReason, ToolCallRecord,
};
use atomr_agents_coding_cli_isolator::{IsolationOpts, Isolator};
use crate::error::Result;
pub(crate) async fn run_one(
run_id: CliRunId,
vendor: Arc<dyn CliVendor>,
isolator: Arc<dyn Isolator>,
req: CliRequest,
event_tx: broadcast::Sender<CodingCliEvent>,
cancel: Arc<AtomicBool>,
) -> Result<CliResult> {
vendor
.materialize_config(&req.project, &req.workdir)
.await?;
let cmd = vendor.build_headless_command(&req, &req.workdir);
let _ = event_tx.send(CodingCliEvent::RunStarted {
run_id: run_id.clone(),
vendor: vendor.kind(),
model: req.model.clone(),
session_id: None,
});
let mut handle = isolator
.spawn(
cmd,
IsolationOpts {
capture_stdout: true,
capture_stderr: true,
grace: None,
},
)
.await?;
let mut stdout = handle
.take_stdout()
.expect("stdout requested via opts");
let stderr = handle.take_stderr();
let mut parser = vendor.new_parser();
let mut result = CliResult::new(run_id.clone(), vendor.kind());
let mut pending_tools: std::collections::HashMap<String, ToolCallRecord> =
std::collections::HashMap::new();
while let Some(chunk) = stdout.recv().await {
if cancel.load(Ordering::Relaxed) {
result.finish_reason = FinishReason::Cancelled;
let _ = handle.kill().await;
break;
}
for line in std::str::from_utf8(&chunk).unwrap_or("").lines() {
let events = match parser.parse_line(line) {
Ok(evs) => evs,
Err(e) => {
warn!(error = %e, "parser failed on line; emitting Note");
vec![CodingCliEvent::Note {
message: format!("parse error: {e}"),
fields: Default::default(),
}]
}
};
for ev in events {
accumulate(&mut result, &mut pending_tools, &ev);
if let CodingCliEvent::RunFinished { reason, .. } = &ev {
result.finish_reason = *reason;
}
let _ = event_tx.send(ev);
}
}
}
let trailing = parser.flush()?;
for ev in trailing {
accumulate(&mut result, &mut pending_tools, &ev);
let _ = event_tx.send(ev);
}
if let Some(mut stderr) = stderr {
let event_tx = event_tx.clone();
tokio::spawn(async move {
while let Some(chunk) = stderr.recv().await {
let text = String::from_utf8_lossy(&chunk).into_owned();
let _ = event_tx.send(CodingCliEvent::Note {
message: format!("stderr: {}", text.trim_end()),
fields: Default::default(),
});
}
});
}
let status = handle.wait().await?;
result.exit_code = status.code;
result.ended_at = Some(Utc::now());
for (id, mut rec) in pending_tools.into_iter() {
rec.error.get_or_insert("tool call did not finish".into());
rec.finished_at = Some(Utc::now());
result.tool_calls.push(rec);
let _ = event_tx.send(CodingCliEvent::Note {
message: format!("tool {} never reported a result", id),
fields: Default::default(),
});
}
if !status.success && result.finish_reason == FinishReason::Completed {
result.finish_reason = FinishReason::ProcessError;
}
let _ = event_tx.send(CodingCliEvent::RunFinished {
reason: result.finish_reason,
result_text: Some(result.final_text.clone()),
});
Ok(result)
}
fn accumulate(
result: &mut CliResult,
pending: &mut std::collections::HashMap<String, ToolCallRecord>,
ev: &CodingCliEvent,
) {
match ev {
CodingCliEvent::AssistantTextDelta { text } => {
result.final_text.push_str(text);
}
CodingCliEvent::ToolCallStarted {
tool_call_id,
name,
input,
} => {
pending.insert(
tool_call_id.clone(),
ToolCallRecord {
tool_call_id: tool_call_id.clone(),
name: name.clone(),
input: input.clone(),
output: None,
error: None,
started_at: Utc::now(),
finished_at: None,
},
);
}
CodingCliEvent::ToolCallFinished {
tool_call_id,
output,
error,
} => {
if let Some(mut rec) = pending.remove(tool_call_id) {
rec.output = output.clone();
rec.error = error.clone();
rec.finished_at = Some(Utc::now());
result.tool_calls.push(rec);
} else {
result.tool_calls.push(ToolCallRecord {
tool_call_id: tool_call_id.clone(),
name: String::new(),
input: serde_json::Value::Null,
output: output.clone(),
error: error.clone(),
started_at: Utc::now(),
finished_at: Some(Utc::now()),
});
}
}
CodingCliEvent::Usage {
input_tokens,
output_tokens,
cost_usd,
} => result.usage.add(*input_tokens, *output_tokens, *cost_usd),
CodingCliEvent::RunFinished { result_text, .. } => {
if let Some(t) = result_text {
if !t.is_empty() {
result.final_text = t.clone();
}
}
}
_ => {}
}
}