Skip to main content

objectiveai_sdk/cli/command/command_executor/
binary.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3
4use futures::{Stream, StreamExt};
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::command::{AgentArguments, CommandExecutor, CommandRequest, CommandResponse};
9
10/// Spawn the `objectiveai` cli binary on disk, feed it the argv from
11/// `request.into_command()`, and stream each stdout JSONL line back as
12/// either a typed `T` or a structured [`crate::cli::Error`].
13///
14/// The binary is resolved at `execute` time (not at construction).
15/// Resolution order:
16///
17/// - [`BinaryExecutor::from_path`] — returns the explicit path verbatim
18///   (used by tests pointing at an out-of-tree build).
19/// - [`BinaryExecutor::new`] with `Some(config_base_dir)` —
20///   `<config_base_dir>/objectiveai{.exe}`.
21/// - [`BinaryExecutor::new`] with `None` —
22///   `<home>/.objectiveai/objectiveai{.exe}`.
23pub struct BinaryExecutor {
24    /// Resolves to a directory; the binary inside is always
25    /// `objectiveai{.exe}`. `None` falls back to the home-dir default.
26    config_base_dir: Option<PathBuf>,
27    /// When set, used verbatim and the `config_base_dir` lookup is
28    /// skipped entirely. Lets callers (notably tests) point the
29    /// executor at any binary by absolute path without enforcing the
30    /// `<dir>/objectiveai` naming convention.
31    explicit_path: Option<PathBuf>,
32    /// Extra environment variables to set on the spawned child. Stacks
33    /// on top of the parent's environment (the child inherits the rest
34    /// — this map only overrides). Set via [`Self::env`].
35    extra_env: Vec<(String, String)>,
36}
37
38impl BinaryExecutor {
39    pub fn new(config_base_dir: Option<impl Into<PathBuf>>) -> Self {
40        Self {
41            config_base_dir: config_base_dir.map(Into::into),
42            explicit_path: None,
43            extra_env: Vec::new(),
44        }
45    }
46
47    /// Construct an executor that spawns the binary at `binary`
48    /// directly, regardless of file name. Skips the `objectiveai`
49    /// name lookup so tests can target a `target/debug/objectiveai-cli`
50    /// build without renaming or symlinking.
51    pub fn from_path(binary: impl Into<PathBuf>) -> Self {
52        Self {
53            config_base_dir: None,
54            explicit_path: Some(binary.into()),
55            extra_env: Vec::new(),
56        }
57    }
58
59    /// Set an environment variable on every child the executor spawns.
60    /// Stacks on top of the parent's env; intended for tests that need
61    /// to pin a per-instance `CONFIG_BASE_DIR` or `OBJECTIVEAI_ADDRESS`
62    /// without racing other parallel tests via `std::env::set_var`.
63    pub fn env(
64        mut self,
65        key: impl Into<String>,
66        value: impl Into<String>,
67    ) -> Self {
68        self.extra_env.push((key.into(), value.into()));
69        self
70    }
71
72    fn binary_path(&self) -> Result<PathBuf, Error> {
73        if let Some(p) = &self.explicit_path {
74            return Ok(p.clone());
75        }
76        let base = match &self.config_base_dir {
77            Some(d) => d.clone(),
78            None => dirs::home_dir()
79                .ok_or(Error::NoHomeDir)?
80                .join(".objectiveai"),
81        };
82        let name = if cfg!(windows) { "objectiveai.exe" } else { "objectiveai" };
83        Ok(base.join(name))
84    }
85}
86
87#[derive(Debug, thiserror::Error)]
88pub enum Error {
89    /// `dirs::home_dir()` returned `None` and no `config_base_dir` was set.
90    #[error("no home directory and no config_base_dir set")]
91    NoHomeDir,
92    /// Failed to spawn the binary at the resolved path.
93    #[error("failed to spawn cli binary: {0}")]
94    Spawn(std::io::Error),
95    /// Child stdout was unexpectedly absent after spawn.
96    #[error("cli binary child has no stdout handle")]
97    NoStdout,
98    /// Reading stdout line failed.
99    #[error("read cli binary stdout: {0}")]
100    Io(std::io::Error),
101    /// Stdout produced a line that didn't deserialize as either the
102    /// structured `cli::Error` or `T`.
103    #[error("decode cli binary stdout line: {0}")]
104    Json(serde_json::Error),
105    /// Structured error emitted by the cli binary on stdout.
106    #[error("{0}")]
107    Cli(crate::cli::Error),
108    /// `execute_one` was called but the stream produced no items.
109    #[error("cli binary stream produced no items")]
110    Empty,
111}
112
113/// Per-line untagged decode. `Err` is listed first so serde tries it
114/// before `Ok` — `cli::Error`'s `type:"error"` constant short-circuits
115/// every non-error wire shape, then `Ok(T)` is the fallthrough.
116#[derive(serde::Deserialize)]
117#[serde(untagged)]
118enum Line<T> {
119    Err(crate::cli::Error),
120    Ok(T),
121}
122
123impl<T> From<Line<T>> for Result<T, Error> {
124    fn from(line: Line<T>) -> Self {
125        match line {
126            Line::Err(e) => Err(Error::Cli(e)),
127            Line::Ok(t) => Ok(t),
128        }
129    }
130}
131
132impl CommandExecutor for BinaryExecutor {
133    type Error = Error;
134    type Stream<T>
135        = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>
136    where
137        T: Send + 'static;
138
139    async fn execute<R, T>(
140        &self,
141        request: R,
142        agent_arguments: Option<&AgentArguments>,
143    ) -> Result<Self::Stream<T>, Error>
144    where
145        R: CommandRequest + Send,
146        T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
147    {
148        let argv = request.into_command();
149        let binary = self.binary_path()?;
150
151        let mut command = Command::new(&binary);
152        command
153            .args(&argv)
154            .stdin(std::process::Stdio::null())
155            .stdout(std::process::Stdio::piped())
156            .stderr(std::process::Stdio::inherit());
157        for (k, v) in &self.extra_env {
158            command.env(k, v);
159        }
160        // Per-call agent identity override. When `Some`, every field
161        // gets applied atomically: `Some(v)` → set, `None` →
162        // env_remove so the parent's value can't leak through. When
163        // the bag itself is `None`, parent env is inherited
164        // untouched.
165        if let Some(args) = agent_arguments {
166            args.apply_to_command(&mut command);
167        }
168        let mut child = command.spawn().map_err(Error::Spawn)?;
169
170        let stdout = child.stdout.take().ok_or(Error::NoStdout)?;
171        let lines = BufReader::new(stdout).lines();
172
173        // Carry `child` in the unfold state so its handle isn't dropped
174        // mid-stream. tokio's Child defaults to kill_on_drop = false,
175        // so on stream drop the child remains running until it finishes.
176        let stream = futures::stream::unfold(
177            (child, lines),
178            |(child, mut lines)| async move {
179                match lines.next_line().await {
180                    Ok(Some(line)) => {
181                        let item = match serde_json::from_str::<Line<T>>(&line) {
182                            Ok(line) => line.into(),
183                            Err(e) => Err(Error::Json(e)),
184                        };
185                        Some((item, (child, lines)))
186                    }
187                    Ok(None) => None,
188                    Err(e) => Some((Err(Error::Io(e)), (child, lines))),
189                }
190            },
191        );
192
193        Ok(Box::pin(stream))
194    }
195
196    async fn execute_one<R, T>(
197        &self,
198        request: R,
199        agent_arguments: Option<&AgentArguments>,
200    ) -> Result<T, Error>
201    where
202        R: CommandRequest + Send,
203        T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
204    {
205        let mut stream = self.execute::<R, T>(request, agent_arguments).await?;
206        stream.next().await.ok_or(Error::Empty)?
207    }
208}