Skip to main content

bamboo_subagent/
fleet.rs

1//! Parent-side fleet helpers: spawn an actor subprocess, provision it over stdin, discover it.
2//!
3//! Bootstrap protocol:
4//! 1. spawn the worker binary with **no arguments** (nothing secret in argv/env),
5//! 2. write one [`ProvisionSpec`] JSON document to its stdin and close the pipe,
6//! 3. poll the Tier-1 fabric until the worker self-registers under `identity.child_id`.
7//!
8//! The real engine adapter (`SubprocessChildRunner`) builds on these primitives.
9
10use std::path::Path;
11use std::process::Stdio;
12use std::time::Duration;
13
14use tokio::io::AsyncWriteExt;
15use tokio::process::{Child, Command};
16use tokio::time::{sleep, Instant};
17
18use crate::discovery::Fabric;
19use crate::proto::AgentRecord;
20use crate::provision::ProvisionSpec;
21use crate::transport::{TransportError, TransportResult};
22
23/// A spawned actor subprocess plus its discovered record. Killed on drop (`kill_on_drop`).
24pub struct SpawnedChild {
25    pub record: AgentRecord,
26    process: Child,
27}
28
29impl SpawnedChild {
30    /// Terminate the child process.
31    pub async fn kill(mut self) {
32        let _ = self.process.kill().await;
33    }
34
35    pub fn pid(&self) -> Option<u32> {
36        self.process.id()
37    }
38}
39
40/// Spawn `worker_bin worker_args…`, provision it with `spec` over stdin, then poll the fabric
41/// until the worker self-registers (or `wait` elapses). On timeout the process is killed.
42///
43/// `worker_args` carries fixed subcommand/flag arguments (e.g. `["subagent-worker"]` for the
44/// main `bamboo` binary) — never per-child data, which all rides in the spec.
45pub async fn spawn_worker(
46    worker_bin: &Path,
47    worker_args: &[String],
48    spec: &ProvisionSpec,
49    wait: Duration,
50) -> TransportResult<SpawnedChild> {
51    let fabric_dir = Path::new(&spec.fabric_dir);
52    tokio::fs::create_dir_all(fabric_dir).await.ok();
53
54    let spec_json = spec
55        .to_json()
56        .map_err(|e| TransportError::Protocol(format!("provision spec encode: {e}")))?;
57
58    let mut cmd = Command::new(worker_bin);
59    cmd.args(worker_args);
60    cmd.stdin(Stdio::piped());
61    cmd.kill_on_drop(true);
62    let mut process = cmd.spawn().map_err(TransportError::Io)?;
63
64    // Feed the spec, then close stdin so the worker sees EOF.
65    {
66        let mut stdin = process
67            .stdin
68            .take()
69            .ok_or_else(|| TransportError::Protocol("worker stdin unavailable".to_string()))?;
70        stdin
71            .write_all(spec_json.as_bytes())
72            .await
73            .map_err(TransportError::Io)?;
74        stdin.shutdown().await.map_err(TransportError::Io)?;
75    }
76
77    let child_id = spec.identity.child_id.clone();
78    let fab = Fabric::at(fabric_dir);
79    let deadline = Instant::now() + wait;
80    loop {
81        if let Ok(Some(record)) = fab.resolve(&child_id).await {
82            return Ok(SpawnedChild { record, process });
83        }
84        // bail early if the worker died before registering
85        if let Ok(Some(status)) = process.try_wait() {
86            return Err(TransportError::Protocol(format!(
87                "worker exited before registering: {status}"
88            )));
89        }
90        if Instant::now() >= deadline {
91            let _ = process.kill().await;
92            return Err(TransportError::Protocol(format!(
93                "worker '{child_id}' did not register within {wait:?}"
94            )));
95        }
96        sleep(Duration::from_millis(20)).await;
97    }
98}