1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use tokio::sync::mpsc;
5
6use crate::agent::{Agent, AgentEvent, AgentProfile, TodoStatus};
7use crate::command::CommandRegistry;
8use crate::config::Config;
9use crate::db::Db;
10use crate::extension::HookRegistry;
11use crate::memory::MemoryStore;
12use crate::provider::Provider;
13use crate::tools::ToolRegistry;
14
15#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum OutputFormat {
17 Text,
18 Json,
19 StreamJson,
20}
21
22impl OutputFormat {
23 pub fn parse(s: &str) -> Self {
24 match s {
25 "json" => Self::Json,
26 "stream-json" => Self::StreamJson,
27 _ => Self::Text,
28 }
29 }
30}
31
32pub struct HeadlessOptions {
33 pub prompt: String,
34 pub format: OutputFormat,
35 pub no_tools: bool,
36 pub resume_id: Option<String>,
37 pub interactive: bool,
38 pub task_id: Option<String>,
39}
40
41struct TurnResult {
42 text: String,
43 tool_calls: Vec<serde_json::Value>,
44 session_id: String,
45 error: Option<String>,
46}
47
48#[allow(clippy::too_many_arguments)]
49pub async fn run(
50 config: Config,
51 providers: Vec<Box<dyn Provider>>,
52 db: Db,
53 memory: Option<Arc<MemoryStore>>,
54 tools: ToolRegistry,
55 profiles: Vec<AgentProfile>,
56 cwd: String,
57 skill_names: Vec<(String, String)>,
58 hooks: HookRegistry,
59 commands: CommandRegistry,
60 opts: HeadlessOptions,
61) -> Result<()> {
62 let _ = skill_names;
63 let agents_context = crate::context::AgentsContext::load(&cwd, &config.context);
64 let (bg_tx, bg_rx) = mpsc::unbounded_channel();
65 let mut agent = Agent::new(
66 providers,
67 db,
68 &config,
69 memory,
70 tools,
71 profiles,
72 cwd,
73 agents_context,
74 hooks,
75 commands,
76 )?;
77 agent.set_background_tx(bg_tx);
78
79 if let Some(ref id) = opts.resume_id {
80 let conv = agent
81 .get_session(id)
82 .with_context(|| format!("resuming session {id}"))?;
83 agent.resume_conversation(&conv)?;
84 }
85
86 let session_id = agent.conversation_id().to_string();
88 if opts.format == OutputFormat::StreamJson {
89 let obj = serde_json::json!({
90 "type": "session_start",
91 "session_id": session_id,
92 });
93 println!("{obj}");
94 }
95
96 if !opts.interactive {
98 let result = run_turn(&mut agent, &opts.prompt, &opts, bg_rx).await?;
99 let success = result.error.is_none();
100 emit_turn_end(&result, &opts);
101
102 if let Some(ref task_id) = opts.task_id {
103 let db = crate::db::Db::open().ok();
104 if let Some(db) = db {
105 let status = if success { "completed" } else { "failed" };
106 let output = if let Some(ref e) = result.error {
107 e.clone()
108 } else {
109 result.text.clone()
110 };
111 let _ = db.complete_task(task_id, status, Some(&result.session_id), &output);
112 }
113 }
114
115 if !success {
116 std::process::exit(1);
117 }
118 return Ok(());
119 }
120
121 let mut bg_rx = bg_rx;
124 if !opts.prompt.is_empty() {
125 let (result, new_bg_rx) = run_turn_multi(&mut agent, &opts.prompt, &opts, bg_rx).await?;
126 bg_rx = new_bg_rx;
127 emit_turn_end(&result, &opts);
128 }
129
130 let stdin = tokio::io::stdin();
132 let reader = tokio::io::BufReader::new(stdin);
133 use tokio::io::AsyncBufReadExt;
134 let mut lines = reader.lines();
135
136 loop {
137 if opts.format == OutputFormat::StreamJson {
139 let obj = serde_json::json!({"type": "ready"});
140 println!("{obj}");
141 } else if opts.format == OutputFormat::Text {
142 eprint!("> ");
143 }
144
145 let line = match lines.next_line().await {
146 Ok(Some(line)) => line,
147 Ok(None) => break, Err(e) => {
149 eprintln!("[error] reading stdin: {e}");
150 break;
151 }
152 };
153
154 let prompt = line.trim().to_string();
155 if prompt.is_empty() {
156 continue;
157 }
158 if prompt == "/quit" || prompt == "/exit" {
159 break;
160 }
161
162 let (result, new_bg_rx) = run_turn_multi(&mut agent, &prompt, &opts, bg_rx).await?;
163 bg_rx = new_bg_rx;
164 emit_turn_end(&result, &opts);
165 }
166
167 let session_id = agent.conversation_id().to_string();
169 let title = agent.conversation_title();
170 if opts.format == OutputFormat::StreamJson {
171 let obj = serde_json::json!({
172 "type": "session_end",
173 "session_id": session_id,
174 "title": title,
175 });
176 println!("{obj}");
177 } else if opts.format == OutputFormat::Text
178 && let Some(ref t) = title
179 {
180 eprintln!("\n[session] {t} ({session_id})");
181 }
182
183 agent.cleanup_if_empty();
184 Ok(())
185}
186
187async fn run_turn(
189 agent: &mut Agent,
190 prompt: &str,
191 opts: &HeadlessOptions,
192 mut bg_rx: mpsc::UnboundedReceiver<AgentEvent>,
193) -> Result<TurnResult> {
194 let session_id = agent.conversation_id().to_string();
195 let (tx, mut rx) = mpsc::unbounded_channel();
196 let future = agent.send_message(prompt, tx);
197
198 let mut text = String::new();
199 let mut tool_calls: Vec<serde_json::Value> = Vec::new();
200 let mut error: Option<String> = None;
201
202 tokio::pin!(future);
203
204 loop {
205 tokio::select! {
206 biased;
207 result = &mut future => {
208 if let Err(e) = result {
209 error = Some(e.to_string());
210 }
211 while let Ok(ev) = rx.try_recv() {
213 handle_event(&ev, opts, &mut text, &mut tool_calls);
214 }
215 while let Ok(ev) = bg_rx.try_recv() {
216 handle_event(&ev, opts, &mut text, &mut tool_calls);
217 }
218 break;
219 }
220 Some(ev) = rx.recv() => {
221 handle_event(&ev, opts, &mut text, &mut tool_calls);
222 }
223 Some(ev) = bg_rx.recv() => {
224 handle_event(&ev, opts, &mut text, &mut tool_calls);
225 }
226 }
227 }
228
229 Ok(TurnResult {
230 text,
231 tool_calls,
232 session_id,
233 error,
234 })
235}
236
237async fn run_turn_multi(
239 agent: &mut Agent,
240 prompt: &str,
241 opts: &HeadlessOptions,
242 mut bg_rx: mpsc::UnboundedReceiver<AgentEvent>,
243) -> Result<(TurnResult, mpsc::UnboundedReceiver<AgentEvent>)> {
244 let session_id = agent.conversation_id().to_string();
245 let (tx, mut rx) = mpsc::unbounded_channel();
246 let future = agent.send_message(prompt, tx);
247
248 let mut text = String::new();
249 let mut tool_calls: Vec<serde_json::Value> = Vec::new();
250 let mut error: Option<String> = None;
251
252 tokio::pin!(future);
253
254 loop {
255 tokio::select! {
256 biased;
257 result = &mut future => {
258 if let Err(e) = result {
259 error = Some(e.to_string());
260 }
261 while let Ok(ev) = rx.try_recv() {
262 handle_event(&ev, opts, &mut text, &mut tool_calls);
263 }
264 while let Ok(ev) = bg_rx.try_recv() {
265 handle_event(&ev, opts, &mut text, &mut tool_calls);
266 }
267 break;
268 }
269 Some(ev) = rx.recv() => {
270 handle_event(&ev, opts, &mut text, &mut tool_calls);
271 }
272 Some(ev) = bg_rx.recv() => {
273 handle_event(&ev, opts, &mut text, &mut tool_calls);
274 }
275 }
276 }
277
278 let result = TurnResult {
279 text,
280 tool_calls,
281 session_id,
282 error,
283 };
284 Ok((result, bg_rx))
285}
286
287fn emit_turn_end(result: &TurnResult, opts: &HeadlessOptions) {
288 let success = result.error.is_none();
289 if opts.format == OutputFormat::Json {
290 let mut output = serde_json::json!({
291 "success": success,
292 "session_id": result.session_id,
293 "text": result.text,
294 "tool_calls": result.tool_calls,
295 });
296 if let Some(ref e) = result.error {
297 output["error"] = serde_json::json!(e);
298 }
299 println!(
300 "{}",
301 serde_json::to_string_pretty(&output).unwrap_or_default()
302 );
303 } else if opts.format == OutputFormat::StreamJson {
304 let mut obj = serde_json::json!({
305 "type": "turn_complete",
306 "success": success,
307 "session_id": result.session_id,
308 "text": result.text,
309 });
310 if let Some(ref e) = result.error {
311 obj["error"] = serde_json::json!(e);
312 }
313 println!("{obj}");
314 } else if let Some(ref e) = result.error {
315 eprintln!("[error] {e}");
316 }
317}
318
319fn handle_event(
320 ev: &AgentEvent,
321 opts: &HeadlessOptions,
322 final_text: &mut String,
323 tool_outputs: &mut Vec<serde_json::Value>,
324) {
325 match ev {
326 AgentEvent::TextDelta(text) => {
327 if opts.format == OutputFormat::Text {
328 eprint!("{text}");
329 } else if opts.format == OutputFormat::StreamJson {
330 let obj = serde_json::json!({"type": "text_delta", "text": text});
331 println!("{obj}");
332 }
333 }
334 AgentEvent::TextComplete(text) => {
335 *final_text = text.clone();
336 if opts.format == OutputFormat::Text {
337 eprintln!();
338 println!("{text}");
339 } else if opts.format == OutputFormat::StreamJson {
340 let obj = serde_json::json!({"type": "text_complete", "text": text});
341 println!("{obj}");
342 }
343 }
344 AgentEvent::ThinkingDelta(text) => {
345 if opts.format == OutputFormat::StreamJson {
346 let obj = serde_json::json!({"type": "thinking_delta", "text": text});
347 println!("{obj}");
348 }
349 }
350 AgentEvent::ToolCallStart { id, name } => {
351 if !opts.no_tools {
352 if opts.format == OutputFormat::Text {
353 eprintln!("[tool] {name} ({id})");
354 } else if opts.format == OutputFormat::StreamJson {
355 let obj = serde_json::json!({"type": "tool_start", "id": id, "name": name});
356 println!("{obj}");
357 }
358 }
359 }
360 AgentEvent::ToolCallExecuting { id, name, input } => {
361 if !opts.no_tools && opts.format == OutputFormat::StreamJson {
362 let obj = serde_json::json!({"type": "tool_executing", "id": id, "name": name, "input": input});
363 println!("{obj}");
364 }
365 }
366 AgentEvent::ToolCallResult {
367 id,
368 name,
369 output,
370 is_error,
371 } => {
372 if !opts.no_tools {
373 if opts.format == OutputFormat::Text {
374 let prefix = if *is_error { "[error]" } else { "[result]" };
375 eprintln!("{prefix} {name}: {}", truncate(output, 500));
376 } else if opts.format == OutputFormat::StreamJson {
377 let obj = serde_json::json!({
378 "type": "tool_result",
379 "id": id,
380 "name": name,
381 "output": output,
382 "is_error": is_error,
383 });
384 println!("{obj}");
385 }
386 }
387 tool_outputs.push(serde_json::json!({
388 "id": id,
389 "name": name,
390 "output": output,
391 "is_error": is_error,
392 }));
393 }
394 AgentEvent::Question {
395 id,
396 question,
397 options,
398 responder: _,
399 } => {
400 if opts.format == OutputFormat::Text {
403 eprintln!("[question] {question}");
404 if !options.is_empty() {
405 for (i, opt) in options.iter().enumerate() {
406 eprintln!(" {}: {opt}", i + 1);
407 }
408 }
409 } else if opts.format == OutputFormat::StreamJson {
410 let obj = serde_json::json!({
411 "type": "question",
412 "id": id,
413 "question": question,
414 "options": options,
415 });
416 println!("{obj}");
417 }
418 }
419 AgentEvent::PermissionRequest {
420 tool_name,
421 input_summary,
422 responder: _,
423 } => {
424 if opts.format == OutputFormat::Text {
425 eprintln!("[permission] {tool_name}: {input_summary}");
426 } else if opts.format == OutputFormat::StreamJson {
427 let obj = serde_json::json!({
428 "type": "permission_request",
429 "tool_name": tool_name,
430 "input_summary": input_summary,
431 });
432 println!("{obj}");
433 }
434 }
435 AgentEvent::TodoUpdate(items) => {
436 if opts.format == OutputFormat::StreamJson {
437 let todos: Vec<serde_json::Value> = items
438 .iter()
439 .map(|t| {
440 serde_json::json!({
441 "content": t.content,
442 "status": match t.status {
443 TodoStatus::Pending => "pending",
444 TodoStatus::InProgress => "in_progress",
445 TodoStatus::Completed => "completed",
446 }
447 })
448 })
449 .collect();
450 let obj = serde_json::json!({"type": "todo_update", "todos": todos});
451 println!("{obj}");
452 } else if opts.format == OutputFormat::Text {
453 eprintln!("[todos]");
454 for t in items {
455 let icon = match t.status {
456 TodoStatus::Pending => "○",
457 TodoStatus::InProgress => "◑",
458 TodoStatus::Completed => "●",
459 };
460 eprintln!(" {icon} {}", t.content);
461 }
462 }
463 }
464 AgentEvent::Done { usage } => {
465 if opts.format == OutputFormat::StreamJson {
466 let obj = serde_json::json!({
467 "type": "done",
468 "usage": {
469 "input_tokens": usage.input_tokens,
470 "output_tokens": usage.output_tokens,
471 "cache_read_tokens": usage.cache_read_tokens,
472 "cache_write_tokens": usage.cache_write_tokens,
473 }
474 });
475 println!("{obj}");
476 }
477 }
478 AgentEvent::Error(msg) => {
479 if opts.format == OutputFormat::Text {
480 eprintln!("[error] {msg}");
481 } else if opts.format == OutputFormat::StreamJson {
482 let obj = serde_json::json!({"type": "error", "message": msg});
483 println!("{obj}");
484 }
485 }
486 AgentEvent::Compacting => {
487 if opts.format == OutputFormat::StreamJson {
488 let obj = serde_json::json!({"type": "compacting"});
489 println!("{obj}");
490 } else if opts.format == OutputFormat::Text {
491 eprintln!("[compacting conversation...]");
492 }
493 }
494 AgentEvent::Compacted { messages_removed } => {
495 if opts.format == OutputFormat::StreamJson {
496 let obj =
497 serde_json::json!({"type": "compacted", "messages_removed": messages_removed});
498 println!("{obj}");
499 }
500 }
501 AgentEvent::SubagentStart {
502 id,
503 description,
504 background,
505 } => {
506 if opts.format == OutputFormat::StreamJson {
507 let obj = serde_json::json!({"type": "subagent_start", "id": id, "description": description, "background": background});
508 println!("{obj}");
509 } else if opts.format == OutputFormat::Text {
510 eprintln!("[subagent] {description} ({id})");
511 }
512 }
513 AgentEvent::SubagentDelta { id, text } => {
514 if opts.format == OutputFormat::StreamJson {
515 let obj = serde_json::json!({"type": "subagent_delta", "id": id, "text": text});
516 println!("{obj}");
517 }
518 }
519 AgentEvent::SubagentToolStart {
520 id,
521 tool_name,
522 detail,
523 } => {
524 if opts.format == OutputFormat::StreamJson {
525 let obj = serde_json::json!({"type": "subagent_tool_start", "id": id, "tool_name": tool_name, "detail": detail});
526 println!("{obj}");
527 }
528 }
529 AgentEvent::SubagentToolComplete { id, tool_name } => {
530 if opts.format == OutputFormat::StreamJson {
531 let obj = serde_json::json!({"type": "subagent_tool_complete", "id": id, "tool_name": tool_name});
532 println!("{obj}");
533 }
534 }
535 AgentEvent::SubagentComplete { id, output } => {
536 if opts.format == OutputFormat::StreamJson {
537 let obj =
538 serde_json::json!({"type": "subagent_complete", "id": id, "output": output});
539 println!("{obj}");
540 }
541 }
542 AgentEvent::SubagentBackgroundDone {
543 id,
544 description,
545 output,
546 } => {
547 if opts.format == OutputFormat::StreamJson {
548 let obj = serde_json::json!({"type": "subagent_background_done", "id": id, "description": description, "output": output});
549 println!("{obj}");
550 } else if opts.format == OutputFormat::Text {
551 eprintln!("[subagent done] {description}");
552 }
553 }
554 AgentEvent::TitleGenerated(title) => {
555 if opts.format == OutputFormat::StreamJson {
556 let obj = serde_json::json!({"type": "title_generated", "title": title});
557 println!("{obj}");
558 }
559 }
560 AgentEvent::MemoryExtracted {
561 added,
562 updated,
563 deleted,
564 } => {
565 if opts.format == OutputFormat::StreamJson {
566 let obj = serde_json::json!({"type": "memory_extracted", "added": added, "updated": updated, "deleted": deleted});
567 println!("{obj}");
568 }
569 }
570 AgentEvent::ToolCallInputDelta(_) => {
571 }
573 AgentEvent::AsideDelta(_) | AgentEvent::AsideDone | AgentEvent::AsideError(_) => {
574 }
576 }
577}
578
579fn truncate(s: &str, max: usize) -> &str {
580 if s.len() <= max { s } else { &s[..max] }
581}