use std::path::PathBuf;
use futures::Stream;
use futures::StreamExt;
use objectiveai_sdk::functions::executions::request::FunctionExecutionCreateParams;
use objectiveai_sdk::functions::executions::response::streaming::FunctionExecutionChunk;
use crate::context::Context;
use crate::error::Error;
use crate::websockets::agent_hierarchies::ChunkAgentHierarchies;
use crate::websockets::agent_registry::AgentInstanceRegistry;
pub enum Event {
Id(String),
Chunk(FunctionExecutionChunk),
}
pub fn run(
ctx: Context,
params: FunctionExecutionCreateParams,
agents_dir: PathBuf,
) -> impl Stream<Item = Result<Event, Error>> + Send {
async_stream::try_stream! {
let mut registry = AgentInstanceRegistry::new(agents_dir)
.map_err(|e| Error::Instance(format!(
"failed to open agent claim registry: {e}"
)))?;
let mcp_server = crate::websockets::mcp_server::spawn(ctx.clone());
let conduit = crate::websockets::conduit::ConduitMcpHandler::new(
mcp_server,
ctx.clone(),
None,
);
let (log_writer, log_ready_rx) = crate::db::logs::write_function_execution(
&ctx.db,
¶ms,
ctx.config.agent_instance_hierarchy.clone(),
)
.map_err(|e| Error::Instance(format!(
"failed to build function-execution log writer: {e}"
)))?;
let mut log_ready_rx = Some(log_ready_rx);
let (sdk_stream, notifier) =
objectiveai_sdk::functions::executions::create_function_execution_streaming(
&ctx.http,
params,
conduit.clone(),
)
.await
.map_err(|e| Error::Instance(format!(
"failed to open function-execution stream: {e}"
)))?;
conduit.install_notifier(notifier);
let mut sdk_stream = Box::pin(sdk_stream);
let mut buffered: Vec<FunctionExecutionChunk> = Vec::new();
let mut id_emitted = false;
let mut stream_err: Option<String> = None;
while let Some(item) = sdk_stream.next().await {
match item {
Ok(chunk) => {
let mut continuation_upserts: Vec<_> = Vec::new();
for (hier, continuation) in chunk.agent_instance_hierarchies() {
registry.observe(hier);
if let Some(c) = continuation {
continuation_upserts.push(
crate::db::agent_continuations::upsert(&ctx.db, hier, c),
);
}
}
if let Err(e) =
futures::future::try_join_all(continuation_upserts).await
{
stream_err = Some(format!("agent_continuations upsert: {e}"));
break;
}
if let Err(e) = log_writer.write(chunk.clone()) {
stream_err = Some(format!("{e}"));
break;
}
if !id_emitted && log_writer.written_once() {
let rx = log_ready_rx
.take()
.expect("ready_rx taken exactly once via id_emitted gate");
let id = rx.await.map_err(|e| Error::Instance(
format!("log writer ready signal lost: {e}")
))?;
yield Event::Id(id);
for c in buffered.drain(..) {
yield Event::Chunk(c);
}
id_emitted = true;
}
if id_emitted {
yield Event::Chunk(chunk);
} else {
buffered.push(chunk);
}
}
Err(e) => {
stream_err = Some(format!("{e}"));
break;
}
}
}
if !id_emitted && !buffered.is_empty() {
match log_writer.wait_written_once().await {
Ok(()) => {
let rx = log_ready_rx
.take()
.expect("ready_rx taken exactly once via id_emitted gate");
match rx.await {
Ok(id) => yield Event::Id(id),
Err(e) => {
stream_err.get_or_insert_with(||
format!("log writer ready signal lost: {e}")
);
}
}
for c in buffered.drain(..) {
yield Event::Chunk(c);
}
id_emitted = true;
}
Err(e) => {
stream_err.get_or_insert_with(|| format!("log writer wait: {e}"));
}
}
}
let finalize_outcome = log_writer.finalize().await;
if let Some(e) = stream_err {
Err(Error::Instance(e))?;
}
if let Err(e) = finalize_outcome {
Err(Error::Instance(format!("log writer failed: {e}")))?;
}
}
}