use std::path::PathBuf;
use std::pin::Pin;
use futures::{Stream, StreamExt};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use crate::cli::command::{AgentArguments, CommandExecutor, CommandRequest, CommandResponse};
pub struct BinaryExecutor {
objectiveai_dir: Option<PathBuf>,
explicit_path: Option<PathBuf>,
extra_env: Vec<(String, String)>,
kill_on_drop: bool,
detach: bool,
#[cfg(feature = "lockfile")]
transfer_lock: std::sync::Mutex<Option<crate::lockfile::LockClaim>>,
}
impl BinaryExecutor {
pub fn new(objectiveai_dir: Option<impl Into<PathBuf>>) -> Self {
Self {
objectiveai_dir: objectiveai_dir.map(Into::into),
explicit_path: None,
extra_env: Vec::new(),
kill_on_drop: false,
detach: false,
#[cfg(feature = "lockfile")]
transfer_lock: std::sync::Mutex::new(None),
}
}
pub fn from_path(binary: impl Into<PathBuf>) -> Self {
Self {
objectiveai_dir: None,
explicit_path: Some(binary.into()),
extra_env: Vec::new(),
kill_on_drop: false,
detach: false,
#[cfg(feature = "lockfile")]
transfer_lock: std::sync::Mutex::new(None),
}
}
#[cfg(feature = "lockfile")]
pub fn transfer_lock(mut self, claim: crate::lockfile::LockClaim) -> Self {
self.transfer_lock = std::sync::Mutex::new(Some(claim));
self
}
pub fn env(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.extra_env.push((key.into(), value.into()));
self
}
pub fn kill_on_drop(mut self, on: bool) -> Self {
self.kill_on_drop = on;
self
}
pub fn detach(mut self, on: bool) -> Self {
self.detach = on;
self
}
fn binary_path(&self) -> Result<PathBuf, Error> {
if let Some(p) = &self.explicit_path {
return Ok(p.clone());
}
let dir = match &self.objectiveai_dir {
Some(d) => d.clone(),
None => dirs::home_dir()
.ok_or(Error::NoHomeDir)?
.join(".objectiveai"),
};
let name = if cfg!(windows) { "objectiveai.exe" } else { "objectiveai" };
Ok(dir.join("bin").join(name))
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("no home directory and no objectiveai_dir set")]
NoHomeDir,
#[error("failed to spawn cli binary: {0}")]
Spawn(std::io::Error),
#[error("cli binary child has no stdout handle")]
NoStdout,
#[error("read cli binary stdout: {0}")]
Io(std::io::Error),
#[error("decode cli binary stdout line: {0}")]
Json(serde_json::Error),
#[error("{0}")]
Cli(crate::cli::Error),
#[error("cli binary stream produced no items")]
Empty,
#[cfg(feature = "lockfile")]
#[error("transfer lockfile claim into cli binary child: {0}")]
LockTransfer(std::io::Error),
}
#[derive(serde::Deserialize)]
#[serde(untagged)]
enum Line<T> {
Err(crate::cli::Error),
Ok(T),
}
impl<T> From<Line<T>> for Result<T, Error> {
fn from(line: Line<T>) -> Self {
match line {
Line::Err(e) => Err(Error::Cli(e)),
Line::Ok(t) => Ok(t),
}
}
}
impl CommandExecutor for BinaryExecutor {
type Error = Error;
type Stream<T>
= Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>
where
T: Send + 'static;
async fn execute<R, T>(
&self,
request: R,
agent_arguments: Option<&AgentArguments>,
) -> Result<Self::Stream<T>, Error>
where
R: CommandRequest + Send,
T: CommandResponse + serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
{
let argv = request.into_command();
let binary = self.binary_path()?;
let mut command = Command::new(&binary);
command
.args(&argv)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.kill_on_drop(self.kill_on_drop);
for (k, v) in &self.extra_env {
command.env(k, v);
}
#[cfg(windows)]
if self.detach {
const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
const DETACHED_PROCESS: u32 = 0x0000_0008;
command.creation_flags(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS);
}
if let Some(args) = agent_arguments {
args.apply_to_command(&mut command);
}
#[cfg(feature = "lockfile")]
let transfer_claim = self
.transfer_lock
.lock()
.expect("transfer_lock mutex poisoned")
.take();
#[cfg(feature = "lockfile")]
if let Some(claim) = transfer_claim.as_ref() {
claim.prepare_transfer(&mut command);
}
let spawned = command.spawn();
#[cfg(feature = "lockfile")]
let spawned = match spawned {
Ok(child) => {
if let Some(claim) = transfer_claim {
if let Err((claim, e)) = claim.transfer(&child) {
let mut child = child;
let _ = child.start_kill();
let _ = claim.release();
return Err(Error::LockTransfer(e));
}
}
Ok(child)
}
Err(e) => {
if let Some(claim) = transfer_claim {
let _ = claim.release();
}
Err(e)
}
};
let mut child = spawned.map_err(Error::Spawn)?;
let stdout = child.stdout.take().ok_or(Error::NoStdout)?;
let lines = BufReader::new(stdout).lines();
let stream = futures::stream::unfold(
(child, lines),
|(child, mut lines)| async move {
match lines.next_line().await {
Ok(Some(line)) => {
let item = match serde_json::from_str::<Line<T>>(&line) {
Ok(line) => line.into(),
Err(e) => Err(Error::Json(e)),
};
Some((item, (child, lines)))
}
Ok(None) => None,
Err(e) => Some((Err(Error::Io(e)), (child, lines))),
}
},
);
Ok(Box::pin(stream))
}
async fn execute_one<R, T>(
&self,
request: R,
agent_arguments: Option<&AgentArguments>,
) -> Result<T, Error>
where
R: CommandRequest + Send,
T: CommandResponse + serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
{
let mut stream = self.execute::<R, T>(request, agent_arguments).await?;
stream.next().await.ok_or(Error::Empty)?
}
}