use crate::core::context::WorkflowEvent;
use crate::core::parser::GraphParser;
use crate::core::resolver;
use crate::core::validator::WorkflowValidator;
use crate::services::config::JuglansConfig;
use crate::services::local_runtime::LocalRuntime;
use crate::services::prompt_loader::PromptRegistry;
use crate::WorkflowContext;
use crate::WorkflowExecutor;
use anyhow::{anyhow, Context, Result};
use serde_json::Value;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::mpsc;
pub async fn run_file(path: impl AsRef<Path>, input: Option<Value>) -> Result<Value> {
RunBuilder::from_file(path)?.run(input).await
}
pub struct RunBuilder {
file_path: PathBuf,
project_root: PathBuf,
config: JuglansConfig,
runtime: Option<Arc<LocalRuntime>>,
}
impl RunBuilder {
pub fn from_file(path: impl AsRef<Path>) -> Result<Self> {
let absolute = std::fs::canonicalize(path.as_ref())
.with_context(|| format!("Cannot resolve {:?}", path.as_ref()))?;
let project_root = find_project_root(&absolute);
let config = {
let _guard = SetCwd::new(&project_root)?;
JuglansConfig::load()?
};
Ok(Self {
file_path: absolute,
project_root,
config,
runtime: None,
})
}
pub fn runtime(mut self, rt: Arc<LocalRuntime>) -> Self {
self.runtime = Some(rt);
self
}
pub fn config(mut self, config: JuglansConfig) -> Self {
self.config = config;
self
}
pub async fn run(self, input: Option<Value>) -> Result<Value> {
let ctx = self.run_context(input).await?;
Ok(ctx.resolve_path("output")?.unwrap_or(Value::Null))
}
pub async fn run_context(self, input: Option<Value>) -> Result<WorkflowContext> {
self.run_context_inner(input, None).await
}
pub fn run_stream(
self,
input: Option<Value>,
) -> Result<(
mpsc::UnboundedReceiver<WorkflowEvent>,
tokio::task::JoinHandle<Result<Value>>,
)> {
let (tx, rx) = mpsc::unbounded_channel::<WorkflowEvent>();
let handle = tokio::spawn(async move {
let ctx = self.run_context_inner(input, Some(tx)).await?;
Ok(ctx.resolve_path("output")?.unwrap_or(Value::Null))
});
Ok((rx, handle))
}
async fn run_context_inner(
self,
input: Option<Value>,
sender: Option<mpsc::UnboundedSender<WorkflowEvent>>,
) -> Result<WorkflowContext> {
let _guard = SetCwd::new(&self.project_root)?;
let file_parent = self.file_path.parent().unwrap_or(Path::new("."));
let base_dir = pathdiff::diff_paths(file_parent, &self.project_root)
.unwrap_or_else(|| PathBuf::from("."));
let at_base: Option<PathBuf> = self
.config
.paths
.base
.as_ref()
.map(|b| self.project_root.join(b));
let source = std::fs::read_to_string(&self.file_path)?;
let mut workflow = GraphParser::parse(&source)?;
let mut import_stack = vec![self.file_path.clone()];
resolver::resolve_lib_imports(
&mut workflow,
&base_dir,
&mut import_stack,
at_base.as_deref(),
)?;
import_stack = vec![self.file_path.clone()];
resolver::resolve_flow_imports(
&mut workflow,
&base_dir,
&mut import_stack,
at_base.as_deref(),
)?;
let validation = WorkflowValidator::validate(&workflow);
if !validation.is_valid {
return Err(anyhow!(
"Validation failed:\n{}",
validation.format_report(&self.file_path.display().to_string())
));
}
let mut prompt_registry = PromptRegistry::new();
let resolve_patterns = |patterns: &[String]| -> Vec<String> {
let expanded = resolver::expand_at_prefixes(patterns, at_base.as_deref());
expanded
.into_iter()
.map(|p| {
if Path::new(&p).is_absolute() {
p
} else {
base_dir.join(&p).to_string_lossy().to_string()
}
})
.collect()
};
let p_patterns = resolve_patterns(&workflow.prompt_patterns);
workflow.tool_patterns = resolve_patterns(&workflow.tool_patterns);
if !p_patterns.is_empty() {
prompt_registry.load_from_paths(&p_patterns)?;
}
let workflow = Arc::new(workflow);
if let Err(e) = crate::services::history::init_global(&self.config.history) {
tracing::warn!("[history] init_global failed: {}", e);
}
let runtime: Arc<LocalRuntime> = match self.runtime {
Some(rt) => rt,
None => Arc::new(LocalRuntime::new_with_config(&self.config.ai)),
};
let mut executor = WorkflowExecutor::new_with_debug(
Arc::new(prompt_registry),
runtime,
self.config.debug.clone(),
)
.await;
executor.apply_limits(&self.config.limits);
executor.load_tools(&workflow).await;
if let Err(e) = executor.init_python_runtime(&workflow, self.config.limits.python_workers) {
tracing::warn!("Failed to initialize Python runtime: {}", e);
}
let executor = Arc::new(executor);
executor
.get_registry()
.set_executor(Arc::downgrade(&executor));
let context = match sender {
Some(tx) => {
let ctx = WorkflowContext::with_sender(tx);
ctx.set_stream_node_events(true);
ctx
}
None => WorkflowContext::new(),
};
if let Ok(config_value) = serde_json::to_value(&self.config) {
context.set("config".to_string(), config_value)?;
}
if let Some(input_val) = input {
if let Some(obj) = input_val.as_object() {
for (key, val) in obj {
context.set(format!("input.{}", key), val.clone())?;
}
}
context.set("input".to_string(), input_val)?;
}
executor.execute_graph(workflow, &context).await?;
Ok(context)
}
}
fn find_project_root(start: &Path) -> PathBuf {
let mut current = if start.is_file() {
start.parent().unwrap_or(Path::new(".")).to_path_buf()
} else {
start.to_path_buf()
};
let fallback = current.clone();
loop {
if current.join("juglans.toml").exists() {
return current;
}
if !current.pop() {
return fallback;
}
}
}
struct SetCwd {
previous: PathBuf,
}
impl SetCwd {
fn new(dir: &Path) -> Result<Self> {
let previous = std::env::current_dir()?;
std::env::set_current_dir(dir)?;
Ok(Self { previous })
}
}
impl Drop for SetCwd {
fn drop(&mut self) {
let _ = std::env::set_current_dir(&self.previous);
}
}