1use anyhow::Result;
4use parking_lot::RwLock;
5use std::sync::{Arc, OnceLock};
6
7static PIPELINE_STATE: OnceLock<Arc<RwLock<PipelineState>>> = OnceLock::new();
9
10struct PipelineState {
11 active_pipeline: Option<String>,
12 execution_order: Vec<String>,
13 is_suspended: bool,
14 override_order: Option<Vec<String>>,
15}
16
17impl Default for PipelineState {
18 fn default() -> Self {
19 Self {
20 active_pipeline: None,
21 execution_order: Vec::new(),
22 is_suspended: false,
23 override_order: None,
24 }
25 }
26}
27
28fn get_pipeline_state() -> Arc<RwLock<PipelineState>> {
29 PIPELINE_STATE.get_or_init(|| Arc::new(RwLock::new(PipelineState::default()))).clone()
30}
31
32pub fn execute_pipeline(pipeline_name: &str) -> Result<()> {
34 let state = get_pipeline_state();
35 let mut state = state.write();
36
37 if state.is_suspended {
38 anyhow::bail!("Pipeline execution is suspended");
39 }
40
41 tracing::info!("πΌ Executing pipeline: {}", pipeline_name);
42 state.active_pipeline = Some(pipeline_name.to_string());
43
44 Ok(())
47}
48
49pub fn execute_tool_immediately(tool_id: &str) -> Result<()> {
51 tracing::info!("β‘ Immediate execution: {}", tool_id);
52
53 Ok(())
56}
57
58pub fn get_resolved_execution_order() -> Result<Vec<String>> {
60 let state = get_pipeline_state();
61 let state = state.read();
62
63 if let Some(override_order) = &state.override_order {
64 Ok(override_order.clone())
65 } else {
66 Ok(state.execution_order.clone())
67 }
68}
69
70pub fn temporarily_override_pipeline_order(new_order: Vec<String>) -> Result<()> {
72 let state = get_pipeline_state();
73 let mut state = state.write();
74
75 tracing::info!("π Temporarily overriding pipeline order");
76 state.override_order = Some(new_order);
77
78 Ok(())
79}
80
81pub fn restart_current_pipeline() -> Result<()> {
83 let state = get_pipeline_state();
84 let state = state.read();
85
86 if let Some(pipeline) = &state.active_pipeline {
87 let name = pipeline.clone();
88 drop(state);
89
90 tracing::info!("π Restarting pipeline: {}", name);
91 execute_pipeline(&name)?;
92 } else {
93 anyhow::bail!("No active pipeline to restart");
94 }
95
96 Ok(())
97}
98
99pub fn suspend_pipeline_execution() -> Result<()> {
101 let state = get_pipeline_state();
102 let mut state = state.write();
103
104 tracing::info!("βΈοΈ Pipeline execution suspended");
105 state.is_suspended = true;
106
107 Ok(())
108}
109
110pub fn resume_pipeline_execution() -> Result<()> {
112 let state = get_pipeline_state();
113 let mut state = state.write();
114
115 tracing::info!("βΆοΈ Pipeline execution resumed");
116 state.is_suspended = false;
117
118 Ok(())
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124
125 #[test]
126 fn test_pipeline_execution() {
127 assert!(execute_pipeline("default").is_ok());
128 }
129
130 #[test]
131 fn test_suspend_resume() {
132 suspend_pipeline_execution().unwrap();
133 assert!(execute_pipeline("test").is_err());
134
135 resume_pipeline_execution().unwrap();
136 assert!(execute_pipeline("test").is_ok());
137 }
138}