use std::pin::Pin;
use futures::{Stream, StreamExt};
use objectiveai_sdk::cli::command::AgentArguments;
use std::collections::HashMap;
use objectiveai_sdk::cli::command::ResponseItem as RootResponseItem;
use objectiveai_sdk::cli::command::tasks::run::{
Plugin, Request, ResponseItem, SuccessResponseItem, ValueResponseItem,
};
use crate::context::Context;
use crate::db;
use crate::error::Error;
type ItemStream = Pin<Box<dyn Stream<Item = Result<ResponseItem, Error>> + Send>>;
enum TaskEvent {
Item(i64, Result<ResponseItem, Error>),
Done(TaskMeta),
WriterFailed(Error),
}
type TaggedStream = Pin<Box<dyn Stream<Item = TaskEvent> + Send>>;
pub async fn execute(ctx: &Context, request: Request) -> Result<ItemStream, Error> {
let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::<(i64, String)>();
let writer = tokio::spawn(log_writer_loop(ctx.db.clone(), log_rx));
let parent = ctx.config.agent_instance_hierarchy.clone();
let rows = db::tasks::claim_pending(&ctx.db, &parent).await?;
if rows.is_empty() {
return Ok(Box::pin(futures::stream::empty()));
}
let starts = rows.into_iter().map(|row| {
let ctx = ctx.clone();
async move {
let meta = TaskMeta {
run_id: row.run_id,
agent_instance_hierarchy: row.agent_instance_hierarchy.clone(),
name: row.name.clone(),
version: row.version as u64,
plugin: row.plugin.clone().map(|p| Plugin {
owner: p.owner,
repository: p.repository,
version: p.version,
}),
};
let stream_result = run_one(&ctx, row).await;
(meta, stream_result)
}
});
let results = futures::future::join_all(starts).await;
let mut select_all = futures::stream::SelectAll::new();
for (meta, result) in results {
let run_id = meta.run_id;
let done = meta.clone();
match result {
Ok(stream) => {
let tagged = stream
.map(move |r| {
TaskEvent::Item(
run_id,
r.map(|value| {
ResponseItem::Value(ValueResponseItem {
agent_instance_hierarchy: meta
.agent_instance_hierarchy
.clone(),
name: meta.name.clone(),
version: meta.version,
plugin: meta.plugin.clone(),
value: Box::new(value),
})
}),
)
})
.chain(futures::stream::once(async move {
TaskEvent::Done(done)
}));
select_all.push(Box::pin(tagged) as TaggedStream);
}
Err(e) => {
let tagged = futures::stream::once(async move {
TaskEvent::Item(run_id, Err(e))
})
.chain(futures::stream::once(async move {
TaskEvent::Done(done)
}));
select_all.push(Box::pin(tagged) as TaggedStream);
}
}
}
let logged: TaggedStream = Box::pin(async_stream::stream! {
let mut merged = select_all;
while let Some(event) = merged.next().await {
if let TaskEvent::Item(run_id, Ok(envelope)) = &event {
if let Ok(line) = serde_json::to_string(envelope) {
let _ = log_tx.send((*run_id, line));
}
}
yield event;
}
drop(log_tx);
match writer.await {
Ok(Ok(())) => {}
Ok(Err(e)) => yield TaskEvent::WriterFailed(e),
Err(_) => yield TaskEvent::WriterFailed(Error::WriterPanic),
}
});
let stream: ItemStream = if request.stream_all {
Box::pin(logged.filter_map(|event| async move {
match event {
TaskEvent::Item(_, item) => Some(item),
TaskEvent::Done(_) => None,
TaskEvent::WriterFailed(e) => Some(Err(e)),
}
}))
} else {
Box::pin(async_stream::stream! {
let mut last_err: HashMap<i64, bool> = HashMap::new();
let mut inner = logged;
while let Some(event) = inner.next().await {
match event {
TaskEvent::Item(run_id, item) => {
last_err.insert(run_id, item.is_err());
}
TaskEvent::Done(meta) => {
let success =
!last_err.get(&meta.run_id).copied().unwrap_or(false);
yield Ok(ResponseItem::Success(SuccessResponseItem {
agent_instance_hierarchy: meta.agent_instance_hierarchy,
name: meta.name,
version: meta.version,
plugin: meta.plugin,
success,
}));
}
TaskEvent::WriterFailed(e) => yield Err(e),
}
}
})
};
Ok(stream)
}
async fn log_writer_loop(
pool: crate::db::Pool,
mut rx: tokio::sync::mpsc::UnboundedReceiver<(i64, String)>,
) -> Result<(), Error> {
while let Some((run_id, value)) = rx.recv().await {
db::tasks::insert_task_log(&pool, run_id, &value).await?;
}
Ok(())
}
#[derive(Clone)]
struct TaskMeta {
run_id: i64,
agent_instance_hierarchy: String,
name: String,
version: u64,
plugin: Option<Plugin>,
}
type RootStream = Pin<Box<dyn Stream<Item = Result<RootResponseItem, Error>> + Send>>;
async fn run_one(ctx: &Context, row: db::tasks::RunRow) -> Result<RootStream, Error> {
let mut task_ctx = apply_agent_arguments(ctx, &row.agent_arguments);
apply_plugin(&mut task_ctx, row.plugin);
let mut args = vec!["objectiveai-cli".to_string()];
args.extend(row.command);
crate::run(args, Some(task_ctx)).await
}
fn apply_agent_arguments(ctx: &Context, args: &AgentArguments) -> Context {
let mut ctx = ctx.clone();
ctx.config.agent_instance_hierarchy = args
.agent_instance_hierarchy
.clone()
.unwrap_or_else(|| "UNKNOWN".to_string());
ctx.config.agent_id = args.agent_id.clone();
ctx.config.agent_full_id = args.agent_full_id.clone();
ctx.config.agent_remote = args.agent_remote.clone();
ctx.config.response_id = args.response_id.clone();
ctx.config.response_ids = args.response_ids.clone();
ctx.config.mcp_session_id = args.mcp_session_id.clone();
ctx
}
fn apply_plugin(ctx: &mut Context, plugin: Option<crate::plugin_path::PluginPath>) {
match plugin {
Some(p) => {
ctx.config.plugin_owner = Some(p.owner.clone());
ctx.config.plugin_repository = Some(p.repository.clone());
ctx.config.plugin_version = Some(p.version.clone());
ctx.plugin = Some(p);
}
None => {
ctx.config.plugin_owner = None;
ctx.config.plugin_repository = None;
ctx.config.plugin_version = None;
ctx.plugin = None;
}
}
}
pub mod request_schema {
use objectiveai_sdk::cli::command::tasks::run as sdk;
use objectiveai_sdk::cli::command::tasks::run::request_schema::{
Request, Response,
};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(
schemars::schema_for!(sdk::Request),
))
}
}
pub mod response_schema {
use objectiveai_sdk::cli::command::tasks::run as sdk;
use objectiveai_sdk::cli::command::tasks::run::response_schema::{
Request, Response,
};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(
schemars::schema_for!(sdk::ResponseItem),
))
}
}