use serde::{Deserialize, Serialize};
use super::error::{GraphError, StygianError};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineUnvalidated {
pub config: serde_json::Value,
}
#[derive(Debug, Clone)]
pub struct PipelineValidated {
pub config: serde_json::Value,
}
#[derive(Debug)]
pub struct PipelineExecuting {
pub context: serde_json::Value,
}
#[derive(Debug)]
pub struct PipelineComplete {
pub results: serde_json::Value,
}
impl PipelineUnvalidated {
pub const fn new(config: serde_json::Value) -> Self {
Self { config }
}
#[allow(clippy::too_many_lines, clippy::unwrap_used, clippy::indexing_slicing)]
pub fn validate(self) -> Result<PipelineValidated, StygianError> {
use std::collections::{HashMap, HashSet, VecDeque};
let nodes = self
.config
.get("nodes")
.and_then(|n| n.as_array())
.ok_or_else(|| {
GraphError::InvalidPipeline("Pipeline must contain a 'nodes' array".to_string())
})?;
let empty_edges = vec![];
let edges = self
.config
.get("edges")
.and_then(|e| e.as_array())
.unwrap_or(&empty_edges);
if nodes.is_empty() {
return Err(GraphError::InvalidPipeline(
"Pipeline must contain at least one node".to_string(),
)
.into());
}
let mut node_map: HashMap<String, usize> = HashMap::new();
let valid_services = [
"http",
"http_escalating",
"browser",
"ai_claude",
"ai_openai",
"ai_gemini",
"ai_github",
"ai_ollama",
"javascript",
"graphql",
"storage",
];
for (idx, node) in nodes.iter().enumerate() {
let node_obj = node.as_object().ok_or_else(|| {
GraphError::InvalidPipeline(format!("Node at index {idx}: must be an object"))
})?;
let node_id = node_obj.get("id").and_then(|v| v.as_str()).ok_or_else(|| {
GraphError::InvalidPipeline(format!(
"Node at index {idx}: 'id' field is required and must be a string"
))
})?;
if node_id.is_empty() {
return Err(GraphError::InvalidPipeline(format!(
"Node at index {idx}: id cannot be empty"
))
.into());
}
if node_map.insert(node_id.to_string(), idx).is_some() {
return Err(
GraphError::InvalidPipeline(format!("Duplicate node id: '{node_id}'")).into(),
);
}
let service = node_obj
.get("service")
.and_then(|v| v.as_str())
.ok_or_else(|| {
GraphError::InvalidPipeline(format!(
"Node '{node_id}': 'service' field is required and must be a string"
))
})?;
if !valid_services.contains(&service) {
return Err(GraphError::InvalidPipeline(format!(
"Node '{node_id}': service type '{service}' is not recognized"
))
.into());
}
}
let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
let mut in_degree: HashMap<String, usize> = HashMap::new();
for node in nodes {
if let Some(id) = node.get("id").and_then(|v| v.as_str()) {
in_degree.insert(id.to_string(), 0);
adjacency.insert(id.to_string(), Vec::new());
}
}
for (edge_idx, edge) in edges.iter().enumerate() {
let edge_obj = edge.as_object().ok_or_else(|| {
GraphError::InvalidPipeline(format!("Edge at index {edge_idx}: must be an object"))
})?;
let from = edge_obj
.get("from")
.and_then(|v| v.as_str())
.ok_or_else(|| {
GraphError::InvalidPipeline(format!(
"Edge at index {edge_idx}: 'from' field is required and must be a string"
))
})?;
let to = edge_obj.get("to").and_then(|v| v.as_str()).ok_or_else(|| {
GraphError::InvalidPipeline(format!(
"Edge at index {edge_idx}: 'to' field is required and must be a string"
))
})?;
if !node_map.contains_key(from) {
return Err(GraphError::InvalidPipeline(format!(
"Edge {from} -> {to}: source node '{from}' not found"
))
.into());
}
if !node_map.contains_key(to) {
return Err(GraphError::InvalidPipeline(format!(
"Edge {from} -> {to}: target node '{to}' not found"
))
.into());
}
if from == to {
return Err(GraphError::InvalidPipeline(format!(
"Self-loop detected at node '{from}'"
))
.into());
}
adjacency.get_mut(from).unwrap().push(to.to_string());
*in_degree.get_mut(to).unwrap() += 1;
}
let mut in_degree_copy = in_degree.clone();
let mut queue: VecDeque<String> = VecDeque::new();
let entry_points: Vec<String> = in_degree_copy
.iter()
.filter(|(_, degree)| **degree == 0)
.map(|(node_id, _)| node_id.clone())
.collect();
for node_id in entry_points {
queue.push_back(node_id);
}
let mut sorted_count = 0;
while let Some(node_id) = queue.pop_front() {
sorted_count += 1;
if let Some(neighbors) = adjacency.get(&node_id) {
let neighbors_copy = neighbors.clone();
for neighbor in neighbors_copy {
*in_degree_copy.get_mut(&neighbor).unwrap() -= 1;
if in_degree_copy[&neighbor] == 0 {
queue.push_back(neighbor);
}
}
}
}
if sorted_count != node_map.len() {
return Err(GraphError::InvalidPipeline(
"Cycle detected in pipeline graph".to_string(),
)
.into());
}
let mut visited: HashSet<String> = HashSet::new();
let mut to_visit: VecDeque<String> = VecDeque::new();
let mut entry_points = Vec::new();
for (node_id, degree) in &in_degree {
if *degree == 0 {
entry_points.push(node_id.clone());
}
}
if entry_points.is_empty() {
return Err(GraphError::InvalidPipeline(
"No entry points found (all nodes have incoming edges)".to_string(),
)
.into());
}
to_visit.push_back(entry_points[0].clone());
while let Some(node_id) = to_visit.pop_front() {
if visited.insert(node_id.clone()) {
if let Some(neighbors) = adjacency.get(&node_id) {
for neighbor in neighbors {
to_visit.push_back(neighbor.clone());
}
}
for (source, targets) in &adjacency {
if targets.contains(&node_id) && !visited.contains(source) {
to_visit.push_back(source.clone());
}
}
}
}
let all_node_ids: HashSet<String> = node_map.keys().cloned().collect();
let unreachable: Vec<_> = all_node_ids.difference(&visited).collect();
if !unreachable.is_empty() {
let unreachable_str = unreachable
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join("', '");
return Err(GraphError::InvalidPipeline(format!(
"Unreachable nodes found: '{unreachable_str}' (ensure all nodes are connected in a single DAG)"
))
.into());
}
Ok(PipelineValidated {
config: self.config,
})
}
}
impl PipelineValidated {
pub fn execute(self) -> PipelineExecuting {
PipelineExecuting {
context: self.config,
}
}
}
impl PipelineExecuting {
pub fn complete(self, results: serde_json::Value) -> PipelineComplete {
PipelineComplete { results }
}
pub fn abort(self, error: &str) -> PipelineComplete {
PipelineComplete {
results: serde_json::json!({
"status": "error",
"error": error
}),
}
}
}
impl PipelineComplete {
pub fn is_success(&self) -> bool {
self.results
.get("status")
.and_then(|s| s.as_str())
.is_some_and(|s| s == "success")
}
pub const fn results(&self) -> &serde_json::Value {
&self.results
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn validate_empty_nodes_array() {
let pipe = PipelineUnvalidated::new(json!({"nodes": [], "edges": []}));
let result = pipe.validate();
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("at least one node")
);
}
#[test]
fn validate_missing_nodes_field() {
let pipe = PipelineUnvalidated::new(json!({"edges": []}));
let result = pipe.validate();
assert!(result.is_err());
}
#[test]
fn validate_missing_node_id() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [{"service": "http"}],
"edges": []
}));
let result = pipe.validate();
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("'id' field is required")
);
}
#[test]
fn validate_empty_node_id() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [{"id": "", "service": "http"}],
"edges": []
}));
let result = pipe.validate();
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("id cannot be empty")
);
}
#[test]
fn validate_duplicate_node_ids() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [
{"id": "fetch", "service": "http"},
{"id": "fetch", "service": "browser"}
],
"edges": []
}));
let result = pipe.validate();
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Duplicate node id")
);
}
#[test]
fn validate_invalid_service_type() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [{"id": "fetch", "service": "invalid_service"}],
"edges": []
}));
let result = pipe.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not recognized"));
}
#[test]
fn validate_edge_nonexistent_source() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [{"id": "extract", "service": "ai_claude"}],
"edges": [{"from": "fetch", "to": "extract"}]
}));
let result = pipe.validate();
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("source node 'fetch' not found")
);
}
#[test]
fn validate_edge_nonexistent_target() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [{"id": "fetch", "service": "http"}],
"edges": [{"from": "fetch", "to": "extract"}]
}));
let result = pipe.validate();
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("target node 'extract' not found")
);
}
#[test]
fn validate_self_loop() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [{"id": "node1", "service": "http"}],
"edges": [{"from": "node1", "to": "node1"}]
}));
let result = pipe.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Self-loop"));
}
#[test]
fn validate_cycle_detection() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [
{"id": "a", "service": "http"},
{"id": "b", "service": "ai_claude"},
{"id": "c", "service": "browser"}
],
"edges": [
{"from": "a", "to": "b"},
{"from": "b", "to": "c"},
{"from": "c", "to": "a"}
]
}));
let result = pipe.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Cycle"));
}
#[test]
fn validate_unreachable_nodes() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [
{"id": "a", "service": "http"},
{"id": "orphan", "service": "browser"}
],
"edges": []
}));
let result = pipe.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Unreachable"));
}
#[test]
fn validate_valid_single_node() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [{"id": "fetch", "service": "http"}],
"edges": []
}));
assert!(pipe.validate().is_ok());
}
#[test]
fn validate_valid_linear_pipeline() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [
{"id": "fetch", "service": "http"},
{"id": "extract", "service": "ai_claude"},
{"id": "store", "service": "storage"}
],
"edges": [
{"from": "fetch", "to": "extract"},
{"from": "extract", "to": "store"}
]
}));
assert!(pipe.validate().is_ok());
}
#[test]
fn validate_valid_dag_branching() {
let pipe = PipelineUnvalidated::new(json!({
"nodes": [
{"id": "fetch", "service": "http"},
{"id": "extract_ai", "service": "ai_claude"},
{"id": "extract_browser", "service": "browser"},
{"id": "merge", "service": "storage"}
],
"edges": [
{"from": "fetch", "to": "extract_ai"},
{"from": "fetch", "to": "extract_browser"},
{"from": "extract_ai", "to": "merge"},
{"from": "extract_browser", "to": "merge"}
]
}));
assert!(pipe.validate().is_ok());
}
}