use anyhow::Result;
use parking_lot::RwLock;
use std::sync::{Arc, OnceLock};
static PIPELINE_STATE: OnceLock<Arc<RwLock<PipelineState>>> = OnceLock::new();
struct PipelineState {
active_pipeline: Option<String>,
execution_order: Vec<String>,
is_suspended: bool,
override_order: Option<Vec<String>>,
}
impl Default for PipelineState {
fn default() -> Self {
Self {
active_pipeline: None,
execution_order: Vec::new(),
is_suspended: false,
override_order: None,
}
}
}
fn get_pipeline_state() -> Arc<RwLock<PipelineState>> {
PIPELINE_STATE.get_or_init(|| Arc::new(RwLock::new(PipelineState::default()))).clone()
}
pub fn execute_pipeline(pipeline_name: &str) -> Result<()> {
let state = get_pipeline_state();
let mut state = state.write();
if state.is_suspended {
anyhow::bail!("Pipeline execution is suspended");
}
tracing::info!("πΌ Executing pipeline: {}", pipeline_name);
state.active_pipeline = Some(pipeline_name.to_string());
Ok(())
}
pub fn execute_tool_immediately(tool_id: &str) -> Result<()> {
tracing::info!("β‘ Immediate execution: {}", tool_id);
Ok(())
}
pub fn get_resolved_execution_order() -> Result<Vec<String>> {
let state = get_pipeline_state();
let state = state.read();
if let Some(override_order) = &state.override_order {
Ok(override_order.clone())
} else {
Ok(state.execution_order.clone())
}
}
pub fn temporarily_override_pipeline_order(new_order: Vec<String>) -> Result<()> {
let state = get_pipeline_state();
let mut state = state.write();
tracing::info!("π Temporarily overriding pipeline order");
state.override_order = Some(new_order);
Ok(())
}
pub fn restart_current_pipeline() -> Result<()> {
let state = get_pipeline_state();
let state = state.read();
if let Some(pipeline) = &state.active_pipeline {
let name = pipeline.clone();
drop(state);
tracing::info!("π Restarting pipeline: {}", name);
execute_pipeline(&name)?;
} else {
anyhow::bail!("No active pipeline to restart");
}
Ok(())
}
pub fn suspend_pipeline_execution() -> Result<()> {
let state = get_pipeline_state();
let mut state = state.write();
tracing::info!("βΈοΈ Pipeline execution suspended");
state.is_suspended = true;
Ok(())
}
pub fn resume_pipeline_execution() -> Result<()> {
let state = get_pipeline_state();
let mut state = state.write();
tracing::info!("βΆοΈ Pipeline execution resumed");
state.is_suspended = false;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_execution() {
assert!(execute_pipeline("default").is_ok());
}
#[test]
fn test_suspend_resume() {
suspend_pipeline_execution().unwrap();
assert!(execute_pipeline("test").is_err());
resume_pipeline_execution().unwrap();
assert!(execute_pipeline("test").is_ok());
}
}