use std::path::PathBuf;
use std::pin::Pin;
use futures::{Stream, StreamExt};
use objectiveai_sdk::agent::completions::message::{Message, RichContent, UserMessage};
use objectiveai_sdk::agent::completions::request::AgentCompletionCreateParams;
use objectiveai_sdk::cli::command::agents::message::{
EnqueueMode, MessageTarget, Request, RequestDangerousAdvanced, RequestMessage, ResponseItem,
};
use objectiveai_sdk::cli::command::agents::spawn::ResponseItem as SpawnResponseItem;
use objectiveai_sdk::cli::command::{BinaryExecutor, CommandExecutor};
use crate::context::Context;
use crate::db::tags::LookupState;
use crate::error::Error;
use crate::lock_file;
type ItemStream = Pin<Box<dyn Stream<Item = Result<ResponseItem, Error>> + Send>>;
pub async fn execute(ctx: &Context, request: Request) -> Result<ItemStream, Error> {
if let Some(mode) = request.enqueue {
return execute_enqueue(ctx, request.target, request.message, mode).await;
}
let want_stream = request
.dangerous_advanced
.as_ref()
.and_then(|a| a.stream)
.unwrap_or(false);
let agents_dir = ctx
.filesystem
.base_dir()
.join("instances")
.join("agents");
let (hierarchy, _tag) = match resolve_target(ctx, &request).await? {
ResolvedTarget::Hierarchy { hierarchy, tag } => (hierarchy, tag),
ResolvedTarget::EnqueueAgainstTag { id, agent_tag } => {
return Ok(once_item(ResponseItem::Enqueued {
id,
agent_instance_hierarchy: None,
agent_tag: Some(agent_tag),
}));
}
};
let message_content = resolve_message(request.message.clone())?;
if want_stream {
let seed = request.dangerous_advanced.as_ref().and_then(|a| a.seed);
execute_streaming(ctx, hierarchy, message_content, agents_dir, seed).await
} else {
execute_unary(ctx, hierarchy, message_content, agents_dir, request).await
}
}
enum ResolvedTarget {
Hierarchy {
hierarchy: String,
#[allow(dead_code)] tag: Option<String>,
},
EnqueueAgainstTag { id: i64, agent_tag: String },
}
async fn resolve_target(ctx: &Context, request: &Request) -> Result<ResolvedTarget, Error> {
match &request.target {
MessageTarget::Direct {
parent_agent_instance_hierarchy,
agent_instance,
} => {
let parent = parent_agent_instance_hierarchy
.as_deref()
.unwrap_or(&ctx.config.agent_instance_hierarchy);
Ok(ResolvedTarget::Hierarchy {
hierarchy: format!("{parent}/{agent_instance}"),
tag: None,
})
}
MessageTarget::Tag { agent_tag } => {
let state = crate::db::tags::lookup(&ctx.db, agent_tag).await?;
match state {
LookupState::Bound {
agent_instance_hierarchy,
} => Ok(ResolvedTarget::Hierarchy {
hierarchy: agent_instance_hierarchy,
tag: Some(agent_tag.clone()),
}),
LookupState::Grouped { .. } | LookupState::Absent => {
let content = resolve_message(request.message.clone())?;
let id = crate::db::message_queue::enqueue_with_content(
&ctx.db,
None,
Some(agent_tag.clone()),
&ctx.config.agent_instance_hierarchy,
None,
content,
)
.await?;
Ok(ResolvedTarget::EnqueueAgainstTag {
id,
agent_tag: agent_tag.clone(),
})
}
}
}
}
}
async fn execute_streaming(
ctx: &Context,
hierarchy: String,
message_content: RichContent,
agents_dir: PathBuf,
seed: Option<i64>,
) -> Result<ItemStream, Error> {
std::fs::create_dir_all(&agents_dir)
.map_err(|e| Error::Instance(format!("create agents_dir: {e}")))?;
let lock_path = agents_dir.join(hierarchy.replace('/', "_"));
if let Some(claim) = lock_file::try_acquire(&lock_path) {
return Ok(run_spawn_with(ctx, claim, hierarchy, message_content, seed).await?);
}
let queue_id = crate::db::message_queue::enqueue_with_content(
&ctx.db,
Some(hierarchy.clone()),
None,
&ctx.config.agent_instance_hierarchy,
None,
message_content.clone(),
)
.await?;
let pool = ctx.db.clone();
let lock_path_clone = lock_path.clone();
tokio::select! {
delivery = crate::db::message_queue::subscribe_delivered(&pool, queue_id) => {
delivery?;
Ok(once_item(ResponseItem::Delivered))
}
claim = lock_file::wait_acquire(&lock_path_clone) => {
let claim = claim.map_err(|e| Error::Instance(format!(
"lock acquisition: {e}"
)))?;
let _ = crate::db::message_queue::delete_by_id(
&ctx.db,
queue_id,
&ctx.config.agent_instance_hierarchy,
)
.await;
run_spawn_with(ctx, claim, hierarchy, message_content, seed).await
}
}
}
async fn run_spawn_with(
ctx: &Context,
claim: lock_file::LockClaim,
hierarchy: String,
message_content: RichContent,
seed: Option<i64>,
) -> Result<ItemStream, Error> {
let lookup = crate::db::logs::lookup_session(&ctx.db, &hierarchy)
.await?
.ok_or_else(|| {
Error::Instance(format!("no prior session for {hierarchy:?}"))
})?;
let messages = vec![Message::User(UserMessage {
content: message_content,
name: None,
})];
let params = AgentCompletionCreateParams {
messages,
provider: None,
agent: lookup.agent,
response_format: None,
seed,
stream: Some(true),
continuation: lookup.continuation,
};
let agents_dir = ctx
.filesystem
.base_dir()
.join("instances")
.join("agents");
let inner = crate::command::agents::spawn::run_multi_pass(
ctx.clone(),
params,
None,
agents_dir,
);
let stream = async_stream::try_stream! {
let _claim = claim;
let mut inner = Box::pin(inner);
while let Some(item) = inner.next().await {
let ev = item?;
match ev {
SpawnResponseItem::Id(id) => {
yield ResponseItem::Id {
agent_instance_hierarchy: id,
}
}
SpawnResponseItem::Chunk(c) => yield ResponseItem::Chunk(c),
}
}
};
Ok(Box::pin(stream))
}
async fn execute_unary(
ctx: &Context,
hierarchy: String,
message_content: RichContent,
agents_dir: PathBuf,
request: Request,
) -> Result<ItemStream, Error> {
std::fs::create_dir_all(&agents_dir)
.map_err(|e| Error::Instance(format!("create agents_dir: {e}")))?;
let lock_path = agents_dir.join(hierarchy.replace('/', "_"));
loop {
if lock_file::is_held(&lock_path) {
let queue_id = crate::db::message_queue::enqueue_with_content(
&ctx.db,
Some(hierarchy.clone()),
None,
&ctx.config.agent_instance_hierarchy,
None,
message_content.clone(),
)
.await?;
let pool = ctx.db.clone();
let lock_path_clone = lock_path.clone();
let race_outcome = tokio::select! {
delivery = crate::db::message_queue::subscribe_delivered(&pool, queue_id) => {
delivery?;
RaceOutcome::Delivered
}
release = lock_file::wait_release(&lock_path_clone) => {
release.map_err(|e| Error::Instance(format!(
"lock release wait: {e}"
)))?;
RaceOutcome::Released
}
};
match race_outcome {
RaceOutcome::Delivered => {
return Ok(once_item(ResponseItem::Delivered));
}
RaceOutcome::Released => {
let _ = crate::db::message_queue::delete_by_id(
&ctx.db,
queue_id,
&ctx.config.agent_instance_hierarchy,
)
.await;
continue;
}
}
}
return execute_unary_respawn(&hierarchy, &request).await;
}
}
enum RaceOutcome {
Delivered,
Released,
}
async fn execute_unary_respawn(
hierarchy: &str,
original: &Request,
) -> Result<ItemStream, Error> {
let (parent, leaf) = match hierarchy.rsplit_once('/') {
Some((p, l)) => (p.to_string(), l.to_string()),
None => (String::new(), hierarchy.to_string()),
};
let mut child_request = original.clone();
child_request.target = MessageTarget::Direct {
parent_agent_instance_hierarchy: Some(parent),
agent_instance: leaf,
};
match child_request.dangerous_advanced.as_mut() {
Some(adv) => adv.stream = Some(true),
None => {
child_request.dangerous_advanced = Some(RequestDangerousAdvanced {
stream: Some(true),
..Default::default()
})
}
}
let exe = std::env::current_exe()
.map_err(|e| Error::Spawn("current_exe".into(), e))?;
let executor = BinaryExecutor::from_path(exe).detach(true);
let mut stream = executor
.execute::<Request, ResponseItem>(child_request, None)
.await
.map_err(|e| Error::Instance(format!(
"self-respawn for agents message: {e}"
)))?;
let first = stream
.next()
.await
.ok_or(Error::EmptyStream)?
.map_err(|e| Error::Instance(format!(
"self-respawn for agents message: {e}"
)))?;
Ok(once_item(first))
}
fn once_item(item: ResponseItem) -> ItemStream {
Box::pin(futures::stream::once(async move {
Ok::<ResponseItem, Error>(item)
}))
}
async fn execute_enqueue(
ctx: &Context,
target: MessageTarget,
message: RequestMessage,
mode: EnqueueMode,
) -> Result<ItemStream, Error> {
let content = resolve_message(message)?;
let key = match mode {
EnqueueMode::Plain => None,
EnqueueMode::Keyed { key } => Some(key),
};
let (hier, tag) = match target {
MessageTarget::Direct {
parent_agent_instance_hierarchy,
agent_instance,
} => {
let parent = parent_agent_instance_hierarchy
.as_deref()
.unwrap_or(&ctx.config.agent_instance_hierarchy);
(Some(format!("{parent}/{agent_instance}")), None)
}
MessageTarget::Tag { agent_tag } => (None, Some(agent_tag)),
};
let id = crate::db::message_queue::enqueue_with_content(
&ctx.db,
hier.clone(),
tag.clone(),
&ctx.config.agent_instance_hierarchy,
key,
content,
)
.await?;
Ok(once_item(ResponseItem::Enqueued {
id,
agent_instance_hierarchy: hier,
agent_tag: tag,
}))
}
pub fn resolve_message(message: RequestMessage) -> Result<RichContent, Error> {
let (simple, inline, file, python_inline, python_file) = match message {
RequestMessage::Inline(rich) => return Ok(rich),
RequestMessage::Simple(s) => (Some(s), None, None, None, None),
RequestMessage::File(p) => (None, None, Some(p), None, None),
RequestMessage::PythonInline(code) => (None, None, None, Some(code), None),
RequestMessage::PythonFile(p) => (None, None, None, None, Some(p)),
};
crate::source_resolver::resolve_source(
simple,
inline,
file,
python_inline,
python_file,
RichContent::Text,
)
}
pub mod request_schema {
use objectiveai_sdk::cli::command::agents::message as sdk;
use objectiveai_sdk::cli::command::agents::message::request_schema::{Request, Response};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(schemars::schema_for!(sdk::Request)))
}
}
pub mod response_schema {
use objectiveai_sdk::cli::command::agents::message as sdk;
use objectiveai_sdk::cli::command::agents::message::response_schema::{Request, Response};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(schemars::schema_for!(sdk::Response)))
}
}