use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;
use chrono::{Duration as ChronoDuration, Utc};
use bamboo_llm::Config;
use bamboo_subagent::discovery::Fabric;
use bamboo_subagent::executor::{ChildExecutor, EchoExecutor};
use bamboo_subagent::fleet::spawn_worker;
use bamboo_subagent::proto::{AgentRecord, ChildFrame, ParentFrame, RunSpec, TerminalStatus};
use bamboo_subagent::provision::{
ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
};
use bamboo_subagent::transport::{ChildClient, WsServer};
use crate::subagent_worker::BambooRuntimeExecutor;
pub fn default_fabric_dir() -> PathBuf {
std::env::temp_dir().join("bamboo-subagents")
}
pub struct ActorRunArgs {
pub prompt: String,
pub model: Option<String>,
pub role: String,
pub workspace: Option<PathBuf>,
pub data_dir: Option<PathBuf>,
pub echo: bool,
pub raw: bool,
}
pub struct ActorServeArgs {
pub role: String,
pub id: Option<String>,
pub model: Option<String>,
pub workspace: Option<PathBuf>,
pub data_dir: Option<PathBuf>,
pub echo: bool,
pub bind: Option<SocketAddr>,
pub tls: bool,
pub cert_file: Option<PathBuf>,
pub key_file: Option<PathBuf>,
pub token: Option<String>,
}
pub struct ActorCallArgs {
pub agent: String,
pub prompt: String,
pub raw: bool,
}
pub async fn run(args: ActorRunArgs) -> Result<(), String> {
let child_id = format!("cli-{}", uuid::Uuid::new_v4());
let spec = prepare_spec(
&child_id,
&args.role,
&args.model,
&args.workspace,
&args.data_dir,
args.echo,
)?;
let worker_bin =
std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?;
eprintln!(
"▶ spawning actor {child_id} (model: {}, executor: {})",
describe_model(&spec),
if args.echo { "echo" } else { "bamboo_runtime" },
);
let spawned = spawn_worker(
&worker_bin,
&["subagent-worker".to_string()],
&spec,
Duration::from_secs(30),
)
.await
.map_err(|e| format!("spawn/register failed: {e}"))?;
eprintln!(
"✔ actor registered (pid {}, endpoint {})",
spawned.record.pid, spawned.record.endpoint
);
let exit = connect_and_stream(&spawned.record.endpoint, &args.prompt, args.raw).await;
spawned.kill().await;
exit
}
pub async fn serve(args: ActorServeArgs) -> Result<(), String> {
let agent_id = args
.id
.clone()
.unwrap_or_else(|| format!("{}-{}", args.role, &uuid::Uuid::new_v4().to_string()[..8]));
let spec = prepare_spec(
&agent_id,
&args.role,
&args.model,
&args.workspace,
&args.data_dir,
args.echo,
)?;
let executor: std::sync::Arc<dyn ChildExecutor> = if args.echo {
std::sync::Arc::new(EchoExecutor)
} else {
std::sync::Arc::new(BambooRuntimeExecutor::build(&spec).await?)
};
let server = if args.tls {
let (cert, key) = match (&args.cert_file, &args.key_file) {
(Some(c), Some(k)) => (c, k),
_ => return Err("--tls requires both --cert-file and --key-file".to_string()),
};
let bind_addr = args
.bind
.unwrap_or_else(|| (std::net::Ipv4Addr::UNSPECIFIED, 8443).into());
WsServer::bind_tls(bind_addr, cert, key, args.token.clone())
.await
.map_err(|e| format!("bind_tls: {e}"))?
} else if let Some(bind_addr) = args.bind {
if args.token.is_some() && !bind_addr.ip().is_loopback() {
return Err(format!(
"refusing --token on a non-loopback plaintext bind ({bind_addr}): the token would \
be sent in cleartext. Use --tls (with --cert-file/--key-file) for a public bind."
));
}
WsServer::bind_with_token(bind_addr, args.token.clone())
.await
.map_err(|e| format!("bind: {e}"))?
} else {
WsServer::bind_loopback()
.await
.map_err(|e| format!("bind: {e}"))?
};
let endpoint = server.ws_endpoint();
let fab = std::sync::Arc::new(Fabric::at(&spec.fabric_dir));
let _ = fab.gc().await; let record = AgentRecord {
agent_id: agent_id.clone(),
role: args.role.clone(),
labels: Vec::new(),
endpoint: endpoint.clone(),
pid: std::process::id(),
version: env!("CARGO_PKG_VERSION").to_string(),
started_at: Utc::now(),
lease_expires_at: Utc::now() + ChronoDuration::seconds(60),
};
fab.publish(&record)
.await
.map_err(|e| format!("announce: {e}"))?;
let renew_fab = fab.clone();
let mut renew_record = record.clone();
let renew = tokio::spawn(async move {
let mut tick = tokio::time::interval(Duration::from_secs(20));
tick.tick().await;
loop {
tick.tick().await;
renew_record.lease_expires_at = Utc::now() + ChronoDuration::seconds(60);
if renew_fab.publish(&renew_record).await.is_err() {
break;
}
}
});
eprintln!(
"✔ service agent '{agent_id}' (role: {}) announced at {endpoint}",
args.role
);
eprintln!(" serving until Ctrl-C — call it with: bamboo actor call {agent_id} \"<task>\"");
let result = tokio::select! {
r = server.serve(executor) => r.map_err(|e| format!("serve: {e}")),
_ = tokio::signal::ctrl_c() => Ok(()),
};
renew.abort();
let _ = fab.withdraw(&agent_id).await;
eprintln!("⏹ service agent '{agent_id}' withdrawn");
result
}
pub async fn list() -> Result<(), String> {
let fab = Fabric::at(default_fabric_dir());
let _ = fab.gc().await;
let records = fab.discover().await.map_err(|e| format!("discover: {e}"))?;
if records.is_empty() {
println!(
"no live actors (fabric: {})",
default_fabric_dir().display()
);
return Ok(());
}
println!("{:<28} {:<12} {:<8} ENDPOINT", "AGENT", "ROLE", "PID");
for r in records {
println!(
"{:<28} {:<12} {:<8} {}",
r.agent_id, r.role, r.pid, r.endpoint
);
}
Ok(())
}
pub async fn call(args: ActorCallArgs) -> Result<(), String> {
let fab = Fabric::at(default_fabric_dir());
let record = match fab
.resolve(&args.agent)
.await
.map_err(|e| format!("resolve: {e}"))?
{
Some(r) => r,
None => {
fab.discover()
.await
.map_err(|e| format!("discover: {e}"))?
.into_iter()
.find(|r| r.role == args.agent)
.ok_or_else(|| {
format!(
"no live actor with id or role '{}'; see `bamboo actor list`",
args.agent
)
})?
}
};
eprintln!(
"▶ calling {} (role: {}, endpoint {})",
record.agent_id, record.role, record.endpoint
);
connect_and_stream(&record.endpoint, &args.prompt, args.raw).await
}
async fn connect_and_stream(endpoint: &str, prompt: &str, raw: bool) -> Result<(), String> {
let mut client = ChildClient::connect(endpoint)
.await
.map_err(|e| format!("connect failed: {e}"))?;
client
.send(ParentFrame::Run(RunSpec {
assignment: prompt.to_string(),
reasoning_effort: None,
messages: Vec::new(),
}))
.await
.map_err(|e| format!("dispatch failed: {e}"))?;
let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel::<()>(1);
tokio::spawn(async move {
if tokio::signal::ctrl_c().await.is_ok() {
let _ = cancel_tx.send(()).await;
}
});
let mut exit: Result<(), String> = Ok(());
let mut streamed_tokens = false;
loop {
tokio::select! {
_ = cancel_rx.recv() => {
eprintln!("\n⏹ cancelling…");
let _ = client.send(ParentFrame::Cancel).await;
}
frame = client.next_frame() => {
match frame {
Ok(Some(ChildFrame::Event { event })) => {
if event["type"] == "token" {
streamed_tokens = true;
}
print_event(&event, raw);
}
Ok(Some(ChildFrame::ApprovalRequest { .. })) => {
}
Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
println!();
match status {
TerminalStatus::Completed => {
eprintln!("✔ completed");
if !streamed_tokens {
if let Some(r) = result {
println!("{r}");
}
}
}
TerminalStatus::Cancelled => eprintln!("⏹ cancelled"),
TerminalStatus::Suspended => eprintln!("⏸ suspended (waiting on sub-agents)"),
TerminalStatus::Error => {
exit = Err(error.unwrap_or_else(|| "actor errored".into()));
}
}
break;
}
Ok(None) => {
exit = Err("connection closed before terminal".into());
break;
}
Err(e) => {
exit = Err(format!("transport error: {e}"));
break;
}
}
}
}
}
let _ = client.close().await;
exit
}
fn prepare_spec(
child_id: &str,
role: &str,
model_arg: &Option<String>,
workspace: &Option<PathBuf>,
data_dir: &Option<PathBuf>,
echo: bool,
) -> Result<ProvisionSpec, String> {
let data_dir = data_dir
.clone()
.unwrap_or_else(bamboo_config::paths::resolve_bamboo_dir);
let config = Config::from_data_dir(Some(data_dir.clone()));
let credentials =
bamboo_engine::external_agents::runtime::extract_provider_credentials(&config);
let model = resolve_model(model_arg, &config)?;
if !echo && model.is_none() {
return Err(
"no model resolved: pass --model provider:model or configure defaults.sub_agent/chat"
.to_string(),
);
}
let mut spec = ProvisionSpec::new(
ChildIdentity {
child_id: child_id.to_string(),
parent_id: None,
project_key: None,
role: role.to_string(),
depth: 0,
},
if echo {
ExecutorSpec::Echo
} else {
ExecutorSpec::BambooRuntime
},
default_fabric_dir().to_string_lossy().into_owned(),
);
spec.workspace = workspace
.clone()
.or_else(|| std::env::current_dir().ok())
.map(|w| w.to_string_lossy().into_owned());
spec.model = model.clone();
if let Some(m) = &model {
if let Some(cred) = pick_credential(&credentials, &m.provider) {
spec.secrets.provider_credentials.push(cred);
} else if !echo {
return Err(format!(
"no credential found for provider '{}' in {}",
m.provider,
data_dir.display()
));
}
}
Ok(spec)
}
fn describe_model(spec: &ProvisionSpec) -> String {
spec.model
.as_ref()
.map(|m| format!("{}:{}", m.provider, m.model))
.unwrap_or_else(|| "-".into())
}
fn print_event(event: &serde_json::Value, raw: bool) {
use std::io::Write;
if raw {
println!("{event}");
return;
}
match event["type"].as_str().unwrap_or("") {
"token" => {
print!("{}", event["content"].as_str().unwrap_or(""));
let _ = std::io::stdout().flush();
}
"reasoning_token" => { }
"tool_start" => {
eprintln!("\n⚙ {}", event["tool_name"].as_str().unwrap_or("tool"));
}
"tool_complete" => eprintln!("✔ tool done"),
"tool_error" => eprintln!("✘ tool error: {}", event["error"].as_str().unwrap_or("")),
"error" => eprintln!("✘ {}", event["message"].as_str().unwrap_or("")),
_ => {}
}
}
fn resolve_model(
explicit: &Option<String>,
config: &Config,
) -> Result<Option<ModelRefSpec>, String> {
if let Some(spec) = explicit {
let spec = spec.trim();
if let Some((p, m)) = spec.split_once(':') {
if p.trim().is_empty() || m.trim().is_empty() {
return Err(format!("--model '{spec}' must be provider:model"));
}
return Ok(Some(ModelRefSpec {
provider: p.trim().into(),
model: m.trim().into(),
}));
}
return Ok(Some(ModelRefSpec {
provider: config.provider.clone(),
model: spec.into(),
}));
}
if let Some(defaults) = &config.defaults {
let pick = defaults.sub_agent.as_ref().or(Some(&defaults.chat));
if let Some(r) = pick {
return Ok(Some(ModelRefSpec {
provider: r.provider.clone(),
model: r.model.clone(),
}));
}
}
Ok(None)
}
fn pick_credential(creds: &[ScopedCredential], provider: &str) -> Option<ScopedCredential> {
creds.iter().find(|c| c.provider == provider).cloned()
}