use crate::Result;
use crate::agent::{Agent, AgentOptions, PromptPart, parse_prompt_part_options};
use crate::hub::get_hub;
use crate::pricing::price_it;
use crate::run::AiResponse;
use crate::run::literals::Literals;
use crate::run::{DryMode, RunBaseOptions};
use crate::runtime::Runtime;
use crate::script::{AipackCustom, DataResponse, FromValue};
use crate::support::hbs::hbs_render;
use crate::support::text::{self, format_duration, format_num};
use genai::ModelIden;
use genai::chat::{CacheControl, ChatMessage, ChatRequest, ChatResponse, Usage};
use serde_json::Value;
use std::borrow::Cow;
use std::collections::HashMap;
use tokio::time::Instant;
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum RunAgentInputResponse {
AiReponse(AiResponse),
OutputResponse(Value),
}
impl RunAgentInputResponse {
pub fn as_str(&self) -> Option<&str> {
match self {
RunAgentInputResponse::AiReponse(ai_response) => ai_response.content.as_deref(),
RunAgentInputResponse::OutputResponse(value) => value.as_str(),
}
}
pub fn into_value(self) -> Value {
match self {
RunAgentInputResponse::AiReponse(ai_response) => ai_response.content.into(),
RunAgentInputResponse::OutputResponse(value) => value,
}
}
}
pub async fn run_agent_input(
runtime: &Runtime,
agent: &Agent,
before_all_result: Value,
label: &str,
input: Value,
literals: &Literals,
run_base_options: &RunBaseOptions,
) -> Result<Option<RunAgentInputResponse>> {
let hub = get_hub();
let client = runtime.genai_client();
let lua_engine = runtime.new_lua_engine_with_ctx(literals)?;
let lua_scope = lua_engine.create_table()?;
lua_scope.set("input", lua_engine.serde_to_lua_value(input.clone())?)?;
lua_scope.set("before_all", lua_engine.serde_to_lua_value(before_all_result.clone())?)?;
lua_scope.set("options", agent.options_as_ref())?;
let agent_dir = agent.file_dir()?;
let agent_dir_str = agent_dir.as_str();
let DataResponse { input, data, options } = if let Some(data_script) = agent.data_script().as_ref() {
let lua_value = lua_engine.eval(data_script, Some(lua_scope), Some(&[agent_dir_str]))?;
let data_res = serde_json::to_value(lua_value)?;
match AipackCustom::from_value(data_res)? {
FromValue::OriginalValue(data) => DataResponse {
data: Some(data),
input: Some(input),
..Default::default()
},
FromValue::AipackCustom(AipackCustom::Skip { reason }) => {
let reason_txt = reason.map(|r| format!(" (Reason: {r})")).unwrap_or_default();
hub.publish(format!("-! Aipack Skip input at Data stage: {label}{reason_txt}"))
.await;
return Ok(None);
}
FromValue::AipackCustom(AipackCustom::DataResponse(DataResponse {
input: input_ov,
data,
options,
})) => DataResponse {
input: input_ov.or(Some(input)),
data,
options,
},
FromValue::AipackCustom(other) => {
return Err(format!(
"-! Aipack Custom '{}' is not supported at the Data stage",
other.as_ref()
)
.into());
}
}
} else {
DataResponse {
input: Some(input),
data: None,
options: None,
}
};
let input = input.unwrap_or(Value::Null);
let data = data.unwrap_or(Value::Null);
let agent: Cow<Agent> = if let Some(options_to_merge) = options {
let options_to_merge: AgentOptions = serde_json::from_value(options_to_merge)?;
let options_ov = agent.options_as_ref().merge_new(options_to_merge)?;
Cow::Owned(agent.new_merge(options_ov)?)
} else {
Cow::Borrowed(agent)
};
let data_scope = HashMap::from([
("data".to_string(), data.clone()),
("input".to_string(), input.clone()),
]);
let mut chat_messages: Vec<ChatMessage> = Vec::new();
let data_scope = serde_json::to_value(data_scope)?;
for prompt_part in agent.prompt_parts() {
let PromptPart {
kind,
content,
options_str,
} = prompt_part;
let (options_line, content) = if let Some(options_str) = options_str {
(true, Cow::Owned(format!("{}\n{}", options_str, content)))
} else {
(false, Cow::Borrowed(content))
};
let rendered_content = hbs_render(content.as_str(), &data_scope)?;
let (options_str, rendered_content) = if options_line {
text::extract_first_line(rendered_content)
} else {
(String::new(), rendered_content)
};
let options_str = options_str.trim();
if !rendered_content.trim().is_empty() {
let options = if !options_str.is_empty() {
parse_prompt_part_options(options_str)?
} else {
None
};
let options = if options.as_ref().map(|v| v.cache).unwrap_or(false) {
Some(CacheControl::Ephemeral.into())
} else {
None
};
chat_messages.push(ChatMessage {
role: kind.into(),
content: rendered_content.into(),
options,
})
}
}
let is_inst_empty = chat_messages.is_empty();
if run_base_options.verbose() {
hub.publish("\n").await;
for msg in chat_messages.iter() {
hub.publish(format!(
"-- {}:\n{}",
msg.role,
msg.content.text_as_str().unwrap_or_default()
))
.await;
}
}
if matches!(run_base_options.dry_mode(), DryMode::Req) {
return Ok(None);
}
let model_resolved = agent.model_resolved();
let ai_response: Option<AiResponse> = if !is_inst_empty {
let chat_req = ChatRequest::from_messages(chat_messages);
hub.publish(format!("-> Sending rendered instruction to {model_resolved} ..."))
.await;
let start = Instant::now();
let chat_res = client
.exec_chat(model_resolved, chat_req, Some(agent.genai_chat_options()))
.await?;
let duration = start.elapsed();
let duration_msg = format!("Duration: {}", format_duration(duration));
let duration_sec = duration.as_secs_f64(); let duration_sec = (duration_sec * 1000.0).round() / 1000.0;
let mut info = duration_msg;
let price_usd = get_price(&chat_res);
if let Some(price_usd) = price_usd {
info = format!("{info} | ~${price_usd}")
}
let usage_msg = format_usage(&chat_res.usage);
info = format!("{info} | {usage_msg}");
hub.publish(format!(
"<- ai_response content received - {} | {info}",
chat_res.provider_model_iden.model_name
))
.await;
let ChatResponse {
content,
reasoning_content,
usage,
model_iden: res_model_iden,
provider_model_iden: res_provider_model_iden,
..
} = chat_res;
let ai_response_content = content.and_then(|c| c.text_into_string());
let ai_response_reasoning_content = reasoning_content;
let model_info = format_model(&agent, &res_model_iden, &res_provider_model_iden);
if run_base_options.verbose() {
hub.publish(format!(
"\n-- AI Output ({model_info})\n\n{}\n",
ai_response_content.as_deref().unwrap_or_default()
))
.await;
}
let info = format!("{info} | {model_info}",);
Some(AiResponse {
content: ai_response_content,
reasoning_content: ai_response_reasoning_content,
model_name: res_model_iden.model_name,
adapter_kind: res_model_iden.adapter_kind,
duration_sec,
price_usd,
usage,
info,
})
}
else {
hub.publish("-! No instruction, skipping genai.").await;
None
};
if matches!(run_base_options.dry_mode(), DryMode::Res) {
return Ok(None);
}
let res = if let Some(output_script) = agent.output_script() {
let lua_engine = runtime.new_lua_engine_with_ctx(literals)?;
let lua_scope = lua_engine.create_table()?;
lua_scope.set("input", lua_engine.serde_to_lua_value(input)?)?;
lua_scope.set("data", lua_engine.serde_to_lua_value(data)?)?;
lua_scope.set("before_all", lua_engine.serde_to_lua_value(before_all_result)?)?;
lua_scope.set("ai_response", ai_response)?;
lua_scope.set("options", agent.options_as_ref())?;
let lua_value = lua_engine.eval(output_script, Some(lua_scope), Some(&[agent_dir_str]))?;
let output_response = serde_json::to_value(lua_value)?;
Some(RunAgentInputResponse::OutputResponse(output_response))
} else {
ai_response.map(RunAgentInputResponse::AiReponse)
};
Ok(res)
}
fn get_price(chat_res: &ChatResponse) -> Option<f64> {
let provider = chat_res.model_iden.adapter_kind.as_lower_str();
let model_name = &*chat_res.model_iden.model_name;
price_it(provider, model_name, &chat_res.usage)
}
fn format_model(_agent: &Agent, res_model_iden: &ModelIden, res_provider_model_iden: &ModelIden) -> String {
let model_txt: Cow<str> = if *res_model_iden.model_name != *res_provider_model_iden.model_name {
Cow::Owned(format!(
"{} ({})",
res_model_iden.model_name, res_provider_model_iden.model_name
))
} else {
Cow::Borrowed(&*res_model_iden.model_name)
};
format!("Model: {model_txt} | Adapter: {}", res_model_iden.adapter_kind,)
}
fn format_usage(usage: &Usage) -> String {
let mut buff = String::new();
buff.push_str("Prompt Tokens: ");
buff.push_str(&format_num(usage.prompt_tokens.unwrap_or_default() as i64));
if let Some(prompt_tokens_details) = usage.prompt_tokens_details.as_ref() {
buff.push_str(" (cached: ");
let cached = prompt_tokens_details.cached_tokens.unwrap_or(0);
buff.push_str(&format_num(cached as i64));
if let Some(cache_creation_tokens) = prompt_tokens_details.cache_creation_tokens {
buff.push_str(", cache_creation: ");
buff.push_str(&format_num(cache_creation_tokens as i64));
}
buff.push(')');
}
buff.push_str(" | Completion Tokens: ");
buff.push_str(&format_num(usage.completion_tokens.unwrap_or_default() as i64));
if let Some(reasoning) = usage.completion_tokens_details.as_ref().and_then(|v| v.reasoning_tokens) {
buff.push_str(" (reasoning: ");
buff.push_str(&format_num(reasoning as i64));
buff.push(')');
}
buff
}