streamweave-attractor 0.3.0

Attractor pipeline as a StreamWeave graph
Documentation
//! Agent invocation: run the agent command (agent_cmd option) with prompt as stdin; outcome from JSON on stdout or exit code.
//! Shared by runner and CodergenNode.
//! Supports injecting RunContext into prompts so codergen nodes can pass data to each other.

use crate::types::{NodeOutcome, RunContext};
use std::collections::HashMap;
use std::io::Write;
use std::process::Command;
use tracing::instrument;

/// Parses agent stdout as JSON: { "outcome": "success"|"fail", "context_updates": { ... } }.
/// Returns Err on parse failure or missing/invalid outcome.
pub(crate) fn parse_agent_json_response(stdout: &str) -> Result<NodeOutcome, String> {
  let stdout = stdout.trim();
  if stdout.is_empty() {
    return Err("empty response".to_string());
  }
  let v: serde_json::Value =
    serde_json::from_str(stdout).map_err(|e| format!("invalid JSON: {}", e))?;
  let outcome_str = v
    .get("outcome")
    .and_then(|o| o.as_str())
    .ok_or("missing outcome field")?;
  let is_success = match outcome_str.trim().to_lowercase().as_str() {
    "success" | "partial_success" => true,
    "fail" | "error" => false,
    _ => return Err(format!("invalid outcome value: {}", outcome_str)),
  };
  let empty: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
  let obj = v
    .get("context_updates")
    .and_then(|o| o.as_object())
    .unwrap_or(&empty);
  let mut context_updates = HashMap::new();
  for (k, v) in obj {
    if let Some(s) = v.as_str() {
      context_updates.insert(k.clone(), s.to_string());
    }
  }
  let mut outcome = if is_success {
    NodeOutcome::success("agent completed")
  } else {
    NodeOutcome::error("agent reported outcome=fail")
  };
  outcome.context_updates = context_updates;
  Ok(outcome)
}

/// Wraps the graph prompt with JSON guiderails: agent must respond with only a JSON object.
/// Schema: { "outcome": "success"|"fail", "context_updates": { ... } }. No other text.
pub(crate) fn wrap_prompt_for_json_response(prompt: &str) -> String {
  const PREFIX: &str = "You must respond with only a single JSON object. Use keys: \"outcome\" (value \"success\" or \"fail\") and \"context_updates\" (object of string key-value pairs). No other output.\n\n---\n\n";
  format!("{}{}", PREFIX, prompt)
}

/// Formats RunContext for inclusion in an agent prompt. Returns empty string if context is empty.
/// Keys are sorted for stable ordering. Values are included in full (no truncation).
pub(crate) fn format_context_for_prompt(context: &RunContext) -> String {
  if context.is_empty() {
    return String::new();
  }
  let mut keys: Vec<&String> = context.keys().collect();
  keys.sort();
  let lines: Vec<String> = keys
    .into_iter()
    .map(|k| {
      let v = context.get(k).map(String::as_str).unwrap_or("");
      format!(
        "{}:
{}",
        k, v
      )
    })
    .collect();
  format!(
    "Context from previous steps:

{}
",
    lines.join(
      "
---

"
    )
  )
}

/// Builds the prompt to send to the agent: when context is non-empty, prepends a "Context from previous steps" block
/// so downstream codergen nodes receive data from upstream nodes. Works for arbitrary graphs.
pub(crate) fn build_prompt_with_context(static_prompt: &str, context: &RunContext) -> String {
  let block = format_context_for_prompt(context);
  if block.is_empty() {
    static_prompt.to_string()
  } else {
    format!("{}{}", block, static_prompt)
  }
}

/// Runs the agent command with prompt as stdin; returns NodeOutcome from JSON on stdout or from exit code.
/// Used by the compiled workflow and by CodergenNode.
#[instrument(level = "trace", skip(agent_cmd, prompt, _stage_dir))]
pub(crate) fn run_agent(
  agent_cmd: &str,
  prompt: &str,
  _stage_dir: Option<&std::path::Path>,
) -> NodeOutcome {
  let parts: Vec<&str> = agent_cmd.split_whitespace().collect();
  let (bin, args) = match parts.split_first() {
    Some((b, a)) => (b, a),
    None => return NodeOutcome::error("agent_cmd is empty"),
  };

  match Command::new(bin)
    .args(args)
    .stdin(std::process::Stdio::piped())
    .stdout(std::process::Stdio::piped())
    .stderr(std::process::Stdio::inherit())
    .spawn()
  {
    Ok(mut child) => {
      if let Some(mut stdin) = child.stdin.take() {
        let _ = stdin.write_all(prompt.as_bytes());
        let _ = stdin.write_all(b"\n");
      }
      let mut stdout = String::new();
      if let Some(mut out) = child.stdout.take() {
        let _ = std::io::Read::read_to_string(&mut out, &mut stdout);
      }
      match child.wait() {
        Ok(status) => {
          let stdout_trim = stdout.trim();
          if !stdout_trim.is_empty() {
            tracing::info!(response = %stdout_trim, "codergen LLM response");
            if let Ok(outcome) = parse_agent_json_response(stdout_trim) {
              return outcome;
            }
          }
          if status.success() {
            NodeOutcome::success("agent completed")
          } else {
            let msg = status
              .code()
              .map(|c| format!("agent exit {}", c))
              .unwrap_or_else(|| "agent signal".to_string());
            NodeOutcome::error(msg)
          }
        }
        Err(e) => NodeOutcome::error(format!("agent wait: {}", e)),
      }
    }
    Err(e) => NodeOutcome::error(format!("agent spawn: {}", e)),
  }
}