use crate::error::{GoblinError, Result};
use crate::executor::{DefaultExecutor, ExecutionResult, Executor};
use crate::plan::Plan;
use crate::script::Script;
use dashmap::DashMap;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tracing::{debug, info, warn};
use uuid::Uuid;
use tokio::sync::Semaphore;
use futures::future::join_all;
#[derive(Debug)]
pub struct ExecutionContext {
pub id: Uuid,
pub plan_name: String,
pub results: HashMap<String, String>,
pub start_time: std::time::Instant,
}
impl ExecutionContext {
pub fn new(plan_name: String) -> Self {
Self {
id: Uuid::new_v4(),
plan_name,
results: HashMap::new(),
start_time: std::time::Instant::now(),
}
}
pub fn add_result(&mut self, step_name: String, result: String) {
self.results.insert(step_name, result);
}
pub fn get_result(&self, step_name: &str) -> Option<&String> {
self.results.get(step_name)
}
pub fn elapsed(&self) -> std::time::Duration {
self.start_time.elapsed()
}
}
pub struct Engine {
scripts: DashMap<String, Script>,
plans: DashMap<String, Plan>,
executor: Arc<dyn Executor + Send + Sync>,
scripts_dir: Option<PathBuf>,
}
impl Engine {
pub fn new() -> Self {
Self {
scripts: DashMap::new(),
plans: DashMap::new(),
executor: Arc::new(DefaultExecutor::new()),
scripts_dir: None,
}
}
pub fn with_executor(executor: Arc<dyn Executor + Send + Sync>) -> Self {
Self {
scripts: DashMap::new(),
plans: DashMap::new(),
executor,
scripts_dir: None,
}
}
pub fn with_scripts_dir(mut self, scripts_dir: PathBuf) -> Self {
self.scripts_dir = Some(scripts_dir);
self
}
pub fn auto_discover_scripts(&self) -> Result<usize> {
let scripts_dir = match &self.scripts_dir {
Some(dir) => dir.clone(),
None => {
warn!("Scripts directory not set, skipping auto-discovery");
return Ok(0);
}
};
if !scripts_dir.exists() {
warn!("Scripts directory does not exist: {}", scripts_dir.display());
return Ok(0);
}
let mut discovered = 0;
for entry in std::fs::read_dir(&scripts_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
let goblin_toml = path.join("goblin.toml");
if goblin_toml.exists() {
match Script::from_toml_file(&path) {
Ok(script) => {
info!("Discovered script: {} at {}", script.name, path.display());
self.scripts.insert(script.name.clone(), script);
discovered += 1;
}
Err(e) => {
warn!("Failed to load script from {}: {}", path.display(), e);
}
}
}
}
}
info!("Auto-discovered {} scripts", discovered);
Ok(discovered)
}
pub fn load_script(&self, script_dir: PathBuf) -> Result<()> {
let script = Script::from_toml_file(&script_dir)?;
self.scripts.insert(script.name.clone(), script);
Ok(())
}
pub fn add_script(&self, script: Script) -> Result<()> {
script.validate()?;
self.scripts.insert(script.name.clone(), script);
Ok(())
}
pub fn get_script(&self, name: &str) -> Option<Script> {
self.scripts.get(name).map(|entry| entry.value().clone())
}
pub fn list_scripts(&self) -> Vec<String> {
let mut names: Vec<String> = self.scripts.iter().map(|entry| entry.key().clone()).collect();
names.sort();
names
}
pub fn load_plan(&self, plan_path: PathBuf) -> Result<()> {
let plan = Plan::from_toml_file(&plan_path)?;
let required_scripts = plan.get_required_scripts();
for script_name in &required_scripts {
if !self.scripts.contains_key(script_name) {
return Err(GoblinError::script_not_found(script_name));
}
}
plan.validate()?;
self.plans.insert(plan.name.clone(), plan);
Ok(())
}
pub fn load_plan_from_str(&self, toml_str: &str) -> Result<()> {
let plan = Plan::from_toml_str(toml_str)?;
let required_scripts = plan.get_required_scripts();
for script_name in &required_scripts {
if !self.scripts.contains_key(script_name) {
return Err(GoblinError::script_not_found(script_name));
}
}
plan.validate()?;
self.plans.insert(plan.name.clone(), plan);
Ok(())
}
pub fn get_plan(&self, name: &str) -> Option<Plan> {
self.plans.get(name).map(|entry| entry.value().clone())
}
pub fn list_plans(&self) -> Vec<String> {
let mut names: Vec<String> = self.plans.iter().map(|entry| entry.key().clone()).collect();
names.sort();
names
}
pub async fn execute_script(&self, script_name: &str, args: Vec<String>) -> Result<ExecutionResult> {
let script = self.get_script(script_name)
.ok_or_else(|| GoblinError::script_not_found(script_name))?;
self.executor.execute_script(&script, &args).await
}
pub async fn execute_plan(&self, plan_name: &str, default_input: Option<String>) -> Result<ExecutionContext> {
let plan = self.get_plan(plan_name)
.ok_or_else(|| GoblinError::plan_not_found(plan_name))?;
info!("Starting parallel execution of plan: {}", plan_name);
let context = Arc::new(RwLock::new(ExecutionContext::new(plan_name.to_string())));
if let Some(input) = default_input {
context.write().unwrap().add_result("default_input".to_string(), input);
}
self.execute_plan_parallel(&plan, context.clone()).await?;
let final_context = Arc::try_unwrap(context)
.map_err(|_| GoblinError::engine_error("Failed to unwrap execution context"))?
.into_inner()
.map_err(|_| GoblinError::engine_error("Failed to acquire context lock"))?;
info!("Plan '{}' completed successfully in {:?}", plan_name, final_context.elapsed());
Ok(final_context)
}
async fn execute_plan_parallel(
&self,
plan: &Plan,
context: Arc<RwLock<ExecutionContext>>,
) -> Result<()> {
let mut remaining_steps: HashSet<String> = plan.steps.iter().map(|s| s.name.clone()).collect();
let mut completed_steps: HashSet<String> = HashSet::new();
{
let ctx = context.read().unwrap();
if ctx.results.contains_key("default_input") {
completed_steps.insert("default_input".to_string());
}
}
let mut wave_count = 0;
while !remaining_steps.is_empty() {
wave_count += 1;
info!("Starting execution wave {}", wave_count);
let executable_steps: Vec<String> = remaining_steps
.iter()
.filter(|step_name| {
let step = plan.steps.iter().find(|s| &s.name == *step_name).unwrap();
let dependencies = step.get_dependencies();
dependencies.iter().all(|dep| dep == "default_input" || completed_steps.contains(dep))
})
.cloned()
.collect();
if executable_steps.is_empty() {
return Err(GoblinError::engine_error(format!(
"No executable steps found in wave {}. Possible circular dependency or missing dependencies. Remaining steps: {:?}",
wave_count, remaining_steps
)));
}
info!("Wave {} executing steps: {:?}", wave_count, executable_steps);
let mut futures = Vec::new();
for step_name in &executable_steps {
let step = plan.steps.iter().find(|s| &s.name == step_name).unwrap();
let context_clone = context.clone();
let executor_clone = self.executor.clone();
let step_clone = step.clone();
let script = self.get_script(&step.function)
.ok_or_else(|| GoblinError::script_not_found(&step.function))?;
let future = async move {
let args = {
let ctx = context_clone.read().unwrap();
step_clone.resolve_inputs(&ctx.results)?
};
info!("Executing step '{}' with script '{}' and args: {:?}",
step_clone.name, step_clone.function, args);
let result = executor_clone.execute_script(&script, &args).await?;
let output = result.get_output();
{
let mut ctx = context_clone.write().unwrap();
ctx.add_result(step_clone.name.clone(), output.clone());
}
info!("Step '{}' completed successfully. Output: '{}'", step_clone.name, output);
Ok::<String, GoblinError>(step_clone.name)
};
futures.push(future);
}
let results = join_all(futures).await;
let mut wave_completed = Vec::new();
for result in results {
match result {
Ok(step_name) => wave_completed.push(step_name),
Err(e) => return Err(e),
}
}
for step_name in wave_completed {
remaining_steps.remove(&step_name);
completed_steps.insert(step_name);
}
info!("Wave {} completed. Remaining steps: {}", wave_count, remaining_steps.len());
}
info!("All waves completed successfully in {} waves", wave_count);
Ok(())
}
pub fn get_stats(&self) -> (usize, usize) {
(self.scripts.len(), self.plans.len())
}
pub fn clear(&self) {
self.scripts.clear();
self.plans.clear();
}
pub fn validate_all_plans(&self) -> Result<()> {
for plan_entry in self.plans.iter() {
let plan = plan_entry.value();
for script_name in plan.get_required_scripts() {
if !self.scripts.contains_key(&script_name) {
return Err(GoblinError::script_not_found(&script_name));
}
}
plan.validate()?;
}
Ok(())
}
}
impl Default for Engine {
fn default() -> Self {
Self::new()
}
}
pub struct EnginePool {
instances: Arc<RwLock<Vec<Engine>>>,
semaphore: Arc<Semaphore>,
pool_size: usize,
}
impl EnginePool {
pub async fn new(pool_size: usize) -> Result<Self> {
if pool_size == 0 {
return Err(GoblinError::config_error("Pool size must be greater than 0"));
}
let instances = Arc::new(RwLock::new(Vec::with_capacity(pool_size)));
for _ in 0..pool_size {
let instance = Engine::new();
instances.write().unwrap().push(instance);
}
Ok(Self {
instances,
semaphore: Arc::new(Semaphore::new(pool_size)),
pool_size,
})
}
pub async fn with_config(pool_size: usize, scripts_dir: Option<PathBuf>) -> Result<Self> {
if pool_size == 0 {
return Err(GoblinError::config_error("Pool size must be greater than 0"));
}
let instances = Arc::new(RwLock::new(Vec::with_capacity(pool_size)));
for _ in 0..pool_size {
let mut instance = Engine::new();
if let Some(ref dir) = scripts_dir {
instance = instance.with_scripts_dir(dir.clone());
instance.auto_discover_scripts()?;
}
instances.write().unwrap().push(instance);
}
info!("Created engine pool with {} instances", pool_size);
Ok(Self {
instances,
semaphore: Arc::new(Semaphore::new(pool_size)),
pool_size,
})
}
pub fn get_pool_stats(&self) -> PoolStats {
let available_permits = self.semaphore.available_permits();
PoolStats {
total_instances: self.pool_size,
available_instances: available_permits,
busy_instances: self.pool_size - available_permits,
}
}
pub async fn acquire(&self) -> Result<EngineGuard> {
let permit = self.semaphore.clone().acquire_owned().await
.map_err(|e| GoblinError::engine_error(format!("Failed to acquire engine from pool: {}", e)))?;
let engine = {
let mut instances = self.instances.write().unwrap();
instances.pop()
.ok_or_else(|| GoblinError::engine_error("No engine instances available (this should not happen)"))?
};
debug!("Acquired engine instance from pool");
Ok(EngineGuard {
engine: Some(engine),
pool_instances: self.instances.clone(),
_permit: permit,
})
}
pub fn try_acquire(&self) -> Result<Option<EngineGuard>> {
let permit = match self.semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => return Ok(None), };
let engine = {
let mut instances = self.instances.write().unwrap();
instances.pop()
.ok_or_else(|| GoblinError::engine_error("No engine instances available (this should not happen)"))?
};
debug!("Acquired engine instance from pool (non-blocking)");
Ok(Some(EngineGuard {
engine: Some(engine),
pool_instances: self.instances.clone(),
_permit: permit,
}))
}
pub async fn load_scripts_on_all(&self, scripts_dir: &PathBuf) -> Result<()> {
let mut instances = self.instances.write().unwrap();
for instance in instances.iter_mut() {
instance.scripts_dir = Some(scripts_dir.clone());
instance.auto_discover_scripts()?;
}
info!("Loaded scripts on all {} instances in pool", self.pool_size);
Ok(())
}
pub async fn load_plan_on_all(&self, plan_path: PathBuf) -> Result<()> {
let mut instances = self.instances.write().unwrap();
for instance in instances.iter_mut() {
instance.load_plan(plan_path.clone())?;
}
info!("Loaded plan on all {} instances in pool", self.pool_size);
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub total_instances: usize,
pub available_instances: usize,
pub busy_instances: usize,
}
pub struct EngineGuard {
engine: Option<Engine>,
pool_instances: Arc<RwLock<Vec<Engine>>>,
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl EngineGuard {
pub fn engine(&self) -> &Engine {
self.engine.as_ref().unwrap()
}
pub fn engine_mut(&mut self) -> &mut Engine {
self.engine.as_mut().unwrap()
}
pub async fn execute_plan_with_reset(
&mut self,
plan_name: &str,
default_input: Option<String>,
) -> Result<ExecutionContext> {
let result = self.engine().execute_plan(plan_name, default_input).await;
self.reset_execution_state();
result
}
pub fn reset_execution_state(&mut self) {
if let Some(_engine) = &mut self.engine {
debug!("Reset execution state for engine instance");
}
}
}
impl std::ops::Deref for EngineGuard {
type Target = Engine;
fn deref(&self) -> &Self::Target {
self.engine.as_ref().unwrap()
}
}
impl std::ops::DerefMut for EngineGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
self.engine.as_mut().unwrap()
}
}
impl Drop for EngineGuard {
fn drop(&mut self) {
if let Some(engine) = self.engine.take() {
let mut instances = self.pool_instances.write().unwrap();
instances.push(engine);
debug!("Returned engine instance to pool");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::executor::{ExecutionResult, MockExecutor};
use crate::script::{Script, ScriptConfig};
use tokio::time::Duration;
#[tokio::test]
async fn test_engine_basic_operations() {
let engine = Engine::new();
let temp_dir = std::env::temp_dir();
let config = ScriptConfig {
name: "test_script".to_string(),
command: "echo hello".to_string(),
timeout: 30,
test_command: None,
require_test: false,
};
let script = Script::new(config, temp_dir);
engine.add_script(script).unwrap();
assert_eq!(engine.list_scripts(), vec!["test_script"]);
assert!(engine.get_script("test_script").is_some());
assert!(engine.get_script("nonexistent").is_none());
}
#[tokio::test]
async fn test_engine_with_mock_executor() {
let mut mock_executor = MockExecutor::new();
let expected_result = ExecutionResult {
script_name: "test_script".to_string(),
stdout: "Hello, World!".to_string(),
stderr: String::new(),
exit_code: 0,
duration: Duration::from_millis(100),
};
mock_executor.add_result("test_script".to_string(), expected_result.clone());
let engine = Engine::with_executor(Arc::new(mock_executor));
let config = ScriptConfig {
name: "test_script".to_string(),
command: "echo hello".to_string(),
timeout: 30,
test_command: None,
require_test: false,
};
let script = Script::new(config, std::env::temp_dir());
engine.add_script(script).unwrap();
let result = engine.execute_script("test_script", vec![]).await.unwrap();
assert_eq!(result.stdout, "Hello, World!");
assert!(result.is_success());
}
}