1use serde::{Deserialize, Serialize};
22
23use super::error::{GraphError, StygianError};
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct PipelineUnvalidated {
31 pub config: serde_json::Value,
33}
34
35#[derive(Debug, Clone)]
39pub struct PipelineValidated {
40 pub config: serde_json::Value,
42}
43
44#[derive(Debug)]
48pub struct PipelineExecuting {
49 pub context: serde_json::Value,
51}
52
53#[derive(Debug)]
57pub struct PipelineComplete {
58 pub results: serde_json::Value,
60}
61
62impl PipelineUnvalidated {
63 pub const fn new(config: serde_json::Value) -> Self {
77 Self { config }
78 }
79
80 #[allow(clippy::too_many_lines, clippy::unwrap_used, clippy::indexing_slicing)]
101 pub fn validate(self) -> Result<PipelineValidated, StygianError> {
102 use std::collections::{HashMap, HashSet, VecDeque};
103
104 let nodes = self
106 .config
107 .get("nodes")
108 .and_then(|n| n.as_array())
109 .ok_or_else(|| {
110 GraphError::InvalidPipeline("Pipeline must contain a 'nodes' array".to_string())
111 })?;
112
113 let empty_edges = vec![];
114 let edges = self
115 .config
116 .get("edges")
117 .and_then(|e| e.as_array())
118 .unwrap_or(&empty_edges);
119
120 if nodes.is_empty() {
122 return Err(GraphError::InvalidPipeline(
123 "Pipeline must contain at least one node".to_string(),
124 )
125 .into());
126 }
127
128 let mut node_map: HashMap<String, usize> = HashMap::new();
130 let valid_services = [
131 "http",
132 "http_escalating",
133 "browser",
134 "ai_claude",
135 "ai_openai",
136 "ai_gemini",
137 "ai_github",
138 "ai_ollama",
139 "javascript",
140 "graphql",
141 "storage",
142 ];
143
144 for (idx, node) in nodes.iter().enumerate() {
145 let node_obj = node.as_object().ok_or_else(|| {
146 GraphError::InvalidPipeline(format!("Node at index {idx}: must be an object"))
147 })?;
148
149 let node_id = node_obj.get("id").and_then(|v| v.as_str()).ok_or_else(|| {
151 GraphError::InvalidPipeline(format!(
152 "Node at index {idx}: 'id' field is required and must be a string"
153 ))
154 })?;
155
156 if node_id.is_empty() {
157 return Err(GraphError::InvalidPipeline(format!(
158 "Node at index {idx}: id cannot be empty"
159 ))
160 .into());
161 }
162
163 if node_map.insert(node_id.to_string(), idx).is_some() {
165 return Err(
166 GraphError::InvalidPipeline(format!("Duplicate node id: '{node_id}'")).into(),
167 );
168 }
169
170 let service = node_obj
172 .get("service")
173 .and_then(|v| v.as_str())
174 .ok_or_else(|| {
175 GraphError::InvalidPipeline(format!(
176 "Node '{node_id}': 'service' field is required and must be a string"
177 ))
178 })?;
179
180 if !valid_services.contains(&service) {
181 return Err(GraphError::InvalidPipeline(format!(
182 "Node '{node_id}': service type '{service}' is not recognized"
183 ))
184 .into());
185 }
186 }
187
188 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
190 let mut in_degree: HashMap<String, usize> = HashMap::new();
191
192 for node in nodes {
194 if let Some(id) = node.get("id").and_then(|v| v.as_str()) {
195 in_degree.insert(id.to_string(), 0);
196 adjacency.insert(id.to_string(), Vec::new());
197 }
198 }
199
200 for (edge_idx, edge) in edges.iter().enumerate() {
201 let edge_obj = edge.as_object().ok_or_else(|| {
202 GraphError::InvalidPipeline(format!("Edge at index {edge_idx}: must be an object"))
203 })?;
204
205 let from = edge_obj
206 .get("from")
207 .and_then(|v| v.as_str())
208 .ok_or_else(|| {
209 GraphError::InvalidPipeline(format!(
210 "Edge at index {edge_idx}: 'from' field is required and must be a string"
211 ))
212 })?;
213
214 let to = edge_obj.get("to").and_then(|v| v.as_str()).ok_or_else(|| {
215 GraphError::InvalidPipeline(format!(
216 "Edge at index {edge_idx}: 'to' field is required and must be a string"
217 ))
218 })?;
219
220 if !node_map.contains_key(from) {
222 return Err(GraphError::InvalidPipeline(format!(
223 "Edge {from} -> {to}: source node '{from}' not found"
224 ))
225 .into());
226 }
227
228 if !node_map.contains_key(to) {
230 return Err(GraphError::InvalidPipeline(format!(
231 "Edge {from} -> {to}: target node '{to}' not found"
232 ))
233 .into());
234 }
235
236 if from == to {
238 return Err(GraphError::InvalidPipeline(format!(
239 "Self-loop detected at node '{from}'"
240 ))
241 .into());
242 }
243
244 adjacency.get_mut(from).unwrap().push(to.to_string());
246 *in_degree.get_mut(to).unwrap() += 1;
247 }
248
249 let mut in_degree_copy = in_degree.clone();
251 let mut queue: VecDeque<String> = VecDeque::new();
252
253 let entry_points: Vec<String> = in_degree_copy
255 .iter()
256 .filter(|(_, degree)| **degree == 0)
257 .map(|(node_id, _)| node_id.clone())
258 .collect();
259 for node_id in entry_points {
260 queue.push_back(node_id);
261 }
262
263 let mut sorted_count = 0;
264 while let Some(node_id) = queue.pop_front() {
265 sorted_count += 1;
266
267 if let Some(neighbors) = adjacency.get(&node_id) {
269 let neighbors_copy = neighbors.clone();
270 for neighbor in neighbors_copy {
271 *in_degree_copy.get_mut(&neighbor).unwrap() -= 1;
272 if in_degree_copy[&neighbor] == 0 {
273 queue.push_back(neighbor);
274 }
275 }
276 }
277 }
278
279 if sorted_count != node_map.len() {
281 return Err(GraphError::InvalidPipeline(
282 "Cycle detected in pipeline graph".to_string(),
283 )
284 .into());
285 }
286
287 let mut visited: HashSet<String> = HashSet::new();
291 let mut to_visit: VecDeque<String> = VecDeque::new();
292
293 let mut entry_points = Vec::new();
295 for (node_id, degree) in &in_degree {
296 if *degree == 0 {
297 entry_points.push(node_id.clone());
298 }
299 }
300
301 if entry_points.is_empty() {
302 return Err(GraphError::InvalidPipeline(
304 "No entry points found (all nodes have incoming edges)".to_string(),
305 )
306 .into());
307 }
308
309 to_visit.push_back(entry_points[0].clone());
311
312 while let Some(node_id) = to_visit.pop_front() {
314 if visited.insert(node_id.clone()) {
315 if let Some(neighbors) = adjacency.get(&node_id) {
317 for neighbor in neighbors {
318 to_visit.push_back(neighbor.clone());
319 }
320 }
321
322 for (source, targets) in &adjacency {
324 if targets.contains(&node_id) && !visited.contains(source) {
325 to_visit.push_back(source.clone());
326 }
327 }
328 }
329 }
330
331 let all_node_ids: HashSet<String> = node_map.keys().cloned().collect();
333 let unreachable: Vec<_> = all_node_ids.difference(&visited).collect();
334
335 if !unreachable.is_empty() {
336 let unreachable_str = unreachable
337 .iter()
338 .map(|s| s.as_str())
339 .collect::<Vec<_>>()
340 .join("', '");
341 return Err(GraphError::InvalidPipeline(format!(
342 "Unreachable nodes found: '{unreachable_str}' (ensure all nodes are connected in a single DAG)"
343 ))
344 .into());
345 }
346
347 Ok(PipelineValidated {
348 config: self.config,
349 })
350 }
351}
352
353impl PipelineValidated {
354 pub fn execute(self) -> PipelineExecuting {
372 PipelineExecuting {
373 context: self.config,
374 }
375 }
376}
377
378impl PipelineExecuting {
379 pub fn complete(self, results: serde_json::Value) -> PipelineComplete {
399 PipelineComplete { results }
400 }
401
402 pub fn abort(self, error: &str) -> PipelineComplete {
422 PipelineComplete {
423 results: serde_json::json!({
424 "status": "error",
425 "error": error
426 }),
427 }
428 }
429}
430
431impl PipelineComplete {
432 pub fn is_success(&self) -> bool {
451 self.results
452 .get("status")
453 .and_then(|s| s.as_str())
454 .is_some_and(|s| s == "success")
455 }
456
457 pub const fn results(&self) -> &serde_json::Value {
459 &self.results
460 }
461}
462
463#[cfg(test)]
464#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
465mod tests {
466 use super::*;
467 use serde_json::json;
468
469 #[test]
470 fn validate_empty_nodes_array() {
471 let pipe = PipelineUnvalidated::new(json!({"nodes": [], "edges": []}));
472 let result = pipe.validate();
473 assert!(result.is_err());
474 assert!(
475 result
476 .unwrap_err()
477 .to_string()
478 .contains("at least one node")
479 );
480 }
481
482 #[test]
483 fn validate_missing_nodes_field() {
484 let pipe = PipelineUnvalidated::new(json!({"edges": []}));
485 let result = pipe.validate();
486 assert!(result.is_err());
487 }
488
489 #[test]
490 fn validate_missing_node_id() {
491 let pipe = PipelineUnvalidated::new(json!({
492 "nodes": [{"service": "http"}],
493 "edges": []
494 }));
495 let result = pipe.validate();
496 assert!(result.is_err());
497 assert!(
498 result
499 .unwrap_err()
500 .to_string()
501 .contains("'id' field is required")
502 );
503 }
504
505 #[test]
506 fn validate_empty_node_id() {
507 let pipe = PipelineUnvalidated::new(json!({
508 "nodes": [{"id": "", "service": "http"}],
509 "edges": []
510 }));
511 let result = pipe.validate();
512 assert!(result.is_err());
513 assert!(
514 result
515 .unwrap_err()
516 .to_string()
517 .contains("id cannot be empty")
518 );
519 }
520
521 #[test]
522 fn validate_duplicate_node_ids() {
523 let pipe = PipelineUnvalidated::new(json!({
524 "nodes": [
525 {"id": "fetch", "service": "http"},
526 {"id": "fetch", "service": "browser"}
527 ],
528 "edges": []
529 }));
530 let result = pipe.validate();
531 assert!(result.is_err());
532 assert!(
533 result
534 .unwrap_err()
535 .to_string()
536 .contains("Duplicate node id")
537 );
538 }
539
540 #[test]
541 fn validate_invalid_service_type() {
542 let pipe = PipelineUnvalidated::new(json!({
543 "nodes": [{"id": "fetch", "service": "invalid_service"}],
544 "edges": []
545 }));
546 let result = pipe.validate();
547 assert!(result.is_err());
548 assert!(result.unwrap_err().to_string().contains("not recognized"));
549 }
550
551 #[test]
552 fn validate_edge_nonexistent_source() {
553 let pipe = PipelineUnvalidated::new(json!({
554 "nodes": [{"id": "extract", "service": "ai_claude"}],
555 "edges": [{"from": "fetch", "to": "extract"}]
556 }));
557 let result = pipe.validate();
558 assert!(result.is_err());
559 assert!(
560 result
561 .unwrap_err()
562 .to_string()
563 .contains("source node 'fetch' not found")
564 );
565 }
566
567 #[test]
568 fn validate_edge_nonexistent_target() {
569 let pipe = PipelineUnvalidated::new(json!({
570 "nodes": [{"id": "fetch", "service": "http"}],
571 "edges": [{"from": "fetch", "to": "extract"}]
572 }));
573 let result = pipe.validate();
574 assert!(result.is_err());
575 assert!(
576 result
577 .unwrap_err()
578 .to_string()
579 .contains("target node 'extract' not found")
580 );
581 }
582
583 #[test]
584 fn validate_self_loop() {
585 let pipe = PipelineUnvalidated::new(json!({
586 "nodes": [{"id": "node1", "service": "http"}],
587 "edges": [{"from": "node1", "to": "node1"}]
588 }));
589 let result = pipe.validate();
590 assert!(result.is_err());
591 assert!(result.unwrap_err().to_string().contains("Self-loop"));
592 }
593
594 #[test]
595 fn validate_cycle_detection() {
596 let pipe = PipelineUnvalidated::new(json!({
597 "nodes": [
598 {"id": "a", "service": "http"},
599 {"id": "b", "service": "ai_claude"},
600 {"id": "c", "service": "browser"}
601 ],
602 "edges": [
603 {"from": "a", "to": "b"},
604 {"from": "b", "to": "c"},
605 {"from": "c", "to": "a"}
606 ]
607 }));
608 let result = pipe.validate();
609 assert!(result.is_err());
610 assert!(result.unwrap_err().to_string().contains("Cycle"));
611 }
612
613 #[test]
614 fn validate_unreachable_nodes() {
615 let pipe = PipelineUnvalidated::new(json!({
616 "nodes": [
617 {"id": "a", "service": "http"},
618 {"id": "orphan", "service": "browser"}
619 ],
620 "edges": []
621 }));
622 let result = pipe.validate();
623 assert!(result.is_err());
624 assert!(result.unwrap_err().to_string().contains("Unreachable"));
625 }
626
627 #[test]
628 fn validate_valid_single_node() {
629 let pipe = PipelineUnvalidated::new(json!({
630 "nodes": [{"id": "fetch", "service": "http"}],
631 "edges": []
632 }));
633 assert!(pipe.validate().is_ok());
634 }
635
636 #[test]
637 fn validate_valid_linear_pipeline() {
638 let pipe = PipelineUnvalidated::new(json!({
639 "nodes": [
640 {"id": "fetch", "service": "http"},
641 {"id": "extract", "service": "ai_claude"},
642 {"id": "store", "service": "storage"}
643 ],
644 "edges": [
645 {"from": "fetch", "to": "extract"},
646 {"from": "extract", "to": "store"}
647 ]
648 }));
649 assert!(pipe.validate().is_ok());
650 }
651
652 #[test]
653 fn validate_valid_dag_branching() {
654 let pipe = PipelineUnvalidated::new(json!({
655 "nodes": [
656 {"id": "fetch", "service": "http"},
657 {"id": "extract_ai", "service": "ai_claude"},
658 {"id": "extract_browser", "service": "browser"},
659 {"id": "merge", "service": "storage"}
660 ],
661 "edges": [
662 {"from": "fetch", "to": "extract_ai"},
663 {"from": "fetch", "to": "extract_browser"},
664 {"from": "extract_ai", "to": "merge"},
665 {"from": "extract_browser", "to": "merge"}
666 ]
667 }));
668 assert!(pipe.validate().is_ok());
669 }
670}