Skip to main content

dot/
headless.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use tokio::sync::mpsc;
5
6use crate::agent::{Agent, AgentEvent, AgentProfile, TodoStatus};
7use crate::command::CommandRegistry;
8use crate::config::Config;
9use crate::db::Db;
10use crate::extension::HookRegistry;
11use crate::memory::MemoryStore;
12use crate::provider::Provider;
13use crate::tools::ToolRegistry;
14
15#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum OutputFormat {
17    Text,
18    Json,
19    StreamJson,
20}
21
22impl OutputFormat {
23    pub fn parse(s: &str) -> Self {
24        match s {
25            "json" => Self::Json,
26            "stream-json" => Self::StreamJson,
27            _ => Self::Text,
28        }
29    }
30}
31
32pub struct HeadlessOptions {
33    pub prompt: String,
34    pub format: OutputFormat,
35    pub no_tools: bool,
36    pub resume_id: Option<String>,
37    pub interactive: bool,
38    pub task_id: Option<String>,
39}
40
41struct TurnResult {
42    text: String,
43    tool_calls: Vec<serde_json::Value>,
44    session_id: String,
45    error: Option<String>,
46}
47
48#[allow(clippy::too_many_arguments)]
49pub async fn run(
50    config: Config,
51    providers: Vec<Box<dyn Provider>>,
52    db: Db,
53    memory: Option<Arc<MemoryStore>>,
54    tools: ToolRegistry,
55    profiles: Vec<AgentProfile>,
56    cwd: String,
57    skill_names: Vec<(String, String)>,
58    hooks: HookRegistry,
59    commands: CommandRegistry,
60    opts: HeadlessOptions,
61) -> Result<()> {
62    let _ = skill_names;
63    let agents_context = crate::context::AgentsContext::load(&cwd, &config.context);
64    let (bg_tx, bg_rx) = mpsc::unbounded_channel();
65    let mut agent = Agent::new(
66        providers,
67        db,
68        &config,
69        memory,
70        tools,
71        profiles,
72        cwd,
73        agents_context,
74        hooks,
75        commands,
76    )?;
77    agent.set_background_tx(bg_tx);
78
79    if let Some(ref id) = opts.resume_id {
80        let conv = agent
81            .get_session(id)
82            .with_context(|| format!("resuming session {id}"))?;
83        agent.resume_conversation(&conv)?;
84    }
85
86    // Emit session info at start for programmatic consumers
87    let session_id = agent.conversation_id().to_string();
88    if opts.format == OutputFormat::StreamJson {
89        let obj = serde_json::json!({
90            "type": "session_start",
91            "session_id": session_id,
92        });
93        println!("{obj}");
94    }
95
96    // Single-turn: send the prompt and exit
97    if !opts.interactive {
98        let result = run_turn(&mut agent, &opts.prompt, &opts, bg_rx).await?;
99        let success = result.error.is_none();
100        emit_turn_end(&result, &opts);
101
102        if let Some(ref task_id) = opts.task_id {
103            let db = crate::db::Db::open().ok();
104            if let Some(db) = db {
105                let status = if success { "completed" } else { "failed" };
106                let output = if let Some(ref e) = result.error {
107                    e.clone()
108                } else {
109                    result.text.clone()
110                };
111                let _ = db.complete_task(task_id, status, Some(&result.session_id), &output);
112            }
113        }
114
115        if !success {
116            std::process::exit(1);
117        }
118        return Ok(());
119    }
120
121    // Multi-turn interactive mode
122    // First turn uses the provided prompt (if non-empty)
123    let mut bg_rx = bg_rx;
124    if !opts.prompt.is_empty() {
125        let (result, new_bg_rx) = run_turn_multi(&mut agent, &opts.prompt, &opts, bg_rx).await?;
126        bg_rx = new_bg_rx;
127        emit_turn_end(&result, &opts);
128    }
129
130    // Read subsequent prompts from stdin line by line
131    let stdin = tokio::io::stdin();
132    let reader = tokio::io::BufReader::new(stdin);
133    use tokio::io::AsyncBufReadExt;
134    let mut lines = reader.lines();
135
136    loop {
137        // Signal readiness for next prompt
138        if opts.format == OutputFormat::StreamJson {
139            let obj = serde_json::json!({"type": "ready"});
140            println!("{obj}");
141        } else if opts.format == OutputFormat::Text {
142            eprint!("> ");
143        }
144
145        let line = match lines.next_line().await {
146            Ok(Some(line)) => line,
147            Ok(None) => break, // EOF
148            Err(e) => {
149                eprintln!("[error] reading stdin: {e}");
150                break;
151            }
152        };
153
154        let prompt = line.trim().to_string();
155        if prompt.is_empty() {
156            continue;
157        }
158        if prompt == "/quit" || prompt == "/exit" {
159            break;
160        }
161
162        let (result, new_bg_rx) = run_turn_multi(&mut agent, &prompt, &opts, bg_rx).await?;
163        bg_rx = new_bg_rx;
164        emit_turn_end(&result, &opts);
165    }
166
167    // Emit session end
168    let session_id = agent.conversation_id().to_string();
169    let title = agent.conversation_title();
170    if opts.format == OutputFormat::StreamJson {
171        let obj = serde_json::json!({
172            "type": "session_end",
173            "session_id": session_id,
174            "title": title,
175        });
176        println!("{obj}");
177    } else if opts.format == OutputFormat::Text
178        && let Some(ref t) = title
179    {
180        eprintln!("\n[session] {t} ({session_id})");
181    }
182
183    agent.cleanup_if_empty();
184    Ok(())
185}
186
187/// Run a single turn for single-shot mode (consumes bg_rx).
188async fn run_turn(
189    agent: &mut Agent,
190    prompt: &str,
191    opts: &HeadlessOptions,
192    mut bg_rx: mpsc::UnboundedReceiver<AgentEvent>,
193) -> Result<TurnResult> {
194    let session_id = agent.conversation_id().to_string();
195    let (tx, mut rx) = mpsc::unbounded_channel();
196    let future = agent.send_message(prompt, tx);
197
198    let mut text = String::new();
199    let mut tool_calls: Vec<serde_json::Value> = Vec::new();
200    let mut error: Option<String> = None;
201
202    tokio::pin!(future);
203
204    loop {
205        tokio::select! {
206            biased;
207            result = &mut future => {
208                if let Err(e) = result {
209                    error = Some(e.to_string());
210                }
211                // Drain remaining
212                while let Ok(ev) = rx.try_recv() {
213                    handle_event(&ev, opts, &mut text, &mut tool_calls);
214                }
215                while let Ok(ev) = bg_rx.try_recv() {
216                    handle_event(&ev, opts, &mut text, &mut tool_calls);
217                }
218                break;
219            }
220            Some(ev) = rx.recv() => {
221                handle_event(&ev, opts, &mut text, &mut tool_calls);
222            }
223            Some(ev) = bg_rx.recv() => {
224                handle_event(&ev, opts, &mut text, &mut tool_calls);
225            }
226        }
227    }
228
229    Ok(TurnResult {
230        text,
231        tool_calls,
232        session_id,
233        error,
234    })
235}
236
237/// Run a single turn for multi-turn mode (returns bg_rx back for reuse).
238async fn run_turn_multi(
239    agent: &mut Agent,
240    prompt: &str,
241    opts: &HeadlessOptions,
242    mut bg_rx: mpsc::UnboundedReceiver<AgentEvent>,
243) -> Result<(TurnResult, mpsc::UnboundedReceiver<AgentEvent>)> {
244    let session_id = agent.conversation_id().to_string();
245    let (tx, mut rx) = mpsc::unbounded_channel();
246    let future = agent.send_message(prompt, tx);
247
248    let mut text = String::new();
249    let mut tool_calls: Vec<serde_json::Value> = Vec::new();
250    let mut error: Option<String> = None;
251
252    tokio::pin!(future);
253
254    loop {
255        tokio::select! {
256            biased;
257            result = &mut future => {
258                if let Err(e) = result {
259                    error = Some(e.to_string());
260                }
261                while let Ok(ev) = rx.try_recv() {
262                    handle_event(&ev, opts, &mut text, &mut tool_calls);
263                }
264                while let Ok(ev) = bg_rx.try_recv() {
265                    handle_event(&ev, opts, &mut text, &mut tool_calls);
266                }
267                break;
268            }
269            Some(ev) = rx.recv() => {
270                handle_event(&ev, opts, &mut text, &mut tool_calls);
271            }
272            Some(ev) = bg_rx.recv() => {
273                handle_event(&ev, opts, &mut text, &mut tool_calls);
274            }
275        }
276    }
277
278    let result = TurnResult {
279        text,
280        tool_calls,
281        session_id,
282        error,
283    };
284    Ok((result, bg_rx))
285}
286
287fn emit_turn_end(result: &TurnResult, opts: &HeadlessOptions) {
288    let success = result.error.is_none();
289    if opts.format == OutputFormat::Json {
290        let mut output = serde_json::json!({
291            "success": success,
292            "session_id": result.session_id,
293            "text": result.text,
294            "tool_calls": result.tool_calls,
295        });
296        if let Some(ref e) = result.error {
297            output["error"] = serde_json::json!(e);
298        }
299        println!(
300            "{}",
301            serde_json::to_string_pretty(&output).unwrap_or_default()
302        );
303    } else if opts.format == OutputFormat::StreamJson {
304        let mut obj = serde_json::json!({
305            "type": "turn_complete",
306            "success": success,
307            "session_id": result.session_id,
308            "text": result.text,
309        });
310        if let Some(ref e) = result.error {
311            obj["error"] = serde_json::json!(e);
312        }
313        println!("{obj}");
314    } else if let Some(ref e) = result.error {
315        eprintln!("[error] {e}");
316    }
317}
318
319fn handle_event(
320    ev: &AgentEvent,
321    opts: &HeadlessOptions,
322    final_text: &mut String,
323    tool_outputs: &mut Vec<serde_json::Value>,
324) {
325    match ev {
326        AgentEvent::TextDelta(text) => {
327            if opts.format == OutputFormat::Text {
328                eprint!("{text}");
329            } else if opts.format == OutputFormat::StreamJson {
330                let obj = serde_json::json!({"type": "text_delta", "text": text});
331                println!("{obj}");
332            }
333        }
334        AgentEvent::TextComplete(text) => {
335            *final_text = text.clone();
336            if opts.format == OutputFormat::Text {
337                eprintln!();
338                println!("{text}");
339            } else if opts.format == OutputFormat::StreamJson {
340                let obj = serde_json::json!({"type": "text_complete", "text": text});
341                println!("{obj}");
342            }
343        }
344        AgentEvent::ThinkingDelta(text) => {
345            if opts.format == OutputFormat::StreamJson {
346                let obj = serde_json::json!({"type": "thinking_delta", "text": text});
347                println!("{obj}");
348            }
349        }
350        AgentEvent::ToolCallStart { id, name } => {
351            if !opts.no_tools {
352                if opts.format == OutputFormat::Text {
353                    eprintln!("[tool] {name} ({id})");
354                } else if opts.format == OutputFormat::StreamJson {
355                    let obj = serde_json::json!({"type": "tool_start", "id": id, "name": name});
356                    println!("{obj}");
357                }
358            }
359        }
360        AgentEvent::ToolCallExecuting { id, name, input } => {
361            if !opts.no_tools && opts.format == OutputFormat::StreamJson {
362                let obj = serde_json::json!({"type": "tool_executing", "id": id, "name": name, "input": input});
363                println!("{obj}");
364            }
365        }
366        AgentEvent::ToolCallResult {
367            id,
368            name,
369            output,
370            is_error,
371        } => {
372            if !opts.no_tools {
373                if opts.format == OutputFormat::Text {
374                    let prefix = if *is_error { "[error]" } else { "[result]" };
375                    eprintln!("{prefix} {name}: {}", truncate(output, 500));
376                } else if opts.format == OutputFormat::StreamJson {
377                    let obj = serde_json::json!({
378                        "type": "tool_result",
379                        "id": id,
380                        "name": name,
381                        "output": output,
382                        "is_error": is_error,
383                    });
384                    println!("{obj}");
385                }
386            }
387            tool_outputs.push(serde_json::json!({
388                "id": id,
389                "name": name,
390                "output": output,
391                "is_error": is_error,
392            }));
393        }
394        AgentEvent::Question {
395            id,
396            question,
397            options,
398            responder: _,
399        } => {
400            // In headless mode, questions get auto-answered.
401            // The responder is consumed by the agent loop — we emit the event for observability.
402            if opts.format == OutputFormat::Text {
403                eprintln!("[question] {question}");
404                if !options.is_empty() {
405                    for (i, opt) in options.iter().enumerate() {
406                        eprintln!("  {}: {opt}", i + 1);
407                    }
408                }
409            } else if opts.format == OutputFormat::StreamJson {
410                let obj = serde_json::json!({
411                    "type": "question",
412                    "id": id,
413                    "question": question,
414                    "options": options,
415                });
416                println!("{obj}");
417            }
418        }
419        AgentEvent::PermissionRequest {
420            tool_name,
421            input_summary,
422            responder: _,
423        } => {
424            if opts.format == OutputFormat::Text {
425                eprintln!("[permission] {tool_name}: {input_summary}");
426            } else if opts.format == OutputFormat::StreamJson {
427                let obj = serde_json::json!({
428                    "type": "permission_request",
429                    "tool_name": tool_name,
430                    "input_summary": input_summary,
431                });
432                println!("{obj}");
433            }
434        }
435        AgentEvent::TodoUpdate(items) => {
436            if opts.format == OutputFormat::StreamJson {
437                let todos: Vec<serde_json::Value> = items
438                    .iter()
439                    .map(|t| {
440                        serde_json::json!({
441                            "content": t.content,
442                            "status": match t.status {
443                                TodoStatus::Pending => "pending",
444                                TodoStatus::InProgress => "in_progress",
445                                TodoStatus::Completed => "completed",
446                            }
447                        })
448                    })
449                    .collect();
450                let obj = serde_json::json!({"type": "todo_update", "todos": todos});
451                println!("{obj}");
452            } else if opts.format == OutputFormat::Text {
453                eprintln!("[todos]");
454                for t in items {
455                    let icon = match t.status {
456                        TodoStatus::Pending => "○",
457                        TodoStatus::InProgress => "◑",
458                        TodoStatus::Completed => "●",
459                    };
460                    eprintln!("  {icon} {}", t.content);
461                }
462            }
463        }
464        AgentEvent::Done { usage } => {
465            if opts.format == OutputFormat::StreamJson {
466                let obj = serde_json::json!({
467                    "type": "done",
468                    "usage": {
469                        "input_tokens": usage.input_tokens,
470                        "output_tokens": usage.output_tokens,
471                        "cache_read_tokens": usage.cache_read_tokens,
472                        "cache_write_tokens": usage.cache_write_tokens,
473                    }
474                });
475                println!("{obj}");
476            }
477        }
478        AgentEvent::Error(msg) => {
479            if opts.format == OutputFormat::Text {
480                eprintln!("[error] {msg}");
481            } else if opts.format == OutputFormat::StreamJson {
482                let obj = serde_json::json!({"type": "error", "message": msg});
483                println!("{obj}");
484            }
485        }
486        AgentEvent::Compacting => {
487            if opts.format == OutputFormat::StreamJson {
488                let obj = serde_json::json!({"type": "compacting"});
489                println!("{obj}");
490            } else if opts.format == OutputFormat::Text {
491                eprintln!("[compacting conversation...]");
492            }
493        }
494        AgentEvent::Compacted { messages_removed } => {
495            if opts.format == OutputFormat::StreamJson {
496                let obj =
497                    serde_json::json!({"type": "compacted", "messages_removed": messages_removed});
498                println!("{obj}");
499            }
500        }
501        AgentEvent::SubagentStart {
502            id,
503            description,
504            background,
505        } => {
506            if opts.format == OutputFormat::StreamJson {
507                let obj = serde_json::json!({"type": "subagent_start", "id": id, "description": description, "background": background});
508                println!("{obj}");
509            } else if opts.format == OutputFormat::Text {
510                eprintln!("[subagent] {description} ({id})");
511            }
512        }
513        AgentEvent::SubagentDelta { id, text } => {
514            if opts.format == OutputFormat::StreamJson {
515                let obj = serde_json::json!({"type": "subagent_delta", "id": id, "text": text});
516                println!("{obj}");
517            }
518        }
519        AgentEvent::SubagentToolStart {
520            id,
521            tool_name,
522            detail,
523        } => {
524            if opts.format == OutputFormat::StreamJson {
525                let obj = serde_json::json!({"type": "subagent_tool_start", "id": id, "tool_name": tool_name, "detail": detail});
526                println!("{obj}");
527            }
528        }
529        AgentEvent::SubagentToolComplete { id, tool_name } => {
530            if opts.format == OutputFormat::StreamJson {
531                let obj = serde_json::json!({"type": "subagent_tool_complete", "id": id, "tool_name": tool_name});
532                println!("{obj}");
533            }
534        }
535        AgentEvent::SubagentComplete { id, output } => {
536            if opts.format == OutputFormat::StreamJson {
537                let obj =
538                    serde_json::json!({"type": "subagent_complete", "id": id, "output": output});
539                println!("{obj}");
540            }
541        }
542        AgentEvent::SubagentBackgroundDone {
543            id,
544            description,
545            output,
546        } => {
547            if opts.format == OutputFormat::StreamJson {
548                let obj = serde_json::json!({"type": "subagent_background_done", "id": id, "description": description, "output": output});
549                println!("{obj}");
550            } else if opts.format == OutputFormat::Text {
551                eprintln!("[subagent done] {description}");
552            }
553        }
554        AgentEvent::TitleGenerated(title) => {
555            if opts.format == OutputFormat::StreamJson {
556                let obj = serde_json::json!({"type": "title_generated", "title": title});
557                println!("{obj}");
558            }
559        }
560        AgentEvent::MemoryExtracted {
561            added,
562            updated,
563            deleted,
564        } => {
565            if opts.format == OutputFormat::StreamJson {
566                let obj = serde_json::json!({"type": "memory_extracted", "added": added, "updated": updated, "deleted": deleted});
567                println!("{obj}");
568            }
569        }
570        AgentEvent::ToolCallInputDelta(_) => {
571            // Not useful in headless — tool input is streamed to the model, not the user
572        }
573        AgentEvent::AsideDelta(_) | AgentEvent::AsideDone | AgentEvent::AsideError(_) => {
574            // Aside is TUI-only
575        }
576    }
577}
578
579fn truncate(s: &str, max: usize) -> &str {
580    if s.len() <= max { s } else { &s[..max] }
581}