1use std::path::PathBuf;
11use std::time::Duration;
12
13use chrono::{Duration as ChronoDuration, Utc};
14
15use bamboo_llm::Config;
16use bamboo_subagent::discovery::Fabric;
17use bamboo_subagent::executor::{ChildExecutor, EchoExecutor};
18use bamboo_subagent::fleet::spawn_worker;
19use bamboo_subagent::proto::{AgentRecord, ChildFrame, ParentFrame, RunSpec, TerminalStatus};
20use bamboo_subagent::provision::{
21 ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
22};
23use bamboo_subagent::transport::{ChildClient, WsServer};
24
25use crate::subagent_worker::BambooRuntimeExecutor;
26
27pub fn default_fabric_dir() -> PathBuf {
29 std::env::temp_dir().join("bamboo-subagents")
30}
31
32pub struct ActorRunArgs {
33 pub prompt: String,
34 pub model: Option<String>,
35 pub role: String,
36 pub workspace: Option<PathBuf>,
37 pub data_dir: Option<PathBuf>,
38 pub echo: bool,
39 pub raw: bool,
41}
42
43pub struct ActorServeArgs {
44 pub role: String,
45 pub id: Option<String>,
47 pub model: Option<String>,
48 pub workspace: Option<PathBuf>,
49 pub data_dir: Option<PathBuf>,
50 pub echo: bool,
51}
52
53pub struct ActorCallArgs {
54 pub agent: String,
56 pub prompt: String,
57 pub raw: bool,
58}
59
60pub async fn run(args: ActorRunArgs) -> Result<(), String> {
65 let child_id = format!("cli-{}", uuid::Uuid::new_v4());
66 let spec = prepare_spec(
67 &child_id,
68 &args.role,
69 &args.model,
70 &args.workspace,
71 &args.data_dir,
72 args.echo,
73 )?;
74
75 let worker_bin =
76 std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?;
77 eprintln!(
78 "▶ spawning actor {child_id} (model: {}, executor: {})",
79 describe_model(&spec),
80 if args.echo { "echo" } else { "bamboo_runtime" },
81 );
82
83 let spawned = spawn_worker(
84 &worker_bin,
85 &["subagent-worker".to_string()],
86 &spec,
87 Duration::from_secs(30),
88 )
89 .await
90 .map_err(|e| format!("spawn/register failed: {e}"))?;
91 eprintln!(
92 "✔ actor registered (pid {}, endpoint {})",
93 spawned.record.pid, spawned.record.endpoint
94 );
95
96 let exit = connect_and_stream(&spawned.record.endpoint, &args.prompt, args.raw).await;
97 spawned.kill().await;
98 exit
99}
100
101pub async fn serve(args: ActorServeArgs) -> Result<(), String> {
106 let agent_id = args
107 .id
108 .clone()
109 .unwrap_or_else(|| format!("{}-{}", args.role, &uuid::Uuid::new_v4().to_string()[..8]));
110 let spec = prepare_spec(
111 &agent_id,
112 &args.role,
113 &args.model,
114 &args.workspace,
115 &args.data_dir,
116 args.echo,
117 )?;
118
119 let executor: std::sync::Arc<dyn ChildExecutor> = if args.echo {
120 std::sync::Arc::new(EchoExecutor)
121 } else {
122 std::sync::Arc::new(BambooRuntimeExecutor::build(&spec).await?)
123 };
124
125 let server = WsServer::bind_loopback()
126 .await
127 .map_err(|e| format!("bind: {e}"))?;
128 let endpoint = server.ws_endpoint();
129
130 let fab = std::sync::Arc::new(Fabric::at(&spec.fabric_dir));
131 let _ = fab.gc().await; let record = AgentRecord {
133 agent_id: agent_id.clone(),
134 role: args.role.clone(),
135 labels: Vec::new(),
136 endpoint: endpoint.clone(),
137 pid: std::process::id(),
138 version: env!("CARGO_PKG_VERSION").to_string(),
139 started_at: Utc::now(),
140 lease_expires_at: Utc::now() + ChronoDuration::seconds(60),
141 };
142 fab.publish(&record)
143 .await
144 .map_err(|e| format!("announce: {e}"))?;
145
146 let renew_fab = fab.clone();
148 let mut renew_record = record.clone();
149 let renew = tokio::spawn(async move {
150 let mut tick = tokio::time::interval(Duration::from_secs(20));
151 tick.tick().await;
152 loop {
153 tick.tick().await;
154 renew_record.lease_expires_at = Utc::now() + ChronoDuration::seconds(60);
155 if renew_fab.publish(&renew_record).await.is_err() {
156 break;
157 }
158 }
159 });
160
161 eprintln!(
162 "✔ service agent '{agent_id}' (role: {}) announced at {endpoint}",
163 args.role
164 );
165 eprintln!(" serving until Ctrl-C — call it with: bamboo actor call {agent_id} \"<task>\"");
166
167 let result = tokio::select! {
169 r = server.serve(executor) => r.map_err(|e| format!("serve: {e}")),
170 _ = tokio::signal::ctrl_c() => Ok(()),
171 };
172 renew.abort();
173 let _ = fab.withdraw(&agent_id).await;
174 eprintln!("⏹ service agent '{agent_id}' withdrawn");
175 result
176}
177
178pub async fn list() -> Result<(), String> {
183 let fab = Fabric::at(default_fabric_dir());
184 let _ = fab.gc().await;
185 let records = fab.discover().await.map_err(|e| format!("discover: {e}"))?;
186 if records.is_empty() {
187 println!(
188 "no live actors (fabric: {})",
189 default_fabric_dir().display()
190 );
191 return Ok(());
192 }
193 println!("{:<28} {:<12} {:<8} ENDPOINT", "AGENT", "ROLE", "PID");
194 for r in records {
195 println!(
196 "{:<28} {:<12} {:<8} {}",
197 r.agent_id, r.role, r.pid, r.endpoint
198 );
199 }
200 Ok(())
201}
202
203pub async fn call(args: ActorCallArgs) -> Result<(), String> {
208 let fab = Fabric::at(default_fabric_dir());
209 let record = match fab
210 .resolve(&args.agent)
211 .await
212 .map_err(|e| format!("resolve: {e}"))?
213 {
214 Some(r) => r,
215 None => {
216 fab.discover()
218 .await
219 .map_err(|e| format!("discover: {e}"))?
220 .into_iter()
221 .find(|r| r.role == args.agent)
222 .ok_or_else(|| {
223 format!(
224 "no live actor with id or role '{}'; see `bamboo actor list`",
225 args.agent
226 )
227 })?
228 }
229 };
230 eprintln!(
231 "▶ calling {} (role: {}, endpoint {})",
232 record.agent_id, record.role, record.endpoint
233 );
234 connect_and_stream(&record.endpoint, &args.prompt, args.raw).await
235}
236
237async fn connect_and_stream(endpoint: &str, prompt: &str, raw: bool) -> Result<(), String> {
244 let mut client = ChildClient::connect(endpoint)
245 .await
246 .map_err(|e| format!("connect failed: {e}"))?;
247 client
248 .send(ParentFrame::Run(RunSpec {
249 assignment: prompt.to_string(),
250 reasoning_effort: None,
251 messages: Vec::new(),
252 }))
253 .await
254 .map_err(|e| format!("dispatch failed: {e}"))?;
255
256 let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel::<()>(1);
257 tokio::spawn(async move {
258 if tokio::signal::ctrl_c().await.is_ok() {
259 let _ = cancel_tx.send(()).await;
260 }
261 });
262
263 let mut exit: Result<(), String> = Ok(());
264 let mut streamed_tokens = false;
265 loop {
266 tokio::select! {
267 _ = cancel_rx.recv() => {
268 eprintln!("\n⏹ cancelling…");
269 let _ = client.send(ParentFrame::Cancel).await;
270 }
271 frame = client.next_frame() => {
272 match frame {
273 Ok(Some(ChildFrame::Event { event })) => {
274 if event["type"] == "token" {
275 streamed_tokens = true;
276 }
277 print_event(&event, raw);
278 }
279 Ok(Some(ChildFrame::ApprovalRequest { .. })) => {
280 }
283 Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
284 println!();
285 match status {
286 TerminalStatus::Completed => {
287 eprintln!("✔ completed");
288 if !streamed_tokens {
289 if let Some(r) = result {
290 println!("{r}");
291 }
292 }
293 }
294 TerminalStatus::Cancelled => eprintln!("⏹ cancelled"),
295 TerminalStatus::Suspended => eprintln!("⏸ suspended (waiting on sub-agents)"),
296 TerminalStatus::Error => {
297 exit = Err(error.unwrap_or_else(|| "actor errored".into()));
298 }
299 }
300 break;
301 }
302 Ok(None) => {
303 exit = Err("connection closed before terminal".into());
304 break;
305 }
306 Err(e) => {
307 exit = Err(format!("transport error: {e}"));
308 break;
309 }
310 }
311 }
312 }
313 }
314
315 let _ = client.close().await;
316 exit
317}
318
319fn prepare_spec(
321 child_id: &str,
322 role: &str,
323 model_arg: &Option<String>,
324 workspace: &Option<PathBuf>,
325 data_dir: &Option<PathBuf>,
326 echo: bool,
327) -> Result<ProvisionSpec, String> {
328 let data_dir = data_dir
329 .clone()
330 .unwrap_or_else(bamboo_config::paths::resolve_bamboo_dir);
331 let config = Config::from_data_dir(Some(data_dir.clone()));
333 let credentials =
334 bamboo_engine::external_agents::runtime::extract_provider_credentials(&config);
335
336 let model = resolve_model(model_arg, &config)?;
337 if !echo && model.is_none() {
338 return Err(
339 "no model resolved: pass --model provider:model or configure defaults.sub_agent/chat"
340 .to_string(),
341 );
342 }
343
344 let mut spec = ProvisionSpec::new(
345 ChildIdentity {
346 child_id: child_id.to_string(),
347 parent_id: None,
348 project_key: None,
349 role: role.to_string(),
350 depth: 0,
351 },
352 if echo {
353 ExecutorSpec::Echo
354 } else {
355 ExecutorSpec::BambooRuntime
356 },
357 default_fabric_dir().to_string_lossy().into_owned(),
358 );
359 spec.workspace = workspace
360 .clone()
361 .or_else(|| std::env::current_dir().ok())
362 .map(|w| w.to_string_lossy().into_owned());
363 spec.model = model.clone();
364 if let Some(m) = &model {
365 if let Some(cred) = pick_credential(&credentials, &m.provider) {
366 spec.secrets.provider_credentials.push(cred);
367 } else if !echo {
368 return Err(format!(
369 "no credential found for provider '{}' in {}",
370 m.provider,
371 data_dir.display()
372 ));
373 }
374 }
375 Ok(spec)
376}
377
378fn describe_model(spec: &ProvisionSpec) -> String {
379 spec.model
380 .as_ref()
381 .map(|m| format!("{}:{}", m.provider, m.model))
382 .unwrap_or_else(|| "-".into())
383}
384
385fn print_event(event: &serde_json::Value, raw: bool) {
386 use std::io::Write;
387 if raw {
388 println!("{event}");
389 return;
390 }
391 match event["type"].as_str().unwrap_or("") {
392 "token" => {
393 print!("{}", event["content"].as_str().unwrap_or(""));
394 let _ = std::io::stdout().flush();
395 }
396 "reasoning_token" => { }
397 "tool_start" => {
398 eprintln!("\n⚙ {}", event["tool_name"].as_str().unwrap_or("tool"));
399 }
400 "tool_complete" => eprintln!("✔ tool done"),
401 "tool_error" => eprintln!("✘ tool error: {}", event["error"].as_str().unwrap_or("")),
402 "error" => eprintln!("✘ {}", event["message"].as_str().unwrap_or("")),
403 _ => {}
404 }
405}
406
407fn resolve_model(
410 explicit: &Option<String>,
411 config: &Config,
412) -> Result<Option<ModelRefSpec>, String> {
413 if let Some(spec) = explicit {
414 let spec = spec.trim();
415 if let Some((p, m)) = spec.split_once(':') {
416 if p.trim().is_empty() || m.trim().is_empty() {
417 return Err(format!("--model '{spec}' must be provider:model"));
418 }
419 return Ok(Some(ModelRefSpec {
420 provider: p.trim().into(),
421 model: m.trim().into(),
422 }));
423 }
424 return Ok(Some(ModelRefSpec {
425 provider: config.provider.clone(),
426 model: spec.into(),
427 }));
428 }
429 if let Some(defaults) = &config.defaults {
430 let pick = defaults.sub_agent.as_ref().or(Some(&defaults.chat));
431 if let Some(r) = pick {
432 return Ok(Some(ModelRefSpec {
433 provider: r.provider.clone(),
434 model: r.model.clone(),
435 }));
436 }
437 }
438 Ok(None)
439}
440
441fn pick_credential(creds: &[ScopedCredential], provider: &str) -> Option<ScopedCredential> {
442 creds.iter().find(|c| c.provider == provider).cloned()
443}