use crate::{ctx, util::ChoiceIndexer};
use objectiveai_sdk::error::StatusError;
use futures::{Stream, StreamExt};
use std::{
pin::Pin,
sync::Arc,
time,
};
type FunctionInventionChunk =
objectiveai_sdk::functions::inventions::response::streaming::FunctionInventionChunk;
type RecursiveChunk =
objectiveai_sdk::functions::inventions::recursive::response::streaming::FunctionInventionRecursiveChunk;
type RecursiveInventionChunk =
objectiveai_sdk::functions::inventions::recursive::response::streaming::FunctionInventionChunk;
type RecursiveObject =
objectiveai_sdk::functions::inventions::recursive::response::streaming::Object;
pub fn recursive_invention_response_id(created: u64) -> String {
let uuid = uuid::Uuid::new_v4();
format!("fninvr-{}-{}", uuid.simple(), created)
}
pub struct Client<CTXEXT, OPENROUTER, CLAUDEAGENTSDK, CODEXSDK, MOCK, RETRG, RETRF, RETRM, CUSG, IUSG, FFNG, FFNF, FFNM, RIUSG> {
pub invention_client: Arc<
crate::functions::inventions::Client<
CTXEXT, OPENROUTER, CLAUDEAGENTSDK, CODEXSDK, MOCK, RETRG, RETRF, RETRM, CUSG, IUSG, FFNG, FFNF, FFNM,
>,
>,
pub viewer_client: Arc<crate::viewer::Client<CTXEXT>>,
pub usage_handler: Arc<RIUSG>,
}
impl<CTXEXT, OPENROUTER, CLAUDEAGENTSDK, CODEXSDK, MOCK, RETRG, RETRF, RETRM, CUSG, IUSG, FFNG, FFNF, FFNM, RIUSG>
Client<CTXEXT, OPENROUTER, CLAUDEAGENTSDK, CODEXSDK, MOCK, RETRG, RETRF, RETRM, CUSG, IUSG, FFNG, FFNF, FFNM, RIUSG>
{
pub fn new(
invention_client: Arc<
crate::functions::inventions::Client<
CTXEXT, OPENROUTER, CLAUDEAGENTSDK, CODEXSDK, MOCK, RETRG, RETRF, RETRM, CUSG, IUSG, FFNG, FFNF, FFNM,
>,
>,
viewer_client: Arc<crate::viewer::Client<CTXEXT>>,
usage_handler: Arc<RIUSG>,
) -> Self {
Self {
invention_client,
viewer_client,
usage_handler,
}
}
}
impl<CTXEXT, OPENROUTER, CLAUDEAGENTSDK, CODEXSDK, MOCK, RETRG, RETRF, RETRM, CUSG, IUSG, FFNG, FFNF, FFNM, RIUSG>
Client<CTXEXT, OPENROUTER, CLAUDEAGENTSDK, CODEXSDK, MOCK, RETRG, RETRF, RETRM, CUSG, IUSG, FFNG, FFNF, FFNM, RIUSG>
where
CTXEXT: ctx::ContextExt + Send + Sync + 'static,
OPENROUTER: crate::agent::completions::UpstreamClient<objectiveai_sdk::agent::openrouter::Agent, objectiveai_sdk::agent::openrouter::Continuation>
+ Send
+ Sync
+ 'static,
CLAUDEAGENTSDK: crate::agent::completions::UpstreamClient<
objectiveai_sdk::agent::claude_agent_sdk::Agent, objectiveai_sdk::agent::claude_agent_sdk::Continuation,
> + Send
+ Sync
+ 'static,
CODEXSDK: crate::agent::completions::UpstreamClient<
objectiveai_sdk::agent::codex_sdk::Agent, objectiveai_sdk::agent::codex_sdk::Continuation,
> + Send
+ Sync
+ 'static,
MOCK: crate::agent::completions::UpstreamClient<objectiveai_sdk::agent::mock::Agent, objectiveai_sdk::agent::mock::Continuation>
+ Send
+ Sync
+ 'static,
RETRG: crate::retrieval::retrieve::Client<CTXEXT>,
RETRF: crate::retrieval::retrieve::Client<CTXEXT>,
RETRM: crate::retrieval::retrieve::Client<CTXEXT>,
CUSG: crate::agent::completions::usage_handler::UsageHandler<CTXEXT> + Send + Sync + 'static,
IUSG: crate::functions::inventions::usage_handler::UsageHandler<CTXEXT> + Send + Sync + 'static,
FFNG: crate::retrieval::retrieve::Client<CTXEXT> + Send + Sync + 'static,
FFNF: crate::retrieval::retrieve::Client<CTXEXT> + Send + Sync + 'static,
FFNM: crate::retrieval::retrieve::Client<CTXEXT> + Send + Sync + 'static,
RIUSG: super::usage_handler::UsageHandler<CTXEXT> + Send + Sync + 'static,
{
pub async fn create_unary_handle_usage(
self: Arc<Self>,
ctx: ctx::Context<CTXEXT, impl crate::ctx::persistent_cache::PersistentCacheClient>,
request: Arc<objectiveai_sdk::functions::inventions::recursive::request::FunctionInventionRecursiveCreateParams>,
) -> Result<
objectiveai_sdk::functions::inventions::recursive::response::unary::FunctionInventionRecursive,
super::Error,
> {
let mut aggregate: Option<RecursiveChunk> = None;
let mut stream =
self.create_streaming_handle_usage(ctx, request).await?;
while let Some(chunk) = stream.next().await {
match &mut aggregate {
Some(aggregate) => aggregate.push(&chunk),
None => aggregate = Some(chunk),
}
}
Ok(aggregate.unwrap().into())
}
pub async fn create_streaming_handle_usage(
self: Arc<Self>,
ctx: ctx::Context<CTXEXT, impl crate::ctx::persistent_cache::PersistentCacheClient>,
request: Arc<objectiveai_sdk::functions::inventions::recursive::request::FunctionInventionRecursiveCreateParams>,
) -> Result<
impl Stream<Item = RecursiveChunk> + Send + Unpin + 'static,
super::Error,
> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let self_clone = self.clone();
tokio::spawn(async move {
let mut aggregate: Option<RecursiveChunk> = None;
let stream = match self_clone
.clone()
.create_streaming(ctx.clone(), request.clone())
.await
{
Ok(stream) => stream,
Err(e) => {
let _ = tx.send(Err(e));
return;
}
};
futures::pin_mut!(stream);
while let Some(chunk) = stream.next().await {
match &mut aggregate {
Some(aggregate) => aggregate.push(&chunk),
None => aggregate = Some(chunk.clone()),
}
if tx.send(Ok(chunk)).is_err() {
ctx.cancel();
}
}
drop(stream);
drop(tx);
if let Some(aggregate) = aggregate {
if aggregate.usage.as_ref().is_some_and(
objectiveai_sdk::agent::completions::response::Usage::any_usage,
) {
self_clone
.usage_handler
.handle_usage(ctx, request, aggregate.into())
.await;
}
}
});
let mut stream =
tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
match stream.next().await {
Some(Ok(chunk)) => {
Ok(crate::util::StreamOnce::new(chunk)
.chain(stream.map(Result::unwrap)))
}
Some(Err(e)) => Err(e),
None => unreachable!(),
}
}
pub async fn create_streaming(
self: Arc<Self>,
ctx: ctx::Context<CTXEXT, impl crate::ctx::persistent_cache::PersistentCacheClient>,
request: Arc<objectiveai_sdk::functions::inventions::recursive::request::FunctionInventionRecursiveCreateParams>,
) -> Result<
impl Stream<Item = RecursiveChunk> + Send + 'static,
super::Error,
> {
let resolved_state = self.invention_client.retrieve_router
.get_function_invention_state(&ctx, request.state.clone())
.await
.map_err(|e| crate::functions::inventions::Error::InvalidState(e.to_string()))?
.ok_or(crate::functions::inventions::Error::StateNotFound)?;
let created = time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap()
.as_secs();
let id = recursive_invention_response_id(created);
self.viewer_client.send_function_invention_recursive_begin(
ctx.clone(),
id.clone(),
request.clone(),
);
let is_scalar = match &resolved_state {
objectiveai_sdk::functions::inventions::state::ParamsState::AlphaScalarBranch(_)
| objectiveai_sdk::functions::inventions::state::ParamsState::AlphaScalarLeaf(_)
| objectiveai_sdk::functions::inventions::state::ParamsState::AlphaScalar(_) => true,
_ => false,
};
let object = if is_scalar {
RecursiveObject::AlphaScalarFunctionInventionRecursiveChunk
} else {
RecursiveObject::AlphaVectorFunctionInventionRecursiveChunk
};
let choice_indexer = Arc::new(ChoiceIndexer::new(0));
let viewer_client = self.viewer_client.clone();
let viewer_ctx = ctx.clone();
let inner = run_recursive(
self.invention_client.clone(),
ctx,
request,
resolved_state,
id.clone(),
created,
object,
choice_indexer,
0, );
let stream: Pin<Box<dyn Stream<Item = RecursiveChunk> + Send>> =
Box::pin(async_stream::stream! {
let mut accumulated_usage =
objectiveai_sdk::agent::completions::response::Usage::default();
let mut had_errors = false;
futures::pin_mut!(inner);
while let Some(mut chunk) = inner.next().await {
for inv in &chunk.inventions {
if let Some(u) = &inv.inner.usage {
accumulated_usage.push(u);
}
}
if chunk.inventions_errors == Some(true) {
had_errors = true;
}
if had_errors {
chunk.inventions_errors = Some(true);
}
yield chunk;
}
yield RecursiveChunk {
id,
inventions: vec![],
inventions_errors: if had_errors { Some(true) } else { None },
created,
object,
usage: Some(accumulated_usage),
};
});
let stream = stream.inspect(move |chunk| {
viewer_client.send_function_invention_recursive_continue(viewer_ctx.clone(), chunk.clone());
});
let mut stream = Box::pin(stream);
match stream.next().await {
Some(first) => {
if first.inventions_errors == Some(true) {
if let Some(err) = first.inventions.iter()
.find_map(|inv| inv.inner.error.clone())
{
return Err(super::Error::InventionFirstChunk(err));
}
}
Ok(crate::util::StreamOnce::new(first).chain(stream))
}
None => unreachable!(),
}
}
}
fn run_recursive<CTXEXT, OPENROUTER, CLAUDEAGENTSDK, CODEXSDK, MOCK, RETRG, RETRF, RETRM, CUSG, IUSG, FFNG, FFNF, FFNM>(
invention_client: Arc<
crate::functions::inventions::Client<
CTXEXT, OPENROUTER, CLAUDEAGENTSDK, CODEXSDK, MOCK, RETRG, RETRF, RETRM, CUSG, IUSG, FFNG, FFNF, FFNM,
>,
>,
ctx: ctx::Context<CTXEXT, impl crate::ctx::persistent_cache::PersistentCacheClient>,
request: Arc<objectiveai_sdk::functions::inventions::recursive::request::FunctionInventionRecursiveCreateParams>,
resolved_state: objectiveai_sdk::functions::inventions::ParamsState,
id: String,
created: u64,
object: RecursiveObject,
choice_indexer: Arc<ChoiceIndexer>,
native_index: usize,
) -> Pin<Box<dyn Stream<Item = RecursiveChunk> + Send>>
where
CTXEXT: ctx::ContextExt + Send + Sync + 'static,
OPENROUTER: crate::agent::completions::UpstreamClient<objectiveai_sdk::agent::openrouter::Agent, objectiveai_sdk::agent::openrouter::Continuation>
+ Send
+ Sync
+ 'static,
CLAUDEAGENTSDK: crate::agent::completions::UpstreamClient<
objectiveai_sdk::agent::claude_agent_sdk::Agent, objectiveai_sdk::agent::claude_agent_sdk::Continuation,
> + Send
+ Sync
+ 'static,
CODEXSDK: crate::agent::completions::UpstreamClient<
objectiveai_sdk::agent::codex_sdk::Agent, objectiveai_sdk::agent::codex_sdk::Continuation,
> + Send
+ Sync
+ 'static,
MOCK: crate::agent::completions::UpstreamClient<objectiveai_sdk::agent::mock::Agent, objectiveai_sdk::agent::mock::Continuation>
+ Send
+ Sync
+ 'static,
RETRG: crate::retrieval::retrieve::Client<CTXEXT>,
RETRF: crate::retrieval::retrieve::Client<CTXEXT>,
RETRM: crate::retrieval::retrieve::Client<CTXEXT>,
CUSG: crate::agent::completions::usage_handler::UsageHandler<CTXEXT> + Send + Sync + 'static,
IUSG: crate::functions::inventions::usage_handler::UsageHandler<CTXEXT> + Send + Sync + 'static,
FFNG: crate::retrieval::retrieve::Client<CTXEXT> + Send + Sync + 'static,
FFNF: crate::retrieval::retrieve::Client<CTXEXT> + Send + Sync + 'static,
FFNM: crate::retrieval::retrieve::Client<CTXEXT> + Send + Sync + 'static,
{
Box::pin(async_stream::stream! {
let resolved_state_for_error = resolved_state.clone();
let invention_request = Arc::new(
objectiveai_sdk::functions::inventions::request::FunctionInventionCreateParams {
remote: Some(request.remote),
overwrite: request.overwrite,
state: objectiveai_sdk::functions::inventions::ParamsStateOrRemoteCommitOptional::Inline(resolved_state),
provider: request.provider.clone(),
agent: request.agent.clone(),
prompt: request.prompt.clone(),
seed: request.seed,
stream: request.stream,
max_step_retries: request.max_step_retries,
continuation: request.continuation.clone(),
},
);
let stream = match invention_client
.clone()
.create_streaming_handle_usage(ctx.clone(), invention_request)
.await
{
Ok(stream) => stream,
Err(e) => {
yield RecursiveChunk {
id: id.clone(),
inventions: vec![RecursiveInventionChunk {
index: choice_indexer.get(native_index),
inner: FunctionInventionChunk {
id: id.clone(),
completions: vec![],
state: Some(resolved_state_for_error.route()),
path: None,
function: None,
created,
object: object.into(),
usage: None,
error: Some(objectiveai_sdk::error::ResponseError {
code: objectiveai_sdk::error::StatusError::status(&e),
message: objectiveai_sdk::error::StatusError::message(&e)
.unwrap_or_else(|| serde_json::json!(e.to_string())),
}),
},
}],
inventions_errors: Some(true),
created,
object,
usage: None,
};
return;
}
};
let mut final_state: Option<objectiveai_sdk::functions::inventions::State> = None;
let mut final_path: Option<objectiveai_sdk::RemotePath> = None;
let mut saved_function: Option<objectiveai_sdk::functions::FullRemoteFunction> = None;
let mut had_error = false;
futures::pin_mut!(stream);
while let Some(mut chunk) = stream.next().await {
if chunk.state.is_some() {
final_state = chunk.state.clone();
}
if chunk.path.is_some() {
final_path = chunk.path.clone();
}
if chunk.function.is_some() {
saved_function = chunk.function.clone();
}
if chunk.error.is_some() {
had_error = true;
}
if final_state.as_ref().is_some_and(|s| !s.placeholder_children().is_empty()) {
chunk.function = None;
}
yield RecursiveChunk {
id: id.clone(),
inventions: vec![RecursiveInventionChunk {
index: choice_indexer.get(native_index),
inner: chunk,
}],
inventions_errors: None,
created,
object,
usage: None,
};
}
drop(stream);
let mut state = match final_state {
Some(state) if !had_error => state,
_ => return,
};
let children = state.placeholder_children();
if children.is_empty() {
return;
}
let base_native = (native_index + 1) * 1000; let mut child_streams: Vec<Pin<Box<dyn Stream<Item = RecursiveChunk> + Send>>> = Vec::new();
for (i, child_state) in children.into_iter().enumerate() {
let child_native = base_native + i;
let child_seed = request
.seed
.map(|s| s ^ (child_native as i64));
let child_request = Arc::new(
objectiveai_sdk::functions::inventions::recursive::request::FunctionInventionRecursiveCreateParams {
remote: request.remote,
overwrite: request.overwrite,
state: objectiveai_sdk::functions::inventions::ParamsStateOrRemoteCommitOptional::Inline(child_state.clone()),
provider: request.provider.clone(),
agent: request.agent.clone(),
prompt: request.prompt.clone(),
seed: child_seed,
stream: request.stream,
max_step_retries: request.max_step_retries,
continuation: request.continuation.clone(),
},
);
child_streams.push(run_recursive(
invention_client.clone(),
ctx.clone(),
child_request,
child_state,
id.clone(),
created,
object,
choice_indexer.clone(),
child_native,
));
}
let mut child_paths: Vec<objectiveai_sdk::RemotePath> = Vec::new();
let mut merged = futures::stream::select_all(child_streams);
while let Some(chunk) = merged.next().await {
for invention in &chunk.inventions {
if let Some(path) = &invention.inner.path {
child_paths.push(path.clone());
}
}
yield chunk;
}
if child_paths.is_empty() || final_path.is_none() {
yield RecursiveChunk {
id: id.clone(),
inventions: vec![RecursiveInventionChunk {
index: choice_indexer.get(native_index),
inner: FunctionInventionChunk {
id: id.clone(),
completions: vec![],
state: Some(state),
path: final_path,
function: saved_function,
created,
object: object.into(),
usage: None,
error: None,
},
}],
inventions_errors: None,
created,
object,
usage: None,
};
return;
}
state.replace_placeholders(&child_paths);
state.write_readme();
let function = match state.build_function() {
Some(f) => f,
None => {
yield RecursiveChunk {
id: id.clone(),
inventions: vec![RecursiveInventionChunk {
index: choice_indexer.get(native_index),
inner: FunctionInventionChunk {
id: id.clone(),
completions: vec![],
state: Some(state),
path: final_path,
function: saved_function,
created,
object: object.into(),
usage: None,
error: None,
},
}],
inventions_errors: None,
created,
object,
usage: None,
};
return;
}
};
let repo = state.name();
let publish_files = state.serialize_into_files();
let description = crate::functions::inventions::extract_description(&state);
let (updated_path, publish_error) = match request.remote {
objectiveai_sdk::Remote::Filesystem => {
match crate::functions::inventions::publish_filesystem(
&invention_client.filesystem_client, &ctx, repo, &publish_files,
).await {
Ok(path) => (Some(path), None),
Err(e) => (None, Some(e)),
}
}
objectiveai_sdk::Remote::Github => {
match crate::functions::inventions::publish_github(
&invention_client.github_client,
&invention_client.filesystem_client,
&ctx, repo, &description,
&publish_files,
).await {
Ok(path) => (Some(path), None),
Err(e) => (None, Some(e)),
}
}
objectiveai_sdk::Remote::Mock => (None, None),
};
let (final_function, final_path, inventions_errors, error) = if let Some(publish_error) = publish_error {
(
saved_function,
final_path,
Some(true),
Some(objectiveai_sdk::error::ResponseError {
code: publish_error.status(),
message: publish_error.message().unwrap_or(serde_json::Value::Null),
}),
)
} else {
(Some(function), updated_path, None, None)
};
yield RecursiveChunk {
id: id.clone(),
inventions: vec![RecursiveInventionChunk {
index: choice_indexer.get(native_index),
inner: FunctionInventionChunk {
id: id.clone(),
completions: vec![],
state: Some(state),
path: final_path,
function: final_function,
created,
object: object.into(),
usage: None,
error,
},
}],
inventions_errors,
created,
object,
usage: None,
};
})
}