goblin_engine/
engine.rs

1use crate::error::{GoblinError, Result};
2use crate::executor::{DefaultExecutor, ExecutionResult, Executor};
3use crate::plan::Plan;
4use crate::script::Script;
5use dashmap::DashMap;
6use std::collections::{HashMap, HashSet};
7use std::path::PathBuf;
8use std::sync::{Arc, RwLock};
9use tracing::{debug, info, warn};
10use uuid::Uuid;
11use tokio::sync::Semaphore;
12use futures::future::join_all;
13
14/// Execution context for a plan run
15#[derive(Debug)]
16pub struct ExecutionContext {
17    pub id: Uuid,
18    pub plan_name: String,
19    pub results: HashMap<String, String>,
20    pub start_time: std::time::Instant,
21}
22
23impl ExecutionContext {
24    pub fn new(plan_name: String) -> Self {
25        Self {
26            id: Uuid::new_v4(),
27            plan_name,
28            results: HashMap::new(),
29            start_time: std::time::Instant::now(),
30        }
31    }
32
33    pub fn add_result(&mut self, step_name: String, result: String) {
34        self.results.insert(step_name, result);
35    }
36
37    pub fn get_result(&self, step_name: &str) -> Option<&String> {
38        self.results.get(step_name)
39    }
40
41    pub fn elapsed(&self) -> std::time::Duration {
42        self.start_time.elapsed()
43    }
44}
45
46/// The main workflow engine
47pub struct Engine {
48    /// Collection of loaded scripts indexed by name
49    scripts: DashMap<String, Script>,
50    /// Collection of loaded plans indexed by name  
51    plans: DashMap<String, Plan>,
52    /// Executor for running scripts
53    executor: Arc<dyn Executor + Send + Sync>,
54    /// Base directory for auto-discovering scripts
55    scripts_dir: Option<PathBuf>,
56}
57
58impl Engine {
59    /// Create a new engine with the default executor
60    pub fn new() -> Self {
61        Self {
62            scripts: DashMap::new(),
63            plans: DashMap::new(),
64            executor: Arc::new(DefaultExecutor::new()),
65            scripts_dir: None,
66        }
67    }
68
69    /// Create a new engine with a custom executor
70    pub fn with_executor(executor: Arc<dyn Executor + Send + Sync>) -> Self {
71        Self {
72            scripts: DashMap::new(),
73            plans: DashMap::new(),
74            executor,
75            scripts_dir: None,
76        }
77    }
78
79    /// Set the base directory for auto-discovering scripts
80    pub fn with_scripts_dir(mut self, scripts_dir: PathBuf) -> Self {
81        self.scripts_dir = Some(scripts_dir);
82        self
83    }
84
85    /// Auto-discover scripts from the scripts directory
86    pub fn auto_discover_scripts(&self) -> Result<usize> {
87        let scripts_dir = match &self.scripts_dir {
88            Some(dir) => dir.clone(),
89            None => {
90                warn!("Scripts directory not set, skipping auto-discovery");
91                return Ok(0);
92            }
93        };
94
95        if !scripts_dir.exists() {
96            warn!("Scripts directory does not exist: {}", scripts_dir.display());
97            return Ok(0);
98        }
99
100        let mut discovered = 0;
101
102        for entry in std::fs::read_dir(&scripts_dir)? {
103            let entry = entry?;
104            let path = entry.path();
105
106            if path.is_dir() {
107                let goblin_toml = path.join("goblin.toml");
108                if goblin_toml.exists() {
109                    match Script::from_toml_file(&path) {
110                        Ok(script) => {
111                            info!("Discovered script: {} at {}", script.name, path.display());
112                            self.scripts.insert(script.name.clone(), script);
113                            discovered += 1;
114                        }
115                        Err(e) => {
116                            warn!("Failed to load script from {}: {}", path.display(), e);
117                        }
118                    }
119                }
120            }
121        }
122
123        info!("Auto-discovered {} scripts", discovered);
124        Ok(discovered)
125    }
126
127    /// Load a single script from a directory
128    pub fn load_script(&self, script_dir: PathBuf) -> Result<()> {
129        let script = Script::from_toml_file(&script_dir)?;
130        self.scripts.insert(script.name.clone(), script);
131        Ok(())
132    }
133
134    /// Add a script directly
135    pub fn add_script(&self, script: Script) -> Result<()> {
136        script.validate()?;
137        self.scripts.insert(script.name.clone(), script);
138        Ok(())
139    }
140
141    /// Get a script by name
142    pub fn get_script(&self, name: &str) -> Option<Script> {
143        self.scripts.get(name).map(|entry| entry.value().clone())
144    }
145
146    /// List all available scripts
147    pub fn list_scripts(&self) -> Vec<String> {
148        let mut names: Vec<String> = self.scripts.iter().map(|entry| entry.key().clone()).collect();
149        names.sort();
150        names
151    }
152
153    /// Load a plan from a TOML file
154    pub fn load_plan(&self, plan_path: PathBuf) -> Result<()> {
155        let plan = Plan::from_toml_file(&plan_path)?;
156        
157        // Validate that all required scripts are available
158        let required_scripts = plan.get_required_scripts();
159        for script_name in &required_scripts {
160            if !self.scripts.contains_key(script_name) {
161                return Err(GoblinError::script_not_found(script_name));
162            }
163        }
164
165        plan.validate()?;
166        self.plans.insert(plan.name.clone(), plan);
167        Ok(())
168    }
169
170    /// Load a plan from a TOML string
171    pub fn load_plan_from_str(&self, toml_str: &str) -> Result<()> {
172        let plan = Plan::from_toml_str(toml_str)?;
173        
174        // Validate that all required scripts are available
175        let required_scripts = plan.get_required_scripts();
176        for script_name in &required_scripts {
177            if !self.scripts.contains_key(script_name) {
178                return Err(GoblinError::script_not_found(script_name));
179            }
180        }
181
182        plan.validate()?;
183        self.plans.insert(plan.name.clone(), plan);
184        Ok(())
185    }
186
187    /// Get a plan by name
188    pub fn get_plan(&self, name: &str) -> Option<Plan> {
189        self.plans.get(name).map(|entry| entry.value().clone())
190    }
191
192    /// List all available plans
193    pub fn list_plans(&self) -> Vec<String> {
194        let mut names: Vec<String> = self.plans.iter().map(|entry| entry.key().clone()).collect();
195        names.sort();
196        names
197    }
198
199    /// Execute a single script with arguments
200    pub async fn execute_script(&self, script_name: &str, args: Vec<String>) -> Result<ExecutionResult> {
201        let script = self.get_script(script_name)
202            .ok_or_else(|| GoblinError::script_not_found(script_name))?;
203
204        self.executor.execute_script(&script, &args).await
205    }
206
207    /// Execute a plan with optional default input (with parallel execution)
208    pub async fn execute_plan(&self, plan_name: &str, default_input: Option<String>) -> Result<ExecutionContext> {
209        let plan = self.get_plan(plan_name)
210            .ok_or_else(|| GoblinError::plan_not_found(plan_name))?;
211
212        info!("Starting parallel execution of plan: {}", plan_name);
213        
214        let context = Arc::new(RwLock::new(ExecutionContext::new(plan_name.to_string())));
215        
216        // Add default input if provided
217        if let Some(input) = default_input {
218            context.write().unwrap().add_result("default_input".to_string(), input);
219        }
220
221        // Execute steps in parallel waves based on dependencies
222        self.execute_plan_parallel(&plan, context.clone()).await?;
223
224        let final_context = Arc::try_unwrap(context)
225            .map_err(|_| GoblinError::engine_error("Failed to unwrap execution context"))?
226            .into_inner()
227            .map_err(|_| GoblinError::engine_error("Failed to acquire context lock"))?;
228
229        info!("Plan '{}' completed successfully in {:?}", plan_name, final_context.elapsed());
230        Ok(final_context)
231    }
232
233    /// Execute plan steps in parallel waves based on dependency resolution
234    async fn execute_plan_parallel(
235        &self,
236        plan: &Plan,
237        context: Arc<RwLock<ExecutionContext>>,
238    ) -> Result<()> {
239        let mut remaining_steps: HashSet<String> = plan.steps.iter().map(|s| s.name.clone()).collect();
240        let mut completed_steps: HashSet<String> = HashSet::new();
241        
242        // Add default_input as completed if it exists in context
243        {
244            let ctx = context.read().unwrap();
245            if ctx.results.contains_key("default_input") {
246                completed_steps.insert("default_input".to_string());
247            }
248        }
249
250        let mut wave_count = 0;
251
252        while !remaining_steps.is_empty() {
253            wave_count += 1;
254            info!("Starting execution wave {}", wave_count);
255
256            // Find steps that can be executed (all dependencies satisfied)
257            let executable_steps: Vec<String> = remaining_steps
258                .iter()
259                .filter(|step_name| {
260                    let step = plan.steps.iter().find(|s| &s.name == *step_name).unwrap();
261                    let dependencies = step.get_dependencies();
262                    dependencies.iter().all(|dep| dep == "default_input" || completed_steps.contains(dep))
263                })
264                .cloned()
265                .collect();
266
267            if executable_steps.is_empty() {
268                return Err(GoblinError::engine_error(format!(
269                    "No executable steps found in wave {}. Possible circular dependency or missing dependencies. Remaining steps: {:?}",
270                    wave_count, remaining_steps
271                )));
272            }
273
274            info!("Wave {} executing steps: {:?}", wave_count, executable_steps);
275
276            // Execute all executable steps concurrently
277            let mut futures = Vec::new();
278            
279            for step_name in &executable_steps {
280                let step = plan.steps.iter().find(|s| &s.name == step_name).unwrap();
281                let context_clone = context.clone();
282                let executor_clone = self.executor.clone();
283                let step_clone = step.clone();
284                
285                // Get the script for this step
286                let script = self.get_script(&step.function)
287                    .ok_or_else(|| GoblinError::script_not_found(&step.function))?;
288
289                let future = async move {
290                    // Resolve inputs for this step
291                    let args = {
292                        let ctx = context_clone.read().unwrap();
293                        step_clone.resolve_inputs(&ctx.results)?
294                    };
295                    
296                    info!("Executing step '{}' with script '{}' and args: {:?}", 
297                          step_clone.name, step_clone.function, args);
298
299                    // Execute the script
300                    let result = executor_clone.execute_script(&script, &args).await?;
301                    
302                    // Store the result
303                    let output = result.get_output();
304                    {
305                        let mut ctx = context_clone.write().unwrap();
306                        ctx.add_result(step_clone.name.clone(), output.clone());
307                    }
308                    
309                    info!("Step '{}' completed successfully. Output: '{}'", step_clone.name, output);
310                    
311                    Ok::<String, GoblinError>(step_clone.name)
312                };
313
314                futures.push(future);
315            }
316
317            // Wait for all steps in this wave to complete
318            let results = join_all(futures).await;
319            
320            // Check for any failures and collect completed step names
321            let mut wave_completed = Vec::new();
322            for result in results {
323                match result {
324                    Ok(step_name) => wave_completed.push(step_name),
325                    Err(e) => return Err(e),
326                }
327            }
328
329            // Update tracking sets
330            for step_name in wave_completed {
331                remaining_steps.remove(&step_name);
332                completed_steps.insert(step_name);
333            }
334
335            info!("Wave {} completed. Remaining steps: {}", wave_count, remaining_steps.len());
336        }
337
338        info!("All waves completed successfully in {} waves", wave_count);
339        Ok(())
340    }
341
342    /// Get execution statistics
343    pub fn get_stats(&self) -> (usize, usize) {
344        (self.scripts.len(), self.plans.len())
345    }
346
347    /// Clear all loaded scripts and plans
348    pub fn clear(&self) {
349        self.scripts.clear();
350        self.plans.clear();
351    }
352
353    /// Validate all loaded plans against available scripts
354    pub fn validate_all_plans(&self) -> Result<()> {
355        for plan_entry in self.plans.iter() {
356            let plan = plan_entry.value();
357            
358            // Check that all required scripts exist
359            for script_name in plan.get_required_scripts() {
360                if !self.scripts.contains_key(&script_name) {
361                    return Err(GoblinError::script_not_found(&script_name));
362                }
363            }
364            
365            // Validate the plan itself
366            plan.validate()?;
367        }
368        Ok(())
369    }
370}
371
372impl Default for Engine {
373    fn default() -> Self {
374        Self::new()
375    }
376}
377
378/// A pool of engine instances for concurrent execution with memory management
379pub struct EnginePool {
380    instances: Arc<RwLock<Vec<Engine>>>,
381    semaphore: Arc<Semaphore>,
382    pool_size: usize,
383}
384
385impl EnginePool {
386    /// Create a new engine pool with the specified size
387    pub async fn new(pool_size: usize) -> Result<Self> {
388        if pool_size == 0 {
389            return Err(GoblinError::config_error("Pool size must be greater than 0"));
390        }
391
392        let instances = Arc::new(RwLock::new(Vec::with_capacity(pool_size)));
393        
394        // Create the specified number of engine instances
395        for _ in 0..pool_size {
396            let instance = Engine::new();
397            instances.write().unwrap().push(instance);
398        }
399
400        Ok(Self {
401            instances,
402            semaphore: Arc::new(Semaphore::new(pool_size)),
403            pool_size,
404        })
405    }
406
407    /// Create a new engine pool with custom configuration for all instances
408    pub async fn with_config(pool_size: usize, scripts_dir: Option<PathBuf>) -> Result<Self> {
409        if pool_size == 0 {
410            return Err(GoblinError::config_error("Pool size must be greater than 0"));
411        }
412
413        let instances = Arc::new(RwLock::new(Vec::with_capacity(pool_size)));
414        
415        // Create and configure engine instances
416        for _ in 0..pool_size {
417            let mut instance = Engine::new();
418            
419            if let Some(ref dir) = scripts_dir {
420                instance = instance.with_scripts_dir(dir.clone());
421                // Auto-discover scripts for each instance
422                instance.auto_discover_scripts()?;
423            }
424            
425            instances.write().unwrap().push(instance);
426        }
427
428        info!("Created engine pool with {} instances", pool_size);
429        Ok(Self {
430            instances,
431            semaphore: Arc::new(Semaphore::new(pool_size)),
432            pool_size,
433        })
434    }
435
436    /// Get pool statistics
437    pub fn get_pool_stats(&self) -> PoolStats {
438        let available_permits = self.semaphore.available_permits();
439        PoolStats {
440            total_instances: self.pool_size,
441            available_instances: available_permits,
442            busy_instances: self.pool_size - available_permits,
443        }
444    }
445
446    /// Acquire an engine instance from the pool (blocks if none available)
447    pub async fn acquire(&self) -> Result<EngineGuard> {
448        // Acquire a permit from the semaphore
449        let permit = self.semaphore.clone().acquire_owned().await
450            .map_err(|e| GoblinError::engine_error(format!("Failed to acquire engine from pool: {}", e)))?;
451
452        // Get an engine instance
453        let engine = {
454            let mut instances = self.instances.write().unwrap();
455            instances.pop()
456                .ok_or_else(|| GoblinError::engine_error("No engine instances available (this should not happen)"))?
457        };
458
459        debug!("Acquired engine instance from pool");
460        Ok(EngineGuard {
461            engine: Some(engine),
462            pool_instances: self.instances.clone(),
463            _permit: permit,
464        })
465    }
466
467    /// Try to acquire an engine instance immediately (non-blocking)
468    pub fn try_acquire(&self) -> Result<Option<EngineGuard>> {
469        // Try to acquire a permit from the semaphore
470        let permit = match self.semaphore.clone().try_acquire_owned() {
471            Ok(permit) => permit,
472            Err(_) => return Ok(None), // No instances available
473        };
474
475        // Get an engine instance
476        let engine = {
477            let mut instances = self.instances.write().unwrap();
478            instances.pop()
479                .ok_or_else(|| GoblinError::engine_error("No engine instances available (this should not happen)"))?
480        };
481
482        debug!("Acquired engine instance from pool (non-blocking)");
483        Ok(Some(EngineGuard {
484            engine: Some(engine),
485            pool_instances: self.instances.clone(),
486            _permit: permit,
487        }))
488    }
489
490    /// Load scripts on all instances in the pool
491    pub async fn load_scripts_on_all(&self, scripts_dir: &PathBuf) -> Result<()> {
492        let mut instances = self.instances.write().unwrap();
493        
494        for instance in instances.iter_mut() {
495            instance.scripts_dir = Some(scripts_dir.clone());
496            instance.auto_discover_scripts()?;
497        }
498
499        info!("Loaded scripts on all {} instances in pool", self.pool_size);
500        Ok(())
501    }
502
503    /// Load a plan on all instances in the pool
504    pub async fn load_plan_on_all(&self, plan_path: PathBuf) -> Result<()> {
505        let mut instances = self.instances.write().unwrap();
506        
507        for instance in instances.iter_mut() {
508            instance.load_plan(plan_path.clone())?;
509        }
510
511        info!("Loaded plan on all {} instances in pool", self.pool_size);
512        Ok(())
513    }
514}
515
516/// Statistics about the engine pool
517#[derive(Debug, Clone)]
518pub struct PoolStats {
519    pub total_instances: usize,
520    pub available_instances: usize,
521    pub busy_instances: usize,
522}
523
524/// A guard that manages access to an engine instance from the pool
525pub struct EngineGuard {
526    engine: Option<Engine>,
527    pool_instances: Arc<RwLock<Vec<Engine>>>,
528    _permit: tokio::sync::OwnedSemaphorePermit,
529}
530
531impl EngineGuard {
532    /// Get a reference to the engine
533    pub fn engine(&self) -> &Engine {
534        self.engine.as_ref().unwrap()
535    }
536
537    /// Get a mutable reference to the engine
538    pub fn engine_mut(&mut self) -> &mut Engine {
539        self.engine.as_mut().unwrap()
540    }
541
542    /// Execute a plan and automatically reset the engine state afterwards
543    pub async fn execute_plan_with_reset(
544        &mut self,
545        plan_name: &str,
546        default_input: Option<String>,
547    ) -> Result<ExecutionContext> {
548        let result = self.engine().execute_plan(plan_name, default_input).await;
549        
550        // Reset the engine state regardless of success/failure
551        self.reset_execution_state();
552        
553        result
554    }
555
556    /// Reset the execution state (clear any cached results/contexts)
557    pub fn reset_execution_state(&mut self) {
558        if let Some(_engine) = &mut self.engine {
559            // Clear any execution-related state but keep scripts and plans loaded
560            // The ExecutionContext is already separate per execution, so no persistent state to clear
561            debug!("Reset execution state for engine instance");
562        }
563    }
564}
565
566impl std::ops::Deref for EngineGuard {
567    type Target = Engine;
568
569    fn deref(&self) -> &Self::Target {
570        self.engine.as_ref().unwrap()
571    }
572}
573
574impl std::ops::DerefMut for EngineGuard {
575    fn deref_mut(&mut self) -> &mut Self::Target {
576        self.engine.as_mut().unwrap()
577    }
578}
579
580impl Drop for EngineGuard {
581    fn drop(&mut self) {
582        // Return the engine to the pool when the guard is dropped
583        if let Some(engine) = self.engine.take() {
584            let mut instances = self.pool_instances.write().unwrap();
585            instances.push(engine);
586            debug!("Returned engine instance to pool");
587        }
588        // The permit is automatically released when _permit is dropped
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use crate::executor::{ExecutionResult, MockExecutor};
596    use crate::script::{Script, ScriptConfig};
597    use tokio::time::Duration;
598
599    #[tokio::test]
600    async fn test_engine_basic_operations() {
601        let engine = Engine::new();
602        
603        // Add a test script with a valid temp directory
604        let temp_dir = std::env::temp_dir();
605        let config = ScriptConfig {
606            name: "test_script".to_string(),
607            command: "echo hello".to_string(),
608            timeout: 30,
609            test_command: None,
610            require_test: false,
611        };
612        let script = Script::new(config, temp_dir);
613        engine.add_script(script).unwrap();
614        
615        // Check script was added
616        assert_eq!(engine.list_scripts(), vec!["test_script"]);
617        assert!(engine.get_script("test_script").is_some());
618        assert!(engine.get_script("nonexistent").is_none());
619    }
620
621    #[tokio::test]
622    async fn test_engine_with_mock_executor() {
623        let mut mock_executor = MockExecutor::new();
624        
625        // Set up expected result
626        let expected_result = ExecutionResult {
627            script_name: "test_script".to_string(),
628            stdout: "Hello, World!".to_string(),
629            stderr: String::new(),
630            exit_code: 0,
631            duration: Duration::from_millis(100),
632        };
633        mock_executor.add_result("test_script".to_string(), expected_result.clone());
634        
635        let engine = Engine::with_executor(Arc::new(mock_executor));
636        
637        // Add a test script
638        let config = ScriptConfig {
639            name: "test_script".to_string(),
640            command: "echo hello".to_string(),
641            timeout: 30,
642            test_command: None,
643            require_test: false,
644        };
645        let script = Script::new(config, std::env::temp_dir());
646        engine.add_script(script).unwrap();
647        
648        // Execute the script
649        let result = engine.execute_script("test_script", vec![]).await.unwrap();
650        assert_eq!(result.stdout, "Hello, World!");
651        assert!(result.is_success());
652    }
653}