Skip to main content

dot/
headless.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use tokio::sync::mpsc;
5
6use crate::agent::{Agent, AgentEvent, AgentProfile, TodoStatus};
7use crate::command::CommandRegistry;
8use crate::config::Config;
9use crate::db::Db;
10use crate::extension::HookRegistry;
11use crate::memory::MemoryStore;
12use crate::provider::Provider;
13use crate::tools::ToolRegistry;
14
15#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum OutputFormat {
17    Text,
18    Json,
19    StreamJson,
20}
21
22impl OutputFormat {
23    pub fn parse(s: &str) -> Self {
24        match s {
25            "json" => Self::Json,
26            "stream-json" => Self::StreamJson,
27            _ => Self::Text,
28        }
29    }
30}
31
32pub struct HeadlessOptions {
33    pub prompt: String,
34    pub format: OutputFormat,
35    pub no_tools: bool,
36    pub resume_id: Option<String>,
37    pub interactive: bool,
38}
39
40struct TurnResult {
41    text: String,
42    tool_calls: Vec<serde_json::Value>,
43    session_id: String,
44}
45
46#[allow(clippy::too_many_arguments)]
47pub async fn run(
48    config: Config,
49    providers: Vec<Box<dyn Provider>>,
50    db: Db,
51    memory: Option<Arc<MemoryStore>>,
52    tools: ToolRegistry,
53    profiles: Vec<AgentProfile>,
54    cwd: String,
55    skill_names: Vec<(String, String)>,
56    hooks: HookRegistry,
57    commands: CommandRegistry,
58    opts: HeadlessOptions,
59) -> Result<()> {
60    let _ = skill_names;
61    let agents_context = crate::context::AgentsContext::load(&cwd, &config.context);
62    let (bg_tx, bg_rx) = mpsc::unbounded_channel();
63    let mut agent = Agent::new(
64        providers,
65        db,
66        &config,
67        memory,
68        tools,
69        profiles,
70        cwd,
71        agents_context,
72        hooks,
73        commands,
74    )?;
75    agent.set_background_tx(bg_tx);
76
77    if let Some(ref id) = opts.resume_id {
78        let conv = agent
79            .get_session(id)
80            .with_context(|| format!("resuming session {id}"))?;
81        agent.resume_conversation(&conv)?;
82    }
83
84    // Emit session info at start for programmatic consumers
85    let session_id = agent.conversation_id().to_string();
86    if opts.format == OutputFormat::StreamJson {
87        let obj = serde_json::json!({
88            "type": "session_start",
89            "session_id": session_id,
90        });
91        println!("{obj}");
92    }
93
94    // Single-turn: send the prompt and exit
95    if !opts.interactive {
96        let result = run_turn(&mut agent, &opts.prompt, &opts, bg_rx).await?;
97        emit_turn_end(&result, &opts);
98        return Ok(());
99    }
100
101    // Multi-turn interactive mode
102    // First turn uses the provided prompt (if non-empty)
103    let mut bg_rx = bg_rx;
104    if !opts.prompt.is_empty() {
105        let (result, new_bg_rx) = run_turn_multi(&mut agent, &opts.prompt, &opts, bg_rx).await?;
106        bg_rx = new_bg_rx;
107        emit_turn_end(&result, &opts);
108    }
109
110    // Read subsequent prompts from stdin line by line
111    let stdin = tokio::io::stdin();
112    let reader = tokio::io::BufReader::new(stdin);
113    use tokio::io::AsyncBufReadExt;
114    let mut lines = reader.lines();
115
116    loop {
117        // Signal readiness for next prompt
118        if opts.format == OutputFormat::StreamJson {
119            let obj = serde_json::json!({"type": "ready"});
120            println!("{obj}");
121        } else if opts.format == OutputFormat::Text {
122            eprint!("> ");
123        }
124
125        let line = match lines.next_line().await {
126            Ok(Some(line)) => line,
127            Ok(None) => break, // EOF
128            Err(e) => {
129                eprintln!("[error] reading stdin: {e}");
130                break;
131            }
132        };
133
134        let prompt = line.trim().to_string();
135        if prompt.is_empty() {
136            continue;
137        }
138        if prompt == "/quit" || prompt == "/exit" {
139            break;
140        }
141
142        let (result, new_bg_rx) = run_turn_multi(&mut agent, &prompt, &opts, bg_rx).await?;
143        bg_rx = new_bg_rx;
144        emit_turn_end(&result, &opts);
145    }
146
147    // Emit session end
148    let session_id = agent.conversation_id().to_string();
149    let title = agent.conversation_title();
150    if opts.format == OutputFormat::StreamJson {
151        let obj = serde_json::json!({
152            "type": "session_end",
153            "session_id": session_id,
154            "title": title,
155        });
156        println!("{obj}");
157    } else if opts.format == OutputFormat::Text
158        && let Some(ref t) = title
159    {
160        eprintln!("\n[session] {t} ({session_id})");
161    }
162
163    agent.cleanup_if_empty();
164    Ok(())
165}
166
167/// Run a single turn for single-shot mode (consumes bg_rx).
168async fn run_turn(
169    agent: &mut Agent,
170    prompt: &str,
171    opts: &HeadlessOptions,
172    mut bg_rx: mpsc::UnboundedReceiver<AgentEvent>,
173) -> Result<TurnResult> {
174    let session_id = agent.conversation_id().to_string();
175    let (tx, mut rx) = mpsc::unbounded_channel();
176    let future = agent.send_message(prompt, tx);
177
178    let mut text = String::new();
179    let mut tool_calls: Vec<serde_json::Value> = Vec::new();
180
181    tokio::pin!(future);
182
183    loop {
184        tokio::select! {
185            biased;
186            result = &mut future => {
187                result.context("agent send_message failed")?;
188                // Drain remaining
189                while let Ok(ev) = rx.try_recv() {
190                    handle_event(&ev, opts, &mut text, &mut tool_calls);
191                }
192                while let Ok(ev) = bg_rx.try_recv() {
193                    handle_event(&ev, opts, &mut text, &mut tool_calls);
194                }
195                break;
196            }
197            Some(ev) = rx.recv() => {
198                handle_event(&ev, opts, &mut text, &mut tool_calls);
199            }
200            Some(ev) = bg_rx.recv() => {
201                handle_event(&ev, opts, &mut text, &mut tool_calls);
202            }
203        }
204    }
205
206    Ok(TurnResult {
207        text,
208        tool_calls,
209        session_id,
210    })
211}
212
213/// Run a single turn for multi-turn mode (returns bg_rx back for reuse).
214async fn run_turn_multi(
215    agent: &mut Agent,
216    prompt: &str,
217    opts: &HeadlessOptions,
218    mut bg_rx: mpsc::UnboundedReceiver<AgentEvent>,
219) -> Result<(TurnResult, mpsc::UnboundedReceiver<AgentEvent>)> {
220    let session_id = agent.conversation_id().to_string();
221    let (tx, mut rx) = mpsc::unbounded_channel();
222    let future = agent.send_message(prompt, tx);
223
224    let mut text = String::new();
225    let mut tool_calls: Vec<serde_json::Value> = Vec::new();
226
227    tokio::pin!(future);
228
229    loop {
230        tokio::select! {
231            biased;
232            result = &mut future => {
233                result.context("agent send_message failed")?;
234                while let Ok(ev) = rx.try_recv() {
235                    handle_event(&ev, opts, &mut text, &mut tool_calls);
236                }
237                while let Ok(ev) = bg_rx.try_recv() {
238                    handle_event(&ev, opts, &mut text, &mut tool_calls);
239                }
240                break;
241            }
242            Some(ev) = rx.recv() => {
243                handle_event(&ev, opts, &mut text, &mut tool_calls);
244            }
245            Some(ev) = bg_rx.recv() => {
246                handle_event(&ev, opts, &mut text, &mut tool_calls);
247            }
248        }
249    }
250
251    let result = TurnResult {
252        text,
253        tool_calls,
254        session_id,
255    };
256    Ok((result, bg_rx))
257}
258
259fn emit_turn_end(result: &TurnResult, opts: &HeadlessOptions) {
260    if opts.format == OutputFormat::Json {
261        let output = serde_json::json!({
262            "session_id": result.session_id,
263            "text": result.text,
264            "tool_calls": result.tool_calls,
265        });
266        println!(
267            "{}",
268            serde_json::to_string_pretty(&output).unwrap_or_default()
269        );
270    } else if opts.format == OutputFormat::StreamJson {
271        let obj = serde_json::json!({
272            "type": "turn_complete",
273            "session_id": result.session_id,
274            "text": result.text,
275        });
276        println!("{obj}");
277    }
278    // Text mode: final text already printed via TextComplete handler
279}
280
281fn handle_event(
282    ev: &AgentEvent,
283    opts: &HeadlessOptions,
284    final_text: &mut String,
285    tool_outputs: &mut Vec<serde_json::Value>,
286) {
287    match ev {
288        AgentEvent::TextDelta(text) => {
289            if opts.format == OutputFormat::Text {
290                eprint!("{text}");
291            } else if opts.format == OutputFormat::StreamJson {
292                let obj = serde_json::json!({"type": "text_delta", "text": text});
293                println!("{obj}");
294            }
295        }
296        AgentEvent::TextComplete(text) => {
297            *final_text = text.clone();
298            if opts.format == OutputFormat::Text {
299                eprintln!();
300                println!("{text}");
301            } else if opts.format == OutputFormat::StreamJson {
302                let obj = serde_json::json!({"type": "text_complete", "text": text});
303                println!("{obj}");
304            }
305        }
306        AgentEvent::ThinkingDelta(text) => {
307            if opts.format == OutputFormat::StreamJson {
308                let obj = serde_json::json!({"type": "thinking_delta", "text": text});
309                println!("{obj}");
310            }
311        }
312        AgentEvent::ToolCallStart { id, name } => {
313            if !opts.no_tools {
314                if opts.format == OutputFormat::Text {
315                    eprintln!("[tool] {name} ({id})");
316                } else if opts.format == OutputFormat::StreamJson {
317                    let obj = serde_json::json!({"type": "tool_start", "id": id, "name": name});
318                    println!("{obj}");
319                }
320            }
321        }
322        AgentEvent::ToolCallExecuting { id, name, input } => {
323            if !opts.no_tools && opts.format == OutputFormat::StreamJson {
324                let obj = serde_json::json!({"type": "tool_executing", "id": id, "name": name, "input": input});
325                println!("{obj}");
326            }
327        }
328        AgentEvent::ToolCallResult {
329            id,
330            name,
331            output,
332            is_error,
333        } => {
334            if !opts.no_tools {
335                if opts.format == OutputFormat::Text {
336                    let prefix = if *is_error { "[error]" } else { "[result]" };
337                    eprintln!("{prefix} {name}: {}", truncate(output, 500));
338                } else if opts.format == OutputFormat::StreamJson {
339                    let obj = serde_json::json!({
340                        "type": "tool_result",
341                        "id": id,
342                        "name": name,
343                        "output": output,
344                        "is_error": is_error,
345                    });
346                    println!("{obj}");
347                }
348            }
349            tool_outputs.push(serde_json::json!({
350                "id": id,
351                "name": name,
352                "output": output,
353                "is_error": is_error,
354            }));
355        }
356        AgentEvent::Question {
357            id,
358            question,
359            options,
360            responder: _,
361        } => {
362            // In headless mode, questions get auto-answered.
363            // The responder is consumed by the agent loop — we emit the event for observability.
364            if opts.format == OutputFormat::Text {
365                eprintln!("[question] {question}");
366                if !options.is_empty() {
367                    for (i, opt) in options.iter().enumerate() {
368                        eprintln!("  {}: {opt}", i + 1);
369                    }
370                }
371            } else if opts.format == OutputFormat::StreamJson {
372                let obj = serde_json::json!({
373                    "type": "question",
374                    "id": id,
375                    "question": question,
376                    "options": options,
377                });
378                println!("{obj}");
379            }
380        }
381        AgentEvent::PermissionRequest {
382            tool_name,
383            input_summary,
384            responder: _,
385        } => {
386            if opts.format == OutputFormat::Text {
387                eprintln!("[permission] {tool_name}: {input_summary}");
388            } else if opts.format == OutputFormat::StreamJson {
389                let obj = serde_json::json!({
390                    "type": "permission_request",
391                    "tool_name": tool_name,
392                    "input_summary": input_summary,
393                });
394                println!("{obj}");
395            }
396        }
397        AgentEvent::TodoUpdate(items) => {
398            if opts.format == OutputFormat::StreamJson {
399                let todos: Vec<serde_json::Value> = items
400                    .iter()
401                    .map(|t| {
402                        serde_json::json!({
403                            "content": t.content,
404                            "status": match t.status {
405                                TodoStatus::Pending => "pending",
406                                TodoStatus::InProgress => "in_progress",
407                                TodoStatus::Completed => "completed",
408                            }
409                        })
410                    })
411                    .collect();
412                let obj = serde_json::json!({"type": "todo_update", "todos": todos});
413                println!("{obj}");
414            } else if opts.format == OutputFormat::Text {
415                eprintln!("[todos]");
416                for t in items {
417                    let icon = match t.status {
418                        TodoStatus::Pending => "○",
419                        TodoStatus::InProgress => "◑",
420                        TodoStatus::Completed => "●",
421                    };
422                    eprintln!("  {icon} {}", t.content);
423                }
424            }
425        }
426        AgentEvent::Done { usage } => {
427            if opts.format == OutputFormat::StreamJson {
428                let obj = serde_json::json!({
429                    "type": "done",
430                    "usage": {
431                        "input_tokens": usage.input_tokens,
432                        "output_tokens": usage.output_tokens,
433                        "cache_read_tokens": usage.cache_read_tokens,
434                        "cache_write_tokens": usage.cache_write_tokens,
435                    }
436                });
437                println!("{obj}");
438            }
439        }
440        AgentEvent::Error(msg) => {
441            if opts.format == OutputFormat::Text {
442                eprintln!("[error] {msg}");
443            } else if opts.format == OutputFormat::StreamJson {
444                let obj = serde_json::json!({"type": "error", "message": msg});
445                println!("{obj}");
446            }
447        }
448        AgentEvent::Compacting => {
449            if opts.format == OutputFormat::StreamJson {
450                let obj = serde_json::json!({"type": "compacting"});
451                println!("{obj}");
452            } else if opts.format == OutputFormat::Text {
453                eprintln!("[compacting conversation...]");
454            }
455        }
456        AgentEvent::Compacted { messages_removed } => {
457            if opts.format == OutputFormat::StreamJson {
458                let obj =
459                    serde_json::json!({"type": "compacted", "messages_removed": messages_removed});
460                println!("{obj}");
461            }
462        }
463        AgentEvent::SubagentStart {
464            id,
465            description,
466            background,
467        } => {
468            if opts.format == OutputFormat::StreamJson {
469                let obj = serde_json::json!({"type": "subagent_start", "id": id, "description": description, "background": background});
470                println!("{obj}");
471            } else if opts.format == OutputFormat::Text {
472                eprintln!("[subagent] {description} ({id})");
473            }
474        }
475        AgentEvent::SubagentDelta { id, text } => {
476            if opts.format == OutputFormat::StreamJson {
477                let obj = serde_json::json!({"type": "subagent_delta", "id": id, "text": text});
478                println!("{obj}");
479            }
480        }
481        AgentEvent::SubagentToolStart {
482            id,
483            tool_name,
484            detail,
485        } => {
486            if opts.format == OutputFormat::StreamJson {
487                let obj = serde_json::json!({"type": "subagent_tool_start", "id": id, "tool_name": tool_name, "detail": detail});
488                println!("{obj}");
489            }
490        }
491        AgentEvent::SubagentToolComplete { id, tool_name } => {
492            if opts.format == OutputFormat::StreamJson {
493                let obj = serde_json::json!({"type": "subagent_tool_complete", "id": id, "tool_name": tool_name});
494                println!("{obj}");
495            }
496        }
497        AgentEvent::SubagentComplete { id, output } => {
498            if opts.format == OutputFormat::StreamJson {
499                let obj =
500                    serde_json::json!({"type": "subagent_complete", "id": id, "output": output});
501                println!("{obj}");
502            }
503        }
504        AgentEvent::SubagentBackgroundDone {
505            id,
506            description,
507            output,
508        } => {
509            if opts.format == OutputFormat::StreamJson {
510                let obj = serde_json::json!({"type": "subagent_background_done", "id": id, "description": description, "output": output});
511                println!("{obj}");
512            } else if opts.format == OutputFormat::Text {
513                eprintln!("[subagent done] {description}");
514            }
515        }
516        AgentEvent::TitleGenerated(title) => {
517            if opts.format == OutputFormat::StreamJson {
518                let obj = serde_json::json!({"type": "title_generated", "title": title});
519                println!("{obj}");
520            }
521        }
522        AgentEvent::MemoryExtracted {
523            added,
524            updated,
525            deleted,
526        } => {
527            if opts.format == OutputFormat::StreamJson {
528                let obj = serde_json::json!({"type": "memory_extracted", "added": added, "updated": updated, "deleted": deleted});
529                println!("{obj}");
530            }
531        }
532        AgentEvent::ToolCallInputDelta(_) => {
533            // Not useful in headless — tool input is streamed to the model, not the user
534        }
535    }
536}
537
538fn truncate(s: &str, max: usize) -> &str {
539    if s.len() <= max { s } else { &s[..max] }
540}