use futures::StreamExt;
use objectiveai_cli_sdk::output::{LabResultItem, Laboratory};
use super::create_args::CreateArgs;
pub async fn handle(
args: CreateArgs,
cli_config: &crate::Config,
handle: &objectiveai_cli_sdk::output::Handle,
) -> Result<(), crate::error::Error> {
args.instructions.verify(cli_config, crate::instructions::InstructionsScope::LaboratoryExecutions)?;
let mut builder_agents = Vec::with_capacity(args.builder_agent.len());
for a in &args.builder_agent {
builder_agents.push(a.clone().resolve(|| async {
let (_, mut c) = crate::config::read(cli_config).await.unwrap();
c.agents().get_favorites().to_vec()
}).await?);
}
let original_agents = builder_agents.clone();
let evaluation_agent = match args.evaluation_agent {
Some(a) => Some(a.resolve(|| async {
let (_, mut c) = crate::config::read(cli_config).await.unwrap();
c.agents().get_favorites().to_vec()
}).await?),
None => None,
};
let builder_messages = args.builder_messages.resolve()?;
let evaluation_messages = args.evaluation_messages.resolve()?;
let evaluation_output_schema = args.evaluation_output_schema.resolve()?;
let builder_continuation = args.builder_continuation.resolve()?;
let evaluation_continuation = args.evaluation_continuation.resolve()?;
let python_code = if let Some(inline) = args.output_python.output_python_inline {
Some(inline)
} else if let Some(path) = args.output_python.output_python_file {
Some(
std::fs::read_to_string(&path)
.map_err(|e| crate::error::Error::PythonFileRead(path, e))?,
)
} else {
None
};
let num_agents = original_agents.len();
let params = objectiveai_sdk::laboratories::executions::request::LaboratoryExecutionCreateParams {
docker_image: args.docker_image,
builder_agents,
evaluation_agent,
builder_messages,
evaluation_messages,
evaluation_output_schema,
builder_continuation,
evaluation_continuation,
max_evaluation_retries: args.max_evaluation_retries,
persist: None,
provider: None,
seed: args.seed,
stream: Some(true),
};
let fs_client = objectiveai_sdk::filesystem::Client::new(cli_config.config_base_dir.as_deref(), None::<String>, None::<String>);
let log_writer = fs_client.write_laboratory_execution();
let handle = handle.clone();
crate::api::run(
Box::new(move |http_client| Box::pin(async move {
let stream =
objectiveai_sdk::laboratories::executions::create_laboratory_execution_streaming(
&http_client, params,
)
.await?;
let emit_handle = handle.clone();
let stream = stream.then(move |result| {
let handle = emit_handle.clone();
async move {
if let Ok(chunk) = &result {
for inner in chunk.inner_errors() {
objectiveai_cli_sdk::output::Output::<serde_json::Value>::Error(
objectiveai_cli_sdk::output::Error {
level: objectiveai_cli_sdk::output::Level::Warn,
fatal: false,
message: serde_json::to_value(&inner).unwrap(),
},
)
.emit(&handle)
.await;
}
}
result.map_err(crate::error::Error::from)
}
});
let mut accumulated = crate::log_stream::consume_with_coalesced_writes(
stream,
log_writer,
|agg: &mut objectiveai_sdk::laboratories::executions::response::streaming::LaboratoryExecutionChunk, c| agg.push(c),
handle.clone(),
).await?;
if let Some(error) = accumulated.error.take() {
return Err(crate::error::Error::ResponseError(error));
}
let execution: objectiveai_sdk::laboratories::executions::response::unary::LaboratoryExecution =
accumulated.into();
let mut eval_map: std::collections::HashMap<u64, Option<&objectiveai_sdk::functions::expression::InputValue>> =
std::collections::HashMap::new();
for eval in &execution.evaluations {
eval_map.insert(eval.agent_index, eval.output.as_ref());
}
let mut outputs_with_indices: Vec<(u64, &objectiveai_sdk::functions::expression::InputValue)> = Vec::new();
for agent_index in 0..num_agents as u64 {
if let Some(Some(output)) = eval_map.get(&agent_index) {
outputs_with_indices.push((agent_index, output));
}
}
let mut score_map: std::collections::HashMap<u64, f64> = std::collections::HashMap::new();
if !outputs_with_indices.is_empty() {
if let Some(ref script) = python_code {
let evaluations_json = serde_json::to_string(
&outputs_with_indices
.iter()
.map(|(_, output)| serde_json::to_value(output).unwrap())
.collect::<Vec<_>>(),
)
.unwrap();
let scores: Vec<f64> = crate::python::exec_code_with_args(
script,
&[evaluations_json],
)?;
if scores.len() < outputs_with_indices.len() {
return Err(crate::error::Error::MissingArgs(
"python script returned fewer scores than evaluation outputs",
));
}
for (i, (agent_index, _)) in outputs_with_indices.iter().enumerate() {
score_map.insert(*agent_index, scores[i]);
}
}
}
let results: Vec<LabResultItem> = (0..num_agents)
.map(|i| {
let agent_index = i as u64;
let agent = original_agents[i].clone();
let score = score_map.get(&agent_index).copied();
LabResultItem { agent, score }
})
.collect();
objectiveai_cli_sdk::output::Output::<Laboratory>::Notification(objectiveai_cli_sdk::output::Notification { value:
Laboratory { laboratory: results },
})
.emit(&handle).await;
Ok(())
})),
true,
)
.await
}