1use anyhow::Result;
4use parking_lot::RwLock;
5use std::sync::Arc;
6
7static mut PIPELINE_STATE: Option<Arc<RwLock<PipelineState>>> = None;
9
10struct PipelineState {
11 active_pipeline: Option<String>,
12 execution_order: Vec<String>,
13 is_suspended: bool,
14 override_order: Option<Vec<String>>,
15}
16
17impl Default for PipelineState {
18 fn default() -> Self {
19 Self {
20 active_pipeline: None,
21 execution_order: Vec::new(),
22 is_suspended: false,
23 override_order: None,
24 }
25 }
26}
27
28fn get_pipeline_state() -> Arc<RwLock<PipelineState>> {
29 unsafe {
30 if PIPELINE_STATE.is_none() {
31 PIPELINE_STATE = Some(Arc::new(RwLock::new(PipelineState::default())));
32 }
33 PIPELINE_STATE.as_ref().unwrap().clone()
34 }
35}
36
37pub fn execute_pipeline(pipeline_name: &str) -> Result<()> {
39 let state = get_pipeline_state();
40 let mut state = state.write();
41
42 if state.is_suspended {
43 anyhow::bail!("Pipeline execution is suspended");
44 }
45
46 tracing::info!("πΌ Executing pipeline: {}", pipeline_name);
47 state.active_pipeline = Some(pipeline_name.to_string());
48
49 Ok(())
52}
53
54pub fn execute_tool_immediately(tool_id: &str) -> Result<()> {
56 tracing::info!("β‘ Immediate execution: {}", tool_id);
57
58 Ok(())
61}
62
63pub fn get_resolved_execution_order() -> Result<Vec<String>> {
65 let state = get_pipeline_state();
66 let state = state.read();
67
68 if let Some(override_order) = &state.override_order {
69 Ok(override_order.clone())
70 } else {
71 Ok(state.execution_order.clone())
72 }
73}
74
75pub fn temporarily_override_pipeline_order(new_order: Vec<String>) -> Result<()> {
77 let state = get_pipeline_state();
78 let mut state = state.write();
79
80 tracing::info!("π Temporarily overriding pipeline order");
81 state.override_order = Some(new_order);
82
83 Ok(())
84}
85
86pub fn restart_current_pipeline() -> Result<()> {
88 let state = get_pipeline_state();
89 let state = state.read();
90
91 if let Some(pipeline) = &state.active_pipeline {
92 let name = pipeline.clone();
93 drop(state);
94
95 tracing::info!("π Restarting pipeline: {}", name);
96 execute_pipeline(&name)?;
97 } else {
98 anyhow::bail!("No active pipeline to restart");
99 }
100
101 Ok(())
102}
103
104pub fn suspend_pipeline_execution() -> Result<()> {
106 let state = get_pipeline_state();
107 let mut state = state.write();
108
109 tracing::info!("βΈοΈ Pipeline execution suspended");
110 state.is_suspended = true;
111
112 Ok(())
113}
114
115pub fn resume_pipeline_execution() -> Result<()> {
117 let state = get_pipeline_state();
118 let mut state = state.write();
119
120 tracing::info!("βΆοΈ Pipeline execution resumed");
121 state.is_suspended = false;
122
123 Ok(())
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[test]
131 fn test_pipeline_execution() {
132 assert!(execute_pipeline("default").is_ok());
133 }
134
135 #[test]
136 fn test_suspend_resume() {
137 suspend_pipeline_execution().unwrap();
138 assert!(execute_pipeline("test").is_err());
139
140 resume_pipeline_execution().unwrap();
141 assert!(execute_pipeline("test").is_ok());
142 }
143}