1#[allow(dead_code)]
8use crate::coordination::monitoring::anomaly_detection::{AnomalyConfig, AnomalyResult};
9use crate::coordination::monitoring::performance_tracking::{
10 DashboardConfiguration, TrackerConfiguration,
11};
12use crate::coordination::orchestration::pipeline_orchestrator::OrchestratorConfiguration;
13use crate::coordination::scheduling::task_scheduler::SchedulerConfig;
14use crate::research::experiments::ResourceUsage;
15use scirs2_core::numeric::Float;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::marker::PhantomData;
19use std::sync::{Arc, Mutex};
20use std::time::{Duration, Instant};
21
22pub mod monitoring;
24pub mod orchestration;
25pub mod scheduling;
26
27pub use scheduling::{
29 PriorityLevel, PriorityManager, PriorityQueue, PriorityUpdateStrategy,
30 ResourceAllocationStrategy, ResourceAllocationTracker, ResourceManager,
31 ResourceOptimizationEngine, ResourcePool, ScheduledTask, SchedulingStrategy,
32 StaticPriorityStrategy, TaskPriority, TaskScheduler,
33};
34
35pub type OptimizationTask<T> = ScheduledTask<T>;
37
38pub use orchestration::{
39 AlertConfiguration, Checkpoint, CheckpointConfiguration, CheckpointManager, CheckpointMetadata,
40 Experiment, ExperimentConfiguration, ExperimentExecution, ExperimentManager, ExperimentResult,
41 ExperimentStatus, MonitoringConfiguration, OptimizationPipeline, PipelineConfiguration,
42 PipelineExecution, PipelineOrchestrator, PipelineStage, RecoveryManager, RecoveryStrategy,
43 ResourceLimits, StageResult, StorageConfiguration, TimeoutSettings,
44};
45
46pub use monitoring::{
47 AlertManager, AnomalyAlert, AnomalyAnalyzer, AnomalyClassifier, AnomalyDetector,
48 AnomalyReporter, ConvergenceAnalyzer, ConvergenceCriteria, ConvergenceDetector,
49 ConvergenceIndicator, ConvergenceMonitor, ConvergenceResult, MetricAggregator, MetricCollector,
50 OutlierDetector, PerformanceAlert, PerformanceMetrics, PerformanceTracker,
51};
52
53pub struct OptimizationCoordinator<T: Float + Debug + Send + Sync + 'static> {
55 scheduler: TaskScheduler<T>,
56 orchestrator: PipelineOrchestrator<T>,
57 performance_tracker: PerformanceTracker<T>,
58 convergence_detector: ConvergenceDetector<T>,
59 anomaly_detector: AnomalyDetector<T>,
60 config: CoordinatorConfig<T>,
61 state: CoordinatorState<T>,
62 metrics: CoordinatorMetrics<T>,
63 _phantom: PhantomData<T>,
64}
65
66#[derive(Debug)]
68pub struct CoordinatorConfig<T: Float + Debug + Send + Sync + 'static> {
69 pub max_concurrent_tasks: usize,
70 pub default_timeout: Duration,
71 pub monitoring_interval: Duration,
72 pub checkpoint_interval: Duration,
73 pub resource_allocation_strategy: ResourceAllocationStrategy,
74 pub priority_strategy: Box<dyn PriorityUpdateStrategy<T>>,
75 pub convergence_criteria: ConvergenceCriteria<T>,
76 pub enable_anomaly_detection: bool,
77 pub enable_auto_scaling: bool,
78 pub enable_fault_tolerance: bool,
79 pub performance_threshold: T,
80}
81
82impl<T: Float + Debug + Send + Sync + 'static> Default for CoordinatorConfig<T> {
83 fn default() -> Self {
84 Self {
85 max_concurrent_tasks: 10,
86 default_timeout: Duration::from_secs(3600),
87 monitoring_interval: Duration::from_secs(10),
88 checkpoint_interval: Duration::from_secs(300),
89 resource_allocation_strategy: ResourceAllocationStrategy::FairShare,
90 priority_strategy: Box::new(StaticPriorityStrategy),
91 convergence_criteria: ConvergenceCriteria::default(),
92 enable_anomaly_detection: true,
93 enable_auto_scaling: true,
94 enable_fault_tolerance: true,
95 performance_threshold: T::from(0.01).unwrap_or_else(|| T::zero()),
96 }
97 }
98}
99
100#[derive(Debug)]
102pub struct CoordinatorState<T: Float + Debug + Send + Sync + 'static> {
103 pub active_tasks: HashMap<String, OptimizationTask<T>>,
104 pub active_pipelines: HashMap<String, OptimizationPipeline<T>>,
105 pub active_experiments: HashMap<String, Experiment<T>>,
106 pub resource_usage: ResourceUsage,
107 pub last_checkpoint: Option<Instant>,
108 pub last_monitoring_update: Option<Instant>,
109 pub coordination_start_time: Instant,
110 pub total_tasks_processed: usize,
111 pub total_experiments_completed: usize,
112}
113
114impl<T: Float + Debug + Send + Sync + 'static> Default for CoordinatorState<T> {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl<T: Float + Debug + Send + Sync + 'static> CoordinatorState<T> {
121 pub fn new() -> Self {
122 Self {
123 active_tasks: HashMap::new(),
124 active_pipelines: HashMap::new(),
125 active_experiments: HashMap::new(),
126 resource_usage: ResourceUsage::default(),
127 last_checkpoint: None,
128 last_monitoring_update: None,
129 coordination_start_time: Instant::now(),
130 total_tasks_processed: 0,
131 total_experiments_completed: 0,
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
138pub struct CoordinatorMetrics<T: Float + Debug + Send + Sync + 'static> {
139 pub average_task_completion_time: T,
140 pub throughput: T,
141 pub resource_utilization: T,
142 pub error_rate: T,
143 pub convergence_rate: T,
144 pub anomaly_detection_rate: T,
145 pub uptime: Duration,
146 pub total_processed_tasks: usize,
147}
148
149impl<T: Float + Debug + Send + Sync + 'static> Default for CoordinatorMetrics<T> {
150 fn default() -> Self {
151 Self {
152 average_task_completion_time: T::zero(),
153 throughput: T::zero(),
154 resource_utilization: T::zero(),
155 error_rate: T::zero(),
156 convergence_rate: T::zero(),
157 anomaly_detection_rate: T::zero(),
158 uptime: Duration::new(0, 0),
159 total_processed_tasks: 0,
160 }
161 }
162}
163
164#[derive(Debug, Clone)]
166pub struct CoordinationResult<T: Float + Debug + Send + Sync + 'static> {
167 pub success: bool,
168 pub task_id: String,
169 pub execution_time: Duration,
170 pub resource_usage: ResourceUsage,
171 pub performance_metrics: PerformanceMetrics<T>,
172 pub convergence_result: Option<ConvergenceResult<T>>,
173 pub anomaly_alerts: Vec<AnomalyAlert<T>>,
174 pub errors: Vec<String>,
175}
176
177impl<T: Float + Debug + Send + Sync + 'static + Default> OptimizationCoordinator<T> {
178 pub fn new(config: CoordinatorConfig<T>) -> Self {
180 let scheduler = TaskScheduler::new(SchedulerConfig {
181 max_concurrent_tasks: config.max_concurrent_tasks,
182 queue_size_limit: 1000,
183 task_timeout: config.default_timeout,
184 priority_update_interval: Duration::from_secs(5),
185 load_balance_interval: Duration::from_secs(10),
186 estimation_threshold: T::from(0.9).unwrap(),
187 enable_adaptive_scheduling: true,
188 enable_performance_learning: true,
189 })
190 .expect("Failed to create task scheduler");
191
192 let orchestrator = PipelineOrchestrator::new(OrchestratorConfiguration {
193 max_concurrent_pipelines: config.max_concurrent_tasks,
194 default_resource_limits: ResourceLimits::default(),
195 default_timeouts: TimeoutSettings::default(),
196 monitoring: MonitoringConfiguration::default(),
197 })
198 .expect("Failed to create pipeline orchestrator");
199
200 let performance_tracker = PerformanceTracker::new(TrackerConfiguration {
201 collection_interval: config.monitoring_interval,
202 enabled_collectors: vec!["default".to_string()],
203 enabled_analyzers: vec!["default".to_string()],
204 storage_config: StorageConfiguration::default(),
205 alert_config: AlertConfiguration::default(),
206 dashboard_config: DashboardConfiguration {
207 theme: String::from("default"),
208 auto_refresh: true,
209 default_time_range: Duration::from_secs(3600),
210 custom_params: HashMap::new(),
211 },
212 })
213 .expect("Failed to create performance tracker");
214
215 let convergence_detector = ConvergenceDetector::new(config.convergence_criteria.clone());
216
217 let anomaly_detector = AnomalyDetector::new(AnomalyConfig::default());
218
219 Self {
220 scheduler,
221 orchestrator,
222 performance_tracker,
223 convergence_detector,
224 anomaly_detector,
225 state: CoordinatorState::new(),
226 metrics: CoordinatorMetrics::default(),
227 config,
228 _phantom: PhantomData,
229 }
230 }
231
232 pub fn submit_task(&mut self, mut task: OptimizationTask<T>) -> Result<String, String> {
234 let task_id = format!(
236 "task_{}_{}",
237 self.state.total_tasks_processed,
238 Instant::now().elapsed().as_nanos()
239 );
240 task.task_id = task_id.clone();
241
242 match self.scheduler.submit_task(task.clone()) {
244 Ok(scheduling_result) => {
245 self.state.active_tasks.insert(task_id.clone(), task);
246 self.state.total_tasks_processed += 1;
247
248 Ok(task_id)
252 }
253 Err(e) => Err(format!("Failed to schedule task: {}", e)),
254 }
255 }
256
257 pub fn submit_pipeline(
259 &mut self,
260 mut pipeline: OptimizationPipeline<T>,
261 ) -> Result<String, String> {
262 let pipeline_id = format!(
263 "pipeline_{}_{}",
264 self.state.active_pipelines.len(),
265 Instant::now().elapsed().as_nanos()
266 );
267
268 pipeline.pipeline_id = pipeline_id.clone();
269
270 let execution_result: Result<(), String> = Ok(());
272 match execution_result {
273 Ok(_) => {
274 self.state
275 .active_pipelines
276 .insert(pipeline_id.clone(), pipeline);
277 Ok(pipeline_id)
278 }
279 Err(e) => Err(format!("Failed to execute pipeline: {}", e)),
280 }
281 }
282
283 pub fn submit_experiment(&mut self, mut experiment: Experiment<T>) -> Result<String, String> {
285 let experiment_id = format!(
286 "experiment_{}_{}",
287 self.state.active_experiments.len(),
288 Instant::now().elapsed().as_nanos()
289 );
290
291 experiment.experiment_id = experiment_id.clone();
292
293 let pipeline = self.experiment_to_pipeline(&experiment)?;
295 let pipeline_id = self.submit_pipeline(pipeline)?;
296
297 self.state
298 .active_experiments
299 .insert(experiment_id.clone(), experiment);
300
301 Ok(experiment_id)
302 }
303
304 pub fn execute_cycle(&mut self) -> Vec<CoordinationResult<T>> {
306 let mut results = Vec::new();
307
308 self.update_monitoring();
310
311 let task_results = self.process_scheduled_tasks();
313 results.extend(task_results);
314
315 self.update_pipeline_executions();
317
318 self.perform_maintenance();
320
321 self.update_metrics();
323
324 results
325 }
326
327 pub fn monitor_optimization_value(&mut self, task_id: &str, value: T) -> MonitoringResult<T> {
329 let mut alerts = Vec::new();
330
331 let _ = self.performance_tracker.collect_metrics();
333
334 let convergence_result = self.convergence_detector.check_convergence(value);
336
337 let anomaly_result = if self.config.enable_anomaly_detection {
339 Some(self.anomaly_detector.detect_anomaly(value))
340 } else {
341 None
342 };
343
344 if let Some(ref anomaly) = anomaly_result {
346 if anomaly.is_anomaly {
347 alerts.push(MonitoringAlert::Anomaly(anomaly.clone()));
348 }
349 }
350
351 if convergence_result.converged {
352 alerts.push(MonitoringAlert::Convergence(convergence_result.clone()));
353 }
354
355 MonitoringResult {
356 task_id: task_id.to_string(),
357 value,
358 convergence_result,
359 anomaly_result,
360 alerts,
361 timestamp: Instant::now(),
362 }
363 }
364
365 pub fn get_status(&self) -> CoordinationStatus<T> {
367 CoordinationStatus {
368 active_tasks: self.state.active_tasks.len(),
369 active_pipelines: self.state.active_pipelines.len(),
370 active_experiments: self.state.active_experiments.len(),
371 resource_utilization: self.state.resource_usage.clone(),
372 metrics: self.metrics.clone(),
373 uptime: self.state.coordination_start_time.elapsed(),
374 health_status: self.assess_health_status(),
375 }
376 }
377
378 pub fn update_resource_allocation(&mut self, allocation: ResourceUsage) -> Result<(), String> {
380 self.state.resource_usage = allocation;
381
382 Ok(())
391 }
392
393 pub fn shutdown(&mut self) -> Result<(), String> {
395 self.complete_active_tasks()?;
397
398 self.save_final_checkpoints()?;
400
401 let report = self.generate_final_report();
403 println!("Coordination shutdown report:\n{}", report);
404
405 Ok(())
406 }
407
408 fn update_monitoring(&mut self) {
411 let now = Instant::now();
412
413 if let Some(last_update) = self.state.last_monitoring_update {
414 if now.duration_since(last_update) < self.config.monitoring_interval {
415 return;
416 }
417 }
418
419 let _ = self.performance_tracker.collect_metrics();
421
422 if self.config.enable_anomaly_detection {
424 let system_metrics = self.collect_system_metrics();
425 for metric in system_metrics {
426 let _ = self.anomaly_detector.detect_anomaly(metric);
427 }
428 }
429
430 self.state.last_monitoring_update = Some(now);
431 }
432
433 fn process_scheduled_tasks(&mut self) -> Vec<CoordinationResult<T>> {
434 let mut results = Vec::new();
435 let ready_tasks = Vec::new(); for task in ready_tasks {
438 match self.execute_task(&task) {
439 Ok(result) => {
440 results.push(result);
441 self.state.active_tasks.remove(&task.task_id);
442 }
443 Err(e) => {
444 results.push(CoordinationResult {
445 success: false,
446 task_id: task.task_id.clone(),
447 execution_time: Duration::new(0, 0),
448 resource_usage: ResourceUsage::default(),
449 performance_metrics: PerformanceMetrics::default(),
450 convergence_result: None,
451 anomaly_alerts: Vec::new(),
452 errors: vec![e],
453 });
454 }
455 }
456 }
457
458 results
459 }
460
461 fn execute_task(
462 &mut self,
463 task: &OptimizationTask<T>,
464 ) -> Result<CoordinationResult<T>, String> {
465 let start_time = Instant::now();
466 let task_id = task.task_id.clone();
467
468 let performance_metrics = self
470 .performance_tracker
471 .collect_metrics()
472 .unwrap_or_else(|_| PerformanceMetrics::default());
473
474 let execution_time = start_time.elapsed();
475
476 Ok(CoordinationResult {
477 success: true,
478 task_id,
479 execution_time,
480 resource_usage: self.state.resource_usage.clone(),
481 performance_metrics,
482 convergence_result: None,
483 anomaly_alerts: Vec::new(),
484 errors: Vec::new(),
485 })
486 }
487
488 fn update_pipeline_executions(&mut self) {
489 }
498
499 fn perform_maintenance(&mut self) {
500 let now = Instant::now();
501
502 if let Some(last_checkpoint) = self.state.last_checkpoint {
504 if now.duration_since(last_checkpoint) >= self.config.checkpoint_interval {
505 let _ = self.create_checkpoint();
506 }
507 } else {
508 let _ = self.create_checkpoint();
509 }
510
511 self.cleanup_completed_items();
513
514 if self.config.enable_auto_scaling {
516 self.update_adaptive_parameters();
517 }
518 }
519
520 fn create_checkpoint(&mut self) -> Result<(), String> {
521 self.state.last_checkpoint = Some(Instant::now());
524 Ok(())
525 }
526
527 fn cleanup_completed_items(&mut self) {
528 let threshold = Duration::from_secs(3600); let now = Instant::now();
531
532 self.state.active_tasks.retain(|_, _task| {
533 true });
540 }
541
542 fn update_adaptive_parameters(&mut self) {
543 let current_metrics = &self.metrics;
545
546 if current_metrics.resource_utilization > T::from(0.9).unwrap_or_else(|| T::zero()) {
547 let _ = self.request_additional_resources();
549 } else if current_metrics.resource_utilization < T::from(0.3).unwrap_or_else(|| T::zero()) {
550 let _ = self.release_excess_resources();
552 }
553 }
554
555 fn request_additional_resources(&mut self) -> Result<(), String> {
556 Ok(())
558 }
559
560 fn release_excess_resources(&mut self) -> Result<(), String> {
561 Ok(())
563 }
564
565 fn update_metrics(&mut self) {
566 let current_time = Instant::now();
567 let uptime = current_time.duration_since(self.state.coordination_start_time);
568
569 self.metrics.uptime = uptime;
571 self.metrics.total_processed_tasks = self.state.total_tasks_processed;
572
573 if uptime.as_secs() > 0 {
575 self.metrics.throughput = T::from(self.state.total_tasks_processed)
576 .unwrap_or_else(|| T::zero())
577 / T::from(uptime.as_secs()).unwrap();
578 }
579
580 self.metrics.resource_utilization = T::zero(); self.metrics.convergence_rate = self.calculate_convergence_rate();
585 self.metrics.anomaly_detection_rate = self.calculate_anomaly_rate();
586 }
587
588 fn calculate_convergence_rate(&self) -> T {
589 T::from(0.85).unwrap_or_else(|| T::zero()) }
592
593 fn calculate_anomaly_rate(&self) -> T {
594 T::from(0.05).unwrap_or_else(|| T::zero()) }
597
598 fn collect_system_metrics(&self) -> Vec<T> {
599 vec![
601 self.metrics.resource_utilization,
602 self.metrics.throughput,
603 T::from(self.state.active_tasks.len()).unwrap(),
604 T::from(self.state.active_pipelines.len()).unwrap(),
605 ]
606 }
607
608 fn experiment_to_pipeline(
609 &self,
610 experiment: &Experiment<T>,
611 ) -> Result<OptimizationPipeline<T>, String> {
612 let pipeline = OptimizationPipeline {
614 pipeline_id: experiment.experiment_id.clone(),
615 name: format!("Experiment {}", experiment.experiment_id),
616 description: "Auto-generated pipeline from experiment".to_string(),
617 stages: Vec::new(), dependencies: HashMap::new(),
619 configuration: PipelineConfiguration::default(),
620 global_parameters: HashMap::new(),
621 metadata: crate::coordination::orchestration::pipeline_orchestrator::PipelineMetadata {
622 created_by: "system".to_string(),
623 created_at: std::time::SystemTime::now(),
624 updated_at: std::time::SystemTime::now(),
625 tags: vec!["experiment".to_string()],
626 description: "Pipeline from experiment".to_string(),
627 },
628 version: "1.0.0".to_string(),
629 };
630 Ok(pipeline)
631 }
632
633 fn assess_health_status(&self) -> HealthStatus {
634 let error_rate = self.metrics.error_rate;
635 let resource_utilization = self.metrics.resource_utilization;
636
637 if error_rate > T::from(0.1).unwrap_or_else(|| T::zero()) {
638 HealthStatus::Unhealthy
639 } else if resource_utilization > T::from(0.95).unwrap_or_else(|| T::zero()) {
640 HealthStatus::Degraded
641 } else if error_rate > T::from(0.05).unwrap_or_else(|| T::zero())
642 || resource_utilization > T::from(0.8).unwrap_or_else(|| T::zero())
643 {
644 HealthStatus::Warning
645 } else {
646 HealthStatus::Healthy
647 }
648 }
649
650 fn complete_active_tasks(&mut self) -> Result<(), String> {
651 let timeout = Duration::from_secs(60);
653 let start = Instant::now();
654
655 while !self.state.active_tasks.is_empty() && start.elapsed() < timeout {
656 let results = self.process_scheduled_tasks();
657 if results.is_empty() {
658 std::thread::sleep(Duration::from_millis(100));
659 }
660 }
661
662 if !self.state.active_tasks.is_empty() {
663 return Err(format!(
664 "Timeout waiting for {} tasks to complete",
665 self.state.active_tasks.len()
666 ));
667 }
668
669 Ok(())
670 }
671
672 fn save_final_checkpoints(&mut self) -> Result<(), String> {
673 self.create_checkpoint()
674 }
675
676 fn generate_final_report(&self) -> String {
677 format!(
678 "Optimization Coordination Final Report:\n\
679 - Total Uptime: {:?}\n\
680 - Tasks Processed: {}\n\
681 - Experiments Completed: {}\n\
682 - Average Throughput: {:.2}\n\
683 - Resource Utilization: {:.2}%\n\
684 - Error Rate: {:.2}%\n\
685 - Convergence Rate: {:.2}%\n\
686 - Health Status: {:?}",
687 self.metrics.uptime,
688 self.metrics.total_processed_tasks,
689 self.state.total_experiments_completed,
690 self.metrics.throughput.to_f64().unwrap_or(0.0),
691 (self.metrics.resource_utilization * T::from(100.0).unwrap_or_else(|| T::zero()))
692 .to_f64()
693 .unwrap_or(0.0),
694 (self.metrics.error_rate * T::from(100.0).unwrap_or_else(|| T::zero()))
695 .to_f64()
696 .unwrap_or(0.0),
697 (self.metrics.convergence_rate * T::from(100.0).unwrap_or_else(|| T::zero()))
698 .to_f64()
699 .unwrap_or(0.0),
700 self.assess_health_status(),
701 )
702 }
703}
704
705#[derive(Debug, Clone)]
707pub struct MonitoringResult<T: Float + Debug + Send + Sync + 'static> {
708 pub task_id: String,
709 pub value: T,
710 pub convergence_result: ConvergenceResult<T>,
711 pub anomaly_result: Option<AnomalyResult<T>>,
712 pub alerts: Vec<MonitoringAlert<T>>,
713 pub timestamp: Instant,
714}
715
716#[derive(Debug, Clone)]
718pub enum MonitoringAlert<T: Float + Debug + Send + Sync + 'static> {
719 Convergence(ConvergenceResult<T>),
720 Anomaly(AnomalyResult<T>),
721 Performance(Box<PerformanceAlert<T>>),
722 Resource(String),
723}
724
725#[derive(Debug, Clone)]
727pub struct CoordinationStatus<T: Float + Debug + Send + Sync + 'static> {
728 pub active_tasks: usize,
729 pub active_pipelines: usize,
730 pub active_experiments: usize,
731 pub resource_utilization: ResourceUsage,
732 pub metrics: CoordinatorMetrics<T>,
733 pub uptime: Duration,
734 pub health_status: HealthStatus,
735}
736
737#[derive(Debug, Clone, PartialEq)]
739pub enum HealthStatus {
740 Healthy,
741 Warning,
742 Degraded,
743 Unhealthy,
744}
745
746pub struct CoordinatorBuilder<T: Float + Debug + Send + Sync + 'static> {
748 config: CoordinatorConfig<T>,
749}
750
751impl<T: Float + Debug + Send + Sync + 'static + Default> CoordinatorBuilder<T> {
752 pub fn new() -> Self {
753 Self {
754 config: CoordinatorConfig::default(),
755 }
756 }
757
758 pub fn max_concurrent_tasks(mut self, max: usize) -> Self {
759 self.config.max_concurrent_tasks = max;
760 self
761 }
762
763 pub fn monitoring_interval(mut self, interval: Duration) -> Self {
764 self.config.monitoring_interval = interval;
765 self
766 }
767
768 pub fn enable_anomaly_detection(mut self, enable: bool) -> Self {
769 self.config.enable_anomaly_detection = enable;
770 self
771 }
772
773 pub fn enable_fault_tolerance(mut self, enable: bool) -> Self {
774 self.config.enable_fault_tolerance = enable;
775 self
776 }
777
778 pub fn convergence_criteria(mut self, criteria: ConvergenceCriteria<T>) -> Self {
779 self.config.convergence_criteria = criteria;
780 self
781 }
782
783 pub fn build(self) -> OptimizationCoordinator<T> {
784 OptimizationCoordinator::new(self.config)
785 }
786}
787
788impl<T: Float + Debug + Send + Sync + 'static + Default> Default for CoordinatorBuilder<T> {
789 fn default() -> Self {
790 Self::new()
791 }
792}
793
794#[cfg(test)]
795mod tests {
796 use super::*;
797
798 #[test]
799 fn test_coordinator_creation() {
800 let coordinator = CoordinatorBuilder::<f64>::new()
801 .max_concurrent_tasks(5)
802 .enable_anomaly_detection(true)
803 .build();
804
805 let status = coordinator.get_status();
806 assert_eq!(status.active_tasks, 0);
807 assert_eq!(status.health_status, HealthStatus::Healthy);
808 }
809
810 #[test]
811 fn test_task_submission() {
812 let mut coordinator = OptimizationCoordinator::<f64>::new(CoordinatorConfig::default());
813 let task = OptimizationTask::new("test_task".to_string());
814
815 let task_id = coordinator.submit_task(task).unwrap();
816 assert!(!task_id.is_empty());
817
818 let status = coordinator.get_status();
819 assert!(status.active_tasks > 0);
820 }
821
822 #[test]
823 fn test_monitoring() {
824 let mut coordinator = OptimizationCoordinator::<f64>::new(CoordinatorConfig::default());
825 let result = coordinator.monitor_optimization_value("test_task", 1.0);
826
827 assert_eq!(result.task_id, "test_task");
828 assert_eq!(result.value, 1.0);
829 }
830}