1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use tokio::sync::mpsc;
5
6use crate::agent::{Agent, AgentEvent, AgentProfile, TodoStatus};
7use crate::command::CommandRegistry;
8use crate::config::Config;
9use crate::db::Db;
10use crate::extension::HookRegistry;
11use crate::memory::MemoryStore;
12use crate::provider::Provider;
13use crate::tools::ToolRegistry;
14
15#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum OutputFormat {
17 Text,
18 Json,
19 StreamJson,
20}
21
22impl OutputFormat {
23 pub fn parse(s: &str) -> Self {
24 match s {
25 "json" => Self::Json,
26 "stream-json" => Self::StreamJson,
27 _ => Self::Text,
28 }
29 }
30}
31
32pub struct HeadlessOptions {
33 pub prompt: String,
34 pub format: OutputFormat,
35 pub no_tools: bool,
36 pub resume_id: Option<String>,
37 pub interactive: bool,
38}
39
40struct TurnResult {
41 text: String,
42 tool_calls: Vec<serde_json::Value>,
43 session_id: String,
44}
45
46#[allow(clippy::too_many_arguments)]
47pub async fn run(
48 config: Config,
49 providers: Vec<Box<dyn Provider>>,
50 db: Db,
51 memory: Option<Arc<MemoryStore>>,
52 tools: ToolRegistry,
53 profiles: Vec<AgentProfile>,
54 cwd: String,
55 skill_names: Vec<(String, String)>,
56 hooks: HookRegistry,
57 commands: CommandRegistry,
58 opts: HeadlessOptions,
59) -> Result<()> {
60 let _ = skill_names;
61 let agents_context = crate::context::AgentsContext::load(&cwd, &config.context);
62 let (bg_tx, bg_rx) = mpsc::unbounded_channel();
63 let mut agent = Agent::new(
64 providers,
65 db,
66 &config,
67 memory,
68 tools,
69 profiles,
70 cwd,
71 agents_context,
72 hooks,
73 commands,
74 )?;
75 agent.set_background_tx(bg_tx);
76
77 if let Some(ref id) = opts.resume_id {
78 let conv = agent
79 .get_session(id)
80 .with_context(|| format!("resuming session {id}"))?;
81 agent.resume_conversation(&conv)?;
82 }
83
84 let session_id = agent.conversation_id().to_string();
86 if opts.format == OutputFormat::StreamJson {
87 let obj = serde_json::json!({
88 "type": "session_start",
89 "session_id": session_id,
90 });
91 println!("{obj}");
92 }
93
94 if !opts.interactive {
96 let result = run_turn(&mut agent, &opts.prompt, &opts, bg_rx).await?;
97 emit_turn_end(&result, &opts);
98 return Ok(());
99 }
100
101 let mut bg_rx = bg_rx;
104 if !opts.prompt.is_empty() {
105 let (result, new_bg_rx) = run_turn_multi(&mut agent, &opts.prompt, &opts, bg_rx).await?;
106 bg_rx = new_bg_rx;
107 emit_turn_end(&result, &opts);
108 }
109
110 let stdin = tokio::io::stdin();
112 let reader = tokio::io::BufReader::new(stdin);
113 use tokio::io::AsyncBufReadExt;
114 let mut lines = reader.lines();
115
116 loop {
117 if opts.format == OutputFormat::StreamJson {
119 let obj = serde_json::json!({"type": "ready"});
120 println!("{obj}");
121 } else if opts.format == OutputFormat::Text {
122 eprint!("> ");
123 }
124
125 let line = match lines.next_line().await {
126 Ok(Some(line)) => line,
127 Ok(None) => break, Err(e) => {
129 eprintln!("[error] reading stdin: {e}");
130 break;
131 }
132 };
133
134 let prompt = line.trim().to_string();
135 if prompt.is_empty() {
136 continue;
137 }
138 if prompt == "/quit" || prompt == "/exit" {
139 break;
140 }
141
142 let (result, new_bg_rx) = run_turn_multi(&mut agent, &prompt, &opts, bg_rx).await?;
143 bg_rx = new_bg_rx;
144 emit_turn_end(&result, &opts);
145 }
146
147 let session_id = agent.conversation_id().to_string();
149 let title = agent.conversation_title();
150 if opts.format == OutputFormat::StreamJson {
151 let obj = serde_json::json!({
152 "type": "session_end",
153 "session_id": session_id,
154 "title": title,
155 });
156 println!("{obj}");
157 } else if opts.format == OutputFormat::Text
158 && let Some(ref t) = title
159 {
160 eprintln!("\n[session] {t} ({session_id})");
161 }
162
163 agent.cleanup_if_empty();
164 Ok(())
165}
166
167async fn run_turn(
169 agent: &mut Agent,
170 prompt: &str,
171 opts: &HeadlessOptions,
172 mut bg_rx: mpsc::UnboundedReceiver<AgentEvent>,
173) -> Result<TurnResult> {
174 let session_id = agent.conversation_id().to_string();
175 let (tx, mut rx) = mpsc::unbounded_channel();
176 let future = agent.send_message(prompt, tx);
177
178 let mut text = String::new();
179 let mut tool_calls: Vec<serde_json::Value> = Vec::new();
180
181 tokio::pin!(future);
182
183 loop {
184 tokio::select! {
185 biased;
186 result = &mut future => {
187 result.context("agent send_message failed")?;
188 while let Ok(ev) = rx.try_recv() {
190 handle_event(&ev, opts, &mut text, &mut tool_calls);
191 }
192 while let Ok(ev) = bg_rx.try_recv() {
193 handle_event(&ev, opts, &mut text, &mut tool_calls);
194 }
195 break;
196 }
197 Some(ev) = rx.recv() => {
198 handle_event(&ev, opts, &mut text, &mut tool_calls);
199 }
200 Some(ev) = bg_rx.recv() => {
201 handle_event(&ev, opts, &mut text, &mut tool_calls);
202 }
203 }
204 }
205
206 Ok(TurnResult {
207 text,
208 tool_calls,
209 session_id,
210 })
211}
212
213async fn run_turn_multi(
215 agent: &mut Agent,
216 prompt: &str,
217 opts: &HeadlessOptions,
218 mut bg_rx: mpsc::UnboundedReceiver<AgentEvent>,
219) -> Result<(TurnResult, mpsc::UnboundedReceiver<AgentEvent>)> {
220 let session_id = agent.conversation_id().to_string();
221 let (tx, mut rx) = mpsc::unbounded_channel();
222 let future = agent.send_message(prompt, tx);
223
224 let mut text = String::new();
225 let mut tool_calls: Vec<serde_json::Value> = Vec::new();
226
227 tokio::pin!(future);
228
229 loop {
230 tokio::select! {
231 biased;
232 result = &mut future => {
233 result.context("agent send_message failed")?;
234 while let Ok(ev) = rx.try_recv() {
235 handle_event(&ev, opts, &mut text, &mut tool_calls);
236 }
237 while let Ok(ev) = bg_rx.try_recv() {
238 handle_event(&ev, opts, &mut text, &mut tool_calls);
239 }
240 break;
241 }
242 Some(ev) = rx.recv() => {
243 handle_event(&ev, opts, &mut text, &mut tool_calls);
244 }
245 Some(ev) = bg_rx.recv() => {
246 handle_event(&ev, opts, &mut text, &mut tool_calls);
247 }
248 }
249 }
250
251 let result = TurnResult {
252 text,
253 tool_calls,
254 session_id,
255 };
256 Ok((result, bg_rx))
257}
258
259fn emit_turn_end(result: &TurnResult, opts: &HeadlessOptions) {
260 if opts.format == OutputFormat::Json {
261 let output = serde_json::json!({
262 "session_id": result.session_id,
263 "text": result.text,
264 "tool_calls": result.tool_calls,
265 });
266 println!(
267 "{}",
268 serde_json::to_string_pretty(&output).unwrap_or_default()
269 );
270 } else if opts.format == OutputFormat::StreamJson {
271 let obj = serde_json::json!({
272 "type": "turn_complete",
273 "session_id": result.session_id,
274 "text": result.text,
275 });
276 println!("{obj}");
277 }
278 }
280
281fn handle_event(
282 ev: &AgentEvent,
283 opts: &HeadlessOptions,
284 final_text: &mut String,
285 tool_outputs: &mut Vec<serde_json::Value>,
286) {
287 match ev {
288 AgentEvent::TextDelta(text) => {
289 if opts.format == OutputFormat::Text {
290 eprint!("{text}");
291 } else if opts.format == OutputFormat::StreamJson {
292 let obj = serde_json::json!({"type": "text_delta", "text": text});
293 println!("{obj}");
294 }
295 }
296 AgentEvent::TextComplete(text) => {
297 *final_text = text.clone();
298 if opts.format == OutputFormat::Text {
299 eprintln!();
300 println!("{text}");
301 } else if opts.format == OutputFormat::StreamJson {
302 let obj = serde_json::json!({"type": "text_complete", "text": text});
303 println!("{obj}");
304 }
305 }
306 AgentEvent::ThinkingDelta(text) => {
307 if opts.format == OutputFormat::StreamJson {
308 let obj = serde_json::json!({"type": "thinking_delta", "text": text});
309 println!("{obj}");
310 }
311 }
312 AgentEvent::ToolCallStart { id, name } => {
313 if !opts.no_tools {
314 if opts.format == OutputFormat::Text {
315 eprintln!("[tool] {name} ({id})");
316 } else if opts.format == OutputFormat::StreamJson {
317 let obj = serde_json::json!({"type": "tool_start", "id": id, "name": name});
318 println!("{obj}");
319 }
320 }
321 }
322 AgentEvent::ToolCallExecuting { id, name, input } => {
323 if !opts.no_tools && opts.format == OutputFormat::StreamJson {
324 let obj = serde_json::json!({"type": "tool_executing", "id": id, "name": name, "input": input});
325 println!("{obj}");
326 }
327 }
328 AgentEvent::ToolCallResult {
329 id,
330 name,
331 output,
332 is_error,
333 } => {
334 if !opts.no_tools {
335 if opts.format == OutputFormat::Text {
336 let prefix = if *is_error { "[error]" } else { "[result]" };
337 eprintln!("{prefix} {name}: {}", truncate(output, 500));
338 } else if opts.format == OutputFormat::StreamJson {
339 let obj = serde_json::json!({
340 "type": "tool_result",
341 "id": id,
342 "name": name,
343 "output": output,
344 "is_error": is_error,
345 });
346 println!("{obj}");
347 }
348 }
349 tool_outputs.push(serde_json::json!({
350 "id": id,
351 "name": name,
352 "output": output,
353 "is_error": is_error,
354 }));
355 }
356 AgentEvent::Question {
357 id,
358 question,
359 options,
360 responder: _,
361 } => {
362 if opts.format == OutputFormat::Text {
365 eprintln!("[question] {question}");
366 if !options.is_empty() {
367 for (i, opt) in options.iter().enumerate() {
368 eprintln!(" {}: {opt}", i + 1);
369 }
370 }
371 } else if opts.format == OutputFormat::StreamJson {
372 let obj = serde_json::json!({
373 "type": "question",
374 "id": id,
375 "question": question,
376 "options": options,
377 });
378 println!("{obj}");
379 }
380 }
381 AgentEvent::PermissionRequest {
382 tool_name,
383 input_summary,
384 responder: _,
385 } => {
386 if opts.format == OutputFormat::Text {
387 eprintln!("[permission] {tool_name}: {input_summary}");
388 } else if opts.format == OutputFormat::StreamJson {
389 let obj = serde_json::json!({
390 "type": "permission_request",
391 "tool_name": tool_name,
392 "input_summary": input_summary,
393 });
394 println!("{obj}");
395 }
396 }
397 AgentEvent::TodoUpdate(items) => {
398 if opts.format == OutputFormat::StreamJson {
399 let todos: Vec<serde_json::Value> = items
400 .iter()
401 .map(|t| {
402 serde_json::json!({
403 "content": t.content,
404 "status": match t.status {
405 TodoStatus::Pending => "pending",
406 TodoStatus::InProgress => "in_progress",
407 TodoStatus::Completed => "completed",
408 }
409 })
410 })
411 .collect();
412 let obj = serde_json::json!({"type": "todo_update", "todos": todos});
413 println!("{obj}");
414 } else if opts.format == OutputFormat::Text {
415 eprintln!("[todos]");
416 for t in items {
417 let icon = match t.status {
418 TodoStatus::Pending => "○",
419 TodoStatus::InProgress => "◑",
420 TodoStatus::Completed => "●",
421 };
422 eprintln!(" {icon} {}", t.content);
423 }
424 }
425 }
426 AgentEvent::Done { usage } => {
427 if opts.format == OutputFormat::StreamJson {
428 let obj = serde_json::json!({
429 "type": "done",
430 "usage": {
431 "input_tokens": usage.input_tokens,
432 "output_tokens": usage.output_tokens,
433 "cache_read_tokens": usage.cache_read_tokens,
434 "cache_write_tokens": usage.cache_write_tokens,
435 }
436 });
437 println!("{obj}");
438 }
439 }
440 AgentEvent::Error(msg) => {
441 if opts.format == OutputFormat::Text {
442 eprintln!("[error] {msg}");
443 } else if opts.format == OutputFormat::StreamJson {
444 let obj = serde_json::json!({"type": "error", "message": msg});
445 println!("{obj}");
446 }
447 }
448 AgentEvent::Compacting => {
449 if opts.format == OutputFormat::StreamJson {
450 let obj = serde_json::json!({"type": "compacting"});
451 println!("{obj}");
452 } else if opts.format == OutputFormat::Text {
453 eprintln!("[compacting conversation...]");
454 }
455 }
456 AgentEvent::Compacted { messages_removed } => {
457 if opts.format == OutputFormat::StreamJson {
458 let obj =
459 serde_json::json!({"type": "compacted", "messages_removed": messages_removed});
460 println!("{obj}");
461 }
462 }
463 AgentEvent::SubagentStart {
464 id,
465 description,
466 background,
467 } => {
468 if opts.format == OutputFormat::StreamJson {
469 let obj = serde_json::json!({"type": "subagent_start", "id": id, "description": description, "background": background});
470 println!("{obj}");
471 } else if opts.format == OutputFormat::Text {
472 eprintln!("[subagent] {description} ({id})");
473 }
474 }
475 AgentEvent::SubagentDelta { id, text } => {
476 if opts.format == OutputFormat::StreamJson {
477 let obj = serde_json::json!({"type": "subagent_delta", "id": id, "text": text});
478 println!("{obj}");
479 }
480 }
481 AgentEvent::SubagentToolStart {
482 id,
483 tool_name,
484 detail,
485 } => {
486 if opts.format == OutputFormat::StreamJson {
487 let obj = serde_json::json!({"type": "subagent_tool_start", "id": id, "tool_name": tool_name, "detail": detail});
488 println!("{obj}");
489 }
490 }
491 AgentEvent::SubagentToolComplete { id, tool_name } => {
492 if opts.format == OutputFormat::StreamJson {
493 let obj = serde_json::json!({"type": "subagent_tool_complete", "id": id, "tool_name": tool_name});
494 println!("{obj}");
495 }
496 }
497 AgentEvent::SubagentComplete { id, output } => {
498 if opts.format == OutputFormat::StreamJson {
499 let obj =
500 serde_json::json!({"type": "subagent_complete", "id": id, "output": output});
501 println!("{obj}");
502 }
503 }
504 AgentEvent::SubagentBackgroundDone {
505 id,
506 description,
507 output,
508 } => {
509 if opts.format == OutputFormat::StreamJson {
510 let obj = serde_json::json!({"type": "subagent_background_done", "id": id, "description": description, "output": output});
511 println!("{obj}");
512 } else if opts.format == OutputFormat::Text {
513 eprintln!("[subagent done] {description}");
514 }
515 }
516 AgentEvent::TitleGenerated(title) => {
517 if opts.format == OutputFormat::StreamJson {
518 let obj = serde_json::json!({"type": "title_generated", "title": title});
519 println!("{obj}");
520 }
521 }
522 AgentEvent::MemoryExtracted {
523 added,
524 updated,
525 deleted,
526 } => {
527 if opts.format == OutputFormat::StreamJson {
528 let obj = serde_json::json!({"type": "memory_extracted", "added": added, "updated": updated, "deleted": deleted});
529 println!("{obj}");
530 }
531 }
532 AgentEvent::ToolCallInputDelta(_) => {
533 }
535 }
536}
537
538fn truncate(s: &str, max: usize) -> &str {
539 if s.len() <= max { s } else { &s[..max] }
540}