dx_forge/api/
pipeline.rs

1//! Pipeline Execution & Orchestration APIs
2
3use anyhow::Result;
4use parking_lot::RwLock;
5use std::sync::{Arc, OnceLock};
6
7/// Pipeline execution state
8static PIPELINE_STATE: OnceLock<Arc<RwLock<PipelineState>>> = OnceLock::new();
9
10struct PipelineState {
11    active_pipeline: Option<String>,
12    execution_order: Vec<String>,
13    is_suspended: bool,
14    override_order: Option<Vec<String>>,
15}
16
17impl Default for PipelineState {
18    fn default() -> Self {
19        Self {
20            active_pipeline: None,
21            execution_order: Vec::new(),
22            is_suspended: false,
23            override_order: None,
24        }
25    }
26}
27
28fn get_pipeline_state() -> Arc<RwLock<PipelineState>> {
29    PIPELINE_STATE.get_or_init(|| Arc::new(RwLock::new(PipelineState::default()))).clone()
30}
31
32/// Executes named pipeline ("default" | "auth" | "deploy" | "ci")
33pub fn execute_pipeline(pipeline_name: &str) -> Result<()> {
34    let state = get_pipeline_state();
35    let mut state = state.write();
36    
37    if state.is_suspended {
38        anyhow::bail!("Pipeline execution is suspended");
39    }
40    
41    tracing::info!("🎼 Executing pipeline: {}", pipeline_name);
42    state.active_pipeline = Some(pipeline_name.to_string());
43    
44    // TODO: Load pipeline configuration and execute tools
45    
46    Ok(())
47}
48
49/// Highest priority execution β€” bypasses queue and debounce
50pub fn execute_tool_immediately(tool_id: &str) -> Result<()> {
51    tracing::info!("⚑ Immediate execution: {}", tool_id);
52    
53    // TODO: Execute tool directly, bypassing normal queue
54    
55    Ok(())
56}
57
58/// Returns final Vec<ToolId> after topology sort
59pub fn get_resolved_execution_order() -> Result<Vec<String>> {
60    let state = get_pipeline_state();
61    let state = state.read();
62    
63    if let Some(override_order) = &state.override_order {
64        Ok(override_order.clone())
65    } else {
66        Ok(state.execution_order.clone())
67    }
68}
69
70/// Used by traffic_branching and user experiments
71pub fn temporarily_override_pipeline_order(new_order: Vec<String>) -> Result<()> {
72    let state = get_pipeline_state();
73    let mut state = state.write();
74    
75    tracing::info!("πŸ”€ Temporarily overriding pipeline order");
76    state.override_order = Some(new_order);
77    
78    Ok(())
79}
80
81/// Aborts and restarts active pipeline from scratch
82pub fn restart_current_pipeline() -> Result<()> {
83    let state = get_pipeline_state();
84    let state = state.read();
85    
86    if let Some(pipeline) = &state.active_pipeline {
87        let name = pipeline.clone();
88        drop(state);
89        
90        tracing::info!("πŸ”„ Restarting pipeline: {}", name);
91        execute_pipeline(&name)?;
92    } else {
93        anyhow::bail!("No active pipeline to restart");
94    }
95    
96    Ok(())
97}
98
99/// Pauses all tool execution until resumed
100pub fn suspend_pipeline_execution() -> Result<()> {
101    let state = get_pipeline_state();
102    let mut state = state.write();
103    
104    tracing::info!("⏸️  Pipeline execution suspended");
105    state.is_suspended = true;
106    
107    Ok(())
108}
109
110/// Continues from suspended state
111pub fn resume_pipeline_execution() -> Result<()> {
112    let state = get_pipeline_state();
113    let mut state = state.write();
114    
115    tracing::info!("▢️  Pipeline execution resumed");
116    state.is_suspended = false;
117    
118    Ok(())
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124    
125    #[test]
126    fn test_pipeline_execution() {
127        assert!(execute_pipeline("default").is_ok());
128    }
129    
130    #[test]
131    fn test_suspend_resume() {
132        suspend_pipeline_execution().unwrap();
133        assert!(execute_pipeline("test").is_err());
134        
135        resume_pipeline_execution().unwrap();
136        assert!(execute_pipeline("test").is_ok());
137    }
138}