use std::path::PathBuf;
use std::pin::Pin;
use futures::Stream;
use futures::StreamExt;
use objectiveai_sdk::agent::InlineAgentBaseWithFallbacksOrRemoteCommitOptional;
use objectiveai_sdk::agent::completions::message::{Message, UserMessage};
use objectiveai_sdk::agent::completions::request::AgentCompletionCreateParams;
use objectiveai_sdk::cli::command::agents::spawn::{
AgentResolution, AgentSpec, Request, RequestDangerousAdvanced, ResponseItem,
};
use objectiveai_sdk::cli::command::{BinaryExecutor, CommandExecutor};
use crate::context::Context;
use crate::error::Error;
use crate::websockets::agent_hierarchies::ChunkAgentHierarchies;
use crate::websockets::agent_registry::AgentInstanceRegistry;
type ItemStream = Pin<Box<dyn Stream<Item = Result<ResponseItem, Error>> + Send>>;
pub async fn execute(
ctx: &Context,
request: Request,
) -> Result<ItemStream, Error> {
let want_stream = request
.dangerous_advanced
.as_ref()
.and_then(|a| a.stream)
.unwrap_or(false);
if want_stream {
execute_streaming(ctx, request).await
} else {
execute_detached(request).await
}
}
async fn execute_detached(request: Request) -> Result<ItemStream, Error> {
let mut child_request = request;
match child_request.dangerous_advanced.as_mut() {
Some(adv) => adv.stream = Some(true),
None => {
child_request.dangerous_advanced = Some(RequestDangerousAdvanced {
stream: Some(true),
..Default::default()
})
}
}
let exe = std::env::current_exe()
.map_err(|e| Error::Spawn("current_exe".into(), e))?;
let executor = BinaryExecutor::from_path(exe).detach(true);
let mut stream = executor
.execute::<Request, ResponseItem>(child_request, None)
.await
.map_err(|e| Error::Instance(format!(
"self-respawn for agents spawn: {e}"
)))?;
let first = stream
.next()
.await
.ok_or(Error::EmptyStream)?
.map_err(|e| Error::Instance(format!(
"self-respawn for agents spawn: {e}"
)))?;
Ok(Box::pin(
objectiveai_sdk::cli::command::StreamOnce::new(Ok(first)),
))
}
async fn execute_streaming(
ctx: &Context,
request: Request,
) -> Result<ItemStream, Error> {
let content = super::message::resolve_message(request.message)?;
let messages = vec![Message::User(UserMessage {
content,
name: None,
})];
let (agent_spec, agent_tag) = match request.agent {
AgentResolution::Direct { agent_spec } => (agent_spec, None),
AgentResolution::Tag { agent_tag } => {
match crate::db::tags::lookup(&ctx.db, &agent_tag).await? {
crate::db::tags::LookupState::Bound { agent_instance_hierarchy } => {
return Err(Error::Instance(format!(
"tag {agent_tag:?} is already bound to {agent_instance_hierarchy:?}; \
use `agents message` to deliver to the live spawn"
)));
}
crate::db::tags::LookupState::Grouped {
agent_spec,
..
} => (agent_spec, Some(agent_tag)),
crate::db::tags::LookupState::Absent => {
return Err(Error::TagNotFound(agent_tag));
}
}
}
};
let agent = resolve_agent(ctx, agent_spec).await?;
let agents_dir = ctx
.filesystem
.base_dir()
.join("instances")
.join("agents");
let params = AgentCompletionCreateParams {
messages,
provider: None,
agent,
response_format: None,
seed: request.dangerous_advanced.as_ref().and_then(|a| a.seed),
stream: Some(true),
continuation: None,
};
let ctx_clone = ctx.clone();
Ok(Box::pin(run_multi_pass(ctx_clone, params, agent_tag, agents_dir)))
}
pub(crate) fn run_multi_pass(
ctx: Context,
initial_params: AgentCompletionCreateParams,
agent_tag: Option<String>,
agents_dir: PathBuf,
) -> impl Stream<Item = Result<ResponseItem, Error>> + Send {
async_stream::try_stream! {
let mut registry = AgentInstanceRegistry::new(agents_dir)
.map_err(|e| Error::Instance(format!(
"failed to open agent claim registry: {e}"
)))?;
let mut params = initial_params;
let mut identity: Option<(String, String)> = None;
let mut id_emitted = false;
loop {
let mcp_server =
crate::websockets::mcp_server::spawn(ctx.clone());
let conduit =
crate::websockets::conduit::ConduitMcpHandler::new(
mcp_server,
ctx.clone(),
agent_tag.clone(),
);
let (log_writer, _ready_rx) = crate::db::logs::write_agent_completion(
&ctx.db,
¶ms,
ctx.config.agent_instance_hierarchy.clone(),
)
.map_err(|e| Error::Instance(format!(
"failed to build agent-completion log writer: {e}"
)))?;
let (sdk_stream, notifier) =
objectiveai_sdk::agent::completions::create_agent_completion_streaming(
&ctx.http,
params.clone(),
conduit.clone(),
)
.await
.map_err(|e| Error::Instance(format!(
"failed to open agent-completion stream: {e}"
)))?;
conduit.install_notifier(notifier);
let mut sdk_stream = Box::pin(sdk_stream);
let mut last_continuation: Option<String> = None;
let mut buffered: Vec<
objectiveai_sdk::agent::completions::response::streaming::AgentCompletionChunk,
> = Vec::new();
let mut stream_err: Option<String> = None;
while let Some(item) = sdk_stream.next().await {
let chunk = match item {
Ok(c) => c,
Err(e) => {
stream_err = Some(format!("agent stream item error: {e}"));
break;
}
};
if identity.is_none() {
let hier = chunk.agent_instance_hierarchy.clone();
let full_id = chunk.agent_full_id.clone();
registry.observe(&hier);
identity = Some((hier, full_id));
}
if let Some(c) = chunk.continuation.as_deref() {
last_continuation = Some(c.to_string());
}
let mut continuation_upserts: Vec<_> = Vec::new();
for (hier, continuation) in chunk.agent_instance_hierarchies() {
if let Some(c) = continuation {
continuation_upserts.push(
crate::db::agent_continuations::upsert(&ctx.db, hier, c),
);
}
}
if let Err(e) =
futures::future::try_join_all(continuation_upserts).await
{
stream_err =
Some(format!("agent_continuations upsert: {e}"));
break;
}
if let Err(e) = log_writer.write(chunk.clone()) {
stream_err = Some(format!("log writer error: {e}"));
break;
}
if !id_emitted && log_writer.written_once() {
let (hier, _) = identity
.as_ref()
.expect("identity set above on the first chunk");
yield ResponseItem::Id(hier.clone());
for c in buffered.drain(..) {
yield ResponseItem::Chunk(c);
}
id_emitted = true;
}
if id_emitted {
yield ResponseItem::Chunk(chunk);
} else {
buffered.push(chunk);
}
}
if !id_emitted && !buffered.is_empty() {
if let Err(e) = log_writer.wait_written_once().await {
stream_err.get_or_insert_with(|| format!("log writer wait: {e}"));
} else {
let (hier, _) = identity
.as_ref()
.expect("identity set on the first chunk");
yield ResponseItem::Id(hier.clone());
for c in buffered.drain(..) {
yield ResponseItem::Chunk(c);
}
id_emitted = true;
}
}
if let Err(e) = log_writer.finalize().await {
stream_err.get_or_insert_with(|| format!("log writer finalize: {e}"));
}
drop(sdk_stream);
drop(conduit);
if let Some(e) = stream_err {
Err(Error::Instance(e))?;
}
let Some((hier, _full_id)) = identity.as_ref() else {
break;
};
let pending = crate::db::message_queue::check_any_pending(
&ctx.db, hier,
)
.await
.unwrap_or(false);
if !pending {
break;
}
params.messages = Vec::new();
params.continuation = last_continuation;
}
}
}
async fn resolve_agent(
ctx: &Context,
spec: AgentSpec,
) -> Result<InlineAgentBaseWithFallbacksOrRemoteCommitOptional, Error> {
match spec {
AgentSpec::Resolved(resolved) => Ok(resolved),
AgentSpec::Favorite(name) => {
let mut config = ctx.filesystem.read_config().await?;
let favorites = config.agents().get_favorites();
let fav = favorites
.iter()
.find(|f| f.get_name() == name)
.ok_or_else(|| Error::FavoriteNotFound(name.clone()))?;
Ok(InlineAgentBaseWithFallbacksOrRemoteCommitOptional::Remote(
fav.path.clone(),
))
}
}
}
pub mod request_schema {
use objectiveai_sdk::cli::command::agents::spawn as sdk;
use objectiveai_sdk::cli::command::agents::spawn::request_schema::{Request, Response};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(schemars::schema_for!(sdk::Request)))
}
}
pub mod response_schema {
use objectiveai_sdk::cli::command::agents::spawn as sdk;
use objectiveai_sdk::cli::command::agents::spawn::response_schema::{Request, Response};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(schemars::schema_for!(sdk::Response)))
}
}