1use crate::error::{GoblinError, Result};
2use crate::executor::{DefaultExecutor, ExecutionResult, Executor};
3use crate::plan::Plan;
4use crate::script::Script;
5use dashmap::DashMap;
6use std::collections::{HashMap, HashSet};
7use std::path::PathBuf;
8use std::sync::{Arc, RwLock};
9use tracing::{debug, info, warn};
10use uuid::Uuid;
11use tokio::sync::Semaphore;
12use futures::future::join_all;
13
14#[derive(Debug)]
16pub struct ExecutionContext {
17 pub id: Uuid,
18 pub plan_name: String,
19 pub results: HashMap<String, String>,
20 pub start_time: std::time::Instant,
21}
22
23impl ExecutionContext {
24 pub fn new(plan_name: String) -> Self {
25 Self {
26 id: Uuid::new_v4(),
27 plan_name,
28 results: HashMap::new(),
29 start_time: std::time::Instant::now(),
30 }
31 }
32
33 pub fn add_result(&mut self, step_name: String, result: String) {
34 self.results.insert(step_name, result);
35 }
36
37 pub fn get_result(&self, step_name: &str) -> Option<&String> {
38 self.results.get(step_name)
39 }
40
41 pub fn elapsed(&self) -> std::time::Duration {
42 self.start_time.elapsed()
43 }
44}
45
46pub struct Engine {
48 scripts: DashMap<String, Script>,
50 plans: DashMap<String, Plan>,
52 executor: Arc<dyn Executor + Send + Sync>,
54 scripts_dir: Option<PathBuf>,
56}
57
58impl Engine {
59 pub fn new() -> Self {
61 Self {
62 scripts: DashMap::new(),
63 plans: DashMap::new(),
64 executor: Arc::new(DefaultExecutor::new()),
65 scripts_dir: None,
66 }
67 }
68
69 pub fn with_executor(executor: Arc<dyn Executor + Send + Sync>) -> Self {
71 Self {
72 scripts: DashMap::new(),
73 plans: DashMap::new(),
74 executor,
75 scripts_dir: None,
76 }
77 }
78
79 pub fn with_scripts_dir(mut self, scripts_dir: PathBuf) -> Self {
81 self.scripts_dir = Some(scripts_dir);
82 self
83 }
84
85 pub fn auto_discover_scripts(&self) -> Result<usize> {
87 let scripts_dir = match &self.scripts_dir {
88 Some(dir) => dir.clone(),
89 None => {
90 warn!("Scripts directory not set, skipping auto-discovery");
91 return Ok(0);
92 }
93 };
94
95 if !scripts_dir.exists() {
96 warn!("Scripts directory does not exist: {}", scripts_dir.display());
97 return Ok(0);
98 }
99
100 let mut discovered = 0;
101
102 for entry in std::fs::read_dir(&scripts_dir)? {
103 let entry = entry?;
104 let path = entry.path();
105
106 if path.is_dir() {
107 let goblin_toml = path.join("goblin.toml");
108 if goblin_toml.exists() {
109 match Script::from_toml_file(&path) {
110 Ok(script) => {
111 info!("Discovered script: {} at {}", script.name, path.display());
112 self.scripts.insert(script.name.clone(), script);
113 discovered += 1;
114 }
115 Err(e) => {
116 warn!("Failed to load script from {}: {}", path.display(), e);
117 }
118 }
119 }
120 }
121 }
122
123 info!("Auto-discovered {} scripts", discovered);
124 Ok(discovered)
125 }
126
127 pub fn load_script(&self, script_dir: PathBuf) -> Result<()> {
129 let script = Script::from_toml_file(&script_dir)?;
130 self.scripts.insert(script.name.clone(), script);
131 Ok(())
132 }
133
134 pub fn add_script(&self, script: Script) -> Result<()> {
136 script.validate()?;
137 self.scripts.insert(script.name.clone(), script);
138 Ok(())
139 }
140
141 pub fn get_script(&self, name: &str) -> Option<Script> {
143 self.scripts.get(name).map(|entry| entry.value().clone())
144 }
145
146 pub fn list_scripts(&self) -> Vec<String> {
148 let mut names: Vec<String> = self.scripts.iter().map(|entry| entry.key().clone()).collect();
149 names.sort();
150 names
151 }
152
153 pub fn load_plan(&self, plan_path: PathBuf) -> Result<()> {
155 let plan = Plan::from_toml_file(&plan_path)?;
156
157 let required_scripts = plan.get_required_scripts();
159 for script_name in &required_scripts {
160 if !self.scripts.contains_key(script_name) {
161 return Err(GoblinError::script_not_found(script_name));
162 }
163 }
164
165 plan.validate()?;
166 self.plans.insert(plan.name.clone(), plan);
167 Ok(())
168 }
169
170 pub fn load_plan_from_str(&self, toml_str: &str) -> Result<()> {
172 let plan = Plan::from_toml_str(toml_str)?;
173
174 let required_scripts = plan.get_required_scripts();
176 for script_name in &required_scripts {
177 if !self.scripts.contains_key(script_name) {
178 return Err(GoblinError::script_not_found(script_name));
179 }
180 }
181
182 plan.validate()?;
183 self.plans.insert(plan.name.clone(), plan);
184 Ok(())
185 }
186
187 pub fn get_plan(&self, name: &str) -> Option<Plan> {
189 self.plans.get(name).map(|entry| entry.value().clone())
190 }
191
192 pub fn list_plans(&self) -> Vec<String> {
194 let mut names: Vec<String> = self.plans.iter().map(|entry| entry.key().clone()).collect();
195 names.sort();
196 names
197 }
198
199 pub async fn execute_script(&self, script_name: &str, args: Vec<String>) -> Result<ExecutionResult> {
201 let script = self.get_script(script_name)
202 .ok_or_else(|| GoblinError::script_not_found(script_name))?;
203
204 self.executor.execute_script(&script, &args).await
205 }
206
207 pub async fn execute_plan(&self, plan_name: &str, default_input: Option<String>) -> Result<ExecutionContext> {
209 let plan = self.get_plan(plan_name)
210 .ok_or_else(|| GoblinError::plan_not_found(plan_name))?;
211
212 info!("Starting parallel execution of plan: {}", plan_name);
213
214 let context = Arc::new(RwLock::new(ExecutionContext::new(plan_name.to_string())));
215
216 if let Some(input) = default_input {
218 context.write().unwrap().add_result("default_input".to_string(), input);
219 }
220
221 self.execute_plan_parallel(&plan, context.clone()).await?;
223
224 let final_context = Arc::try_unwrap(context)
225 .map_err(|_| GoblinError::engine_error("Failed to unwrap execution context"))?
226 .into_inner()
227 .map_err(|_| GoblinError::engine_error("Failed to acquire context lock"))?;
228
229 info!("Plan '{}' completed successfully in {:?}", plan_name, final_context.elapsed());
230 Ok(final_context)
231 }
232
233 async fn execute_plan_parallel(
235 &self,
236 plan: &Plan,
237 context: Arc<RwLock<ExecutionContext>>,
238 ) -> Result<()> {
239 let mut remaining_steps: HashSet<String> = plan.steps.iter().map(|s| s.name.clone()).collect();
240 let mut completed_steps: HashSet<String> = HashSet::new();
241
242 {
244 let ctx = context.read().unwrap();
245 if ctx.results.contains_key("default_input") {
246 completed_steps.insert("default_input".to_string());
247 }
248 }
249
250 let mut wave_count = 0;
251
252 while !remaining_steps.is_empty() {
253 wave_count += 1;
254 info!("Starting execution wave {}", wave_count);
255
256 let executable_steps: Vec<String> = remaining_steps
258 .iter()
259 .filter(|step_name| {
260 let step = plan.steps.iter().find(|s| &s.name == *step_name).unwrap();
261 let dependencies = step.get_dependencies();
262 dependencies.iter().all(|dep| dep == "default_input" || completed_steps.contains(dep))
263 })
264 .cloned()
265 .collect();
266
267 if executable_steps.is_empty() {
268 return Err(GoblinError::engine_error(format!(
269 "No executable steps found in wave {}. Possible circular dependency or missing dependencies. Remaining steps: {:?}",
270 wave_count, remaining_steps
271 )));
272 }
273
274 info!("Wave {} executing steps: {:?}", wave_count, executable_steps);
275
276 let mut futures = Vec::new();
278
279 for step_name in &executable_steps {
280 let step = plan.steps.iter().find(|s| &s.name == step_name).unwrap();
281 let context_clone = context.clone();
282 let executor_clone = self.executor.clone();
283 let step_clone = step.clone();
284
285 let script = self.get_script(&step.function)
287 .ok_or_else(|| GoblinError::script_not_found(&step.function))?;
288
289 let future = async move {
290 let args = {
292 let ctx = context_clone.read().unwrap();
293 step_clone.resolve_inputs(&ctx.results)?
294 };
295
296 info!("Executing step '{}' with script '{}' and args: {:?}",
297 step_clone.name, step_clone.function, args);
298
299 let result = executor_clone.execute_script(&script, &args).await?;
301
302 let output = result.get_output();
304 {
305 let mut ctx = context_clone.write().unwrap();
306 ctx.add_result(step_clone.name.clone(), output.clone());
307 }
308
309 info!("Step '{}' completed successfully. Output: '{}'", step_clone.name, output);
310
311 Ok::<String, GoblinError>(step_clone.name)
312 };
313
314 futures.push(future);
315 }
316
317 let results = join_all(futures).await;
319
320 let mut wave_completed = Vec::new();
322 for result in results {
323 match result {
324 Ok(step_name) => wave_completed.push(step_name),
325 Err(e) => return Err(e),
326 }
327 }
328
329 for step_name in wave_completed {
331 remaining_steps.remove(&step_name);
332 completed_steps.insert(step_name);
333 }
334
335 info!("Wave {} completed. Remaining steps: {}", wave_count, remaining_steps.len());
336 }
337
338 info!("All waves completed successfully in {} waves", wave_count);
339 Ok(())
340 }
341
342 pub fn get_stats(&self) -> (usize, usize) {
344 (self.scripts.len(), self.plans.len())
345 }
346
347 pub fn clear(&self) {
349 self.scripts.clear();
350 self.plans.clear();
351 }
352
353 pub fn validate_all_plans(&self) -> Result<()> {
355 for plan_entry in self.plans.iter() {
356 let plan = plan_entry.value();
357
358 for script_name in plan.get_required_scripts() {
360 if !self.scripts.contains_key(&script_name) {
361 return Err(GoblinError::script_not_found(&script_name));
362 }
363 }
364
365 plan.validate()?;
367 }
368 Ok(())
369 }
370}
371
372impl Default for Engine {
373 fn default() -> Self {
374 Self::new()
375 }
376}
377
378pub struct EnginePool {
380 instances: Arc<RwLock<Vec<Engine>>>,
381 semaphore: Arc<Semaphore>,
382 pool_size: usize,
383}
384
385impl EnginePool {
386 pub async fn new(pool_size: usize) -> Result<Self> {
388 if pool_size == 0 {
389 return Err(GoblinError::config_error("Pool size must be greater than 0"));
390 }
391
392 let instances = Arc::new(RwLock::new(Vec::with_capacity(pool_size)));
393
394 for _ in 0..pool_size {
396 let instance = Engine::new();
397 instances.write().unwrap().push(instance);
398 }
399
400 Ok(Self {
401 instances,
402 semaphore: Arc::new(Semaphore::new(pool_size)),
403 pool_size,
404 })
405 }
406
407 pub async fn with_config(pool_size: usize, scripts_dir: Option<PathBuf>) -> Result<Self> {
409 if pool_size == 0 {
410 return Err(GoblinError::config_error("Pool size must be greater than 0"));
411 }
412
413 let instances = Arc::new(RwLock::new(Vec::with_capacity(pool_size)));
414
415 for _ in 0..pool_size {
417 let mut instance = Engine::new();
418
419 if let Some(ref dir) = scripts_dir {
420 instance = instance.with_scripts_dir(dir.clone());
421 instance.auto_discover_scripts()?;
423 }
424
425 instances.write().unwrap().push(instance);
426 }
427
428 info!("Created engine pool with {} instances", pool_size);
429 Ok(Self {
430 instances,
431 semaphore: Arc::new(Semaphore::new(pool_size)),
432 pool_size,
433 })
434 }
435
436 pub fn get_pool_stats(&self) -> PoolStats {
438 let available_permits = self.semaphore.available_permits();
439 PoolStats {
440 total_instances: self.pool_size,
441 available_instances: available_permits,
442 busy_instances: self.pool_size - available_permits,
443 }
444 }
445
446 pub async fn acquire(&self) -> Result<EngineGuard> {
448 let permit = self.semaphore.clone().acquire_owned().await
450 .map_err(|e| GoblinError::engine_error(format!("Failed to acquire engine from pool: {}", e)))?;
451
452 let engine = {
454 let mut instances = self.instances.write().unwrap();
455 instances.pop()
456 .ok_or_else(|| GoblinError::engine_error("No engine instances available (this should not happen)"))?
457 };
458
459 debug!("Acquired engine instance from pool");
460 Ok(EngineGuard {
461 engine: Some(engine),
462 pool_instances: self.instances.clone(),
463 _permit: permit,
464 })
465 }
466
467 pub fn try_acquire(&self) -> Result<Option<EngineGuard>> {
469 let permit = match self.semaphore.clone().try_acquire_owned() {
471 Ok(permit) => permit,
472 Err(_) => return Ok(None), };
474
475 let engine = {
477 let mut instances = self.instances.write().unwrap();
478 instances.pop()
479 .ok_or_else(|| GoblinError::engine_error("No engine instances available (this should not happen)"))?
480 };
481
482 debug!("Acquired engine instance from pool (non-blocking)");
483 Ok(Some(EngineGuard {
484 engine: Some(engine),
485 pool_instances: self.instances.clone(),
486 _permit: permit,
487 }))
488 }
489
490 pub async fn load_scripts_on_all(&self, scripts_dir: &PathBuf) -> Result<()> {
492 let mut instances = self.instances.write().unwrap();
493
494 for instance in instances.iter_mut() {
495 instance.scripts_dir = Some(scripts_dir.clone());
496 instance.auto_discover_scripts()?;
497 }
498
499 info!("Loaded scripts on all {} instances in pool", self.pool_size);
500 Ok(())
501 }
502
503 pub async fn load_plan_on_all(&self, plan_path: PathBuf) -> Result<()> {
505 let mut instances = self.instances.write().unwrap();
506
507 for instance in instances.iter_mut() {
508 instance.load_plan(plan_path.clone())?;
509 }
510
511 info!("Loaded plan on all {} instances in pool", self.pool_size);
512 Ok(())
513 }
514}
515
516#[derive(Debug, Clone)]
518pub struct PoolStats {
519 pub total_instances: usize,
520 pub available_instances: usize,
521 pub busy_instances: usize,
522}
523
524pub struct EngineGuard {
526 engine: Option<Engine>,
527 pool_instances: Arc<RwLock<Vec<Engine>>>,
528 _permit: tokio::sync::OwnedSemaphorePermit,
529}
530
531impl EngineGuard {
532 pub fn engine(&self) -> &Engine {
534 self.engine.as_ref().unwrap()
535 }
536
537 pub fn engine_mut(&mut self) -> &mut Engine {
539 self.engine.as_mut().unwrap()
540 }
541
542 pub async fn execute_plan_with_reset(
544 &mut self,
545 plan_name: &str,
546 default_input: Option<String>,
547 ) -> Result<ExecutionContext> {
548 let result = self.engine().execute_plan(plan_name, default_input).await;
549
550 self.reset_execution_state();
552
553 result
554 }
555
556 pub fn reset_execution_state(&mut self) {
558 if let Some(_engine) = &mut self.engine {
559 debug!("Reset execution state for engine instance");
562 }
563 }
564}
565
566impl std::ops::Deref for EngineGuard {
567 type Target = Engine;
568
569 fn deref(&self) -> &Self::Target {
570 self.engine.as_ref().unwrap()
571 }
572}
573
574impl std::ops::DerefMut for EngineGuard {
575 fn deref_mut(&mut self) -> &mut Self::Target {
576 self.engine.as_mut().unwrap()
577 }
578}
579
580impl Drop for EngineGuard {
581 fn drop(&mut self) {
582 if let Some(engine) = self.engine.take() {
584 let mut instances = self.pool_instances.write().unwrap();
585 instances.push(engine);
586 debug!("Returned engine instance to pool");
587 }
588 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595 use crate::executor::{ExecutionResult, MockExecutor};
596 use crate::script::{Script, ScriptConfig};
597 use tokio::time::Duration;
598
599 #[tokio::test]
600 async fn test_engine_basic_operations() {
601 let engine = Engine::new();
602
603 let temp_dir = std::env::temp_dir();
605 let config = ScriptConfig {
606 name: "test_script".to_string(),
607 command: "echo hello".to_string(),
608 timeout: 30,
609 test_command: None,
610 require_test: false,
611 };
612 let script = Script::new(config, temp_dir);
613 engine.add_script(script).unwrap();
614
615 assert_eq!(engine.list_scripts(), vec!["test_script"]);
617 assert!(engine.get_script("test_script").is_some());
618 assert!(engine.get_script("nonexistent").is_none());
619 }
620
621 #[tokio::test]
622 async fn test_engine_with_mock_executor() {
623 let mut mock_executor = MockExecutor::new();
624
625 let expected_result = ExecutionResult {
627 script_name: "test_script".to_string(),
628 stdout: "Hello, World!".to_string(),
629 stderr: String::new(),
630 exit_code: 0,
631 duration: Duration::from_millis(100),
632 };
633 mock_executor.add_result("test_script".to_string(), expected_result.clone());
634
635 let engine = Engine::with_executor(Arc::new(mock_executor));
636
637 let config = ScriptConfig {
639 name: "test_script".to_string(),
640 command: "echo hello".to_string(),
641 timeout: 30,
642 test_command: None,
643 require_test: false,
644 };
645 let script = Script::new(config, std::env::temp_dir());
646 engine.add_script(script).unwrap();
647
648 let result = engine.execute_script("test_script", vec![]).await.unwrap();
650 assert_eq!(result.stdout, "Hello, World!");
651 assert!(result.is_success());
652 }
653}