use std::collections::{BTreeSet, HashMap, VecDeque};
use std::io::{IsTerminal, Write};
use spawningpool::ai::{Client, StopReason};
use spawningpool::workflow::AskOutcome;
use spawningpool::{Registry, RunEvent};
use crate::cli::OutputFormat;
pub(crate) async fn run_specialist(
name: &str,
prompt: &str,
output: Option<OutputFormat>,
) -> Result<(), String> {
let registry = spawningpool::store::load()?;
let specialist = registry
.specialists
.get(name)
.ok_or_else(|| format!("unknown specialist: {name}"))?;
let tools = spawningpool::tools::resolve_all(
&spawningpool::store::tools_dir(),
specialist.tool_names(),
)?;
let mut opts = specialist.complete_options();
if let Some(provider) = registry.providers.get(&specialist.provider) {
if let Some(env) = provider.api_key_env.as_ref() {
if let Ok(key) = std::env::var(env) {
opts.api_key = Some(key);
}
}
opts.constrained_decoding = provider.constrained_decoding;
}
let client = Client::new();
let log = crate::log::open_sink(name)?;
let spec_log = spawningpool::SpecialistLog {
sink: &log,
wf: None,
stmt: None,
};
match output {
None | Some(OutputFormat::Json) => {
let mut output = String::new();
let mut thinking = String::new();
let mut input_tokens: u32 = 0;
let mut output_tokens: u32 = 0;
let mut stop_reason: Option<StopReason> = None;
let mut turns: u32 = 0;
let mut tool_calls: Vec<serde_json::Value> = Vec::new();
let mut render = |event: RunEvent<'_>| match event {
RunEvent::TextDelta(delta) => output.push_str(delta),
RunEvent::Text(t) => output.push_str(t),
RunEvent::ThinkingDelta(delta) => thinking.push_str(delta),
RunEvent::Thinking(t) => thinking.push_str(t),
RunEvent::TurnDone { stop_reason: sr } => {
stop_reason = Some(sr);
turns += 1;
}
RunEvent::Usage(usage) => {
input_tokens += usage.input;
output_tokens += usage.output;
}
RunEvent::ToolRan {
name,
output: out,
success,
} => tool_calls.push(serde_json::json!({
"name": name,
"success": success,
"output": out,
})),
RunEvent::ToolFailed { name, message } => tool_calls.push(serde_json::json!({
"name": name,
"success": false,
"output": message,
})),
};
spawningpool::run::run_specialist(
&client,
®istry,
specialist,
prompt,
&tools,
&opts,
&mut render,
Some(&spec_log),
)
.await?;
println!(
"{}",
serde_json::json!({
"output": output,
"thinking": thinking,
"inputTokens": input_tokens,
"outputTokens": output_tokens,
"stopReason": stop_reason,
"model": specialist.model,
"specialist": name,
"turns": turns,
"toolCalls": tool_calls,
})
);
Ok(())
}
Some(OutputFormat::Plaintext) => {
let mut printed_text = false;
let mut render = |event: RunEvent<'_>| match event {
RunEvent::TextDelta(delta) => {
print!("{delta}");
std::io::stdout().flush().ok();
printed_text = true;
}
RunEvent::Text(text) => println!("{text}"),
RunEvent::ThinkingDelta(_) | RunEvent::Thinking(_) | RunEvent::TurnDone { .. } => {}
RunEvent::Usage(usage) => {
if std::mem::take(&mut printed_text) {
println!();
}
eprintln!("[usage] {} in / {} out", usage.input, usage.output);
}
RunEvent::ToolRan { name, output, .. } => println!("[tool {name}]\n{output}"),
RunEvent::ToolFailed { name, message } => eprintln!("[tool {name}] {message}"),
};
spawningpool::run::run_specialist(
&client,
®istry,
specialist,
prompt,
&tools,
&opts,
&mut render,
Some(&spec_log),
)
.await
}
}
}
pub(crate) async fn run_workflow(name: &str, args: &[String]) -> Result<(), String> {
let registry = spawningpool::store::load()?;
let closure = load_workflow_closure(name, ®istry)?;
let workflows = &closure.workflows;
let root = workflows
.get(name)
.expect("the closure always contains the root workflow");
let provided = parse_kv_args(args)?;
let inputs = spawningpool::workflow::resolve_inputs(&root.inputs, &provided)
.map_err(|e| format!("workflow '{name}': {e}"))?;
let tools_dir = spawningpool::store::tools_dir();
let tool_names: Vec<String> = closure.tools.iter().cloned().collect();
let tools = spawningpool::tools::resolve_all(&tools_dir, &tool_names)?;
spawningpool::workflow::check(root, ®istry, &tools, workflows)
.map_err(|e| format!("workflow '{name}' failed type-checking: {e}"))?;
let keys = provider_keys(®istry);
warn_unset_keys(&closure.specialists, ®istry, &keys);
let client = Client::new();
let ask = ask_handler();
let log = crate::log::open_sink(name)?;
let result = spawningpool::workflow::eval(
name, root, ®istry, &tools, &client, &keys, &inputs, workflows, &ask, &log,
)
.await
.map_err(|e| format!("workflow '{name}' failed: {e}"))?;
if let Some(path) = std::env::var_os("SP_OUTPUT_PATH") {
std::fs::write(&path, result.to_string())
.map_err(|e| format!("workflow '{name}': can't write $SP_OUTPUT_PATH: {e}"))?;
}
println!("{result}");
Ok(())
}
fn ask_handler() -> impl Fn(&str) -> AskOutcome {
let headless = std::env::var_os("SP_OUTPUT_PATH").is_some() || !std::io::stdin().is_terminal();
move |prompt: &str| {
if headless {
return AskOutcome::Unavailable;
}
eprint!("{prompt} ");
let _ = std::io::stderr().flush();
let mut line = String::new();
match std::io::stdin().read_line(&mut line) {
Ok(0) => AskOutcome::Unavailable,
Ok(_) => {
let answer = line.strip_suffix('\n').unwrap_or(&line);
let answer = answer.strip_suffix('\r').unwrap_or(answer);
AskOutcome::Answered(answer.to_string())
}
Err(_) => AskOutcome::Unavailable,
}
}
}
struct WorkflowClosure {
workflows: HashMap<String, spawningpool::workflow::Workflow>,
tools: BTreeSet<String>,
specialists: BTreeSet<String>,
}
fn load_workflow_closure(name: &str, registry: &Registry) -> Result<WorkflowClosure, String> {
let dir = spawningpool::store::workflows_dir();
let mut closure = WorkflowClosure {
workflows: HashMap::new(),
tools: BTreeSet::new(),
specialists: BTreeSet::new(),
};
let mut queue: VecDeque<String> = VecDeque::from([name.to_string()]);
while let Some(wf_name) = queue.pop_front() {
if closure.workflows.contains_key(&wf_name) {
continue;
}
let source = spawningpool::workflow::source(&dir, &wf_name)?;
let workflow = spawningpool::workflow::parse(&source)
.map_err(|e| format!("workflow '{wf_name}' is invalid: {e}"))?;
let refs = spawningpool::workflow::referenced(&workflow, registry);
closure.tools.extend(refs.tools);
closure.specialists.extend(refs.specialists);
for nested in refs.workflows {
if !closure.workflows.contains_key(&nested) {
queue.push_back(nested);
}
}
closure.workflows.insert(wf_name, workflow);
}
Ok(closure)
}
fn parse_kv_args(args: &[String]) -> Result<HashMap<String, String>, String> {
let mut map = HashMap::new();
for arg in args {
let (key, value) = arg
.split_once('=')
.ok_or_else(|| format!("invalid --arg '{arg}': expected KEY=VALUE"))?;
map.insert(key.to_string(), value.to_string());
}
Ok(map)
}
fn provider_keys(registry: &Registry) -> HashMap<String, String> {
let mut keys = HashMap::new();
for provider in registry.providers.values() {
if let Some(env) = provider.api_key_env.as_ref() {
if let Ok(key) = std::env::var(env) {
keys.insert(provider.name.clone(), key);
}
}
}
keys
}
fn warn_unset_keys(
specialists: &std::collections::BTreeSet<String>,
registry: &Registry,
keys: &HashMap<String, String>,
) {
let mut warned = std::collections::BTreeSet::new();
for spec_name in specialists {
if let Some(spec) = registry.specialists.get(spec_name) {
if let Some(provider) = registry.providers.get(&spec.provider) {
if provider.api_key_env.is_some()
&& !keys.contains_key(&provider.name)
&& warned.insert(provider.name.clone())
{
eprintln!(
"warning: API key for provider '{}' is unset; specialists using it will fail",
provider.name
);
}
}
}
}
}
pub(crate) fn run_tool(name: &str, args: &[String]) -> Result<(), String> {
let tool = spawningpool::tools::resolve(&spawningpool::store::tools_dir(), name)?;
let mut vars = HashMap::new();
for arg in args {
let (key, value) = arg
.split_once('=')
.ok_or_else(|| format!("invalid --arg '{arg}': expected KEY=VALUE"))?;
vars.insert(key.to_string(), value.to_string());
}
for key in vars.keys() {
if !tool.params.iter().any(|p| &p.name == key) {
return Err(format!("tool '{name}' has no parameter '{key}'"));
}
}
for param in &tool.params {
if !vars.contains_key(¶m.name) {
return Err(format!(
"tool '{name}' is missing required parameter '{}'",
param.name
));
}
}
let run = spawningpool::run_script(&tool.script, &vars)
.map_err(|e| format!("failed to run tool '{name}': {e}"))?;
let output = run.structured_output.ok_or_else(|| {
format!(
"tool '{name}' didn't write to $SP_OUTPUT_PATH; it has no structured output to show"
)
})?;
println!("{output}");
Ok(())
}