use std::pin::Pin;
use futures::{Stream, StreamExt};
use objectiveai_sdk::cli::command::{
AgentArguments, CommandExecutor, CommandRequest, CommandResponse, parse_request,
};
use serde_json::Value;
use crate::context::Context;
use crate::error::Error;
pub struct CliCommandExecutor {
ctx: Context,
}
impl CliCommandExecutor {
pub fn new(ctx: Context) -> Self {
Self { ctx }
}
}
fn extract_leaf<T: serde::de::DeserializeOwned>(value: Value) -> Result<T, serde_json::Error> {
let mut current = value;
loop {
match serde_json::from_value::<T>(current.clone()) {
Ok(t) => return Ok(t),
Err(_) => match current {
Value::Object(map) if map.len() == 1 => {
current = map.into_iter().next().unwrap().1;
}
other => return serde_json::from_value::<T>(other),
},
}
}
}
impl CliCommandExecutor {
fn resolve_ctx<'a>(
&'a self,
agent_arguments: Option<&AgentArguments>,
) -> std::borrow::Cow<'a, Context> {
match agent_arguments {
None => std::borrow::Cow::Borrowed(&self.ctx),
Some(args) => {
let mut ctx = self.ctx.clone();
ctx.config.agent_instance_hierarchy = args
.agent_instance_hierarchy
.clone()
.unwrap_or_else(|| "UNKNOWN".to_string());
ctx.config.agent_id = args.agent_id.clone();
ctx.config.agent_full_id = args.agent_full_id.clone();
ctx.config.agent_remote = args.agent_remote.clone();
ctx.config.response_id = args.response_id.clone();
ctx.config.response_ids = args.response_ids.clone();
ctx.config.mcp_session_id = args.mcp_session_id.clone();
std::borrow::Cow::Owned(ctx)
}
}
}
}
impl CommandExecutor for CliCommandExecutor {
type Error = Error;
type Stream<T>
= Pin<Box<dyn Stream<Item = Result<T, Self::Error>> + Send>>
where
T: Send + 'static;
async fn execute<R, T>(
&self,
request: R,
agent_arguments: Option<&AgentArguments>,
) -> Result<Self::Stream<T>, Self::Error>
where
R: CommandRequest + Send,
T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
{
let argv = request.into_command();
let sdk_request = parse_request(&argv).map_err(|e| match e {
objectiveai_sdk::cli::command::ParseError::Clap(e) => Error::ClapParse(e),
objectiveai_sdk::cli::command::ParseError::FromArgs(e) => Error::FromArgs(e),
})?;
let ctx = self.resolve_ctx(agent_arguments);
let stream = crate::command::command::execute(&ctx, sdk_request).await?;
let mapped = stream.map(|r| {
r.and_then(|item| {
let value = serde_json::to_value(item).map_err(Error::InlineJson)?;
extract_leaf::<T>(value).map_err(Error::InlineJson)
})
});
Ok(Box::pin(mapped))
}
async fn execute_one<R, T>(
&self,
request: R,
agent_arguments: Option<&AgentArguments>,
) -> Result<T, Self::Error>
where
R: CommandRequest + Send,
T: CommandResponse + serde::de::DeserializeOwned + Send + 'static,
{
let mut stream: Self::Stream<T> =
self.execute::<R, T>(request, agent_arguments).await?;
match stream.next().await {
Some(item) => item,
None => Err(Error::EmptyStream),
}
}
}