Skip to main content

objectiveai_sdk/cli/command/command_executor/
binary.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3
4use futures::{Stream, StreamExt};
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::command::{AgentArguments, CommandExecutor, CommandRequest, CommandResponse};
9
10/// Spawn the `objectiveai` cli binary on disk, feed it the argv from
11/// `request.into_command()`, and stream each stdout JSONL line back as
12/// either a typed `T` or a structured [`crate::cli::Error`].
13///
14/// The binary is resolved at `execute` time (not at construction).
15/// Resolution order:
16///
17/// - [`BinaryExecutor::from_path`] — returns the explicit path verbatim
18///   (used by tests pointing at an out-of-tree build).
19/// - [`BinaryExecutor::new`] with `Some(config_base_dir)` —
20///   `<config_base_dir>/objectiveai{.exe}`.
21/// - [`BinaryExecutor::new`] with `None` —
22///   `<home>/.objectiveai/objectiveai{.exe}`.
23pub struct BinaryExecutor {
24    /// Resolves to a directory; the binary inside is always
25    /// `objectiveai{.exe}`. `None` falls back to the home-dir default.
26    config_base_dir: Option<PathBuf>,
27    /// When set, used verbatim and the `config_base_dir` lookup is
28    /// skipped entirely. Lets callers (notably tests) point the
29    /// executor at any binary by absolute path without enforcing the
30    /// `<dir>/objectiveai` naming convention.
31    explicit_path: Option<PathBuf>,
32    /// Extra environment variables to set on the spawned child. Stacks
33    /// on top of the parent's environment (the child inherits the rest
34    /// — this map only overrides). Set via [`Self::env`].
35    extra_env: Vec<(String, String)>,
36    /// When `true`, the spawned child is sent a kill signal whenever
37    /// the [`tokio::process::Child`] held in the stream state is
38    /// dropped. Defaults to `false` (the SDK's long-standing
39    /// behaviour: drop-without-kill lets the child finish its work
40    /// orphaned). Set via [`Self::kill_on_drop`]. Test wrappers that
41    /// want to abort hung children (e.g. cli-test
42    /// `HangPreventingBinaryCommandExecutor`) flip this on.
43    kill_on_drop: bool,
44    /// When `true`, spawn the child detached from the parent's console
45    /// / process group so it survives the parent process exiting.
46    /// Unix already does this automatically when the parent dies
47    /// (kernel re-parents to init). Windows needs explicit
48    /// `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP` creation flags;
49    /// otherwise the child's inherited console closes with the parent
50    /// and the child gets a `CTRL_CLOSE_EVENT`. Defaults to `false`.
51    detach: bool,
52}
53
54impl BinaryExecutor {
55    pub fn new(config_base_dir: Option<impl Into<PathBuf>>) -> Self {
56        Self {
57            config_base_dir: config_base_dir.map(Into::into),
58            explicit_path: None,
59            extra_env: Vec::new(),
60            kill_on_drop: false,
61            detach: false,
62        }
63    }
64
65    /// Construct an executor that spawns the binary at `binary`
66    /// directly, regardless of file name. Skips the `objectiveai`
67    /// name lookup so tests can target a `target/debug/objectiveai-cli`
68    /// build without renaming or symlinking.
69    pub fn from_path(binary: impl Into<PathBuf>) -> Self {
70        Self {
71            config_base_dir: None,
72            explicit_path: Some(binary.into()),
73            extra_env: Vec::new(),
74            kill_on_drop: false,
75            detach: false,
76        }
77    }
78
79    /// Set an environment variable on every child the executor spawns.
80    /// Stacks on top of the parent's env; intended for tests that need
81    /// to pin a per-instance `CONFIG_BASE_DIR` or `OBJECTIVEAI_ADDRESS`
82    /// without racing other parallel tests via `std::env::set_var`.
83    pub fn env(
84        mut self,
85        key: impl Into<String>,
86        value: impl Into<String>,
87    ) -> Self {
88        self.extra_env.push((key.into(), value.into()));
89        self
90    }
91
92    /// When `true`, the spawned [`tokio::process::Child`] held in the
93    /// stream state is sent a kill signal when the stream is dropped.
94    /// Defaults to `false`. Wrappers that detect hangs (e.g. the
95    /// cli-test `HangPreventingBinaryCommandExecutor`) toggle this on
96    /// so dropping the inner stream tears the cli child down.
97    pub fn kill_on_drop(mut self, on: bool) -> Self {
98        self.kill_on_drop = on;
99        self
100    }
101
102    /// When `true`, the child is spawned detached from the parent's
103    /// console / process group so it survives parent exit. Pairs
104    /// naturally with `kill_on_drop = false` (the default): drop the
105    /// stream + exit the parent, the child keeps running orphaned.
106    ///
107    /// - **Windows**: sets `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP`
108    ///   on the `CreateProcessW` call. `DETACHED_PROCESS` is what
109    ///   actually makes orphan survival work — without it the child
110    ///   inherits the parent's console and receives `CTRL_CLOSE_EVENT`
111    ///   when the parent's console window closes. The process-group
112    ///   flag is belt-and-suspenders signal isolation.
113    /// - **Unix**: no-op. The kernel automatically re-parents the
114    ///   child to init when the parent exits.
115    ///
116    /// Defaults to `false`.
117    pub fn detach(mut self, on: bool) -> Self {
118        self.detach = on;
119        self
120    }
121
122    fn binary_path(&self) -> Result<PathBuf, Error> {
123        if let Some(p) = &self.explicit_path {
124            return Ok(p.clone());
125        }
126        let base = match &self.config_base_dir {
127            Some(d) => d.clone(),
128            None => dirs::home_dir()
129                .ok_or(Error::NoHomeDir)?
130                .join(".objectiveai"),
131        };
132        let name = if cfg!(windows) { "objectiveai.exe" } else { "objectiveai" };
133        Ok(base.join(name))
134    }
135}
136
137#[derive(Debug, thiserror::Error)]
138pub enum Error {
139    /// `dirs::home_dir()` returned `None` and no `config_base_dir` was set.
140    #[error("no home directory and no config_base_dir set")]
141    NoHomeDir,
142    /// Failed to spawn the binary at the resolved path.
143    #[error("failed to spawn cli binary: {0}")]
144    Spawn(std::io::Error),
145    /// Child stdout was unexpectedly absent after spawn.
146    #[error("cli binary child has no stdout handle")]
147    NoStdout,
148    /// Reading stdout line failed.
149    #[error("read cli binary stdout: {0}")]
150    Io(std::io::Error),
151    /// Stdout produced a line that didn't deserialize as either the
152    /// structured `cli::Error` or `T`.
153    #[error("decode cli binary stdout line: {0}")]
154    Json(serde_json::Error),
155    /// Structured error emitted by the cli binary on stdout.
156    #[error("{0}")]
157    Cli(crate::cli::Error),
158    /// `execute_one` was called but the stream produced no items.
159    #[error("cli binary stream produced no items")]
160    Empty,
161}
162
163/// Per-line untagged decode. `Err` is listed first so serde tries it
164/// before `Ok` — `cli::Error`'s `type:"error"` constant short-circuits
165/// every non-error wire shape, then `Ok(T)` is the fallthrough.
166#[derive(serde::Deserialize)]
167#[serde(untagged)]
168enum Line<T> {
169    Err(crate::cli::Error),
170    Ok(T),
171}
172
173impl<T> From<Line<T>> for Result<T, Error> {
174    fn from(line: Line<T>) -> Self {
175        match line {
176            Line::Err(e) => Err(Error::Cli(e)),
177            Line::Ok(t) => Ok(t),
178        }
179    }
180}
181
182impl CommandExecutor for BinaryExecutor {
183    type Error = Error;
184    type Stream<T>
185        = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>
186    where
187        T: Send + 'static;
188
189    async fn execute<R, T>(
190        &self,
191        request: R,
192        agent_arguments: Option<&AgentArguments>,
193    ) -> Result<Self::Stream<T>, Error>
194    where
195        R: CommandRequest + Send,
196        T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
197    {
198        let argv = request.into_command();
199        let binary = self.binary_path()?;
200
201        let mut command = Command::new(&binary);
202        command
203            .args(&argv)
204            .stdin(std::process::Stdio::null())
205            .stdout(std::process::Stdio::piped())
206            .stderr(std::process::Stdio::inherit())
207            .kill_on_drop(self.kill_on_drop);
208        for (k, v) in &self.extra_env {
209            command.env(k, v);
210        }
211        // Windows detach. Required for orphan-survival when the
212        // parent will exit before the child finishes — without
213        // `DETACHED_PROCESS` the child inherits the parent's
214        // console and dies on `CTRL_CLOSE_EVENT` when the
215        // parent's console closes. We've hit this bug twice in
216        // the past; preserve the flag. Unix gets re-parent-to-init
217        // for free.
218        #[cfg(windows)]
219        if self.detach {
220            use std::os::windows::process::CommandExt;
221            const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
222            const DETACHED_PROCESS: u32 = 0x0000_0008;
223            command.creation_flags(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS);
224        }
225        // Per-call agent identity override. When `Some`, every field
226        // gets applied atomically: `Some(v)` → set, `None` →
227        // env_remove so the parent's value can't leak through. When
228        // the bag itself is `None`, parent env is inherited
229        // untouched.
230        if let Some(args) = agent_arguments {
231            args.apply_to_command(&mut command);
232        }
233        let mut child = command.spawn().map_err(Error::Spawn)?;
234
235        let stdout = child.stdout.take().ok_or(Error::NoStdout)?;
236        let lines = BufReader::new(stdout).lines();
237
238        // Carry `child` in the unfold state so its handle isn't dropped
239        // mid-stream. tokio's Child defaults to kill_on_drop = false,
240        // so on stream drop the child remains running until it finishes.
241        let stream = futures::stream::unfold(
242            (child, lines),
243            |(child, mut lines)| async move {
244                match lines.next_line().await {
245                    Ok(Some(line)) => {
246                        let item = match serde_json::from_str::<Line<T>>(&line) {
247                            Ok(line) => line.into(),
248                            Err(e) => Err(Error::Json(e)),
249                        };
250                        Some((item, (child, lines)))
251                    }
252                    Ok(None) => None,
253                    Err(e) => Some((Err(Error::Io(e)), (child, lines))),
254                }
255            },
256        );
257
258        Ok(Box::pin(stream))
259    }
260
261    async fn execute_one<R, T>(
262        &self,
263        request: R,
264        agent_arguments: Option<&AgentArguments>,
265    ) -> Result<T, Error>
266    where
267        R: CommandRequest + Send,
268        T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
269    {
270        let mut stream = self.execute::<R, T>(request, agent_arguments).await?;
271        stream.next().await.ok_or(Error::Empty)?
272    }
273}