dx_forge/api/
pipeline.rs

1//! Pipeline Execution & Orchestration APIs
2
3use anyhow::Result;
4use parking_lot::RwLock;
5use std::sync::Arc;
6
7/// Pipeline execution state
8static mut PIPELINE_STATE: Option<Arc<RwLock<PipelineState>>> = None;
9
10struct PipelineState {
11    active_pipeline: Option<String>,
12    execution_order: Vec<String>,
13    is_suspended: bool,
14    override_order: Option<Vec<String>>,
15}
16
17impl Default for PipelineState {
18    fn default() -> Self {
19        Self {
20            active_pipeline: None,
21            execution_order: Vec::new(),
22            is_suspended: false,
23            override_order: None,
24        }
25    }
26}
27
28fn get_pipeline_state() -> Arc<RwLock<PipelineState>> {
29    unsafe {
30        if PIPELINE_STATE.is_none() {
31            PIPELINE_STATE = Some(Arc::new(RwLock::new(PipelineState::default())));
32        }
33        PIPELINE_STATE.as_ref().unwrap().clone()
34    }
35}
36
37/// Executes named pipeline ("default" | "auth" | "deploy" | "ci")
38pub fn execute_pipeline(pipeline_name: &str) -> Result<()> {
39    let state = get_pipeline_state();
40    let mut state = state.write();
41    
42    if state.is_suspended {
43        anyhow::bail!("Pipeline execution is suspended");
44    }
45    
46    tracing::info!("🎼 Executing pipeline: {}", pipeline_name);
47    state.active_pipeline = Some(pipeline_name.to_string());
48    
49    // TODO: Load pipeline configuration and execute tools
50    
51    Ok(())
52}
53
54/// Highest priority execution β€” bypasses queue and debounce
55pub fn execute_tool_immediately(tool_id: &str) -> Result<()> {
56    tracing::info!("⚑ Immediate execution: {}", tool_id);
57    
58    // TODO: Execute tool directly, bypassing normal queue
59    
60    Ok(())
61}
62
63/// Returns final Vec<ToolId> after topology sort
64pub fn get_resolved_execution_order() -> Result<Vec<String>> {
65    let state = get_pipeline_state();
66    let state = state.read();
67    
68    if let Some(override_order) = &state.override_order {
69        Ok(override_order.clone())
70    } else {
71        Ok(state.execution_order.clone())
72    }
73}
74
75/// Used by traffic_branching and user experiments
76pub fn temporarily_override_pipeline_order(new_order: Vec<String>) -> Result<()> {
77    let state = get_pipeline_state();
78    let mut state = state.write();
79    
80    tracing::info!("πŸ”€ Temporarily overriding pipeline order");
81    state.override_order = Some(new_order);
82    
83    Ok(())
84}
85
86/// Aborts and restarts active pipeline from scratch
87pub fn restart_current_pipeline() -> Result<()> {
88    let state = get_pipeline_state();
89    let state = state.read();
90    
91    if let Some(pipeline) = &state.active_pipeline {
92        let name = pipeline.clone();
93        drop(state);
94        
95        tracing::info!("πŸ”„ Restarting pipeline: {}", name);
96        execute_pipeline(&name)?;
97    } else {
98        anyhow::bail!("No active pipeline to restart");
99    }
100    
101    Ok(())
102}
103
104/// Pauses all tool execution until resumed
105pub fn suspend_pipeline_execution() -> Result<()> {
106    let state = get_pipeline_state();
107    let mut state = state.write();
108    
109    tracing::info!("⏸️  Pipeline execution suspended");
110    state.is_suspended = true;
111    
112    Ok(())
113}
114
115/// Continues from suspended state
116pub fn resume_pipeline_execution() -> Result<()> {
117    let state = get_pipeline_state();
118    let mut state = state.write();
119    
120    tracing::info!("▢️  Pipeline execution resumed");
121    state.is_suspended = false;
122    
123    Ok(())
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129    
130    #[test]
131    fn test_pipeline_execution() {
132        assert!(execute_pipeline("default").is_ok());
133    }
134    
135    #[test]
136    fn test_suspend_resume() {
137        suspend_pipeline_execution().unwrap();
138        assert!(execute_pipeline("test").is_err());
139        
140        resume_pipeline_execution().unwrap();
141        assert!(execute_pipeline("test").is_ok());
142    }
143}