objectiveai_sdk/cli/command/command_executor/
binary.rs1use std::path::PathBuf;
2use std::pin::Pin;
3
4use futures::{Stream, StreamExt};
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::command::{AgentArguments, CommandExecutor, CommandRequest, CommandResponse};
9
10pub struct BinaryExecutor {
24 config_base_dir: Option<PathBuf>,
27 explicit_path: Option<PathBuf>,
32 extra_env: Vec<(String, String)>,
36}
37
38impl BinaryExecutor {
39 pub fn new(config_base_dir: Option<impl Into<PathBuf>>) -> Self {
40 Self {
41 config_base_dir: config_base_dir.map(Into::into),
42 explicit_path: None,
43 extra_env: Vec::new(),
44 }
45 }
46
47 pub fn from_path(binary: impl Into<PathBuf>) -> Self {
52 Self {
53 config_base_dir: None,
54 explicit_path: Some(binary.into()),
55 extra_env: Vec::new(),
56 }
57 }
58
59 pub fn env(
64 mut self,
65 key: impl Into<String>,
66 value: impl Into<String>,
67 ) -> Self {
68 self.extra_env.push((key.into(), value.into()));
69 self
70 }
71
72 fn binary_path(&self) -> Result<PathBuf, Error> {
73 if let Some(p) = &self.explicit_path {
74 return Ok(p.clone());
75 }
76 let base = match &self.config_base_dir {
77 Some(d) => d.clone(),
78 None => dirs::home_dir()
79 .ok_or(Error::NoHomeDir)?
80 .join(".objectiveai"),
81 };
82 let name = if cfg!(windows) { "objectiveai.exe" } else { "objectiveai" };
83 Ok(base.join(name))
84 }
85}
86
87#[derive(Debug, thiserror::Error)]
88pub enum Error {
89 #[error("no home directory and no config_base_dir set")]
91 NoHomeDir,
92 #[error("failed to spawn cli binary: {0}")]
94 Spawn(std::io::Error),
95 #[error("cli binary child has no stdout handle")]
97 NoStdout,
98 #[error("read cli binary stdout: {0}")]
100 Io(std::io::Error),
101 #[error("decode cli binary stdout line: {0}")]
104 Json(serde_json::Error),
105 #[error("{0}")]
107 Cli(crate::cli::Error),
108 #[error("cli binary stream produced no items")]
110 Empty,
111}
112
113#[derive(serde::Deserialize)]
117#[serde(untagged)]
118enum Line<T> {
119 Err(crate::cli::Error),
120 Ok(T),
121}
122
123impl<T> From<Line<T>> for Result<T, Error> {
124 fn from(line: Line<T>) -> Self {
125 match line {
126 Line::Err(e) => Err(Error::Cli(e)),
127 Line::Ok(t) => Ok(t),
128 }
129 }
130}
131
132impl CommandExecutor for BinaryExecutor {
133 type Error = Error;
134 type Stream<T>
135 = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>
136 where
137 T: Send + 'static;
138
139 async fn execute<R, T>(
140 &self,
141 request: R,
142 agent_arguments: Option<&AgentArguments>,
143 ) -> Result<Self::Stream<T>, Error>
144 where
145 R: CommandRequest + Send,
146 T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
147 {
148 let argv = request.into_command();
149 let binary = self.binary_path()?;
150
151 let mut command = Command::new(&binary);
152 command
153 .args(&argv)
154 .stdin(std::process::Stdio::null())
155 .stdout(std::process::Stdio::piped())
156 .stderr(std::process::Stdio::inherit());
157 for (k, v) in &self.extra_env {
158 command.env(k, v);
159 }
160 if let Some(args) = agent_arguments {
166 args.apply_to_command(&mut command);
167 }
168 let mut child = command.spawn().map_err(Error::Spawn)?;
169
170 let stdout = child.stdout.take().ok_or(Error::NoStdout)?;
171 let lines = BufReader::new(stdout).lines();
172
173 let stream = futures::stream::unfold(
177 (child, lines),
178 |(child, mut lines)| async move {
179 match lines.next_line().await {
180 Ok(Some(line)) => {
181 let item = match serde_json::from_str::<Line<T>>(&line) {
182 Ok(line) => line.into(),
183 Err(e) => Err(Error::Json(e)),
184 };
185 Some((item, (child, lines)))
186 }
187 Ok(None) => None,
188 Err(e) => Some((Err(Error::Io(e)), (child, lines))),
189 }
190 },
191 );
192
193 Ok(Box::pin(stream))
194 }
195
196 async fn execute_one<R, T>(
197 &self,
198 request: R,
199 agent_arguments: Option<&AgentArguments>,
200 ) -> Result<T, Error>
201 where
202 R: CommandRequest + Send,
203 T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
204 {
205 let mut stream = self.execute::<R, T>(request, agent_arguments).await?;
206 stream.next().await.ok_or(Error::Empty)?
207 }
208}