objectiveai_sdk/cli/command/command_executor/binary.rs
1use std::path::PathBuf;
2use std::pin::Pin;
3
4use futures::{Stream, StreamExt};
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::command::{AgentArguments, CommandExecutor, CommandRequest, CommandResponse};
9
10/// Spawn the `objectiveai` cli binary on disk, feed it the argv from
11/// `request.into_command()`, and stream each stdout JSONL line back as
12/// either a typed `T` or a structured [`crate::cli::Error`].
13///
14/// The binary is resolved at `execute` time (not at construction).
15/// Resolution order:
16///
17/// - [`BinaryExecutor::from_path`] — returns the explicit path verbatim
18/// (used by tests pointing at an out-of-tree build).
19/// - [`BinaryExecutor::new`] with `Some(objectiveai_dir)` —
20/// `<objectiveai_dir>/bin/objectiveai{.exe}`.
21/// - [`BinaryExecutor::new`] with `None` —
22/// `<home>/.objectiveai/bin/objectiveai{.exe}`.
23pub struct BinaryExecutor {
24 /// The layout root (`OBJECTIVEAI_DIR`); the binary inside is
25 /// always `bin/objectiveai{.exe}`. `None` falls back to the
26 /// home-dir default (`~/.objectiveai`).
27 objectiveai_dir: Option<PathBuf>,
28 /// When set, used verbatim and the `objectiveai_dir` lookup is
29 /// skipped entirely. Lets callers (notably tests) point the
30 /// executor at any binary by absolute path without enforcing the
31 /// `<dir>/objectiveai` naming convention.
32 explicit_path: Option<PathBuf>,
33 /// Extra environment variables to set on the spawned child. Stacks
34 /// on top of the parent's environment (the child inherits the rest
35 /// — this map only overrides). Set via [`Self::env`].
36 extra_env: Vec<(String, String)>,
37 /// When `true`, the spawned child is sent a kill signal whenever
38 /// the [`tokio::process::Child`] held in the stream state is
39 /// dropped. Defaults to `false` (the SDK's long-standing
40 /// behaviour: drop-without-kill lets the child finish its work
41 /// orphaned). Set via [`Self::kill_on_drop`]. Test wrappers that
42 /// want to abort hung children (e.g. cli-test
43 /// `HangPreventingBinaryCommandExecutor`) flip this on.
44 kill_on_drop: bool,
45 /// When `true`, spawn the child detached from the parent's console
46 /// / process group so it survives the parent process exiting.
47 /// Unix already does this automatically when the parent dies
48 /// (kernel re-parents to init). Windows needs explicit
49 /// `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP` creation flags;
50 /// otherwise the child's inherited console closes with the parent
51 /// and the child gets a `CTRL_CLOSE_EVENT`. Defaults to `false`.
52 detach: bool,
53 /// One-shot lockfile claim to hand off to the next spawned child
54 /// ([`crate::lockfile::LockClaim::prepare_transfer`] before the
55 /// spawn, [`crate::lockfile::LockClaim::transfer`] after) — the
56 /// child becomes the sole owner and the lock lives until the
57 /// child exits. Consumed by the first `execute`; interior
58 /// mutability because [`CommandExecutor::execute`] takes `&self`.
59 /// Set via [`Self::transfer_lock`].
60 #[cfg(feature = "lockfile")]
61 transfer_lock: std::sync::Mutex<Option<crate::lockfile::LockClaim>>,
62}
63
64impl BinaryExecutor {
65 pub fn new(objectiveai_dir: Option<impl Into<PathBuf>>) -> Self {
66 Self {
67 objectiveai_dir: objectiveai_dir.map(Into::into),
68 explicit_path: None,
69 extra_env: Vec::new(),
70 kill_on_drop: false,
71 detach: false,
72 #[cfg(feature = "lockfile")]
73 transfer_lock: std::sync::Mutex::new(None),
74 }
75 }
76
77 /// Construct an executor that spawns the binary at `binary`
78 /// directly, regardless of file name. Skips the `objectiveai`
79 /// name lookup so tests can target a `target/debug/objectiveai-cli`
80 /// build without renaming or symlinking.
81 pub fn from_path(binary: impl Into<PathBuf>) -> Self {
82 Self {
83 objectiveai_dir: None,
84 explicit_path: Some(binary.into()),
85 extra_env: Vec::new(),
86 kill_on_drop: false,
87 detach: false,
88 #[cfg(feature = "lockfile")]
89 transfer_lock: std::sync::Mutex::new(None),
90 }
91 }
92
93 /// Hand `claim` off to the next spawned child: ownership of the
94 /// lock transfers into the child process, which keeps it until it
95 /// exits (the parent retains nothing). One-shot — consumed by the
96 /// first `execute`. On spawn failure the claim is released; on
97 /// transfer failure the child is killed best-effort, the claim is
98 /// released, and `execute` returns [`Error::LockTransfer`] —
99 /// either way the lock slot is retryable afterwards.
100 #[cfg(feature = "lockfile")]
101 pub fn transfer_lock(mut self, claim: crate::lockfile::LockClaim) -> Self {
102 self.transfer_lock = std::sync::Mutex::new(Some(claim));
103 self
104 }
105
106 /// Set an environment variable on every child the executor spawns.
107 /// Stacks on top of the parent's env; intended for tests that need
108 /// to pin a per-instance `OBJECTIVEAI_DIR`/`OBJECTIVEAI_STATE` or `OBJECTIVEAI_ADDRESS`
109 /// without racing other parallel tests via `std::env::set_var`.
110 pub fn env(
111 mut self,
112 key: impl Into<String>,
113 value: impl Into<String>,
114 ) -> Self {
115 self.extra_env.push((key.into(), value.into()));
116 self
117 }
118
119 /// When `true`, the spawned [`tokio::process::Child`] held in the
120 /// stream state is sent a kill signal when the stream is dropped.
121 /// Defaults to `false`. Wrappers that detect hangs (e.g. the
122 /// cli-test `HangPreventingBinaryCommandExecutor`) toggle this on
123 /// so dropping the inner stream tears the cli child down.
124 pub fn kill_on_drop(mut self, on: bool) -> Self {
125 self.kill_on_drop = on;
126 self
127 }
128
129 /// When `true`, the child is spawned detached from the parent's
130 /// console / process group so it survives parent exit. Pairs
131 /// naturally with `kill_on_drop = false` (the default): drop the
132 /// stream + exit the parent, the child keeps running orphaned.
133 ///
134 /// - **Windows**: sets `DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP`
135 /// on the `CreateProcessW` call. `DETACHED_PROCESS` is what
136 /// actually makes orphan survival work — without it the child
137 /// inherits the parent's console and receives `CTRL_CLOSE_EVENT`
138 /// when the parent's console window closes. The process-group
139 /// flag is belt-and-suspenders signal isolation.
140 /// - **Unix**: no-op. The kernel automatically re-parents the
141 /// child to init when the parent exits.
142 ///
143 /// Defaults to `false`.
144 pub fn detach(mut self, on: bool) -> Self {
145 self.detach = on;
146 self
147 }
148
149 fn binary_path(&self) -> Result<PathBuf, Error> {
150 if let Some(p) = &self.explicit_path {
151 return Ok(p.clone());
152 }
153 let dir = match &self.objectiveai_dir {
154 Some(d) => d.clone(),
155 None => dirs::home_dir()
156 .ok_or(Error::NoHomeDir)?
157 .join(".objectiveai"),
158 };
159 let name = if cfg!(windows) { "objectiveai.exe" } else { "objectiveai" };
160 Ok(dir.join("bin").join(name))
161 }
162}
163
164#[derive(Debug, thiserror::Error)]
165pub enum Error {
166 /// `dirs::home_dir()` returned `None` and no `objectiveai_dir` was set.
167 #[error("no home directory and no objectiveai_dir set")]
168 NoHomeDir,
169 /// Failed to spawn the binary at the resolved path.
170 #[error("failed to spawn cli binary: {0}")]
171 Spawn(std::io::Error),
172 /// Child stdout was unexpectedly absent after spawn.
173 #[error("cli binary child has no stdout handle")]
174 NoStdout,
175 /// Reading stdout line failed.
176 #[error("read cli binary stdout: {0}")]
177 Io(std::io::Error),
178 /// Stdout produced a line that didn't deserialize as either the
179 /// structured `cli::Error` or `T`.
180 #[error("decode cli binary stdout line: {0}")]
181 Json(serde_json::Error),
182 /// Structured error emitted by the cli binary on stdout.
183 #[error("{0}")]
184 Cli(crate::cli::Error),
185 /// `execute_one` was called but the stream produced no items.
186 #[error("cli binary stream produced no items")]
187 Empty,
188 /// Transferring the lockfile claim into the spawned child failed.
189 /// The child was killed best-effort and the claim released.
190 #[cfg(feature = "lockfile")]
191 #[error("transfer lockfile claim into cli binary child: {0}")]
192 LockTransfer(std::io::Error),
193}
194
195/// Per-line untagged decode. `Err` is listed first so serde tries it
196/// before `Ok` — `cli::Error`'s `type:"error"` constant short-circuits
197/// every non-error wire shape, then `Ok(T)` is the fallthrough.
198#[derive(serde::Deserialize)]
199#[serde(untagged)]
200enum Line<T> {
201 Err(crate::cli::Error),
202 Ok(T),
203}
204
205impl<T> From<Line<T>> for Result<T, Error> {
206 fn from(line: Line<T>) -> Self {
207 match line {
208 Line::Err(e) => Err(Error::Cli(e)),
209 Line::Ok(t) => Ok(t),
210 }
211 }
212}
213
214impl CommandExecutor for BinaryExecutor {
215 type Error = Error;
216 type Stream<T>
217 = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>
218 where
219 T: Send + 'static;
220
221 async fn execute<R, T>(
222 &self,
223 request: R,
224 agent_arguments: Option<&AgentArguments>,
225 ) -> Result<Self::Stream<T>, Error>
226 where
227 R: CommandRequest + Send + serde::Serialize,
228 T: CommandResponse + serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
229 {
230 // Dispatch the typed request as JSON via the cli's top-level
231 // `--request` flag — request -> argv lowering no longer exists.
232 let argv = vec![
233 "--request".to_string(),
234 serde_json::to_string(&request).map_err(Error::Json)?,
235 ];
236 let binary = self.binary_path()?;
237
238 let mut command = Command::new(&binary);
239 command
240 .args(&argv)
241 .stdin(std::process::Stdio::null())
242 .stdout(std::process::Stdio::piped())
243 .stderr(std::process::Stdio::inherit())
244 .kill_on_drop(self.kill_on_drop);
245 for (k, v) in &self.extra_env {
246 command.env(k, v);
247 }
248 // Windows detach. Required for orphan-survival when the
249 // parent will exit before the child finishes — without
250 // `DETACHED_PROCESS` the child inherits the parent's
251 // console and dies on `CTRL_CLOSE_EVENT` when the
252 // parent's console closes. We've hit this bug twice in
253 // the past; preserve the flag. Unix gets re-parent-to-init
254 // for free.
255 #[cfg(windows)]
256 if self.detach {
257 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
258 const DETACHED_PROCESS: u32 = 0x0000_0008;
259 command.creation_flags(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS);
260 }
261 // Per-call agent identity override. When `Some`, every field
262 // gets applied atomically: `Some(v)` → set, `None` →
263 // env_remove so the parent's value can't leak through. When
264 // the bag itself is `None`, parent env is inherited
265 // untouched.
266 if let Some(args) = agent_arguments {
267 args.apply_to_command(&mut command);
268 }
269 // Lockfile-claim handoff, step 1 of 2: arm the command so the
270 // child inherits/duplicates the claim's handles at spawn.
271 #[cfg(feature = "lockfile")]
272 let transfer_claim = self
273 .transfer_lock
274 .lock()
275 .expect("transfer_lock mutex poisoned")
276 .take();
277 #[cfg(feature = "lockfile")]
278 if let Some(claim) = transfer_claim.as_ref() {
279 claim.prepare_transfer(&mut command);
280 }
281 let spawned = command.spawn();
282 // Step 2 of 2: complete (or unwind) the handoff. Dropping a
283 // claim does NOT release it (ManuallyDrop), so every failure
284 // path must release explicitly to keep the lock slot
285 // retryable.
286 #[cfg(feature = "lockfile")]
287 let spawned = match spawned {
288 Ok(child) => {
289 if let Some(claim) = transfer_claim {
290 if let Err((claim, e)) = claim.transfer(&child) {
291 let mut child = child;
292 let _ = child.start_kill();
293 let _ = claim.release();
294 return Err(Error::LockTransfer(e));
295 }
296 }
297 Ok(child)
298 }
299 Err(e) => {
300 if let Some(claim) = transfer_claim {
301 let _ = claim.release();
302 }
303 Err(e)
304 }
305 };
306 let mut child = spawned.map_err(Error::Spawn)?;
307
308 let stdout = child.stdout.take().ok_or(Error::NoStdout)?;
309 let lines = BufReader::new(stdout).lines();
310
311 // Carry `child` in the unfold state so its handle isn't dropped
312 // mid-stream. tokio's Child defaults to kill_on_drop = false,
313 // so on stream drop the child remains running until it finishes.
314 let stream = futures::stream::unfold(
315 (child, lines),
316 |(child, mut lines)| async move {
317 match lines.next_line().await {
318 Ok(Some(line)) => {
319 let item = match serde_json::from_str::<Line<T>>(&line) {
320 Ok(line) => line.into(),
321 Err(e) => Err(Error::Json(e)),
322 };
323 Some((item, (child, lines)))
324 }
325 Ok(None) => None,
326 Err(e) => Some((Err(Error::Io(e)), (child, lines))),
327 }
328 },
329 );
330
331 Ok(Box::pin(stream))
332 }
333
334 async fn execute_one<R, T>(
335 &self,
336 request: R,
337 agent_arguments: Option<&AgentArguments>,
338 ) -> Result<T, Error>
339 where
340 R: CommandRequest + Send + serde::Serialize,
341 T: CommandResponse + serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
342 {
343 let mut stream = self.execute::<R, T>(request, agent_arguments).await?;
344 stream.next().await.ok_or(Error::Empty)?
345 }
346}