1use sklears_core::error::{Result as SklResult, SklearsError};
8use std::collections::{HashMap, VecDeque};
9use std::sync::{Arc, Condvar, Mutex, RwLock};
10use std::thread;
11use std::time::{Duration, Instant};
12use thiserror::Error;
13
14use super::component_framework::{ComponentConfig, ComponentMetrics, PluggableComponent};
15use super::dependency_management::DependencyResolver;
16use super::event_system::{ComponentEvent, EventBus};
17use super::lifecycle_management::LifecycleManager;
18use super::pipeline_system::{ExecutionStrategy, Pipeline, PipelineData, PipelineResult};
19use super::registry_system::GlobalComponentRegistry;
20
21#[derive(Debug)]
26pub struct CompositionExecutionEngine {
27 contexts: Arc<RwLock<HashMap<String, Arc<RwLock<CompositionContext>>>>>,
29 component_registry: Arc<GlobalComponentRegistry>,
31 dependency_resolver: Arc<DependencyResolver>,
33 lifecycle_manager: Arc<LifecycleManager>,
35 scheduler: Arc<RwLock<ExecutionScheduler>>,
37 resource_manager: Arc<RwLock<ResourceManager>>,
39 config: ExecutionEngineConfig,
41 stats: Arc<Mutex<ExecutionStatistics>>,
43 event_bus: Arc<RwLock<EventBus>>,
45}
46
47impl CompositionExecutionEngine {
48 #[must_use]
50 pub fn new(
51 component_registry: Arc<GlobalComponentRegistry>,
52 dependency_resolver: Arc<DependencyResolver>,
53 lifecycle_manager: Arc<LifecycleManager>,
54 ) -> Self {
55 Self {
56 contexts: Arc::new(RwLock::new(HashMap::new())),
57 component_registry,
58 dependency_resolver,
59 lifecycle_manager,
60 scheduler: Arc::new(RwLock::new(ExecutionScheduler::new())),
61 resource_manager: Arc::new(RwLock::new(ResourceManager::new())),
62 config: ExecutionEngineConfig::default(),
63 stats: Arc::new(Mutex::new(ExecutionStatistics::new())),
64 event_bus: Arc::new(RwLock::new(EventBus::new())),
65 }
66 }
67
68 pub fn create_context(&self, context_id: &str) -> SklResult<Arc<RwLock<CompositionContext>>> {
70 let mut contexts = self.contexts.write().unwrap_or_else(|e| e.into_inner());
71 let mut stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
72
73 if contexts.contains_key(context_id) && !self.config.allow_context_override {
74 return Err(SklearsError::InvalidInput(format!(
75 "Context {context_id} already exists"
76 )));
77 }
78
79 let context = Arc::new(RwLock::new(CompositionContext::new(context_id)));
80 contexts.insert(context_id.to_string(), context.clone());
81 stats.total_contexts_created += 1;
82
83 let mut event_bus = self.event_bus.write().unwrap_or_else(|e| e.into_inner());
85 let event = ComponentEvent::new("execution_engine", "context_created")
86 .with_data("context_id", context_id);
87 event_bus.publish(event).ok();
88
89 Ok(context)
90 }
91
92 #[must_use]
94 pub fn get_context(&self, context_id: &str) -> Option<Arc<RwLock<CompositionContext>>> {
95 let contexts = self.contexts.read().unwrap_or_else(|e| e.into_inner());
96 contexts.get(context_id).cloned()
97 }
98
99 pub async fn execute_pipeline(
101 &self,
102 context_id: &str,
103 pipeline: Pipeline,
104 input_data: PipelineData,
105 ) -> SklResult<ExecutionResult> {
106 let execution_start = Instant::now();
107 {
108 let mut stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
109 stats.total_executions += 1;
110 }
111
112 let context = match self.get_context(context_id) {
114 Some(ctx) => ctx,
115 None => self.create_context(context_id)?,
116 };
117
118 let resource_allocation = self.acquire_execution_resources(&pipeline).await?;
120
121 let execution_plan = self.create_execution_plan(&pipeline, &resource_allocation)?;
123
124 let execution_id = self
126 .schedule_execution(context.clone(), execution_plan, input_data)
127 .await?;
128
129 let result = self.wait_for_execution(execution_id).await?;
131
132 self.release_execution_resources(resource_allocation)
134 .await?;
135
136 let execution_time = execution_start.elapsed();
137 let mut stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
138
139 match &result.pipeline_result {
140 Ok(_) => stats.successful_executions += 1,
141 Err(_) => stats.failed_executions += 1,
142 }
143
144 stats.total_execution_time += execution_time;
145 stats.update_averages();
146
147 Ok(result)
148 }
149
150 pub async fn execute_pipelines_concurrent(
152 &self,
153 executions: Vec<ConcurrentExecution>,
154 ) -> SklResult<Vec<ExecutionResult>> {
155 let mut execution_handles = Vec::new();
156
157 for execution in executions {
159 let engine = self.clone();
160 let handle = tokio::spawn(async move {
161 engine
162 .execute_pipeline(
163 &execution.context_id,
164 execution.pipeline,
165 execution.input_data,
166 )
167 .await
168 });
169 execution_handles.push(handle);
170 }
171
172 let mut results = Vec::new();
174 for handle in execution_handles {
175 match handle.await {
176 Ok(result) => results.push(result?),
177 Err(e) => return Err(SklearsError::InvalidInput(format!("Execution failed: {e}"))),
178 }
179 }
180
181 Ok(results)
182 }
183
184 pub async fn execute_composition(
186 &self,
187 context_id: &str,
188 composition: CompositionGraph,
189 input_data: HashMap<String, PipelineData>,
190 ) -> SklResult<CompositionResult> {
191 let context = self
192 .get_context(context_id)
193 .ok_or_else(|| SklearsError::InvalidInput(format!("Context {context_id} not found")))?;
194
195 self.validate_composition_graph(&composition)?;
197
198 let dependency_order = self.resolve_composition_dependencies(&composition)?;
200
201 let mut component_results = HashMap::new();
203 let mut execution_context = CompositionExecutionContext::new();
204
205 for component_id in dependency_order {
206 let component_node = composition.nodes.get(&component_id).ok_or_else(|| {
207 SklearsError::InvalidInput(format!("Component {component_id} not found"))
208 })?;
209
210 let component_input = self.prepare_component_input(
212 &component_id,
213 &composition,
214 &input_data,
215 &component_results,
216 )?;
217
218 let component_result = self
220 .execute_composition_component(
221 context.clone(),
222 component_node,
223 component_input,
224 &mut execution_context,
225 )
226 .await?;
227
228 component_results.insert(component_id, component_result);
229 }
230
231 Ok(CompositionResult {
232 composition_id: composition.composition_id,
233 success: true,
234 component_results,
235 execution_time: execution_context.start_time.elapsed(),
236 error: None,
237 })
238 }
239
240 #[must_use]
242 pub fn get_statistics(&self) -> ExecutionStatistics {
243 let stats = self.stats.lock().unwrap_or_else(|e| e.into_inner());
244 stats.clone()
245 }
246
247 pub fn configure(&mut self, config: ExecutionEngineConfig) {
249 self.config = config;
250 }
251
252 pub async fn shutdown(&self) -> SklResult<()> {
254 let scheduler = self.scheduler.write().unwrap_or_else(|e| e.into_inner());
256 scheduler.shutdown()?;
257
258 let mut contexts = self.contexts.write().unwrap_or_else(|e| e.into_inner());
260 contexts.clear();
261
262 let mut resource_manager = self
264 .resource_manager
265 .write()
266 .unwrap_or_else(|e| e.into_inner());
267 resource_manager.release_all_resources()?;
268
269 Ok(())
270 }
271
272 async fn acquire_execution_resources(
274 &self,
275 pipeline: &Pipeline,
276 ) -> SklResult<ResourceAllocation> {
277 let mut resource_manager = self
278 .resource_manager
279 .write()
280 .unwrap_or_else(|e| e.into_inner());
281 resource_manager.allocate_for_pipeline(pipeline)
282 }
283
284 async fn release_execution_resources(&self, allocation: ResourceAllocation) -> SklResult<()> {
285 let mut resource_manager = self
286 .resource_manager
287 .write()
288 .unwrap_or_else(|e| e.into_inner());
289 resource_manager.release_allocation(allocation)
290 }
291
292 fn create_execution_plan(
293 &self,
294 pipeline: &Pipeline,
295 resource_allocation: &ResourceAllocation,
296 ) -> SklResult<ExecutionPlan> {
297 Ok(ExecutionPlan {
298 plan_id: uuid::Uuid::new_v4().to_string(),
299 pipeline_id: pipeline.pipeline_id.clone(),
300 execution_strategy: pipeline.execution_strategy.clone(),
301 resource_allocation: resource_allocation.clone(),
302 estimated_execution_time: self.estimate_execution_time(pipeline)?,
303 priority: ExecutionPriority::Normal,
304 })
305 }
306
307 async fn schedule_execution(
308 &self,
309 context: Arc<RwLock<CompositionContext>>,
310 execution_plan: ExecutionPlan,
311 input_data: PipelineData,
312 ) -> SklResult<String> {
313 let mut scheduler = self.scheduler.write().unwrap_or_else(|e| e.into_inner());
314 scheduler.schedule_execution(context, execution_plan, input_data)
315 }
316
317 async fn wait_for_execution(&self, execution_id: String) -> SklResult<ExecutionResult> {
318 let scheduler = self.scheduler.read().unwrap_or_else(|e| e.into_inner());
319 scheduler.wait_for_execution(&execution_id)
320 }
321
322 fn estimate_execution_time(&self, pipeline: &Pipeline) -> SklResult<Duration> {
323 let base_time = Duration::from_millis(100);
326 let stage_time = Duration::from_millis(50) * pipeline.stages.len() as u32;
327 Ok(base_time + stage_time)
328 }
329
330 fn validate_composition_graph(&self, composition: &CompositionGraph) -> SklResult<()> {
331 if composition.nodes.is_empty() {
332 return Err(SklearsError::InvalidInput(
333 "Composition graph is empty".to_string(),
334 ));
335 }
336
337 self.detect_composition_cycles(composition)?;
339
340 Ok(())
341 }
342
343 fn detect_composition_cycles(&self, composition: &CompositionGraph) -> SklResult<()> {
344 Ok(())
347 }
348
349 fn resolve_composition_dependencies(
350 &self,
351 composition: &CompositionGraph,
352 ) -> SklResult<Vec<String>> {
353 let mut order = Vec::new();
356 for component_id in composition.nodes.keys() {
357 order.push(component_id.clone());
358 }
359 Ok(order)
360 }
361
362 fn prepare_component_input(
363 &self,
364 component_id: &str,
365 composition: &CompositionGraph,
366 initial_input: &HashMap<String, PipelineData>,
367 component_results: &HashMap<String, ComponentExecutionResult>,
368 ) -> SklResult<PipelineData> {
369 if let Some(data) = initial_input.get(component_id) {
372 Ok(data.clone())
373 } else {
374 Ok(PipelineData::empty())
375 }
376 }
377
378 async fn execute_composition_component(
379 &self,
380 context: Arc<RwLock<CompositionContext>>,
381 component_node: &CompositionNode,
382 input_data: PipelineData,
383 execution_context: &mut CompositionExecutionContext,
384 ) -> SklResult<ComponentExecutionResult> {
385 let start_time = Instant::now();
386
387 let component_config =
389 ComponentConfig::new(&component_node.component_id, &component_node.component_type);
390 let mut component = self
391 .component_registry
392 .create_component(&component_node.component_type, &component_config)?;
393
394 component.initialize(&component_config)?;
396 component.start()?;
397
398 let output_data = input_data; let execution_time = start_time.elapsed();
403
404 Ok(ComponentExecutionResult {
405 component_id: component_node.component_id.clone(),
406 success: true,
407 execution_time,
408 output_data,
409 error: None,
410 metrics: component.get_metrics(),
411 })
412 }
413}
414
415pub struct CompositionContext {
417 pub context_id: String,
419 pub active_pipelines: HashMap<String, Pipeline>,
421 pub components: HashMap<String, Box<dyn PluggableComponent>>,
423 pub variables: HashMap<String, serde_json::Value>,
425 pub metadata: HashMap<String, String>,
427 pub state: ContextState,
429 pub created_at: Instant,
431 pub last_activity: Instant,
433}
434
435impl std::fmt::Debug for CompositionContext {
436 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437 f.debug_struct("CompositionContext")
438 .field("context_id", &self.context_id)
439 .field("active_pipelines", &self.active_pipelines)
440 .field(
441 "components",
442 &format!("[{} components]", self.components.len()),
443 )
444 .field("variables", &self.variables)
445 .field("metadata", &self.metadata)
446 .field("state", &self.state)
447 .field("created_at", &self.created_at)
448 .field("last_activity", &self.last_activity)
449 .finish()
450 }
451}
452
453impl CompositionContext {
454 #[must_use]
455 pub fn new(context_id: &str) -> Self {
456 let now = Instant::now();
457 Self {
458 context_id: context_id.to_string(),
459 active_pipelines: HashMap::new(),
460 components: HashMap::new(),
461 variables: HashMap::new(),
462 metadata: HashMap::new(),
463 state: ContextState::Active,
464 created_at: now,
465 last_activity: now,
466 }
467 }
468
469 pub fn add_component(&mut self, component_id: &str, component: Box<dyn PluggableComponent>) {
471 self.components.insert(component_id.to_string(), component);
472 self.last_activity = Instant::now();
473 }
474
475 #[must_use]
477 pub fn get_component(&self, component_id: &str) -> Option<&Box<dyn PluggableComponent>> {
478 self.components.get(component_id)
479 }
480
481 pub fn set_variable(&mut self, key: &str, value: serde_json::Value) {
483 self.variables.insert(key.to_string(), value);
484 self.last_activity = Instant::now();
485 }
486
487 #[must_use]
489 pub fn get_variable(&self, key: &str) -> Option<&serde_json::Value> {
490 self.variables.get(key)
491 }
492}
493
494#[derive(Debug, Clone, PartialEq)]
496pub enum ContextState {
497 Active,
499 Suspended,
501 CleaningUp,
503 Terminated,
505}
506
507#[derive(Debug)]
509pub struct ExecutionScheduler {
510 execution_queue: VecDeque<ScheduledExecution>,
512 active_executions: HashMap<String, ActiveExecution>,
514 config: SchedulerConfig,
516 thread_pool: Option<thread::JoinHandle<()>>,
518 shutdown_signal: Arc<(Mutex<bool>, Condvar)>,
520}
521
522impl ExecutionScheduler {
523 #[must_use]
524 pub fn new() -> Self {
525 Self {
526 execution_queue: VecDeque::new(),
527 active_executions: HashMap::new(),
528 config: SchedulerConfig::default(),
529 thread_pool: None,
530 shutdown_signal: Arc::new((Mutex::new(false), Condvar::new())),
531 }
532 }
533
534 pub fn schedule_execution(
535 &mut self,
536 context: Arc<RwLock<CompositionContext>>,
537 execution_plan: ExecutionPlan,
538 input_data: PipelineData,
539 ) -> SklResult<String> {
540 let execution_id = uuid::Uuid::new_v4().to_string();
541
542 let scheduled_execution = ScheduledExecution {
543 execution_id: execution_id.clone(),
544 context,
545 execution_plan,
546 input_data,
547 scheduled_at: Instant::now(),
548 };
549
550 self.execution_queue.push_back(scheduled_execution);
551 Ok(execution_id)
552 }
553
554 pub fn wait_for_execution(&self, execution_id: &str) -> SklResult<ExecutionResult> {
555 Ok(ExecutionResult {
558 execution_id: execution_id.to_string(),
559 pipeline_result: Ok(PipelineResult {
560 pipeline_id: "test".to_string(),
561 success: true,
562 stage_results: Vec::new(),
563 final_output: PipelineData::empty(),
564 execution_time: Duration::from_millis(100),
565 error: None,
566 }),
567 execution_time: Duration::from_millis(100),
568 resource_usage: ResourceUsage::new(),
569 })
570 }
571
572 pub fn shutdown(&self) -> SklResult<()> {
573 let (lock, cvar) = &*self.shutdown_signal;
574 let mut shutdown = lock.lock().unwrap_or_else(|e| e.into_inner());
575 *shutdown = true;
576 cvar.notify_all();
577 Ok(())
578 }
579}
580
581#[derive(Debug)]
583pub struct ResourceManager {
584 available_cpu_cores: u32,
586 available_memory: u64,
588 allocated_resources: HashMap<String, ResourceAllocation>,
590 config: ResourceManagerConfig,
592}
593
594impl ResourceManager {
595 #[must_use]
596 pub fn new() -> Self {
597 Self {
598 available_cpu_cores: num_cpus::get() as u32,
599 available_memory: 1024 * 1024 * 1024, allocated_resources: HashMap::new(),
601 config: ResourceManagerConfig::default(),
602 }
603 }
604
605 pub fn allocate_for_pipeline(&mut self, pipeline: &Pipeline) -> SklResult<ResourceAllocation> {
606 let allocation_id = uuid::Uuid::new_v4().to_string();
607
608 let estimated_cpu =
610 std::cmp::min(pipeline.stages.len() as u32, self.available_cpu_cores / 2);
611 let estimated_memory = 100 * 1024 * 1024; let allocation = ResourceAllocation {
614 allocation_id: allocation_id.clone(),
615 cpu_cores: estimated_cpu,
616 memory_bytes: estimated_memory,
617 allocated_at: Instant::now(),
618 };
619
620 self.allocated_resources
621 .insert(allocation_id, allocation.clone());
622 Ok(allocation)
623 }
624
625 pub fn release_allocation(&mut self, allocation: ResourceAllocation) -> SklResult<()> {
626 self.allocated_resources.remove(&allocation.allocation_id);
627 Ok(())
628 }
629
630 pub fn release_all_resources(&mut self) -> SklResult<()> {
631 self.allocated_resources.clear();
632 Ok(())
633 }
634}
635
636#[derive(Debug, Clone)]
638pub struct CompositionGraph {
639 pub composition_id: String,
641 pub nodes: HashMap<String, CompositionNode>,
643 pub edges: HashMap<String, Vec<String>>,
645 pub metadata: HashMap<String, String>,
647}
648
649#[derive(Debug, Clone)]
651pub struct CompositionNode {
652 pub component_id: String,
654 pub component_type: String,
656 pub config: ComponentConfig,
658 pub metadata: HashMap<String, String>,
660}
661
662#[derive(Debug)]
664pub struct ConcurrentExecution {
665 pub context_id: String,
667 pub pipeline: Pipeline,
669 pub input_data: PipelineData,
671}
672
673#[derive(Debug, Clone)]
675pub struct ExecutionPlan {
676 pub plan_id: String,
678 pub pipeline_id: String,
680 pub execution_strategy: ExecutionStrategy,
682 pub resource_allocation: ResourceAllocation,
684 pub estimated_execution_time: Duration,
686 pub priority: ExecutionPriority,
688}
689
690#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
692pub enum ExecutionPriority {
693 Low,
695 Normal,
697 High,
699 Critical,
701}
702
703#[derive(Debug, Clone)]
705pub struct ResourceAllocation {
706 pub allocation_id: String,
708 pub cpu_cores: u32,
710 pub memory_bytes: u64,
712 pub allocated_at: Instant,
714}
715
716#[derive(Debug, Clone)]
718pub struct ResourceUsage {
719 pub peak_memory: u64,
721 pub average_cpu: f64,
723 pub processing_time: Duration,
725}
726
727impl ResourceUsage {
728 #[must_use]
729 pub fn new() -> Self {
730 Self {
731 peak_memory: 0,
732 average_cpu: 0.0,
733 processing_time: Duration::from_secs(0),
734 }
735 }
736}
737
738#[derive(Debug)]
740pub struct ScheduledExecution {
741 pub execution_id: String,
743 pub context: Arc<RwLock<CompositionContext>>,
745 pub execution_plan: ExecutionPlan,
747 pub input_data: PipelineData,
749 pub scheduled_at: Instant,
751}
752
753#[derive(Debug)]
755pub struct ActiveExecution {
756 pub execution_id: String,
758 pub started_at: Instant,
760 pub status: ExecutionStatus,
762}
763
764#[derive(Debug, Clone, PartialEq)]
766pub enum ExecutionStatus {
767 Scheduled,
769 Running,
771 Completed,
773 Failed,
775 Cancelled,
777}
778
779#[derive(Debug)]
781pub struct ExecutionResult {
782 pub execution_id: String,
784 pub pipeline_result: SklResult<PipelineResult>,
786 pub execution_time: Duration,
788 pub resource_usage: ResourceUsage,
790}
791
792#[derive(Debug)]
794pub struct CompositionResult {
795 pub composition_id: String,
797 pub success: bool,
799 pub component_results: HashMap<String, ComponentExecutionResult>,
801 pub execution_time: Duration,
803 pub error: Option<String>,
805}
806
807#[derive(Debug, Clone)]
809pub struct ComponentExecutionResult {
810 pub component_id: String,
812 pub success: bool,
814 pub execution_time: Duration,
816 pub output_data: PipelineData,
818 pub error: Option<String>,
820 pub metrics: ComponentMetrics,
822}
823
824#[derive(Debug)]
826pub struct CompositionExecutionContext {
827 pub start_time: Instant,
829 pub variables: HashMap<String, serde_json::Value>,
831 pub trace: Vec<ExecutionTrace>,
833}
834
835impl CompositionExecutionContext {
836 #[must_use]
837 pub fn new() -> Self {
838 Self {
839 start_time: Instant::now(),
840 variables: HashMap::new(),
841 trace: Vec::new(),
842 }
843 }
844}
845
846#[derive(Debug, Clone)]
848pub struct ExecutionTrace {
849 pub timestamp: Instant,
851 pub component_id: String,
853 pub event: String,
855 pub data: Option<serde_json::Value>,
857}
858
859#[derive(Debug, Clone)]
861pub struct ExecutionEngineConfig {
862 pub max_concurrent_executions: usize,
864 pub execution_timeout: Option<Duration>,
866 pub allow_context_override: bool,
868 pub enable_tracing: bool,
870 pub resource_allocation_strategy: ResourceAllocationStrategy,
872}
873
874impl Default for ExecutionEngineConfig {
875 fn default() -> Self {
876 Self {
877 max_concurrent_executions: 10,
878 execution_timeout: Some(Duration::from_secs(300)),
879 allow_context_override: false,
880 enable_tracing: true,
881 resource_allocation_strategy: ResourceAllocationStrategy::Conservative,
882 }
883 }
884}
885
886#[derive(Debug, Clone)]
888pub enum ResourceAllocationStrategy {
889 Conservative,
891 Aggressive,
893 Adaptive,
895}
896
897#[derive(Debug, Clone)]
899pub struct SchedulerConfig {
900 pub max_queue_size: usize,
902 pub execution_timeout: Duration,
904 pub enable_priority_scheduling: bool,
906}
907
908impl Default for SchedulerConfig {
909 fn default() -> Self {
910 Self {
911 max_queue_size: 1000,
912 execution_timeout: Duration::from_secs(300),
913 enable_priority_scheduling: true,
914 }
915 }
916}
917
918#[derive(Debug, Clone)]
920pub struct ResourceManagerConfig {
921 pub cpu_oversubscription_factor: f64,
923 pub memory_oversubscription_factor: f64,
925 pub enable_resource_monitoring: bool,
927}
928
929impl Default for ResourceManagerConfig {
930 fn default() -> Self {
931 Self {
932 cpu_oversubscription_factor: 1.5,
933 memory_oversubscription_factor: 1.2,
934 enable_resource_monitoring: true,
935 }
936 }
937}
938
939#[derive(Debug, Clone)]
941pub struct ExecutionStatistics {
942 pub total_executions: u64,
944 pub successful_executions: u64,
946 pub failed_executions: u64,
948 pub total_execution_time: Duration,
950 pub average_execution_time: Duration,
952 pub total_contexts_created: u64,
954 pub peak_concurrent_executions: u32,
956}
957
958impl ExecutionStatistics {
959 #[must_use]
960 pub fn new() -> Self {
961 Self {
962 total_executions: 0,
963 successful_executions: 0,
964 failed_executions: 0,
965 total_execution_time: Duration::from_secs(0),
966 average_execution_time: Duration::from_secs(0),
967 total_contexts_created: 0,
968 peak_concurrent_executions: 0,
969 }
970 }
971
972 #[must_use]
974 pub fn success_rate(&self) -> f64 {
975 if self.total_executions == 0 {
976 0.0
977 } else {
978 self.successful_executions as f64 / self.total_executions as f64
979 }
980 }
981
982 pub fn update_averages(&mut self) {
984 if self.total_executions > 0 {
985 self.average_execution_time = self.total_execution_time / self.total_executions as u32;
986 }
987 }
988}
989
990#[derive(Debug, Error)]
992pub enum ExecutionEngineError {
993 #[error("Context not found: {0}")]
994 ContextNotFound(String),
995
996 #[error("Resource allocation failed: {0}")]
997 ResourceAllocationFailed(String),
998
999 #[error("Execution scheduling failed: {0}")]
1000 SchedulingFailed(String),
1001
1002 #[error("Execution timeout: {0:?}")]
1003 ExecutionTimeout(Duration),
1004
1005 #[error("Invalid composition graph: {0}")]
1006 InvalidCompositionGraph(String),
1007}
1008
1009impl Clone for CompositionExecutionEngine {
1010 fn clone(&self) -> Self {
1011 Self {
1012 contexts: self.contexts.clone(),
1013 component_registry: self.component_registry.clone(),
1014 dependency_resolver: self.dependency_resolver.clone(),
1015 lifecycle_manager: self.lifecycle_manager.clone(),
1016 scheduler: self.scheduler.clone(),
1017 resource_manager: self.resource_manager.clone(),
1018 config: self.config.clone(),
1019 stats: self.stats.clone(),
1020 event_bus: self.event_bus.clone(),
1021 }
1022 }
1023}
1024
1025impl Default for ExecutionScheduler {
1026 fn default() -> Self {
1027 Self::new()
1028 }
1029}
1030
1031impl Default for ResourceManager {
1032 fn default() -> Self {
1033 Self::new()
1034 }
1035}
1036
1037impl Default for ResourceUsage {
1038 fn default() -> Self {
1039 Self::new()
1040 }
1041}
1042
1043impl Default for CompositionExecutionContext {
1044 fn default() -> Self {
1045 Self::new()
1046 }
1047}
1048
1049impl Default for ExecutionStatistics {
1050 fn default() -> Self {
1051 Self::new()
1052 }
1053}
1054
1055#[allow(non_snake_case)]
1056#[cfg(test)]
1057mod tests {
1058 use super::*;
1059
1060 #[test]
1061 fn test_composition_context() {
1062 let mut context = CompositionContext::new("test_context");
1063
1064 context.set_variable(
1065 "test_var",
1066 serde_json::Value::String("test_value".to_string()),
1067 );
1068 assert_eq!(
1069 context.get_variable("test_var").unwrap_or_default(),
1070 &serde_json::Value::String("test_value".to_string())
1071 );
1072
1073 assert_eq!(context.context_id, "test_context");
1074 assert_eq!(context.state, ContextState::Active);
1075 }
1076
1077 #[test]
1078 fn test_resource_allocation() {
1079 let allocation = ResourceAllocation {
1080 allocation_id: "test_allocation".to_string(),
1081 cpu_cores: 4,
1082 memory_bytes: 1024 * 1024,
1083 allocated_at: Instant::now(),
1084 };
1085
1086 assert_eq!(allocation.cpu_cores, 4);
1087 assert_eq!(allocation.memory_bytes, 1024 * 1024);
1088 }
1089
1090 #[test]
1091 fn test_execution_statistics() {
1092 let mut stats = ExecutionStatistics::new();
1093 stats.total_executions = 10;
1094 stats.successful_executions = 8;
1095 stats.failed_executions = 2;
1096
1097 assert_eq!(stats.success_rate(), 0.8);
1098 }
1099
1100 #[test]
1101 fn test_resource_manager() {
1102 let mut manager = ResourceManager::new();
1103
1104 let pipeline = Pipeline {
1106 pipeline_id: "test_pipeline".to_string(),
1107 stages: vec![], config: super::super::pipeline_system::PipelineConfiguration::default(),
1109 error_strategy: super::super::pipeline_system::ErrorHandlingStrategy::FailFast,
1110 execution_strategy: ExecutionStrategy::Sequential,
1111 metadata: super::super::pipeline_system::PipelineMetadata::new(),
1112 state: super::super::pipeline_system::PipelineState::Created,
1113 components: Arc::new(RwLock::new(HashMap::new())),
1114 event_bus: Arc::new(RwLock::new(EventBus::new())),
1115 execution_context: Arc::new(RwLock::new(
1116 super::super::pipeline_system::ExecutionContext::new(),
1117 )),
1118 metrics: Arc::new(Mutex::new(
1119 super::super::pipeline_system::PipelineMetrics::new(),
1120 )),
1121 };
1122
1123 let allocation = manager.allocate_for_pipeline(&pipeline);
1124 assert!(allocation.is_ok());
1125
1126 let allocation = allocation.expect("operation should succeed");
1127 let release_result = manager.release_allocation(allocation);
1128 assert!(release_result.is_ok());
1129 }
1130}