Skip to main content

objectiveai_sdk/cli/command/command_executor/
binary.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3
4use futures::{Stream, StreamExt};
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::command::{AgentArguments, CommandExecutor, CommandRequest, CommandResponse};
9
10/// Spawn the `objectiveai` cli binary on disk, feed it the argv from
11/// `request.into_command()`, and stream each stdout JSONL line back as
12/// either a typed `T` or a structured [`crate::cli::Error`].
13///
14/// The binary is resolved at `execute` time (not at construction).
15/// Resolution order:
16///
17/// - [`BinaryExecutor::from_path`] — returns the explicit path verbatim
18///   (used by tests pointing at an out-of-tree build).
19/// - [`BinaryExecutor::new`] with `Some(objectiveai_dir)` —
20///   `<objectiveai_dir>/bin/objectiveai{.exe}`.
21/// - [`BinaryExecutor::new`] with `None` —
22///   `<home>/.objectiveai/bin/objectiveai{.exe}`.
23pub struct BinaryExecutor {
24    /// The layout root (`OBJECTIVEAI_DIR`); the binary inside is
25    /// always `bin/objectiveai{.exe}`. `None` falls back to the
26    /// home-dir default (`~/.objectiveai`).
27    objectiveai_dir: Option<PathBuf>,
28    /// When set, used verbatim and the `objectiveai_dir` lookup is
29    /// skipped entirely. Lets callers (notably tests) point the
30    /// executor at any binary by absolute path without enforcing the
31    /// `<dir>/objectiveai` naming convention.
32    explicit_path: Option<PathBuf>,
33    /// Extra environment variables to set on the spawned child. Stacks
34    /// on top of the parent's environment (the child inherits the rest
35    /// — this map only overrides). Set via [`Self::env`].
36    extra_env: Vec<(String, String)>,
37    /// When `true`, the spawned child is sent a kill signal whenever
38    /// the [`tokio::process::Child`] held in the stream state is
39    /// dropped. Defaults to `false` (the SDK's long-standing
40    /// behaviour: drop-without-kill lets the child finish its work
41    /// orphaned). Set via [`Self::kill_on_drop`]. Test wrappers that
42    /// want to abort hung children (e.g. cli-test
43    /// `HangPreventingBinaryCommandExecutor`) flip this on.
44    kill_on_drop: bool,
45    /// When `true`, spawn the child detached from the parent's console
46    /// / process group so it survives the parent process exiting.
47    /// Unix already does this automatically when the parent dies
48    /// (kernel re-parents to init). Windows needs explicit
49    /// `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP` creation flags;
50    /// otherwise the child's inherited console closes with the parent
51    /// and the child gets a `CTRL_CLOSE_EVENT`. Defaults to `false`.
52    detach: bool,
53    /// One-shot lockfile claim to hand off to the next spawned child
54    /// ([`crate::lockfile::LockClaim::prepare_transfer`] before the
55    /// spawn, [`crate::lockfile::LockClaim::transfer`] after) — the
56    /// child becomes the sole owner and the lock lives until the
57    /// child exits. Consumed by the first `execute`; interior
58    /// mutability because [`CommandExecutor::execute`] takes `&self`.
59    /// Set via [`Self::transfer_lock`].
60    #[cfg(feature = "lockfile")]
61    transfer_lock: std::sync::Mutex<Option<crate::lockfile::LockClaim>>,
62}
63
64impl BinaryExecutor {
65    pub fn new(objectiveai_dir: Option<impl Into<PathBuf>>) -> Self {
66        Self {
67            objectiveai_dir: objectiveai_dir.map(Into::into),
68            explicit_path: None,
69            extra_env: Vec::new(),
70            kill_on_drop: false,
71            detach: false,
72            #[cfg(feature = "lockfile")]
73            transfer_lock: std::sync::Mutex::new(None),
74        }
75    }
76
77    /// Construct an executor that spawns the binary at `binary`
78    /// directly, regardless of file name. Skips the `objectiveai`
79    /// name lookup so tests can target a `target/debug/objectiveai-cli`
80    /// build without renaming or symlinking.
81    pub fn from_path(binary: impl Into<PathBuf>) -> Self {
82        Self {
83            objectiveai_dir: None,
84            explicit_path: Some(binary.into()),
85            extra_env: Vec::new(),
86            kill_on_drop: false,
87            detach: false,
88            #[cfg(feature = "lockfile")]
89            transfer_lock: std::sync::Mutex::new(None),
90        }
91    }
92
93    /// Hand `claim` off to the next spawned child: ownership of the
94    /// lock transfers into the child process, which keeps it until it
95    /// exits (the parent retains nothing). One-shot — consumed by the
96    /// first `execute`. On spawn failure the claim is released; on
97    /// transfer failure the child is killed best-effort, the claim is
98    /// released, and `execute` returns [`Error::LockTransfer`] —
99    /// either way the lock slot is retryable afterwards.
100    #[cfg(feature = "lockfile")]
101    pub fn transfer_lock(mut self, claim: crate::lockfile::LockClaim) -> Self {
102        self.transfer_lock = std::sync::Mutex::new(Some(claim));
103        self
104    }
105
106    /// Set an environment variable on every child the executor spawns.
107    /// Stacks on top of the parent's env; intended for tests that need
108    /// to pin a per-instance `OBJECTIVEAI_DIR`/`OBJECTIVEAI_STATE` or `OBJECTIVEAI_ADDRESS`
109    /// without racing other parallel tests via `std::env::set_var`.
110    pub fn env(
111        mut self,
112        key: impl Into<String>,
113        value: impl Into<String>,
114    ) -> Self {
115        self.extra_env.push((key.into(), value.into()));
116        self
117    }
118
119    /// When `true`, the spawned [`tokio::process::Child`] held in the
120    /// stream state is sent a kill signal when the stream is dropped.
121    /// Defaults to `false`. Wrappers that detect hangs (e.g. the
122    /// cli-test `HangPreventingBinaryCommandExecutor`) toggle this on
123    /// so dropping the inner stream tears the cli child down.
124    pub fn kill_on_drop(mut self, on: bool) -> Self {
125        self.kill_on_drop = on;
126        self
127    }
128
129    /// When `true`, the child is spawned detached from the parent's
130    /// console / process group so it survives parent exit. Pairs
131    /// naturally with `kill_on_drop = false` (the default): drop the
132    /// stream + exit the parent, the child keeps running orphaned.
133    ///
134    /// - **Windows**: sets `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP`
135    ///   on the `CreateProcessW` call. `DETACHED_PROCESS` is what
136    ///   actually makes orphan survival work — without it the child
137    ///   inherits the parent's console and receives `CTRL_CLOSE_EVENT`
138    ///   when the parent's console window closes. The process-group
139    ///   flag is belt-and-suspenders signal isolation.
140    /// - **Unix**: no-op. The kernel automatically re-parents the
141    ///   child to init when the parent exits.
142    ///
143    /// Defaults to `false`.
144    pub fn detach(mut self, on: bool) -> Self {
145        self.detach = on;
146        self
147    }
148
149    fn binary_path(&self) -> Result<PathBuf, Error> {
150        if let Some(p) = &self.explicit_path {
151            return Ok(p.clone());
152        }
153        let dir = match &self.objectiveai_dir {
154            Some(d) => d.clone(),
155            None => dirs::home_dir()
156                .ok_or(Error::NoHomeDir)?
157                .join(".objectiveai"),
158        };
159        let name = if cfg!(windows) { "objectiveai.exe" } else { "objectiveai" };
160        Ok(dir.join("bin").join(name))
161    }
162}
163
164#[derive(Debug, thiserror::Error)]
165pub enum Error {
166    /// `dirs::home_dir()` returned `None` and no `objectiveai_dir` was set.
167    #[error("no home directory and no objectiveai_dir set")]
168    NoHomeDir,
169    /// Failed to spawn the binary at the resolved path.
170    #[error("failed to spawn cli binary: {0}")]
171    Spawn(std::io::Error),
172    /// Child stdout was unexpectedly absent after spawn.
173    #[error("cli binary child has no stdout handle")]
174    NoStdout,
175    /// Reading stdout line failed.
176    #[error("read cli binary stdout: {0}")]
177    Io(std::io::Error),
178    /// Stdout produced a line that didn't deserialize as either the
179    /// structured `cli::Error` or `T`.
180    #[error("decode cli binary stdout line: {0}")]
181    Json(serde_json::Error),
182    /// Structured error emitted by the cli binary on stdout.
183    #[error("{0}")]
184    Cli(crate::cli::Error),
185    /// `execute_one` was called but the stream produced no items.
186    #[error("cli binary stream produced no items")]
187    Empty,
188    /// Transferring the lockfile claim into the spawned child failed.
189    /// The child was killed best-effort and the claim released.
190    #[cfg(feature = "lockfile")]
191    #[error("transfer lockfile claim into cli binary child: {0}")]
192    LockTransfer(std::io::Error),
193}
194
195/// Per-line untagged decode. `Err` is listed first so serde tries it
196/// before `Ok` — `cli::Error`'s `type:"error"` constant short-circuits
197/// every non-error wire shape, then `Ok(T)` is the fallthrough.
198#[derive(serde::Deserialize)]
199#[serde(untagged)]
200enum Line<T> {
201    Err(crate::cli::Error),
202    Ok(T),
203}
204
205impl<T> From<Line<T>> for Result<T, Error> {
206    fn from(line: Line<T>) -> Self {
207        match line {
208            Line::Err(e) => Err(Error::Cli(e)),
209            Line::Ok(t) => Ok(t),
210        }
211    }
212}
213
214impl CommandExecutor for BinaryExecutor {
215    type Error = Error;
216    type Stream<T>
217        = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>
218    where
219        T: Send + 'static;
220
221    async fn execute<R, T>(
222        &self,
223        request: R,
224        agent_arguments: Option<&AgentArguments>,
225    ) -> Result<Self::Stream<T>, Error>
226    where
227        R: CommandRequest + Send,
228        T: CommandResponse + serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
229    {
230        let argv = request.into_command();
231        let binary = self.binary_path()?;
232
233        let mut command = Command::new(&binary);
234        command
235            .args(&argv)
236            .stdin(std::process::Stdio::null())
237            .stdout(std::process::Stdio::piped())
238            .stderr(std::process::Stdio::inherit())
239            .kill_on_drop(self.kill_on_drop);
240        for (k, v) in &self.extra_env {
241            command.env(k, v);
242        }
243        // Windows detach. Required for orphan-survival when the
244        // parent will exit before the child finishes — without
245        // `DETACHED_PROCESS` the child inherits the parent's
246        // console and dies on `CTRL_CLOSE_EVENT` when the
247        // parent's console closes. We've hit this bug twice in
248        // the past; preserve the flag. Unix gets re-parent-to-init
249        // for free.
250        #[cfg(windows)]
251        if self.detach {
252            const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
253            const DETACHED_PROCESS: u32 = 0x0000_0008;
254            command.creation_flags(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS);
255        }
256        // Per-call agent identity override. When `Some`, every field
257        // gets applied atomically: `Some(v)` → set, `None` →
258        // env_remove so the parent's value can't leak through. When
259        // the bag itself is `None`, parent env is inherited
260        // untouched.
261        if let Some(args) = agent_arguments {
262            args.apply_to_command(&mut command);
263        }
264        // Lockfile-claim handoff, step 1 of 2: arm the command so the
265        // child inherits/duplicates the claim's handles at spawn.
266        #[cfg(feature = "lockfile")]
267        let transfer_claim = self
268            .transfer_lock
269            .lock()
270            .expect("transfer_lock mutex poisoned")
271            .take();
272        #[cfg(feature = "lockfile")]
273        if let Some(claim) = transfer_claim.as_ref() {
274            claim.prepare_transfer(&mut command);
275        }
276        let spawned = command.spawn();
277        // Step 2 of 2: complete (or unwind) the handoff. Dropping a
278        // claim does NOT release it (ManuallyDrop), so every failure
279        // path must release explicitly to keep the lock slot
280        // retryable.
281        #[cfg(feature = "lockfile")]
282        let spawned = match spawned {
283            Ok(child) => {
284                if let Some(claim) = transfer_claim {
285                    if let Err((claim, e)) = claim.transfer(&child) {
286                        let mut child = child;
287                        let _ = child.start_kill();
288                        let _ = claim.release();
289                        return Err(Error::LockTransfer(e));
290                    }
291                }
292                Ok(child)
293            }
294            Err(e) => {
295                if let Some(claim) = transfer_claim {
296                    let _ = claim.release();
297                }
298                Err(e)
299            }
300        };
301        let mut child = spawned.map_err(Error::Spawn)?;
302
303        let stdout = child.stdout.take().ok_or(Error::NoStdout)?;
304        let lines = BufReader::new(stdout).lines();
305
306        // Carry `child` in the unfold state so its handle isn't dropped
307        // mid-stream. tokio's Child defaults to kill_on_drop = false,
308        // so on stream drop the child remains running until it finishes.
309        let stream = futures::stream::unfold(
310            (child, lines),
311            |(child, mut lines)| async move {
312                match lines.next_line().await {
313                    Ok(Some(line)) => {
314                        let item = match serde_json::from_str::<Line<T>>(&line) {
315                            Ok(line) => line.into(),
316                            Err(e) => Err(Error::Json(e)),
317                        };
318                        Some((item, (child, lines)))
319                    }
320                    Ok(None) => None,
321                    Err(e) => Some((Err(Error::Io(e)), (child, lines))),
322                }
323            },
324        );
325
326        Ok(Box::pin(stream))
327    }
328
329    async fn execute_one<R, T>(
330        &self,
331        request: R,
332        agent_arguments: Option<&AgentArguments>,
333    ) -> Result<T, Error>
334    where
335        R: CommandRequest + Send,
336        T: CommandResponse + serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
337    {
338        let mut stream = self.execute::<R, T>(request, agent_arguments).await?;
339        stream.next().await.ok_or(Error::Empty)?
340    }
341}