objectiveai_sdk/cli/command/command_executor/binary.rs
1use std::path::PathBuf;
2use std::pin::Pin;
3
4use futures::{Stream, StreamExt};
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::command::{AgentArguments, CommandExecutor, CommandRequest, CommandResponse};
9
10/// Spawn the `objectiveai` cli binary on disk, feed it the argv from
11/// `request.into_command()`, and stream each stdout JSONL line back as
12/// either a typed `T` or a structured [`crate::cli::Error`].
13///
14/// The binary is resolved at `execute` time (not at construction).
15/// Resolution order:
16///
17/// - [`BinaryExecutor::from_path`] — returns the explicit path verbatim
18/// (used by tests pointing at an out-of-tree build).
19/// - [`BinaryExecutor::new`] with `Some(objectiveai_dir)` —
20/// `<objectiveai_dir>/bin/objectiveai{.exe}`.
21/// - [`BinaryExecutor::new`] with `None` —
22/// `<home>/.objectiveai/bin/objectiveai{.exe}`.
23pub struct BinaryExecutor {
24 /// The layout root (`OBJECTIVEAI_DIR`); the binary inside is
25 /// always `bin/objectiveai{.exe}`. `None` falls back to the
26 /// home-dir default (`~/.objectiveai`).
27 objectiveai_dir: Option<PathBuf>,
28 /// When set, used verbatim and the `objectiveai_dir` lookup is
29 /// skipped entirely. Lets callers (notably tests) point the
30 /// executor at any binary by absolute path without enforcing the
31 /// `<dir>/objectiveai` naming convention.
32 explicit_path: Option<PathBuf>,
33 /// Extra environment variables to set on the spawned child. Stacks
34 /// on top of the parent's environment (the child inherits the rest
35 /// — this map only overrides). Set via [`Self::env`].
36 extra_env: Vec<(String, String)>,
37 /// When `true`, the spawned child is sent a kill signal whenever
38 /// the [`tokio::process::Child`] held in the stream state is
39 /// dropped. Defaults to `false` (the SDK's long-standing
40 /// behaviour: drop-without-kill lets the child finish its work
41 /// orphaned). Set via [`Self::kill_on_drop`]. Test wrappers that
42 /// want to abort hung children (e.g. cli-test
43 /// `HangPreventingBinaryCommandExecutor`) flip this on.
44 kill_on_drop: bool,
45 /// When `true`, spawn the child detached from the parent's console
46 /// / process group so it survives the parent process exiting.
47 /// Unix already does this automatically when the parent dies
48 /// (kernel re-parents to init). Windows needs explicit
49 /// `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP` creation flags;
50 /// otherwise the child's inherited console closes with the parent
51 /// and the child gets a `CTRL_CLOSE_EVENT`. Defaults to `false`.
52 detach: bool,
53 /// One-shot lockfile claim to hand off to the next spawned child
54 /// ([`crate::lockfile::LockClaim::prepare_transfer`] before the
55 /// spawn, [`crate::lockfile::LockClaim::transfer`] after) — the
56 /// child becomes the sole owner and the lock lives until the
57 /// child exits. Consumed by the first `execute`; interior
58 /// mutability because [`CommandExecutor::execute`] takes `&self`.
59 /// Set via [`Self::transfer_lock`].
60 #[cfg(feature = "lockfile")]
61 transfer_lock: std::sync::Mutex<Option<crate::lockfile::LockClaim>>,
62}
63
64impl BinaryExecutor {
65 pub fn new(objectiveai_dir: Option<impl Into<PathBuf>>) -> Self {
66 Self {
67 objectiveai_dir: objectiveai_dir.map(Into::into),
68 explicit_path: None,
69 extra_env: Vec::new(),
70 kill_on_drop: false,
71 detach: false,
72 #[cfg(feature = "lockfile")]
73 transfer_lock: std::sync::Mutex::new(None),
74 }
75 }
76
77 /// Construct an executor that spawns the binary at `binary`
78 /// directly, regardless of file name. Skips the `objectiveai`
79 /// name lookup so tests can target a `target/debug/objectiveai-cli`
80 /// build without renaming or symlinking.
81 pub fn from_path(binary: impl Into<PathBuf>) -> Self {
82 Self {
83 objectiveai_dir: None,
84 explicit_path: Some(binary.into()),
85 extra_env: Vec::new(),
86 kill_on_drop: false,
87 detach: false,
88 #[cfg(feature = "lockfile")]
89 transfer_lock: std::sync::Mutex::new(None),
90 }
91 }
92
93 /// Hand `claim` off to the next spawned child: ownership of the
94 /// lock transfers into the child process, which keeps it until it
95 /// exits (the parent retains nothing). One-shot — consumed by the
96 /// first `execute`. On spawn failure the claim is released; on
97 /// transfer failure the child is killed best-effort, the claim is
98 /// released, and `execute` returns [`Error::LockTransfer`] —
99 /// either way the lock slot is retryable afterwards.
100 #[cfg(feature = "lockfile")]
101 pub fn transfer_lock(mut self, claim: crate::lockfile::LockClaim) -> Self {
102 self.transfer_lock = std::sync::Mutex::new(Some(claim));
103 self
104 }
105
106 /// Set an environment variable on every child the executor spawns.
107 /// Stacks on top of the parent's env; intended for tests that need
108 /// to pin a per-instance `OBJECTIVEAI_DIR`/`OBJECTIVEAI_STATE` or `OBJECTIVEAI_ADDRESS`
109 /// without racing other parallel tests via `std::env::set_var`.
110 pub fn env(
111 mut self,
112 key: impl Into<String>,
113 value: impl Into<String>,
114 ) -> Self {
115 self.extra_env.push((key.into(), value.into()));
116 self
117 }
118
119 /// When `true`, the spawned [`tokio::process::Child`] held in the
120 /// stream state is sent a kill signal when the stream is dropped.
121 /// Defaults to `false`. Wrappers that detect hangs (e.g. the
122 /// cli-test `HangPreventingBinaryCommandExecutor`) toggle this on
123 /// so dropping the inner stream tears the cli child down.
124 pub fn kill_on_drop(mut self, on: bool) -> Self {
125 self.kill_on_drop = on;
126 self
127 }
128
129 /// When `true`, the child is spawned detached from the parent's
130 /// console / process group so it survives parent exit. Pairs
131 /// naturally with `kill_on_drop = false` (the default): drop the
132 /// stream + exit the parent, the child keeps running orphaned.
133 ///
134 /// - **Windows**: sets `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP`
135 /// on the `CreateProcessW` call. `DETACHED_PROCESS` is what
136 /// actually makes orphan survival work — without it the child
137 /// inherits the parent's console and receives `CTRL_CLOSE_EVENT`
138 /// when the parent's console window closes. The process-group
139 /// flag is belt-and-suspenders signal isolation.
140 /// - **Unix**: no-op. The kernel automatically re-parents the
141 /// child to init when the parent exits.
142 ///
143 /// Defaults to `false`.
144 pub fn detach(mut self, on: bool) -> Self {
145 self.detach = on;
146 self
147 }
148
149 fn binary_path(&self) -> Result<PathBuf, Error> {
150 if let Some(p) = &self.explicit_path {
151 return Ok(p.clone());
152 }
153 let dir = match &self.objectiveai_dir {
154 Some(d) => d.clone(),
155 None => dirs::home_dir()
156 .ok_or(Error::NoHomeDir)?
157 .join(".objectiveai"),
158 };
159 let name = if cfg!(windows) { "objectiveai.exe" } else { "objectiveai" };
160 Ok(dir.join("bin").join(name))
161 }
162}
163
164#[derive(Debug, thiserror::Error)]
165pub enum Error {
166 /// `dirs::home_dir()` returned `None` and no `objectiveai_dir` was set.
167 #[error("no home directory and no objectiveai_dir set")]
168 NoHomeDir,
169 /// Failed to spawn the binary at the resolved path.
170 #[error("failed to spawn cli binary: {0}")]
171 Spawn(std::io::Error),
172 /// Child stdout was unexpectedly absent after spawn.
173 #[error("cli binary child has no stdout handle")]
174 NoStdout,
175 /// Reading stdout line failed.
176 #[error("read cli binary stdout: {0}")]
177 Io(std::io::Error),
178 /// Stdout produced a line that didn't deserialize as either the
179 /// structured `cli::Error` or `T`.
180 #[error("decode cli binary stdout line: {0}")]
181 Json(serde_json::Error),
182 /// Structured error emitted by the cli binary on stdout.
183 #[error("{0}")]
184 Cli(crate::cli::Error),
185 /// `execute_one` was called but the stream produced no items.
186 #[error("cli binary stream produced no items")]
187 Empty,
188 /// Transferring the lockfile claim into the spawned child failed.
189 /// The child was killed best-effort and the claim released.
190 #[cfg(feature = "lockfile")]
191 #[error("transfer lockfile claim into cli binary child: {0}")]
192 LockTransfer(std::io::Error),
193}
194
195/// Per-line untagged decode. `Err` is listed first so serde tries it
196/// before `Ok` — `cli::Error`'s `type:"error"` constant short-circuits
197/// every non-error wire shape, then `Ok(T)` is the fallthrough.
198#[derive(serde::Deserialize)]
199#[serde(untagged)]
200enum Line<T> {
201 Err(crate::cli::Error),
202 Ok(T),
203}
204
205impl<T> From<Line<T>> for Result<T, Error> {
206 fn from(line: Line<T>) -> Self {
207 match line {
208 Line::Err(e) => Err(Error::Cli(e)),
209 Line::Ok(t) => Ok(t),
210 }
211 }
212}
213
214impl CommandExecutor for BinaryExecutor {
215 type Error = Error;
216 type Stream<T>
217 = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>
218 where
219 T: Send + 'static;
220
221 async fn execute<R, T>(
222 &self,
223 request: R,
224 agent_arguments: Option<&AgentArguments>,
225 ) -> Result<Self::Stream<T>, Error>
226 where
227 R: CommandRequest + Send,
228 T: CommandResponse + serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
229 {
230 let argv = request.into_command();
231 let binary = self.binary_path()?;
232
233 let mut command = Command::new(&binary);
234 command
235 .args(&argv)
236 .stdin(std::process::Stdio::null())
237 .stdout(std::process::Stdio::piped())
238 .stderr(std::process::Stdio::inherit())
239 .kill_on_drop(self.kill_on_drop);
240 for (k, v) in &self.extra_env {
241 command.env(k, v);
242 }
243 // Windows detach. Required for orphan-survival when the
244 // parent will exit before the child finishes — without
245 // `DETACHED_PROCESS` the child inherits the parent's
246 // console and dies on `CTRL_CLOSE_EVENT` when the
247 // parent's console closes. We've hit this bug twice in
248 // the past; preserve the flag. Unix gets re-parent-to-init
249 // for free.
250 #[cfg(windows)]
251 if self.detach {
252 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
253 const DETACHED_PROCESS: u32 = 0x0000_0008;
254 command.creation_flags(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS);
255 }
256 // Per-call agent identity override. When `Some`, every field
257 // gets applied atomically: `Some(v)` → set, `None` →
258 // env_remove so the parent's value can't leak through. When
259 // the bag itself is `None`, parent env is inherited
260 // untouched.
261 if let Some(args) = agent_arguments {
262 args.apply_to_command(&mut command);
263 }
264 // Lockfile-claim handoff, step 1 of 2: arm the command so the
265 // child inherits/duplicates the claim's handles at spawn.
266 #[cfg(feature = "lockfile")]
267 let transfer_claim = self
268 .transfer_lock
269 .lock()
270 .expect("transfer_lock mutex poisoned")
271 .take();
272 #[cfg(feature = "lockfile")]
273 if let Some(claim) = transfer_claim.as_ref() {
274 claim.prepare_transfer(&mut command);
275 }
276 let spawned = command.spawn();
277 // Step 2 of 2: complete (or unwind) the handoff. Dropping a
278 // claim does NOT release it (ManuallyDrop), so every failure
279 // path must release explicitly to keep the lock slot
280 // retryable.
281 #[cfg(feature = "lockfile")]
282 let spawned = match spawned {
283 Ok(child) => {
284 if let Some(claim) = transfer_claim {
285 if let Err((claim, e)) = claim.transfer(&child) {
286 let mut child = child;
287 let _ = child.start_kill();
288 let _ = claim.release();
289 return Err(Error::LockTransfer(e));
290 }
291 }
292 Ok(child)
293 }
294 Err(e) => {
295 if let Some(claim) = transfer_claim {
296 let _ = claim.release();
297 }
298 Err(e)
299 }
300 };
301 let mut child = spawned.map_err(Error::Spawn)?;
302
303 let stdout = child.stdout.take().ok_or(Error::NoStdout)?;
304 let lines = BufReader::new(stdout).lines();
305
306 // Carry `child` in the unfold state so its handle isn't dropped
307 // mid-stream. tokio's Child defaults to kill_on_drop = false,
308 // so on stream drop the child remains running until it finishes.
309 let stream = futures::stream::unfold(
310 (child, lines),
311 |(child, mut lines)| async move {
312 match lines.next_line().await {
313 Ok(Some(line)) => {
314 let item = match serde_json::from_str::<Line<T>>(&line) {
315 Ok(line) => line.into(),
316 Err(e) => Err(Error::Json(e)),
317 };
318 Some((item, (child, lines)))
319 }
320 Ok(None) => None,
321 Err(e) => Some((Err(Error::Io(e)), (child, lines))),
322 }
323 },
324 );
325
326 Ok(Box::pin(stream))
327 }
328
329 async fn execute_one<R, T>(
330 &self,
331 request: R,
332 agent_arguments: Option<&AgentArguments>,
333 ) -> Result<T, Error>
334 where
335 R: CommandRequest + Send,
336 T: CommandResponse + serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
337 {
338 let mut stream = self.execute::<R, T>(request, agent_arguments).await?;
339 stream.next().await.ok_or(Error::Empty)?
340 }
341}