use std::path::PathBuf;
use std::time::{Duration, Instant};
use actix_web::web;
use bamboo_agent_core::storage::Storage as _;
use bamboo_agent_core::{Message, Role, Session, SessionKind};
use bamboo_server::app_state::AppState;
use bamboo_server::handlers::agent::execute::{handler as execute_handler, ExecuteRequest};
pub struct HeadlessArgs {
pub prompt: String,
pub session: Option<String>,
pub model: Option<String>,
pub workspace: Option<PathBuf>,
pub data_dir: PathBuf,
pub stream_json: bool,
}
pub async fn run(args: HeadlessArgs) -> Result<(), String> {
let state = AppState::new(args.data_dir.clone())
.await
.map_err(|e| format!("boot app state: {e}"))?;
let session_id = match &args.session {
Some(id) => {
let existing = state
.storage
.load_session(id)
.await
.map_err(|e| format!("load session {id}: {e}"))?
.ok_or_else(|| format!("session '{id}' not found"))?;
if existing.kind != SessionKind::Root {
return Err(format!("session '{id}' is not a root session"));
}
id.clone()
}
None => {
let mut title: String = args.prompt.chars().take(48).collect();
if title.len() < args.prompt.len() {
title.push('…');
}
let mut session = Session::new(uuid::Uuid::new_v4().to_string(), String::new());
session.title = title;
session.workspace = args
.workspace
.clone()
.or_else(|| std::env::current_dir().ok())
.map(|w| w.to_string_lossy().into_owned());
let id = session.id.clone();
state
.storage
.save_session(&session)
.await
.map_err(|e| format!("save session: {e}"))?;
state
.session_store
.save_session(&session)
.await
.map_err(|e| format!("index session: {e}"))?;
id
}
};
{
let mut session = state
.storage
.load_session(&session_id)
.await
.map_err(|e| format!("load session: {e}"))?
.ok_or_else(|| "session vanished".to_string())?;
session.add_message(Message::user(args.prompt.clone()));
state
.storage
.save_session(&session)
.await
.map_err(|e| format!("save session: {e}"))?;
let _ = state.session_store.save_session(&session).await;
}
let sender = state.get_session_event_sender(&session_id).await;
let mut events = sender.subscribe();
let model_ref = parse_model_ref(&args.model)?;
if args.stream_json {
println!(
"{}",
serde_json::json!({
"type": "session_started",
"session_id": session_id,
"resumed": args.session.is_some(),
})
);
} else {
eprintln!("▶ session {session_id}");
}
let data = web::Data::new(state);
let response = execute_handler(
data.clone(),
web::Path::from(session_id.clone()),
web::Json(ExecuteRequest {
model: None,
provider: None,
model_ref,
skill_mode: None,
reasoning_effort: None,
client_sync: None,
}),
)
.await;
if !response.status().is_success() {
return Err(format!("execute rejected: HTTP {}", response.status()));
}
let mut exit: Result<(), String> = Ok(());
let mut streamed_tokens = false;
let mut saw_terminal = false;
let mut last_event = Instant::now();
let mut poll = tokio::time::interval(Duration::from_millis(400));
let started = Instant::now();
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
eprintln!("\n⏹ cancelling…");
if let Some(runner) = data.agent_runners.read().await.get(&session_id) {
runner.cancel_token.cancel();
}
}
ev = events.recv() => {
match ev {
Ok(event) => {
last_event = Instant::now();
if let Ok(value) = serde_json::to_value(&event) {
if args.stream_json {
println!("{value}");
} else {
print_server_event(&value, &mut streamed_tokens);
}
match value["type"].as_str().unwrap_or("") {
"complete" | "cancelled" => saw_terminal = true,
"error" => {
saw_terminal = true;
exit = Err(value["message"]
.as_str()
.unwrap_or("agent errored")
.to_string());
}
_ => {}
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
_ = poll.tick() => {
if started.elapsed() < Duration::from_secs(2) {
continue;
}
if saw_terminal
&& last_event.elapsed() > Duration::from_millis(1200)
&& tree_quiescent(&data, &session_id).await
{
break;
}
}
}
}
let final_reply = data
.storage
.load_session(&session_id)
.await
.ok()
.flatten()
.and_then(|session| {
session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::Assistant))
.map(|m| m.content.clone())
});
if args.stream_json {
println!(
"{}",
serde_json::json!({
"type": "result",
"session_id": session_id,
"status": if exit.is_ok() { "finished" } else { "error" },
"result": final_reply,
"error": exit.as_ref().err(),
})
);
} else {
println!();
if !streamed_tokens {
if let Some(reply) = &final_reply {
println!("{reply}");
}
}
match &exit {
Ok(()) => eprintln!("✔ finished"),
Err(e) => eprintln!("✘ {e}"),
}
eprintln!("session: {session_id}");
eprintln!("continue with: bamboo -p \"<next message>\" -s {session_id}");
}
exit
}
async fn tree_quiescent(state: &AppState, session_id: &str) -> bool {
use bamboo_server::app_state::AgentStatus;
{
let runners = state.agent_runners.read().await;
let busy = runners
.values()
.any(|r| matches!(r.status, AgentStatus::Running | AgentStatus::Pending));
if busy {
return false;
}
}
match state.storage.load_session(session_id).await {
Ok(Some(session)) => session
.agent_runtime_state
.as_ref()
.map(|s| s.waiting_for_children.is_none())
.unwrap_or(true),
_ => true,
}
}
fn parse_model_ref(
model: &Option<String>,
) -> Result<Option<bamboo_domain::ProviderModelRef>, String> {
let Some(spec) = model else { return Ok(None) };
let spec = spec.trim();
let Some((p, m)) = spec.split_once(':') else {
return Err(format!(
"-m '{spec}' must be 'provider:model' in server mode (see config defaults otherwise)"
));
};
if p.trim().is_empty() || m.trim().is_empty() {
return Err(format!("-m '{spec}' must be provider:model"));
}
Ok(Some(bamboo_domain::ProviderModelRef::new(
p.trim(),
m.trim(),
)))
}
fn print_server_event(value: &serde_json::Value, streamed_tokens: &mut bool) {
use std::io::Write;
match value["type"].as_str().unwrap_or("") {
"token" => {
*streamed_tokens = true;
print!("{}", value["content"].as_str().unwrap_or(""));
let _ = std::io::stdout().flush();
}
"tool_start" => eprintln!("\n⚙ {}", value["tool_name"].as_str().unwrap_or("tool")),
"tool_complete" => eprintln!("✔ tool done"),
"tool_error" => eprintln!("✘ tool error: {}", value["error"].as_str().unwrap_or("")),
"sub_agent_started" => {
eprintln!(
"\n┌ actor {} started",
value["child_session_id"].as_str().unwrap_or("?")
);
}
"sub_agent_completed" => {
eprintln!(
"└ actor {} {}",
value["child_session_id"].as_str().unwrap_or("?"),
value["status"].as_str().unwrap_or("done")
);
}
"sub_agent_event" => {
let inner = &value["event"];
match inner["type"].as_str().unwrap_or("") {
"token" => {
print!("{}", inner["content"].as_str().unwrap_or(""));
let _ = std::io::stdout().flush();
}
"tool_start" => {
eprintln!("\n│ ⚙ {}", inner["tool_name"].as_str().unwrap_or("tool"))
}
"tool_complete" => eprintln!("│ ✔ tool done"),
_ => {}
}
}
"error" => eprintln!("\n✘ {}", value["message"].as_str().unwrap_or("")),
_ => {}
}
}