use std::collections::HashSet;
use std::pin::Pin;
use futures::{Stream, StreamExt};
use objectiveai_sdk::agent::completions::request::AgentCompletionCreateParams;
use objectiveai_sdk::cli::command::agents::ResponseItem as AgentsResponseItem;
use objectiveai_sdk::cli::command::agents::queue::deliver::{
AgentActiveResponseItem, AgentActiveType, AgentSpawnedResponseItem, AgentSpawnedType,
AllAgentsActive, Request, RequestDangerousAdvanced, ResponseItem, ValueResponseItem,
};
use objectiveai_sdk::cli::command::ResponseItem as RootResponseItem;
use objectiveai_sdk::cli::command::{BinaryExecutor, CommandExecutor};
use crate::context::Context;
use crate::db;
use crate::error::Error;
use crate::lock_file;
type ItemStream = Pin<Box<dyn Stream<Item = Result<ResponseItem, Error>> + Send>>;
type TaggedStream =
Pin<Box<dyn Stream<Item = (usize, Result<ResponseItem, Error>)> + Send>>;
pub async fn execute(ctx: &Context, request: Request) -> Result<ItemStream, Error> {
if request
.dangerous_advanced
.as_ref()
.and_then(|adv| adv.stream_spawns)
== Some(true)
{
execute_streaming(ctx, request).await
} else {
execute_detached(request).await
}
}
async fn execute_detached(request: Request) -> Result<ItemStream, Error> {
let mut child_request = request;
match child_request.dangerous_advanced.as_mut() {
Some(adv) => adv.stream_spawns = Some(true),
None => {
child_request.dangerous_advanced = Some(RequestDangerousAdvanced {
stream_spawns: Some(true),
})
}
}
let exe = std::env::current_exe()
.map_err(|e| Error::Spawn("current_exe".into(), e))?;
let executor = BinaryExecutor::from_path(exe).detach(true);
let mut stream = executor
.execute::<Request, ResponseItem>(child_request, None)
.await
.map_err(|e| Error::Instance(format!(
"self-respawn for agents queue deliver: {e}"
)))?;
let out = async_stream::stream! {
while let Some(item) = stream.next().await {
match item {
Ok(ResponseItem::Value(_)) => {}
Ok(item) => {
let done = matches!(item, ResponseItem::AllAgentsActive(_));
yield Ok(item);
if done {
break;
}
}
Err(e) => yield Err(Error::Instance(format!(
"self-respawn for agents queue deliver: {e}"
))),
}
}
};
Ok(Box::pin(out))
}
async fn execute_streaming(ctx: &Context, _request: Request) -> Result<ItemStream, Error> {
let agents_dir = ctx
.filesystem
.base_dir()
.join("instances")
.join("agents");
std::fs::create_dir_all(&agents_dir)
.map_err(|e| Error::Instance(format!("create agents_dir: {e}")))?;
let caller = ctx.config.agent_instance_hierarchy.clone();
let targets = db::message_queue::list_delivery_targets(&ctx.db, &caller).await?;
let mut hierarchies: Vec<String> = Vec::new();
for target in targets {
if target.agent_instance_hierarchy != caller
&& !hierarchies.contains(&target.agent_instance_hierarchy)
{
hierarchies.push(target.agent_instance_hierarchy);
}
}
let n = hierarchies.len();
let mut select_all = futures::stream::SelectAll::new();
for (idx, hierarchy) in hierarchies.into_iter().enumerate() {
let tagged = deliver_one(ctx.clone(), hierarchy, agents_dir.clone())
.map(move |item| (idx, item));
select_all.push(Box::pin(tagged) as TaggedStream);
}
let out = async_stream::stream! {
if n == 0 {
yield Ok(ResponseItem::AllAgentsActive(AllAgentsActive::AllAgentsActive));
return;
}
let mut seen: HashSet<usize> = HashSet::new();
let mut merged = select_all;
while let Some((idx, item)) = merged.next().await {
let first = seen.insert(idx);
yield item;
if first && seen.len() == n {
yield Ok(ResponseItem::AllAgentsActive(AllAgentsActive::AllAgentsActive));
}
}
};
Ok(Box::pin(out))
}
fn deliver_one(
ctx: Context,
hierarchy: String,
agents_dir: std::path::PathBuf,
) -> impl Stream<Item = Result<ResponseItem, Error>> + Send {
async_stream::stream! {
let lock_path = agents_dir.join(hierarchy.replace('/', "_"));
let Some(claim) = lock_file::try_acquire(&lock_path) else {
yield Ok(ResponseItem::AgentActive(AgentActiveResponseItem {
r#type: AgentActiveType::AgentActive,
agent_instance_hierarchy: hierarchy,
}));
return;
};
let _claim = claim;
let lookup = match db::logs::lookup_session(&ctx.db, &hierarchy).await {
Ok(Some(lookup)) => lookup,
Ok(None) => {
yield Err(Error::Instance(format!(
"no prior session for {hierarchy:?}"
)));
return;
}
Err(e) => {
yield Err(e.into());
return;
}
};
yield Ok(ResponseItem::AgentSpawned(AgentSpawnedResponseItem {
r#type: AgentSpawnedType::AgentSpawned,
agent_instance_hierarchy: hierarchy.clone(),
}));
let params = AgentCompletionCreateParams {
messages: Vec::new(),
provider: None,
agent: lookup.agent,
response_format: None,
seed: None,
stream: Some(true),
continuation: lookup.continuation,
};
let inner = crate::command::agents::spawn::run_multi_pass(
ctx.clone(),
params,
None,
agents_dir.clone(),
);
let mut inner = Box::pin(inner);
while let Some(item) = inner.next().await {
match item {
Ok(spawn_item) => {
yield Ok(ResponseItem::Value(ValueResponseItem {
agent_instance_hierarchy: hierarchy.clone(),
value: Box::new(RootResponseItem::Agents(
AgentsResponseItem::Spawn(spawn_item),
)),
}));
}
Err(e) => yield Err(e),
}
}
}
}
pub mod request_schema {
use objectiveai_sdk::cli::command::agents::queue::deliver as sdk;
use objectiveai_sdk::cli::command::agents::queue::deliver::request_schema::{
Request, Response,
};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(
schemars::schema_for!(sdk::Request),
))
}
}
pub mod response_schema {
use objectiveai_sdk::cli::command::agents::queue::deliver as sdk;
use objectiveai_sdk::cli::command::agents::queue::deliver::response_schema::{
Request, Response,
};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(
schemars::schema_for!(sdk::ResponseItem),
))
}
}