1use sklears_core::error::{Result as SklResult, SklearsError};
8use std::collections::{HashMap, VecDeque};
9use std::sync::{Arc, Condvar, Mutex, RwLock};
10use std::thread;
11use std::time::{Duration, Instant};
12use thiserror::Error;
13
14use super::component_framework::{ComponentConfig, ComponentMetrics, PluggableComponent};
15use super::dependency_management::DependencyResolver;
16use super::event_system::{ComponentEvent, EventBus};
17use super::lifecycle_management::LifecycleManager;
18use super::pipeline_system::{ExecutionStrategy, Pipeline, PipelineData, PipelineResult};
19use super::registry_system::GlobalComponentRegistry;
20
21#[derive(Debug)]
26pub struct CompositionExecutionEngine {
27 contexts: Arc<RwLock<HashMap<String, Arc<RwLock<CompositionContext>>>>>,
29 component_registry: Arc<GlobalComponentRegistry>,
31 dependency_resolver: Arc<DependencyResolver>,
33 lifecycle_manager: Arc<LifecycleManager>,
35 scheduler: Arc<RwLock<ExecutionScheduler>>,
37 resource_manager: Arc<RwLock<ResourceManager>>,
39 config: ExecutionEngineConfig,
41 stats: Arc<Mutex<ExecutionStatistics>>,
43 event_bus: Arc<RwLock<EventBus>>,
45}
46
47impl CompositionExecutionEngine {
48 #[must_use]
50 pub fn new(
51 component_registry: Arc<GlobalComponentRegistry>,
52 dependency_resolver: Arc<DependencyResolver>,
53 lifecycle_manager: Arc<LifecycleManager>,
54 ) -> Self {
55 Self {
56 contexts: Arc::new(RwLock::new(HashMap::new())),
57 component_registry,
58 dependency_resolver,
59 lifecycle_manager,
60 scheduler: Arc::new(RwLock::new(ExecutionScheduler::new())),
61 resource_manager: Arc::new(RwLock::new(ResourceManager::new())),
62 config: ExecutionEngineConfig::default(),
63 stats: Arc::new(Mutex::new(ExecutionStatistics::new())),
64 event_bus: Arc::new(RwLock::new(EventBus::new())),
65 }
66 }
67
68 pub fn create_context(&self, context_id: &str) -> SklResult<Arc<RwLock<CompositionContext>>> {
70 let mut contexts = self.contexts.write().unwrap();
71 let mut stats = self.stats.lock().unwrap();
72
73 if contexts.contains_key(context_id) && !self.config.allow_context_override {
74 return Err(SklearsError::InvalidInput(format!(
75 "Context {context_id} already exists"
76 )));
77 }
78
79 let context = Arc::new(RwLock::new(CompositionContext::new(context_id)));
80 contexts.insert(context_id.to_string(), context.clone());
81 stats.total_contexts_created += 1;
82
83 let mut event_bus = self.event_bus.write().unwrap();
85 let event = ComponentEvent::new("execution_engine", "context_created")
86 .with_data("context_id", context_id);
87 event_bus.publish(event).ok();
88
89 Ok(context)
90 }
91
92 #[must_use]
94 pub fn get_context(&self, context_id: &str) -> Option<Arc<RwLock<CompositionContext>>> {
95 let contexts = self.contexts.read().unwrap();
96 contexts.get(context_id).cloned()
97 }
98
99 pub async fn execute_pipeline(
101 &self,
102 context_id: &str,
103 pipeline: Pipeline,
104 input_data: PipelineData,
105 ) -> SklResult<ExecutionResult> {
106 let execution_start = Instant::now();
107 {
108 let mut stats = self.stats.lock().unwrap();
109 stats.total_executions += 1;
110 }
111
112 let context = match self.get_context(context_id) {
114 Some(ctx) => ctx,
115 None => self.create_context(context_id)?,
116 };
117
118 let resource_allocation = self.acquire_execution_resources(&pipeline).await?;
120
121 let execution_plan = self.create_execution_plan(&pipeline, &resource_allocation)?;
123
124 let execution_id = self
126 .schedule_execution(context.clone(), execution_plan, input_data)
127 .await?;
128
129 let result = self.wait_for_execution(execution_id).await?;
131
132 self.release_execution_resources(resource_allocation)
134 .await?;
135
136 let execution_time = execution_start.elapsed();
137 let mut stats = self.stats.lock().unwrap();
138
139 match &result.pipeline_result {
140 Ok(_) => stats.successful_executions += 1,
141 Err(_) => stats.failed_executions += 1,
142 }
143
144 stats.total_execution_time += execution_time;
145 stats.update_averages();
146
147 Ok(result)
148 }
149
150 pub async fn execute_pipelines_concurrent(
152 &self,
153 executions: Vec<ConcurrentExecution>,
154 ) -> SklResult<Vec<ExecutionResult>> {
155 let mut execution_handles = Vec::new();
156
157 for execution in executions {
159 let engine = self.clone();
160 let handle = tokio::spawn(async move {
161 engine
162 .execute_pipeline(
163 &execution.context_id,
164 execution.pipeline,
165 execution.input_data,
166 )
167 .await
168 });
169 execution_handles.push(handle);
170 }
171
172 let mut results = Vec::new();
174 for handle in execution_handles {
175 match handle.await {
176 Ok(result) => results.push(result?),
177 Err(e) => return Err(SklearsError::InvalidInput(format!("Execution failed: {e}"))),
178 }
179 }
180
181 Ok(results)
182 }
183
184 pub async fn execute_composition(
186 &self,
187 context_id: &str,
188 composition: CompositionGraph,
189 input_data: HashMap<String, PipelineData>,
190 ) -> SklResult<CompositionResult> {
191 let context = self
192 .get_context(context_id)
193 .ok_or_else(|| SklearsError::InvalidInput(format!("Context {context_id} not found")))?;
194
195 self.validate_composition_graph(&composition)?;
197
198 let dependency_order = self.resolve_composition_dependencies(&composition)?;
200
201 let mut component_results = HashMap::new();
203 let mut execution_context = CompositionExecutionContext::new();
204
205 for component_id in dependency_order {
206 let component_node = composition.nodes.get(&component_id).ok_or_else(|| {
207 SklearsError::InvalidInput(format!("Component {component_id} not found"))
208 })?;
209
210 let component_input = self.prepare_component_input(
212 &component_id,
213 &composition,
214 &input_data,
215 &component_results,
216 )?;
217
218 let component_result = self
220 .execute_composition_component(
221 context.clone(),
222 component_node,
223 component_input,
224 &mut execution_context,
225 )
226 .await?;
227
228 component_results.insert(component_id, component_result);
229 }
230
231 Ok(CompositionResult {
232 composition_id: composition.composition_id,
233 success: true,
234 component_results,
235 execution_time: execution_context.start_time.elapsed(),
236 error: None,
237 })
238 }
239
240 #[must_use]
242 pub fn get_statistics(&self) -> ExecutionStatistics {
243 let stats = self.stats.lock().unwrap();
244 stats.clone()
245 }
246
247 pub fn configure(&mut self, config: ExecutionEngineConfig) {
249 self.config = config;
250 }
251
252 pub async fn shutdown(&self) -> SklResult<()> {
254 let scheduler = self.scheduler.write().unwrap();
256 scheduler.shutdown()?;
257
258 let mut contexts = self.contexts.write().unwrap();
260 contexts.clear();
261
262 let mut resource_manager = self.resource_manager.write().unwrap();
264 resource_manager.release_all_resources()?;
265
266 Ok(())
267 }
268
269 async fn acquire_execution_resources(
271 &self,
272 pipeline: &Pipeline,
273 ) -> SklResult<ResourceAllocation> {
274 let mut resource_manager = self.resource_manager.write().unwrap();
275 resource_manager.allocate_for_pipeline(pipeline)
276 }
277
278 async fn release_execution_resources(&self, allocation: ResourceAllocation) -> SklResult<()> {
279 let mut resource_manager = self.resource_manager.write().unwrap();
280 resource_manager.release_allocation(allocation)
281 }
282
283 fn create_execution_plan(
284 &self,
285 pipeline: &Pipeline,
286 resource_allocation: &ResourceAllocation,
287 ) -> SklResult<ExecutionPlan> {
288 Ok(ExecutionPlan {
289 plan_id: uuid::Uuid::new_v4().to_string(),
290 pipeline_id: pipeline.pipeline_id.clone(),
291 execution_strategy: pipeline.execution_strategy.clone(),
292 resource_allocation: resource_allocation.clone(),
293 estimated_execution_time: self.estimate_execution_time(pipeline)?,
294 priority: ExecutionPriority::Normal,
295 })
296 }
297
298 async fn schedule_execution(
299 &self,
300 context: Arc<RwLock<CompositionContext>>,
301 execution_plan: ExecutionPlan,
302 input_data: PipelineData,
303 ) -> SklResult<String> {
304 let mut scheduler = self.scheduler.write().unwrap();
305 scheduler.schedule_execution(context, execution_plan, input_data)
306 }
307
308 async fn wait_for_execution(&self, execution_id: String) -> SklResult<ExecutionResult> {
309 let scheduler = self.scheduler.read().unwrap();
310 scheduler.wait_for_execution(&execution_id)
311 }
312
313 fn estimate_execution_time(&self, pipeline: &Pipeline) -> SklResult<Duration> {
314 let base_time = Duration::from_millis(100);
317 let stage_time = Duration::from_millis(50) * pipeline.stages.len() as u32;
318 Ok(base_time + stage_time)
319 }
320
321 fn validate_composition_graph(&self, composition: &CompositionGraph) -> SklResult<()> {
322 if composition.nodes.is_empty() {
323 return Err(SklearsError::InvalidInput(
324 "Composition graph is empty".to_string(),
325 ));
326 }
327
328 self.detect_composition_cycles(composition)?;
330
331 Ok(())
332 }
333
334 fn detect_composition_cycles(&self, composition: &CompositionGraph) -> SklResult<()> {
335 Ok(())
338 }
339
340 fn resolve_composition_dependencies(
341 &self,
342 composition: &CompositionGraph,
343 ) -> SklResult<Vec<String>> {
344 let mut order = Vec::new();
347 for component_id in composition.nodes.keys() {
348 order.push(component_id.clone());
349 }
350 Ok(order)
351 }
352
353 fn prepare_component_input(
354 &self,
355 component_id: &str,
356 composition: &CompositionGraph,
357 initial_input: &HashMap<String, PipelineData>,
358 component_results: &HashMap<String, ComponentExecutionResult>,
359 ) -> SklResult<PipelineData> {
360 if let Some(data) = initial_input.get(component_id) {
363 Ok(data.clone())
364 } else {
365 Ok(PipelineData::empty())
366 }
367 }
368
369 async fn execute_composition_component(
370 &self,
371 context: Arc<RwLock<CompositionContext>>,
372 component_node: &CompositionNode,
373 input_data: PipelineData,
374 execution_context: &mut CompositionExecutionContext,
375 ) -> SklResult<ComponentExecutionResult> {
376 let start_time = Instant::now();
377
378 let component_config =
380 ComponentConfig::new(&component_node.component_id, &component_node.component_type);
381 let mut component = self
382 .component_registry
383 .create_component(&component_node.component_type, &component_config)?;
384
385 component.initialize(&component_config)?;
387 component.start()?;
388
389 let output_data = input_data; let execution_time = start_time.elapsed();
394
395 Ok(ComponentExecutionResult {
396 component_id: component_node.component_id.clone(),
397 success: true,
398 execution_time,
399 output_data,
400 error: None,
401 metrics: component.get_metrics(),
402 })
403 }
404}
405
406pub struct CompositionContext {
408 pub context_id: String,
410 pub active_pipelines: HashMap<String, Pipeline>,
412 pub components: HashMap<String, Box<dyn PluggableComponent>>,
414 pub variables: HashMap<String, serde_json::Value>,
416 pub metadata: HashMap<String, String>,
418 pub state: ContextState,
420 pub created_at: Instant,
422 pub last_activity: Instant,
424}
425
426impl std::fmt::Debug for CompositionContext {
427 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428 f.debug_struct("CompositionContext")
429 .field("context_id", &self.context_id)
430 .field("active_pipelines", &self.active_pipelines)
431 .field(
432 "components",
433 &format!("[{} components]", self.components.len()),
434 )
435 .field("variables", &self.variables)
436 .field("metadata", &self.metadata)
437 .field("state", &self.state)
438 .field("created_at", &self.created_at)
439 .field("last_activity", &self.last_activity)
440 .finish()
441 }
442}
443
444impl CompositionContext {
445 #[must_use]
446 pub fn new(context_id: &str) -> Self {
447 let now = Instant::now();
448 Self {
449 context_id: context_id.to_string(),
450 active_pipelines: HashMap::new(),
451 components: HashMap::new(),
452 variables: HashMap::new(),
453 metadata: HashMap::new(),
454 state: ContextState::Active,
455 created_at: now,
456 last_activity: now,
457 }
458 }
459
460 pub fn add_component(&mut self, component_id: &str, component: Box<dyn PluggableComponent>) {
462 self.components.insert(component_id.to_string(), component);
463 self.last_activity = Instant::now();
464 }
465
466 #[must_use]
468 pub fn get_component(&self, component_id: &str) -> Option<&Box<dyn PluggableComponent>> {
469 self.components.get(component_id)
470 }
471
472 pub fn set_variable(&mut self, key: &str, value: serde_json::Value) {
474 self.variables.insert(key.to_string(), value);
475 self.last_activity = Instant::now();
476 }
477
478 #[must_use]
480 pub fn get_variable(&self, key: &str) -> Option<&serde_json::Value> {
481 self.variables.get(key)
482 }
483}
484
485#[derive(Debug, Clone, PartialEq)]
487pub enum ContextState {
488 Active,
490 Suspended,
492 CleaningUp,
494 Terminated,
496}
497
498#[derive(Debug)]
500pub struct ExecutionScheduler {
501 execution_queue: VecDeque<ScheduledExecution>,
503 active_executions: HashMap<String, ActiveExecution>,
505 config: SchedulerConfig,
507 thread_pool: Option<thread::JoinHandle<()>>,
509 shutdown_signal: Arc<(Mutex<bool>, Condvar)>,
511}
512
513impl ExecutionScheduler {
514 #[must_use]
515 pub fn new() -> Self {
516 Self {
517 execution_queue: VecDeque::new(),
518 active_executions: HashMap::new(),
519 config: SchedulerConfig::default(),
520 thread_pool: None,
521 shutdown_signal: Arc::new((Mutex::new(false), Condvar::new())),
522 }
523 }
524
525 pub fn schedule_execution(
526 &mut self,
527 context: Arc<RwLock<CompositionContext>>,
528 execution_plan: ExecutionPlan,
529 input_data: PipelineData,
530 ) -> SklResult<String> {
531 let execution_id = uuid::Uuid::new_v4().to_string();
532
533 let scheduled_execution = ScheduledExecution {
534 execution_id: execution_id.clone(),
535 context,
536 execution_plan,
537 input_data,
538 scheduled_at: Instant::now(),
539 };
540
541 self.execution_queue.push_back(scheduled_execution);
542 Ok(execution_id)
543 }
544
545 pub fn wait_for_execution(&self, execution_id: &str) -> SklResult<ExecutionResult> {
546 Ok(ExecutionResult {
549 execution_id: execution_id.to_string(),
550 pipeline_result: Ok(PipelineResult {
551 pipeline_id: "test".to_string(),
552 success: true,
553 stage_results: Vec::new(),
554 final_output: PipelineData::empty(),
555 execution_time: Duration::from_millis(100),
556 error: None,
557 }),
558 execution_time: Duration::from_millis(100),
559 resource_usage: ResourceUsage::new(),
560 })
561 }
562
563 pub fn shutdown(&self) -> SklResult<()> {
564 let (lock, cvar) = &*self.shutdown_signal;
565 let mut shutdown = lock.lock().unwrap();
566 *shutdown = true;
567 cvar.notify_all();
568 Ok(())
569 }
570}
571
572#[derive(Debug)]
574pub struct ResourceManager {
575 available_cpu_cores: u32,
577 available_memory: u64,
579 allocated_resources: HashMap<String, ResourceAllocation>,
581 config: ResourceManagerConfig,
583}
584
585impl ResourceManager {
586 #[must_use]
587 pub fn new() -> Self {
588 Self {
589 available_cpu_cores: num_cpus::get() as u32,
590 available_memory: 1024 * 1024 * 1024, allocated_resources: HashMap::new(),
592 config: ResourceManagerConfig::default(),
593 }
594 }
595
596 pub fn allocate_for_pipeline(&mut self, pipeline: &Pipeline) -> SklResult<ResourceAllocation> {
597 let allocation_id = uuid::Uuid::new_v4().to_string();
598
599 let estimated_cpu =
601 std::cmp::min(pipeline.stages.len() as u32, self.available_cpu_cores / 2);
602 let estimated_memory = 100 * 1024 * 1024; let allocation = ResourceAllocation {
605 allocation_id: allocation_id.clone(),
606 cpu_cores: estimated_cpu,
607 memory_bytes: estimated_memory,
608 allocated_at: Instant::now(),
609 };
610
611 self.allocated_resources
612 .insert(allocation_id, allocation.clone());
613 Ok(allocation)
614 }
615
616 pub fn release_allocation(&mut self, allocation: ResourceAllocation) -> SklResult<()> {
617 self.allocated_resources.remove(&allocation.allocation_id);
618 Ok(())
619 }
620
621 pub fn release_all_resources(&mut self) -> SklResult<()> {
622 self.allocated_resources.clear();
623 Ok(())
624 }
625}
626
627#[derive(Debug, Clone)]
629pub struct CompositionGraph {
630 pub composition_id: String,
632 pub nodes: HashMap<String, CompositionNode>,
634 pub edges: HashMap<String, Vec<String>>,
636 pub metadata: HashMap<String, String>,
638}
639
640#[derive(Debug, Clone)]
642pub struct CompositionNode {
643 pub component_id: String,
645 pub component_type: String,
647 pub config: ComponentConfig,
649 pub metadata: HashMap<String, String>,
651}
652
653#[derive(Debug)]
655pub struct ConcurrentExecution {
656 pub context_id: String,
658 pub pipeline: Pipeline,
660 pub input_data: PipelineData,
662}
663
664#[derive(Debug, Clone)]
666pub struct ExecutionPlan {
667 pub plan_id: String,
669 pub pipeline_id: String,
671 pub execution_strategy: ExecutionStrategy,
673 pub resource_allocation: ResourceAllocation,
675 pub estimated_execution_time: Duration,
677 pub priority: ExecutionPriority,
679}
680
681#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
683pub enum ExecutionPriority {
684 Low,
686 Normal,
688 High,
690 Critical,
692}
693
694#[derive(Debug, Clone)]
696pub struct ResourceAllocation {
697 pub allocation_id: String,
699 pub cpu_cores: u32,
701 pub memory_bytes: u64,
703 pub allocated_at: Instant,
705}
706
707#[derive(Debug, Clone)]
709pub struct ResourceUsage {
710 pub peak_memory: u64,
712 pub average_cpu: f64,
714 pub processing_time: Duration,
716}
717
718impl ResourceUsage {
719 #[must_use]
720 pub fn new() -> Self {
721 Self {
722 peak_memory: 0,
723 average_cpu: 0.0,
724 processing_time: Duration::from_secs(0),
725 }
726 }
727}
728
729#[derive(Debug)]
731pub struct ScheduledExecution {
732 pub execution_id: String,
734 pub context: Arc<RwLock<CompositionContext>>,
736 pub execution_plan: ExecutionPlan,
738 pub input_data: PipelineData,
740 pub scheduled_at: Instant,
742}
743
744#[derive(Debug)]
746pub struct ActiveExecution {
747 pub execution_id: String,
749 pub started_at: Instant,
751 pub status: ExecutionStatus,
753}
754
755#[derive(Debug, Clone, PartialEq)]
757pub enum ExecutionStatus {
758 Scheduled,
760 Running,
762 Completed,
764 Failed,
766 Cancelled,
768}
769
770#[derive(Debug)]
772pub struct ExecutionResult {
773 pub execution_id: String,
775 pub pipeline_result: SklResult<PipelineResult>,
777 pub execution_time: Duration,
779 pub resource_usage: ResourceUsage,
781}
782
783#[derive(Debug)]
785pub struct CompositionResult {
786 pub composition_id: String,
788 pub success: bool,
790 pub component_results: HashMap<String, ComponentExecutionResult>,
792 pub execution_time: Duration,
794 pub error: Option<String>,
796}
797
798#[derive(Debug, Clone)]
800pub struct ComponentExecutionResult {
801 pub component_id: String,
803 pub success: bool,
805 pub execution_time: Duration,
807 pub output_data: PipelineData,
809 pub error: Option<String>,
811 pub metrics: ComponentMetrics,
813}
814
815#[derive(Debug)]
817pub struct CompositionExecutionContext {
818 pub start_time: Instant,
820 pub variables: HashMap<String, serde_json::Value>,
822 pub trace: Vec<ExecutionTrace>,
824}
825
826impl CompositionExecutionContext {
827 #[must_use]
828 pub fn new() -> Self {
829 Self {
830 start_time: Instant::now(),
831 variables: HashMap::new(),
832 trace: Vec::new(),
833 }
834 }
835}
836
837#[derive(Debug, Clone)]
839pub struct ExecutionTrace {
840 pub timestamp: Instant,
842 pub component_id: String,
844 pub event: String,
846 pub data: Option<serde_json::Value>,
848}
849
850#[derive(Debug, Clone)]
852pub struct ExecutionEngineConfig {
853 pub max_concurrent_executions: usize,
855 pub execution_timeout: Option<Duration>,
857 pub allow_context_override: bool,
859 pub enable_tracing: bool,
861 pub resource_allocation_strategy: ResourceAllocationStrategy,
863}
864
865impl Default for ExecutionEngineConfig {
866 fn default() -> Self {
867 Self {
868 max_concurrent_executions: 10,
869 execution_timeout: Some(Duration::from_secs(300)),
870 allow_context_override: false,
871 enable_tracing: true,
872 resource_allocation_strategy: ResourceAllocationStrategy::Conservative,
873 }
874 }
875}
876
877#[derive(Debug, Clone)]
879pub enum ResourceAllocationStrategy {
880 Conservative,
882 Aggressive,
884 Adaptive,
886}
887
888#[derive(Debug, Clone)]
890pub struct SchedulerConfig {
891 pub max_queue_size: usize,
893 pub execution_timeout: Duration,
895 pub enable_priority_scheduling: bool,
897}
898
899impl Default for SchedulerConfig {
900 fn default() -> Self {
901 Self {
902 max_queue_size: 1000,
903 execution_timeout: Duration::from_secs(300),
904 enable_priority_scheduling: true,
905 }
906 }
907}
908
909#[derive(Debug, Clone)]
911pub struct ResourceManagerConfig {
912 pub cpu_oversubscription_factor: f64,
914 pub memory_oversubscription_factor: f64,
916 pub enable_resource_monitoring: bool,
918}
919
920impl Default for ResourceManagerConfig {
921 fn default() -> Self {
922 Self {
923 cpu_oversubscription_factor: 1.5,
924 memory_oversubscription_factor: 1.2,
925 enable_resource_monitoring: true,
926 }
927 }
928}
929
930#[derive(Debug, Clone)]
932pub struct ExecutionStatistics {
933 pub total_executions: u64,
935 pub successful_executions: u64,
937 pub failed_executions: u64,
939 pub total_execution_time: Duration,
941 pub average_execution_time: Duration,
943 pub total_contexts_created: u64,
945 pub peak_concurrent_executions: u32,
947}
948
949impl ExecutionStatistics {
950 #[must_use]
951 pub fn new() -> Self {
952 Self {
953 total_executions: 0,
954 successful_executions: 0,
955 failed_executions: 0,
956 total_execution_time: Duration::from_secs(0),
957 average_execution_time: Duration::from_secs(0),
958 total_contexts_created: 0,
959 peak_concurrent_executions: 0,
960 }
961 }
962
963 #[must_use]
965 pub fn success_rate(&self) -> f64 {
966 if self.total_executions == 0 {
967 0.0
968 } else {
969 self.successful_executions as f64 / self.total_executions as f64
970 }
971 }
972
973 pub fn update_averages(&mut self) {
975 if self.total_executions > 0 {
976 self.average_execution_time = self.total_execution_time / self.total_executions as u32;
977 }
978 }
979}
980
981#[derive(Debug, Error)]
983pub enum ExecutionEngineError {
984 #[error("Context not found: {0}")]
985 ContextNotFound(String),
986
987 #[error("Resource allocation failed: {0}")]
988 ResourceAllocationFailed(String),
989
990 #[error("Execution scheduling failed: {0}")]
991 SchedulingFailed(String),
992
993 #[error("Execution timeout: {0:?}")]
994 ExecutionTimeout(Duration),
995
996 #[error("Invalid composition graph: {0}")]
997 InvalidCompositionGraph(String),
998}
999
1000impl Clone for CompositionExecutionEngine {
1001 fn clone(&self) -> Self {
1002 Self {
1003 contexts: self.contexts.clone(),
1004 component_registry: self.component_registry.clone(),
1005 dependency_resolver: self.dependency_resolver.clone(),
1006 lifecycle_manager: self.lifecycle_manager.clone(),
1007 scheduler: self.scheduler.clone(),
1008 resource_manager: self.resource_manager.clone(),
1009 config: self.config.clone(),
1010 stats: self.stats.clone(),
1011 event_bus: self.event_bus.clone(),
1012 }
1013 }
1014}
1015
1016impl Default for ExecutionScheduler {
1017 fn default() -> Self {
1018 Self::new()
1019 }
1020}
1021
1022impl Default for ResourceManager {
1023 fn default() -> Self {
1024 Self::new()
1025 }
1026}
1027
1028impl Default for ResourceUsage {
1029 fn default() -> Self {
1030 Self::new()
1031 }
1032}
1033
1034impl Default for CompositionExecutionContext {
1035 fn default() -> Self {
1036 Self::new()
1037 }
1038}
1039
1040impl Default for ExecutionStatistics {
1041 fn default() -> Self {
1042 Self::new()
1043 }
1044}
1045
1046#[allow(non_snake_case)]
1047#[cfg(test)]
1048mod tests {
1049 use super::*;
1050
1051 #[test]
1052 fn test_composition_context() {
1053 let mut context = CompositionContext::new("test_context");
1054
1055 context.set_variable(
1056 "test_var",
1057 serde_json::Value::String("test_value".to_string()),
1058 );
1059 assert_eq!(
1060 context.get_variable("test_var").unwrap(),
1061 &serde_json::Value::String("test_value".to_string())
1062 );
1063
1064 assert_eq!(context.context_id, "test_context");
1065 assert_eq!(context.state, ContextState::Active);
1066 }
1067
1068 #[test]
1069 fn test_resource_allocation() {
1070 let allocation = ResourceAllocation {
1071 allocation_id: "test_allocation".to_string(),
1072 cpu_cores: 4,
1073 memory_bytes: 1024 * 1024,
1074 allocated_at: Instant::now(),
1075 };
1076
1077 assert_eq!(allocation.cpu_cores, 4);
1078 assert_eq!(allocation.memory_bytes, 1024 * 1024);
1079 }
1080
1081 #[test]
1082 fn test_execution_statistics() {
1083 let mut stats = ExecutionStatistics::new();
1084 stats.total_executions = 10;
1085 stats.successful_executions = 8;
1086 stats.failed_executions = 2;
1087
1088 assert_eq!(stats.success_rate(), 0.8);
1089 }
1090
1091 #[test]
1092 fn test_resource_manager() {
1093 let mut manager = ResourceManager::new();
1094
1095 let pipeline = Pipeline {
1097 pipeline_id: "test_pipeline".to_string(),
1098 stages: vec![], config: super::super::pipeline_system::PipelineConfiguration::default(),
1100 error_strategy: super::super::pipeline_system::ErrorHandlingStrategy::FailFast,
1101 execution_strategy: ExecutionStrategy::Sequential,
1102 metadata: super::super::pipeline_system::PipelineMetadata::new(),
1103 state: super::super::pipeline_system::PipelineState::Created,
1104 components: Arc::new(RwLock::new(HashMap::new())),
1105 event_bus: Arc::new(RwLock::new(EventBus::new())),
1106 execution_context: Arc::new(RwLock::new(
1107 super::super::pipeline_system::ExecutionContext::new(),
1108 )),
1109 metrics: Arc::new(Mutex::new(
1110 super::super::pipeline_system::PipelineMetrics::new(),
1111 )),
1112 };
1113
1114 let allocation = manager.allocate_for_pipeline(&pipeline);
1115 assert!(allocation.is_ok());
1116
1117 let allocation = allocation.unwrap();
1118 let release_result = manager.release_allocation(allocation);
1119 assert!(release_result.is_ok());
1120 }
1121}