Skip to main content

bamboo_agent/
actor_cli.rs

1//! `bamboo actor …` / `bamboo -p` — drive actors from the terminal.
2//!
3//! - `run`:   spawn an owned one-shot actor, give it a task, stream the output.
4//! - `serve`: become a long-running Tier-1 **service agent** — announce into the
5//!   discovery fabric and serve calls forever (stateless RPC: one isolated
6//!   session per call, design §8).
7//! - `list`:  show live fabric records (who is discoverable right now).
8//! - `call`:  discover a service agent by id or role and send it a task.
9
10use std::path::PathBuf;
11use std::time::Duration;
12
13use chrono::{Duration as ChronoDuration, Utc};
14
15use bamboo_llm::Config;
16use bamboo_subagent::discovery::Fabric;
17use bamboo_subagent::executor::{ChildExecutor, EchoExecutor};
18use bamboo_subagent::fleet::spawn_worker;
19use bamboo_subagent::proto::{AgentRecord, ChildFrame, ParentFrame, RunSpec, TerminalStatus};
20use bamboo_subagent::provision::{
21    ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
22};
23use bamboo_subagent::transport::{ChildClient, WsServer};
24
25use crate::subagent_worker::BambooRuntimeExecutor;
26
27/// Default fabric directory shared by all local actors.
28pub fn default_fabric_dir() -> PathBuf {
29    std::env::temp_dir().join("bamboo-subagents")
30}
31
32pub struct ActorRunArgs {
33    pub prompt: String,
34    pub model: Option<String>,
35    pub role: String,
36    pub workspace: Option<PathBuf>,
37    pub data_dir: Option<PathBuf>,
38    pub echo: bool,
39    /// Print raw event JSON instead of pretty streaming.
40    pub raw: bool,
41}
42
43pub struct ActorServeArgs {
44    pub role: String,
45    /// Stable agent id; defaults to `<role>-<short-uuid>`.
46    pub id: Option<String>,
47    pub model: Option<String>,
48    pub workspace: Option<PathBuf>,
49    pub data_dir: Option<PathBuf>,
50    pub echo: bool,
51}
52
53pub struct ActorCallArgs {
54    /// Agent id (exact) or role (first live match) to call.
55    pub agent: String,
56    pub prompt: String,
57    pub raw: bool,
58}
59
60// ---------------------------------------------------------------------------
61// run — spawn an owned one-shot actor
62// ---------------------------------------------------------------------------
63
64pub async fn run(args: ActorRunArgs) -> Result<(), String> {
65    let child_id = format!("cli-{}", uuid::Uuid::new_v4());
66    let spec = prepare_spec(
67        &child_id,
68        &args.role,
69        &args.model,
70        &args.workspace,
71        &args.data_dir,
72        args.echo,
73    )?;
74
75    let worker_bin =
76        std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?;
77    eprintln!(
78        "▶ spawning actor {child_id} (model: {}, executor: {})",
79        describe_model(&spec),
80        if args.echo { "echo" } else { "bamboo_runtime" },
81    );
82
83    let spawned = spawn_worker(
84        &worker_bin,
85        &["subagent-worker".to_string()],
86        &spec,
87        Duration::from_secs(30),
88    )
89    .await
90    .map_err(|e| format!("spawn/register failed: {e}"))?;
91    eprintln!(
92        "✔ actor registered (pid {}, endpoint {})",
93        spawned.record.pid, spawned.record.endpoint
94    );
95
96    let exit = connect_and_stream(&spawned.record.endpoint, &args.prompt, args.raw).await;
97    spawned.kill().await;
98    exit
99}
100
101// ---------------------------------------------------------------------------
102// serve — long-running Tier-1 service agent
103// ---------------------------------------------------------------------------
104
105pub async fn serve(args: ActorServeArgs) -> Result<(), String> {
106    let agent_id = args
107        .id
108        .clone()
109        .unwrap_or_else(|| format!("{}-{}", args.role, &uuid::Uuid::new_v4().to_string()[..8]));
110    let spec = prepare_spec(
111        &agent_id,
112        &args.role,
113        &args.model,
114        &args.workspace,
115        &args.data_dir,
116        args.echo,
117    )?;
118
119    let executor: std::sync::Arc<dyn ChildExecutor> = if args.echo {
120        std::sync::Arc::new(EchoExecutor)
121    } else {
122        std::sync::Arc::new(BambooRuntimeExecutor::build(&spec).await?)
123    };
124
125    let server = WsServer::bind_loopback()
126        .await
127        .map_err(|e| format!("bind: {e}"))?;
128    let endpoint = server.ws_endpoint();
129
130    let fab = std::sync::Arc::new(Fabric::at(&spec.fabric_dir));
131    let _ = fab.gc().await; // housekeeping: drop expired records
132    let record = AgentRecord {
133        agent_id: agent_id.clone(),
134        role: args.role.clone(),
135        labels: Vec::new(),
136        endpoint: endpoint.clone(),
137        pid: std::process::id(),
138        version: env!("CARGO_PKG_VERSION").to_string(),
139        started_at: Utc::now(),
140        lease_expires_at: Utc::now() + ChronoDuration::seconds(60),
141    };
142    fab.publish(&record)
143        .await
144        .map_err(|e| format!("announce: {e}"))?;
145
146    // Lease renewal while serving.
147    let renew_fab = fab.clone();
148    let mut renew_record = record.clone();
149    let renew = tokio::spawn(async move {
150        let mut tick = tokio::time::interval(Duration::from_secs(20));
151        tick.tick().await;
152        loop {
153            tick.tick().await;
154            renew_record.lease_expires_at = Utc::now() + ChronoDuration::seconds(60);
155            if renew_fab.publish(&renew_record).await.is_err() {
156                break;
157            }
158        }
159    });
160
161    eprintln!(
162        "✔ service agent '{agent_id}' (role: {}) announced at {endpoint}",
163        args.role
164    );
165    eprintln!("  serving until Ctrl-C — call it with: bamboo actor call {agent_id} \"<task>\"");
166
167    // Serve forever; Ctrl-C withdraws the record and exits cleanly.
168    let result = tokio::select! {
169        r = server.serve(executor) => r.map_err(|e| format!("serve: {e}")),
170        _ = tokio::signal::ctrl_c() => Ok(()),
171    };
172    renew.abort();
173    let _ = fab.withdraw(&agent_id).await;
174    eprintln!("⏹ service agent '{agent_id}' withdrawn");
175    result
176}
177
178// ---------------------------------------------------------------------------
179// list — discoverable actors right now
180// ---------------------------------------------------------------------------
181
182pub async fn list() -> Result<(), String> {
183    let fab = Fabric::at(default_fabric_dir());
184    let _ = fab.gc().await;
185    let records = fab.discover().await.map_err(|e| format!("discover: {e}"))?;
186    if records.is_empty() {
187        println!(
188            "no live actors (fabric: {})",
189            default_fabric_dir().display()
190        );
191        return Ok(());
192    }
193    println!("{:<28} {:<12} {:<8} ENDPOINT", "AGENT", "ROLE", "PID");
194    for r in records {
195        println!(
196            "{:<28} {:<12} {:<8} {}",
197            r.agent_id, r.role, r.pid, r.endpoint
198        );
199    }
200    Ok(())
201}
202
203// ---------------------------------------------------------------------------
204// call — discover + invoke a service agent
205// ---------------------------------------------------------------------------
206
207pub async fn call(args: ActorCallArgs) -> Result<(), String> {
208    let fab = Fabric::at(default_fabric_dir());
209    let record = match fab
210        .resolve(&args.agent)
211        .await
212        .map_err(|e| format!("resolve: {e}"))?
213    {
214        Some(r) => r,
215        None => {
216            // Fall back to role match: first live agent with this role.
217            fab.discover()
218                .await
219                .map_err(|e| format!("discover: {e}"))?
220                .into_iter()
221                .find(|r| r.role == args.agent)
222                .ok_or_else(|| {
223                    format!(
224                        "no live actor with id or role '{}'; see `bamboo actor list`",
225                        args.agent
226                    )
227                })?
228        }
229    };
230    eprintln!(
231        "▶ calling {} (role: {}, endpoint {})",
232        record.agent_id, record.role, record.endpoint
233    );
234    connect_and_stream(&record.endpoint, &args.prompt, args.raw).await
235}
236
237// ---------------------------------------------------------------------------
238// shared plumbing
239// ---------------------------------------------------------------------------
240
241/// Connect to an actor endpoint, dispatch a run, stream until terminal.
242/// Ctrl-C sends the out-of-band cancel.
243async fn connect_and_stream(endpoint: &str, prompt: &str, raw: bool) -> Result<(), String> {
244    let mut client = ChildClient::connect(endpoint)
245        .await
246        .map_err(|e| format!("connect failed: {e}"))?;
247    client
248        .send(ParentFrame::Run(RunSpec {
249            assignment: prompt.to_string(),
250            reasoning_effort: None,
251            messages: Vec::new(),
252        }))
253        .await
254        .map_err(|e| format!("dispatch failed: {e}"))?;
255
256    let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel::<()>(1);
257    tokio::spawn(async move {
258        if tokio::signal::ctrl_c().await.is_ok() {
259            let _ = cancel_tx.send(()).await;
260        }
261    });
262
263    let mut exit: Result<(), String> = Ok(());
264    let mut streamed_tokens = false;
265    loop {
266        tokio::select! {
267            _ = cancel_rx.recv() => {
268                eprintln!("\n⏹ cancelling…");
269                let _ = client.send(ParentFrame::Cancel).await;
270            }
271            frame = client.next_frame() => {
272                match frame {
273                    Ok(Some(ChildFrame::Event { event })) => {
274                        if event["type"] == "token" {
275                            streamed_tokens = true;
276                        }
277                        print_event(&event, raw);
278                    }
279                    Ok(Some(ChildFrame::ApprovalRequest { .. })) => {
280                        // This CLI does not route gated-tool approvals; ignore.
281                        // (The production host in actor_adapter answers these.)
282                    }
283                    Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
284                        println!();
285                        match status {
286                            TerminalStatus::Completed => {
287                                eprintln!("✔ completed");
288                                if !streamed_tokens {
289                                    if let Some(r) = result {
290                                        println!("{r}");
291                                    }
292                                }
293                            }
294                            TerminalStatus::Cancelled => eprintln!("⏹ cancelled"),
295                            TerminalStatus::Suspended => eprintln!("⏸ suspended (waiting on sub-agents)"),
296                            TerminalStatus::Error => {
297                                exit = Err(error.unwrap_or_else(|| "actor errored".into()));
298                            }
299                        }
300                        break;
301                    }
302                    Ok(None) => {
303                        exit = Err("connection closed before terminal".into());
304                        break;
305                    }
306                    Err(e) => {
307                        exit = Err(format!("transport error: {e}"));
308                        break;
309                    }
310                }
311            }
312        }
313    }
314
315    let _ = client.close().await;
316    exit
317}
318
319/// Resolve config + model + credential into a ProvisionSpec for a local actor.
320fn prepare_spec(
321    child_id: &str,
322    role: &str,
323    model_arg: &Option<String>,
324    workspace: &Option<PathBuf>,
325    data_dir: &Option<PathBuf>,
326    echo: bool,
327) -> Result<ProvisionSpec, String> {
328    let data_dir = data_dir
329        .clone()
330        .unwrap_or_else(bamboo_config::paths::resolve_bamboo_dir);
331    // Loads config.json and hydrates encrypted api keys into memory.
332    let config = Config::from_data_dir(Some(data_dir.clone()));
333    let credentials =
334        bamboo_engine::external_agents::runtime::extract_provider_credentials(&config);
335
336    let model = resolve_model(model_arg, &config)?;
337    if !echo && model.is_none() {
338        return Err(
339            "no model resolved: pass --model provider:model or configure defaults.sub_agent/chat"
340                .to_string(),
341        );
342    }
343
344    let mut spec = ProvisionSpec::new(
345        ChildIdentity {
346            child_id: child_id.to_string(),
347            parent_id: None,
348            project_key: None,
349            role: role.to_string(),
350            depth: 0,
351        },
352        if echo {
353            ExecutorSpec::Echo
354        } else {
355            ExecutorSpec::BambooRuntime
356        },
357        default_fabric_dir().to_string_lossy().into_owned(),
358    );
359    spec.workspace = workspace
360        .clone()
361        .or_else(|| std::env::current_dir().ok())
362        .map(|w| w.to_string_lossy().into_owned());
363    spec.model = model.clone();
364    if let Some(m) = &model {
365        if let Some(cred) = pick_credential(&credentials, &m.provider) {
366            spec.secrets.provider_credentials.push(cred);
367        } else if !echo {
368            return Err(format!(
369                "no credential found for provider '{}' in {}",
370                m.provider,
371                data_dir.display()
372            ));
373        }
374    }
375    Ok(spec)
376}
377
378fn describe_model(spec: &ProvisionSpec) -> String {
379    spec.model
380        .as_ref()
381        .map(|m| format!("{}:{}", m.provider, m.model))
382        .unwrap_or_else(|| "-".into())
383}
384
385fn print_event(event: &serde_json::Value, raw: bool) {
386    use std::io::Write;
387    if raw {
388        println!("{event}");
389        return;
390    }
391    match event["type"].as_str().unwrap_or("") {
392        "token" => {
393            print!("{}", event["content"].as_str().unwrap_or(""));
394            let _ = std::io::stdout().flush();
395        }
396        "reasoning_token" => { /* keep terse */ }
397        "tool_start" => {
398            eprintln!("\n⚙ {}", event["tool_name"].as_str().unwrap_or("tool"));
399        }
400        "tool_complete" => eprintln!("✔ tool done"),
401        "tool_error" => eprintln!("✘ tool error: {}", event["error"].as_str().unwrap_or("")),
402        "error" => eprintln!("✘ {}", event["message"].as_str().unwrap_or("")),
403        _ => {}
404    }
405}
406
407/// `--model provider:model` (or bare model on the default provider) >
408/// `defaults.sub_agent` > `defaults.chat`.
409fn resolve_model(
410    explicit: &Option<String>,
411    config: &Config,
412) -> Result<Option<ModelRefSpec>, String> {
413    if let Some(spec) = explicit {
414        let spec = spec.trim();
415        if let Some((p, m)) = spec.split_once(':') {
416            if p.trim().is_empty() || m.trim().is_empty() {
417                return Err(format!("--model '{spec}' must be provider:model"));
418            }
419            return Ok(Some(ModelRefSpec {
420                provider: p.trim().into(),
421                model: m.trim().into(),
422            }));
423        }
424        return Ok(Some(ModelRefSpec {
425            provider: config.provider.clone(),
426            model: spec.into(),
427        }));
428    }
429    if let Some(defaults) = &config.defaults {
430        let pick = defaults.sub_agent.as_ref().or(Some(&defaults.chat));
431        if let Some(r) = pick {
432            return Ok(Some(ModelRefSpec {
433                provider: r.provider.clone(),
434                model: r.model.clone(),
435            }));
436        }
437    }
438    Ok(None)
439}
440
441fn pick_credential(creds: &[ScopedCredential], provider: &str) -> Option<ScopedCredential> {
442    creds.iter().find(|c| c.provider == provider).cloned()
443}