objectiveai_sdk/cli/command/command_executor/binary.rs
1use std::path::PathBuf;
2use std::pin::Pin;
3
4use futures::{Stream, StreamExt};
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::command::{AgentArguments, CommandExecutor, CommandRequest, CommandResponse};
9
10/// Spawn the `objectiveai` cli binary on disk, feed it the argv from
11/// `request.into_command()`, and stream each stdout JSONL line back as
12/// either a typed `T` or a structured [`crate::cli::Error`].
13///
14/// The binary is resolved at `execute` time (not at construction).
15/// Resolution order:
16///
17/// - [`BinaryExecutor::from_path`] — returns the explicit path verbatim
18/// (used by tests pointing at an out-of-tree build).
19/// - [`BinaryExecutor::new`] with `Some(config_base_dir)` —
20/// `<config_base_dir>/objectiveai{.exe}`.
21/// - [`BinaryExecutor::new`] with `None` —
22/// `<home>/.objectiveai/objectiveai{.exe}`.
23pub struct BinaryExecutor {
24 /// Resolves to a directory; the binary inside is always
25 /// `objectiveai{.exe}`. `None` falls back to the home-dir default.
26 config_base_dir: Option<PathBuf>,
27 /// When set, used verbatim and the `config_base_dir` lookup is
28 /// skipped entirely. Lets callers (notably tests) point the
29 /// executor at any binary by absolute path without enforcing the
30 /// `<dir>/objectiveai` naming convention.
31 explicit_path: Option<PathBuf>,
32 /// Extra environment variables to set on the spawned child. Stacks
33 /// on top of the parent's environment (the child inherits the rest
34 /// — this map only overrides). Set via [`Self::env`].
35 extra_env: Vec<(String, String)>,
36 /// When `true`, the spawned child is sent a kill signal whenever
37 /// the [`tokio::process::Child`] held in the stream state is
38 /// dropped. Defaults to `false` (the SDK's long-standing
39 /// behaviour: drop-without-kill lets the child finish its work
40 /// orphaned). Set via [`Self::kill_on_drop`]. Test wrappers that
41 /// want to abort hung children (e.g. cli-test
42 /// `HangPreventingBinaryCommandExecutor`) flip this on.
43 kill_on_drop: bool,
44 /// When `true`, spawn the child detached from the parent's console
45 /// / process group so it survives the parent process exiting.
46 /// Unix already does this automatically when the parent dies
47 /// (kernel re-parents to init). Windows needs explicit
48 /// `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP` creation flags;
49 /// otherwise the child's inherited console closes with the parent
50 /// and the child gets a `CTRL_CLOSE_EVENT`. Defaults to `false`.
51 detach: bool,
52}
53
54impl BinaryExecutor {
55 pub fn new(config_base_dir: Option<impl Into<PathBuf>>) -> Self {
56 Self {
57 config_base_dir: config_base_dir.map(Into::into),
58 explicit_path: None,
59 extra_env: Vec::new(),
60 kill_on_drop: false,
61 detach: false,
62 }
63 }
64
65 /// Construct an executor that spawns the binary at `binary`
66 /// directly, regardless of file name. Skips the `objectiveai`
67 /// name lookup so tests can target a `target/debug/objectiveai-cli`
68 /// build without renaming or symlinking.
69 pub fn from_path(binary: impl Into<PathBuf>) -> Self {
70 Self {
71 config_base_dir: None,
72 explicit_path: Some(binary.into()),
73 extra_env: Vec::new(),
74 kill_on_drop: false,
75 detach: false,
76 }
77 }
78
79 /// Set an environment variable on every child the executor spawns.
80 /// Stacks on top of the parent's env; intended for tests that need
81 /// to pin a per-instance `CONFIG_BASE_DIR` or `OBJECTIVEAI_ADDRESS`
82 /// without racing other parallel tests via `std::env::set_var`.
83 pub fn env(
84 mut self,
85 key: impl Into<String>,
86 value: impl Into<String>,
87 ) -> Self {
88 self.extra_env.push((key.into(), value.into()));
89 self
90 }
91
92 /// When `true`, the spawned [`tokio::process::Child`] held in the
93 /// stream state is sent a kill signal when the stream is dropped.
94 /// Defaults to `false`. Wrappers that detect hangs (e.g. the
95 /// cli-test `HangPreventingBinaryCommandExecutor`) toggle this on
96 /// so dropping the inner stream tears the cli child down.
97 pub fn kill_on_drop(mut self, on: bool) -> Self {
98 self.kill_on_drop = on;
99 self
100 }
101
102 /// When `true`, the child is spawned detached from the parent's
103 /// console / process group so it survives parent exit. Pairs
104 /// naturally with `kill_on_drop = false` (the default): drop the
105 /// stream + exit the parent, the child keeps running orphaned.
106 ///
107 /// - **Windows**: sets `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP`
108 /// on the `CreateProcessW` call. `DETACHED_PROCESS` is what
109 /// actually makes orphan survival work — without it the child
110 /// inherits the parent's console and receives `CTRL_CLOSE_EVENT`
111 /// when the parent's console window closes. The process-group
112 /// flag is belt-and-suspenders signal isolation.
113 /// - **Unix**: no-op. The kernel automatically re-parents the
114 /// child to init when the parent exits.
115 ///
116 /// Defaults to `false`.
117 pub fn detach(mut self, on: bool) -> Self {
118 self.detach = on;
119 self
120 }
121
122 fn binary_path(&self) -> Result<PathBuf, Error> {
123 if let Some(p) = &self.explicit_path {
124 return Ok(p.clone());
125 }
126 let base = match &self.config_base_dir {
127 Some(d) => d.clone(),
128 None => dirs::home_dir()
129 .ok_or(Error::NoHomeDir)?
130 .join(".objectiveai"),
131 };
132 let name = if cfg!(windows) { "objectiveai.exe" } else { "objectiveai" };
133 Ok(base.join(name))
134 }
135}
136
137#[derive(Debug, thiserror::Error)]
138pub enum Error {
139 /// `dirs::home_dir()` returned `None` and no `config_base_dir` was set.
140 #[error("no home directory and no config_base_dir set")]
141 NoHomeDir,
142 /// Failed to spawn the binary at the resolved path.
143 #[error("failed to spawn cli binary: {0}")]
144 Spawn(std::io::Error),
145 /// Child stdout was unexpectedly absent after spawn.
146 #[error("cli binary child has no stdout handle")]
147 NoStdout,
148 /// Reading stdout line failed.
149 #[error("read cli binary stdout: {0}")]
150 Io(std::io::Error),
151 /// Stdout produced a line that didn't deserialize as either the
152 /// structured `cli::Error` or `T`.
153 #[error("decode cli binary stdout line: {0}")]
154 Json(serde_json::Error),
155 /// Structured error emitted by the cli binary on stdout.
156 #[error("{0}")]
157 Cli(crate::cli::Error),
158 /// `execute_one` was called but the stream produced no items.
159 #[error("cli binary stream produced no items")]
160 Empty,
161}
162
163/// Per-line untagged decode. `Err` is listed first so serde tries it
164/// before `Ok` — `cli::Error`'s `type:"error"` constant short-circuits
165/// every non-error wire shape, then `Ok(T)` is the fallthrough.
166#[derive(serde::Deserialize)]
167#[serde(untagged)]
168enum Line<T> {
169 Err(crate::cli::Error),
170 Ok(T),
171}
172
173impl<T> From<Line<T>> for Result<T, Error> {
174 fn from(line: Line<T>) -> Self {
175 match line {
176 Line::Err(e) => Err(Error::Cli(e)),
177 Line::Ok(t) => Ok(t),
178 }
179 }
180}
181
182impl CommandExecutor for BinaryExecutor {
183 type Error = Error;
184 type Stream<T>
185 = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>
186 where
187 T: Send + 'static;
188
189 async fn execute<R, T>(
190 &self,
191 request: R,
192 agent_arguments: Option<&AgentArguments>,
193 ) -> Result<Self::Stream<T>, Error>
194 where
195 R: CommandRequest + Send,
196 T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
197 {
198 let argv = request.into_command();
199 let binary = self.binary_path()?;
200
201 let mut command = Command::new(&binary);
202 command
203 .args(&argv)
204 .stdin(std::process::Stdio::null())
205 .stdout(std::process::Stdio::piped())
206 .stderr(std::process::Stdio::inherit())
207 .kill_on_drop(self.kill_on_drop);
208 for (k, v) in &self.extra_env {
209 command.env(k, v);
210 }
211 // Windows detach. Required for orphan-survival when the
212 // parent will exit before the child finishes — without
213 // `DETACHED_PROCESS` the child inherits the parent's
214 // console and dies on `CTRL_CLOSE_EVENT` when the
215 // parent's console closes. We've hit this bug twice in
216 // the past; preserve the flag. Unix gets re-parent-to-init
217 // for free.
218 #[cfg(windows)]
219 if self.detach {
220 use std::os::windows::process::CommandExt;
221 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
222 const DETACHED_PROCESS: u32 = 0x0000_0008;
223 command.creation_flags(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS);
224 }
225 // Per-call agent identity override. When `Some`, every field
226 // gets applied atomically: `Some(v)` → set, `None` →
227 // env_remove so the parent's value can't leak through. When
228 // the bag itself is `None`, parent env is inherited
229 // untouched.
230 if let Some(args) = agent_arguments {
231 args.apply_to_command(&mut command);
232 }
233 let mut child = command.spawn().map_err(Error::Spawn)?;
234
235 let stdout = child.stdout.take().ok_or(Error::NoStdout)?;
236 let lines = BufReader::new(stdout).lines();
237
238 // Carry `child` in the unfold state so its handle isn't dropped
239 // mid-stream. tokio's Child defaults to kill_on_drop = false,
240 // so on stream drop the child remains running until it finishes.
241 let stream = futures::stream::unfold(
242 (child, lines),
243 |(child, mut lines)| async move {
244 match lines.next_line().await {
245 Ok(Some(line)) => {
246 let item = match serde_json::from_str::<Line<T>>(&line) {
247 Ok(line) => line.into(),
248 Err(e) => Err(Error::Json(e)),
249 };
250 Some((item, (child, lines)))
251 }
252 Ok(None) => None,
253 Err(e) => Some((Err(Error::Io(e)), (child, lines))),
254 }
255 },
256 );
257
258 Ok(Box::pin(stream))
259 }
260
261 async fn execute_one<R, T>(
262 &self,
263 request: R,
264 agent_arguments: Option<&AgentArguments>,
265 ) -> Result<T, Error>
266 where
267 R: CommandRequest + Send,
268 T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
269 {
270 let mut stream = self.execute::<R, T>(request, agent_arguments).await?;
271 stream.next().await.ok_or(Error::Empty)?
272 }
273}