use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Write as _;
use std::time::Duration;
use figment::Figment;
use figment::providers::{Env, Format, Toml};
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum PipelineError {
#[error("TOML parse error: {0}")]
ParseError(#[from] toml::de::Error),
#[error("Node '{node}' references unknown service '{service}'")]
UnknownService {
node: String,
service: String,
},
#[error("Node '{node}' depends on unknown node '{dep}'")]
UnknownDependency {
node: String,
dep: String,
},
#[error("Cycle detected involving node '{0}'")]
CycleDetected(String),
#[error("Node '{node}' is missing required field: {field}")]
MissingField {
node: String,
field: String,
},
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Config error: {0}")]
FigmentError(String),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ServiceDecl {
pub name: String,
pub kind: String,
pub model: Option<String>,
#[serde(flatten)]
pub extra: HashMap<String, toml::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct NodeDecl {
pub name: String,
#[serde(default)]
pub service: String,
#[serde(default)]
pub depends_on: Vec<String>,
pub url: Option<String>,
#[serde(flatten)]
pub params: HashMap<String, toml::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct PipelineDefinition {
#[serde(default)]
pub services: Vec<ServiceDecl>,
#[serde(default)]
pub nodes: Vec<NodeDecl>,
}
impl PipelineDefinition {
pub fn validate(&self) -> Result<(), PipelineError> {
let service_names: HashSet<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
let node_names: HashSet<&str> = self.nodes.iter().map(|n| n.name.as_str()).collect();
for node in &self.nodes {
if node.service.is_empty() {
return Err(PipelineError::MissingField {
node: node.name.clone(),
field: "service".to_string(),
});
}
if !self.services.is_empty() && !service_names.contains(node.service.as_str()) {
return Err(PipelineError::UnknownService {
node: node.name.clone(),
service: node.service.clone(),
});
}
for dep in &node.depends_on {
if !node_names.contains(dep.as_str()) {
return Err(PipelineError::UnknownDependency {
node: node.name.clone(),
dep: dep.clone(),
});
}
}
}
self.detect_cycles()?;
Ok(())
}
fn detect_cycles(&self) -> Result<(), PipelineError> {
let mut in_degree: HashMap<&str, usize> = HashMap::new();
let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
for node in &self.nodes {
in_degree.entry(node.name.as_str()).or_insert(0);
children.entry(node.name.as_str()).or_default();
for dep in &node.depends_on {
*in_degree.entry(node.name.as_str()).or_insert(0) += 1;
children
.entry(dep.as_str())
.or_default()
.push(node.name.as_str());
}
}
let mut queue: VecDeque<&str> = in_degree
.iter()
.filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
.collect();
let mut processed = 0usize;
while let Some(node) = queue.pop_front() {
processed += 1;
if let Some(dependents) = children.get(node) {
for &dep in dependents {
if let Some(deg) = in_degree.get_mut(dep) {
*deg -= 1;
if *deg == 0 {
queue.push_back(dep);
}
}
}
}
}
if processed < self.nodes.len() {
let cycle_node = in_degree
.iter()
.find(|&(_, &v)| v > 0)
.map_or("<unknown>", |(&k, _)| k);
return Err(PipelineError::CycleDetected(cycle_node.to_string()));
}
Ok(())
}
pub fn validate_against_registry<S: AsRef<str>>(
&self,
registered_services: &[S],
) -> Result<(), PipelineError> {
let names: HashSet<&str> = registered_services.iter().map(AsRef::as_ref).collect();
for node in &self.nodes {
if !names.contains(node.service.as_str()) {
return Err(PipelineError::UnknownService {
node: node.name.clone(),
service: node.service.clone(),
});
}
}
Ok(())
}
pub fn expand_templates(&mut self) {
for node in &mut self.nodes {
if let Some(url) = &node.url {
node.url = Some(expand_template(url));
}
}
for service in &mut self.services {
service.extra = service
.extra
.iter()
.map(|(k, v)| (k.clone(), expand_toml_value(v)))
.collect();
}
}
pub fn topological_order(&self) -> Result<Vec<String>, PipelineError> {
self.detect_cycles()?;
let mut in_degree: HashMap<&str, usize> = HashMap::new();
let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
for node in &self.nodes {
in_degree.entry(node.name.as_str()).or_insert(0);
children.entry(node.name.as_str()).or_default();
for dep in &node.depends_on {
*in_degree.entry(node.name.as_str()).or_insert(0) += 1;
children
.entry(dep.as_str())
.or_default()
.push(node.name.as_str());
}
}
let mut queue: VecDeque<&str> = in_degree
.iter()
.filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
.collect();
let mut order = Vec::new();
while let Some(node) = queue.pop_front() {
order.push(node.to_string());
if let Some(dependents) = children.get(node) {
for &dep in dependents {
if let Some(deg) = in_degree.get_mut(dep) {
*deg -= 1;
if *deg == 0 {
queue.push_back(dep);
}
}
}
}
}
Ok(order)
}
pub fn to_dot(&self) -> String {
let mut out = String::from("digraph pipeline {\n rankdir=LR;\n");
for node in &self.nodes {
let _ = writeln!(
out,
" \"{}\" [label=\"{}\\n({})\"]; ",
node.name, node.name, node.service
);
}
for node in &self.nodes {
for dep in &node.depends_on {
let _ = writeln!(out, " \"{}\" -> \"{}\";", dep, node.name);
}
}
out.push('}');
out
}
pub fn to_mermaid(&self) -> String {
let mut out = String::from("flowchart LR\n");
for node in &self.nodes {
let _ = writeln!(out, " {}[\"{}\\n{}\"]", node.name, node.name, node.service);
}
for node in &self.nodes {
for dep in &node.depends_on {
let _ = writeln!(out, " {} --> {}", dep, node.name);
}
}
out
}
}
pub(crate) fn expand_template(s: &str) -> String {
let mut result = s.to_string();
let mut start = 0;
while let Some(pos) = result[start..].find("${env:") {
let abs = start + pos;
if let Some(end) = result[abs..].find('}') {
let token = &result[abs..=abs + end]; let var_name = &token[6..token.len() - 1]; if let Ok(value) = std::env::var(var_name) {
result = result.replace(token, &value);
start = abs + value.len();
} else {
start = abs + token.len();
}
} else {
break;
}
}
result
}
fn expand_toml_value(v: &toml::Value) -> toml::Value {
match v {
toml::Value::String(s) => toml::Value::String(expand_template(s)),
toml::Value::Table(map) => toml::Value::Table(
map.iter()
.map(|(k, v)| (k.clone(), expand_toml_value(v)))
.collect(),
),
toml::Value::Array(arr) => toml::Value::Array(arr.iter().map(expand_toml_value).collect()),
other => other.clone(),
}
}
pub struct PipelineParser;
impl PipelineParser {
#[allow(clippy::should_implement_trait)]
pub fn from_str(toml: &str) -> Result<PipelineDefinition, PipelineError> {
Ok(toml::from_str(toml)?)
}
pub fn from_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
let content = std::fs::read_to_string(path)?;
let mut def: PipelineDefinition = toml::from_str(&content)?;
def.expand_templates();
Ok(def)
}
pub fn from_figment_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
let mut def: PipelineDefinition = Figment::new()
.merge(Toml::file(path))
.merge(Env::prefixed("STYGIAN_").lowercase(true))
.extract()
.map_err(|e| PipelineError::FigmentError(e.to_string()))?;
def.expand_templates();
Ok(def)
}
}
pub struct PipelineWatcher {
path: String,
interval: Duration,
}
impl PipelineWatcher {
pub fn new(path: impl Into<String>) -> Self {
Self {
path: path.into(),
interval: Duration::from_secs(5),
}
}
#[must_use]
pub const fn with_interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
pub fn watch<F>(self, callback: F) -> tokio::task::JoinHandle<()>
where
F: Fn(PipelineDefinition) + Send + 'static,
{
tokio::spawn(async move {
let mut last_mtime: Option<std::time::SystemTime> = None;
let mut ticker = tokio::time::interval(self.interval);
loop {
ticker.tick().await;
let mtime = tokio::fs::metadata(&self.path)
.await
.ok()
.and_then(|m| m.modified().ok());
if mtime != last_mtime {
last_mtime = mtime;
if let Ok(mut def) = PipelineParser::from_file(&self.path) {
def.expand_templates();
callback(def);
}
}
}
})
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::literal_string_with_formatting_args
)]
mod tests {
use super::*;
const SIMPLE: &str = r#"
[[services]]
name = "http"
kind = "http"
[[services]]
name = "claude"
kind = "claude"
[[nodes]]
name = "fetch"
service = "http"
url = "https://example.com"
[[nodes]]
name = "extract"
service = "claude"
depends_on = ["fetch"]
"#;
#[test]
fn parse_valid_pipeline() {
let def = PipelineParser::from_str(SIMPLE).unwrap();
assert_eq!(def.services.len(), 2);
assert_eq!(def.nodes.len(), 2);
assert_eq!(def.nodes[0].name, "fetch");
assert_eq!(def.nodes[1].depends_on, vec!["fetch"]);
}
#[test]
fn validate_valid_pipeline() {
let def = PipelineParser::from_str(SIMPLE).unwrap();
assert!(def.validate().is_ok());
}
#[test]
fn validate_unknown_service() {
let toml = r#"
[[services]]
name = "http"
kind = "http"
[[nodes]]
name = "n"
service = "nonexistent"
"#;
let def = PipelineParser::from_str(toml).unwrap();
let err = def.validate().unwrap_err();
assert!(matches!(err, PipelineError::UnknownService { .. }));
}
#[test]
fn validate_unknown_dependency() {
let toml = r#"
[[nodes]]
name = "n"
service = "http"
depends_on = ["ghost"]
"#;
let def = PipelineParser::from_str(toml).unwrap();
let err = def.validate().unwrap_err();
assert!(matches!(err, PipelineError::UnknownDependency { .. }));
}
#[test]
fn validate_cycle_detected() {
let toml = r#"
[[nodes]]
name = "a"
service = "http"
depends_on = ["b"]
[[nodes]]
name = "b"
service = "http"
depends_on = ["a"]
"#;
let def = PipelineParser::from_str(toml).unwrap();
let err = def.validate().unwrap_err();
assert!(matches!(err, PipelineError::CycleDetected(_)));
}
#[test]
fn validate_against_registry() {
let toml = r#"
[[nodes]]
name = "n"
service = "http"
"#;
let def = PipelineParser::from_str(toml).unwrap();
assert!(def.validate_against_registry(&["http".to_string()]).is_ok());
assert!(
def.validate_against_registry(&["other".to_string()])
.is_err()
);
}
#[test]
fn topological_order() {
let toml = r#"
[[nodes]]
name = "c"
service = "http"
depends_on = ["b"]
[[nodes]]
name = "a"
service = "http"
[[nodes]]
name = "b"
service = "http"
depends_on = ["a"]
"#;
let def = PipelineParser::from_str(toml).unwrap();
let order = def.topological_order().unwrap();
let pos_a = order.iter().position(|x| x == "a").unwrap();
let pos_b = order.iter().position(|x| x == "b").unwrap();
let pos_c = order.iter().position(|x| x == "c").unwrap();
assert!(pos_a < pos_b);
assert!(pos_b < pos_c);
}
#[test]
fn to_dot_output() {
let def = PipelineParser::from_str(SIMPLE).unwrap();
let dot = def.to_dot();
assert!(dot.contains("digraph pipeline"));
assert!(dot.contains("fetch"));
assert!(dot.contains("extract"));
assert!(dot.contains(r#""fetch" -> "extract""#));
}
#[test]
fn to_mermaid_output() {
let def = PipelineParser::from_str(SIMPLE).unwrap();
let mermaid = def.to_mermaid();
assert!(mermaid.contains("flowchart LR"));
assert!(mermaid.contains("fetch --> extract"));
}
#[test]
fn template_env_expansion() {
let cargo = std::env::var("CARGO").unwrap_or_else(|_| "cargo".to_string());
let toml = r#"
[[nodes]]
name = "n"
service = "http"
url = "${env:CARGO}"
"#
.to_string();
let mut def = PipelineParser::from_str(&toml).unwrap();
def.expand_templates();
assert_eq!(def.nodes[0].url.as_deref(), Some(cargo.as_str()));
}
#[test]
fn template_missing_env_left_as_is() {
let toml = r#"
[[nodes]]
name = "n"
service = "http"
url = "${env:STYGIAN_DEFINITELY_UNSET_VAR}"
"#;
let mut def = PipelineParser::from_str(toml).unwrap();
def.expand_templates();
assert_eq!(
def.nodes[0].url.as_deref(),
Some("${env:STYGIAN_DEFINITELY_UNSET_VAR}")
);
}
#[test]
fn empty_pipeline_valid() {
let def = PipelineParser::from_str("").unwrap();
assert!(def.validate().is_ok());
assert!(def.topological_order().unwrap().is_empty());
}
#[test]
fn dot_empty_pipeline() {
let def = PipelineParser::from_str("").unwrap();
let dot = def.to_dot();
assert!(dot.starts_with("digraph pipeline"));
}
#[test]
fn missing_service_field_fails_validation() {
let toml = r#"
[[nodes]]
name = "orphan"
"#;
let def = PipelineParser::from_str(toml).unwrap();
let err = def.validate().unwrap_err();
assert!(
matches!(err, PipelineError::MissingField { ref field, .. } if field == "service"),
"expected MissingField(service), got {err}"
);
}
#[test]
fn nonexistent_ai_provider_returns_clear_error() {
let toml = r#"
[[nodes]]
name = "extract"
service = "claude"
"#;
let def = PipelineParser::from_str(toml).unwrap();
let registered = vec!["http".to_string()]; let err = def.validate_against_registry(®istered).unwrap_err();
assert!(
matches!(err, PipelineError::UnknownService { ref service, .. } if service == "claude"),
"expected UnknownService(claude), got {err}"
);
}
#[test]
fn from_figment_file_loads_toml() {
use std::io::Write as _;
let mut tmp = tempfile::NamedTempFile::new().unwrap();
writeln!(
tmp,
r#"
[[services]]
name = "http"
kind = "http"
[[nodes]]
name = "fetch"
service = "http"
url = "https://example.com"
"#
)
.unwrap();
let def = PipelineParser::from_figment_file(tmp.path().to_str().unwrap()).unwrap();
assert_eq!(def.nodes.len(), 1);
assert_eq!(def.nodes[0].name, "fetch");
assert!(def.validate().is_ok());
}
}