Skip to main content

stygian_graph/domain/
pipeline.rs

1//! Pipeline types with typestate pattern
2//!
3//! The typestate pattern ensures pipelines can only transition through valid states:
4//! Unvalidated → Validated → Executing → Complete
5//!
6//! # Example
7//!
8//! ```
9//! use stygian_graph::domain::pipeline::PipelineUnvalidated;
10//! use serde_json::json;
11//!
12//! # fn example() -> Result<(), Box<dyn std::error::Error>> {
13//! let unvalidated = PipelineUnvalidated::new(json!({"nodes": []}));
14//! let validated = unvalidated.validate()?;
15//! let executing = validated.execute();
16//! let complete = executing.complete(json!({"status": "success"}));
17//! # Ok(())
18//! # }
19//! ```
20
21use serde::{Deserialize, Serialize};
22
23use super::error::{GraphError, StygianError};
24
25/// Pipeline in unvalidated state
26///
27/// Initial state after loading configuration from a file or API.
28/// Must be validated before execution.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct PipelineUnvalidated {
31    /// Pipeline configuration (unvalidated)
32    pub config: serde_json::Value,
33}
34
35/// Pipeline in validated state
36///
37/// Configuration has been validated and is ready for execution.
38#[derive(Debug, Clone)]
39pub struct PipelineValidated {
40    /// Validated configuration
41    pub config: serde_json::Value,
42}
43
44/// Pipeline in executing state
45///
46/// Pipeline is actively being executed. Contains runtime context.
47#[derive(Debug)]
48pub struct PipelineExecuting {
49    /// Execution context and state
50    pub context: serde_json::Value,
51}
52
53/// Pipeline in completed state
54///
55/// Pipeline execution has finished. Contains final results.
56#[derive(Debug)]
57pub struct PipelineComplete {
58    /// Execution results
59    pub results: serde_json::Value,
60}
61
62impl PipelineUnvalidated {
63    /// Create a new unvalidated pipeline from raw configuration
64    ///
65    /// # Example
66    ///
67    /// ```
68    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
69    /// use serde_json::json;
70    ///
71    /// let pipeline = PipelineUnvalidated::new(json!({
72    ///     "nodes": [{"id": "fetch", "service": "http"}],
73    ///     "edges": []
74    /// }));
75    /// ```
76    pub const fn new(config: serde_json::Value) -> Self {
77        Self { config }
78    }
79
80    /// Validate the pipeline configuration
81    ///
82    /// Transitions from `Unvalidated` to `Validated` state.
83    ///
84    /// # Errors
85    ///
86    /// Returns `GraphError::InvalidPipeline` if validation fails.
87    ///
88    /// # Example
89    ///
90    /// ```
91    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
92    /// use serde_json::json;
93    ///
94    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
95    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}));
96    /// let validated = pipeline.validate()?;
97    /// # Ok(())
98    /// # }
99    /// ```
100    #[allow(clippy::too_many_lines, clippy::unwrap_used, clippy::indexing_slicing)]
101    pub fn validate(self) -> Result<PipelineValidated, StygianError> {
102        use std::collections::{HashMap, HashSet, VecDeque};
103
104        // Extract nodes and edges from config
105        let nodes = self
106            .config
107            .get("nodes")
108            .and_then(|n| n.as_array())
109            .ok_or_else(|| {
110                GraphError::InvalidPipeline("Pipeline must contain a 'nodes' array".to_string())
111            })?;
112
113        let empty_edges = vec![];
114        let edges = self
115            .config
116            .get("edges")
117            .and_then(|e| e.as_array())
118            .unwrap_or(&empty_edges);
119
120        // Rule 1: At least one node
121        if nodes.is_empty() {
122            return Err(GraphError::InvalidPipeline(
123                "Pipeline must contain at least one node".to_string(),
124            )
125            .into());
126        }
127
128        // Build node map and validate individual nodes
129        let mut node_map: HashMap<String, usize> = HashMap::new();
130        let valid_services = [
131            "http",
132            "http_escalating",
133            "browser",
134            "ai_claude",
135            "ai_openai",
136            "ai_gemini",
137            "ai_github",
138            "ai_ollama",
139            "javascript",
140            "graphql",
141            "storage",
142        ];
143
144        for (idx, node) in nodes.iter().enumerate() {
145            let node_obj = node.as_object().ok_or_else(|| {
146                GraphError::InvalidPipeline(format!("Node at index {idx}: must be an object"))
147            })?;
148
149            // Rule 2 & 3: Validate node ID
150            let node_id = node_obj.get("id").and_then(|v| v.as_str()).ok_or_else(|| {
151                GraphError::InvalidPipeline(format!(
152                    "Node at index {idx}: 'id' field is required and must be a string"
153                ))
154            })?;
155
156            if node_id.is_empty() {
157                return Err(GraphError::InvalidPipeline(format!(
158                    "Node at index {idx}: id cannot be empty"
159                ))
160                .into());
161            }
162
163            // Check for duplicate node IDs
164            if node_map.insert(node_id.to_string(), idx).is_some() {
165                return Err(
166                    GraphError::InvalidPipeline(format!("Duplicate node id: '{node_id}'")).into(),
167                );
168            }
169
170            // Rule 4: Validate service type
171            let service = node_obj
172                .get("service")
173                .and_then(|v| v.as_str())
174                .ok_or_else(|| {
175                    GraphError::InvalidPipeline(format!(
176                        "Node '{node_id}': 'service' field is required and must be a string"
177                    ))
178                })?;
179
180            if !valid_services.contains(&service) {
181                return Err(GraphError::InvalidPipeline(format!(
182                    "Node '{node_id}': service type '{service}' is not recognized"
183                ))
184                .into());
185            }
186        }
187
188        // Rule 5 & 6: Validate edges
189        let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
190        let mut in_degree: HashMap<String, usize> = HashMap::new();
191
192        // Initialize in_degree for all nodes
193        for node in nodes {
194            if let Some(id) = node.get("id").and_then(|v| v.as_str()) {
195                in_degree.insert(id.to_string(), 0);
196                adjacency.insert(id.to_string(), Vec::new());
197            }
198        }
199
200        for (edge_idx, edge) in edges.iter().enumerate() {
201            let edge_obj = edge.as_object().ok_or_else(|| {
202                GraphError::InvalidPipeline(format!("Edge at index {edge_idx}: must be an object"))
203            })?;
204
205            let from = edge_obj
206                .get("from")
207                .and_then(|v| v.as_str())
208                .ok_or_else(|| {
209                    GraphError::InvalidPipeline(format!(
210                        "Edge at index {edge_idx}: 'from' field is required and must be a string"
211                    ))
212                })?;
213
214            let to = edge_obj.get("to").and_then(|v| v.as_str()).ok_or_else(|| {
215                GraphError::InvalidPipeline(format!(
216                    "Edge at index {edge_idx}: 'to' field is required and must be a string"
217                ))
218            })?;
219
220            // Source node must exist
221            if !node_map.contains_key(from) {
222                return Err(GraphError::InvalidPipeline(format!(
223                    "Edge {from} -> {to}: source node '{from}' not found"
224                ))
225                .into());
226            }
227
228            // Target node must exist
229            if !node_map.contains_key(to) {
230                return Err(GraphError::InvalidPipeline(format!(
231                    "Edge {from} -> {to}: target node '{to}' not found"
232                ))
233                .into());
234            }
235
236            // Source and target cannot be the same
237            if from == to {
238                return Err(GraphError::InvalidPipeline(format!(
239                    "Self-loop detected at node '{from}'"
240                ))
241                .into());
242            }
243
244            // Build adjacency list and track in-degrees
245            adjacency.get_mut(from).unwrap().push(to.to_string());
246            *in_degree.get_mut(to).unwrap() += 1;
247        }
248
249        // Rule 7: Detect cycles using Kahn's algorithm (topological sort)
250        let mut in_degree_copy = in_degree.clone();
251        let mut queue: VecDeque<String> = VecDeque::new();
252
253        // Add all nodes with no incoming edges (entry points)
254        let entry_points: Vec<String> = in_degree_copy
255            .iter()
256            .filter(|(_, degree)| **degree == 0)
257            .map(|(node_id, _)| node_id.clone())
258            .collect();
259        for node_id in entry_points {
260            queue.push_back(node_id);
261        }
262
263        let mut sorted_count = 0;
264        while let Some(node_id) = queue.pop_front() {
265            sorted_count += 1;
266
267            // For each neighbor of this node
268            if let Some(neighbors) = adjacency.get(&node_id) {
269                let neighbors_copy = neighbors.clone();
270                for neighbor in neighbors_copy {
271                    *in_degree_copy.get_mut(&neighbor).unwrap() -= 1;
272                    if in_degree_copy[&neighbor] == 0 {
273                        queue.push_back(neighbor);
274                    }
275                }
276            }
277        }
278
279        // If we didn't sort all nodes, there's a cycle
280        if sorted_count != node_map.len() {
281            return Err(GraphError::InvalidPipeline(
282                "Cycle detected in pipeline graph".to_string(),
283            )
284            .into());
285        }
286
287        // Rule 8: Check for unreachable nodes (isolated components)
288        // All nodes must form a single connected DAG with one or more entry points
289        // Only start reachability from the FIRST entry point to ensure all nodes are connected
290        let mut visited: HashSet<String> = HashSet::new();
291        let mut to_visit: VecDeque<String> = VecDeque::new();
292
293        // Find first entry point (node with in_degree == 0)
294        let mut entry_points = Vec::new();
295        for (node_id, degree) in &in_degree {
296            if *degree == 0 {
297                entry_points.push(node_id.clone());
298            }
299        }
300
301        if entry_points.is_empty() {
302            // Should not happen if cycle check passed, but be safe
303            return Err(GraphError::InvalidPipeline(
304                "No entry points found (all nodes have incoming edges)".to_string(),
305            )
306            .into());
307        }
308
309        // Start BFS from ONLY the first entry point to ensure single connected component
310        to_visit.push_back(entry_points[0].clone());
311
312        // BFS from first entry point
313        while let Some(node_id) = to_visit.pop_front() {
314            if visited.insert(node_id.clone()) {
315                // Explore outgoing edges
316                if let Some(neighbors) = adjacency.get(&node_id) {
317                    for neighbor in neighbors {
318                        to_visit.push_back(neighbor.clone());
319                    }
320                }
321
322                // Also explore reverse adjacency (incoming edges) to handle branching
323                for (source, targets) in &adjacency {
324                    if targets.contains(&node_id) && !visited.contains(source) {
325                        to_visit.push_back(source.clone());
326                    }
327                }
328            }
329        }
330
331        // Check for unreachable nodes
332        let all_node_ids: HashSet<String> = node_map.keys().cloned().collect();
333        let unreachable: Vec<_> = all_node_ids.difference(&visited).collect();
334
335        if !unreachable.is_empty() {
336            let unreachable_str = unreachable
337                .iter()
338                .map(|s| s.as_str())
339                .collect::<Vec<_>>()
340                .join("', '");
341            return Err(GraphError::InvalidPipeline(format!(
342                "Unreachable nodes found: '{unreachable_str}' (ensure all nodes are connected in a single DAG)"
343            ))
344            .into());
345        }
346
347        Ok(PipelineValidated {
348            config: self.config,
349        })
350    }
351}
352
353impl PipelineValidated {
354    /// Begin executing the validated pipeline
355    ///
356    /// Transitions from `Validated` to `Executing` state.
357    ///
358    /// # Example
359    ///
360    /// ```
361    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
362    /// use serde_json::json;
363    ///
364    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
365    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
366    ///     .validate()?;
367    /// let executing = pipeline.execute();
368    /// # Ok(())
369    /// # }
370    /// ```
371    pub fn execute(self) -> PipelineExecuting {
372        PipelineExecuting {
373            context: self.config,
374        }
375    }
376}
377
378impl PipelineExecuting {
379    /// Mark the pipeline as complete with results
380    ///
381    /// Transitions from `Executing` to `Complete` state.
382    ///
383    /// # Example
384    ///
385    /// ```
386    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
387    /// use serde_json::json;
388    ///
389    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
390    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
391    ///     .validate()?
392    ///     .execute();
393    ///     
394    /// let complete = pipeline.complete(json!({"status": "success"}));
395    /// # Ok(())
396    /// # }
397    /// ```
398    pub fn complete(self, results: serde_json::Value) -> PipelineComplete {
399        PipelineComplete { results }
400    }
401
402    /// Abort execution with an error
403    ///
404    /// Transitions from `Executing` to `Complete` state with error details.
405    ///
406    /// # Example
407    ///
408    /// ```
409    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
410    /// use serde_json::json;
411    ///
412    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
413    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
414    ///     .validate()?
415    ///     .execute();
416    ///     
417    /// let complete = pipeline.abort("Network timeout");
418    /// # Ok(())
419    /// # }
420    /// ```
421    pub fn abort(self, error: &str) -> PipelineComplete {
422        PipelineComplete {
423            results: serde_json::json!({
424                "status": "error",
425                "error": error
426            }),
427        }
428    }
429}
430
431impl PipelineComplete {
432    /// Check if the pipeline completed successfully
433    ///
434    /// # Example
435    ///
436    /// ```
437    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
438    /// use serde_json::json;
439    ///
440    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
441    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
442    ///     .validate()?
443    ///     .execute()
444    ///     .complete(json!({"status": "success"}));
445    ///     
446    /// assert!(pipeline.is_success());
447    /// # Ok(())
448    /// # }
449    /// ```
450    pub fn is_success(&self) -> bool {
451        self.results
452            .get("status")
453            .and_then(|s| s.as_str())
454            .is_some_and(|s| s == "success")
455    }
456
457    /// Get the execution results
458    pub const fn results(&self) -> &serde_json::Value {
459        &self.results
460    }
461}
462
463#[cfg(test)]
464#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
465mod tests {
466    use super::*;
467    use serde_json::json;
468
469    #[test]
470    fn validate_empty_nodes_array() {
471        let pipe = PipelineUnvalidated::new(json!({"nodes": [], "edges": []}));
472        let result = pipe.validate();
473        assert!(result.is_err());
474        assert!(
475            result
476                .unwrap_err()
477                .to_string()
478                .contains("at least one node")
479        );
480    }
481
482    #[test]
483    fn validate_missing_nodes_field() {
484        let pipe = PipelineUnvalidated::new(json!({"edges": []}));
485        let result = pipe.validate();
486        assert!(result.is_err());
487    }
488
489    #[test]
490    fn validate_missing_node_id() {
491        let pipe = PipelineUnvalidated::new(json!({
492            "nodes": [{"service": "http"}],
493            "edges": []
494        }));
495        let result = pipe.validate();
496        assert!(result.is_err());
497        assert!(
498            result
499                .unwrap_err()
500                .to_string()
501                .contains("'id' field is required")
502        );
503    }
504
505    #[test]
506    fn validate_empty_node_id() {
507        let pipe = PipelineUnvalidated::new(json!({
508            "nodes": [{"id": "", "service": "http"}],
509            "edges": []
510        }));
511        let result = pipe.validate();
512        assert!(result.is_err());
513        assert!(
514            result
515                .unwrap_err()
516                .to_string()
517                .contains("id cannot be empty")
518        );
519    }
520
521    #[test]
522    fn validate_duplicate_node_ids() {
523        let pipe = PipelineUnvalidated::new(json!({
524            "nodes": [
525                {"id": "fetch", "service": "http"},
526                {"id": "fetch", "service": "browser"}
527            ],
528            "edges": []
529        }));
530        let result = pipe.validate();
531        assert!(result.is_err());
532        assert!(
533            result
534                .unwrap_err()
535                .to_string()
536                .contains("Duplicate node id")
537        );
538    }
539
540    #[test]
541    fn validate_invalid_service_type() {
542        let pipe = PipelineUnvalidated::new(json!({
543            "nodes": [{"id": "fetch", "service": "invalid_service"}],
544            "edges": []
545        }));
546        let result = pipe.validate();
547        assert!(result.is_err());
548        assert!(result.unwrap_err().to_string().contains("not recognized"));
549    }
550
551    #[test]
552    fn validate_edge_nonexistent_source() {
553        let pipe = PipelineUnvalidated::new(json!({
554            "nodes": [{"id": "extract", "service": "ai_claude"}],
555            "edges": [{"from": "fetch", "to": "extract"}]
556        }));
557        let result = pipe.validate();
558        assert!(result.is_err());
559        assert!(
560            result
561                .unwrap_err()
562                .to_string()
563                .contains("source node 'fetch' not found")
564        );
565    }
566
567    #[test]
568    fn validate_edge_nonexistent_target() {
569        let pipe = PipelineUnvalidated::new(json!({
570            "nodes": [{"id": "fetch", "service": "http"}],
571            "edges": [{"from": "fetch", "to": "extract"}]
572        }));
573        let result = pipe.validate();
574        assert!(result.is_err());
575        assert!(
576            result
577                .unwrap_err()
578                .to_string()
579                .contains("target node 'extract' not found")
580        );
581    }
582
583    #[test]
584    fn validate_self_loop() {
585        let pipe = PipelineUnvalidated::new(json!({
586            "nodes": [{"id": "node1", "service": "http"}],
587            "edges": [{"from": "node1", "to": "node1"}]
588        }));
589        let result = pipe.validate();
590        assert!(result.is_err());
591        assert!(result.unwrap_err().to_string().contains("Self-loop"));
592    }
593
594    #[test]
595    fn validate_cycle_detection() {
596        let pipe = PipelineUnvalidated::new(json!({
597            "nodes": [
598                {"id": "a", "service": "http"},
599                {"id": "b", "service": "ai_claude"},
600                {"id": "c", "service": "browser"}
601            ],
602            "edges": [
603                {"from": "a", "to": "b"},
604                {"from": "b", "to": "c"},
605                {"from": "c", "to": "a"}
606            ]
607        }));
608        let result = pipe.validate();
609        assert!(result.is_err());
610        assert!(result.unwrap_err().to_string().contains("Cycle"));
611    }
612
613    #[test]
614    fn validate_unreachable_nodes() {
615        let pipe = PipelineUnvalidated::new(json!({
616            "nodes": [
617                {"id": "a", "service": "http"},
618                {"id": "orphan", "service": "browser"}
619            ],
620            "edges": []
621        }));
622        let result = pipe.validate();
623        assert!(result.is_err());
624        assert!(result.unwrap_err().to_string().contains("Unreachable"));
625    }
626
627    #[test]
628    fn validate_valid_single_node() {
629        let pipe = PipelineUnvalidated::new(json!({
630            "nodes": [{"id": "fetch", "service": "http"}],
631            "edges": []
632        }));
633        assert!(pipe.validate().is_ok());
634    }
635
636    #[test]
637    fn validate_valid_linear_pipeline() {
638        let pipe = PipelineUnvalidated::new(json!({
639            "nodes": [
640                {"id": "fetch", "service": "http"},
641                {"id": "extract", "service": "ai_claude"},
642                {"id": "store", "service": "storage"}
643            ],
644            "edges": [
645                {"from": "fetch", "to": "extract"},
646                {"from": "extract", "to": "store"}
647            ]
648        }));
649        assert!(pipe.validate().is_ok());
650    }
651
652    #[test]
653    fn validate_valid_dag_branching() {
654        let pipe = PipelineUnvalidated::new(json!({
655            "nodes": [
656                {"id": "fetch", "service": "http"},
657                {"id": "extract_ai", "service": "ai_claude"},
658                {"id": "extract_browser", "service": "browser"},
659                {"id": "merge", "service": "storage"}
660            ],
661            "edges": [
662                {"from": "fetch", "to": "extract_ai"},
663                {"from": "fetch", "to": "extract_browser"},
664                {"from": "extract_ai", "to": "merge"},
665                {"from": "extract_browser", "to": "merge"}
666            ]
667        }));
668        assert!(pipe.validate().is_ok());
669    }
670}