1use serde::{Deserialize, Serialize};
8use sklears_core::error::{Result as SklResult, SklearsError};
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex, RwLock};
11use std::time::{Duration, Instant};
12use thiserror::Error;
13
14use super::component_framework::{ComponentConfig, PluggableComponent};
15use super::event_system::EventBus;
16
17#[derive(Debug)]
22pub struct PipelineBuilder {
23 stages: Vec<PipelineStage>,
25 config: PipelineConfiguration,
27 error_strategy: ErrorHandlingStrategy,
29 execution_strategy: ExecutionStrategy,
31 metadata: PipelineMetadata,
33}
34
35impl PipelineBuilder {
36 #[must_use]
38 pub fn new() -> Self {
39 Self {
40 stages: Vec::new(),
41 config: PipelineConfiguration::default(),
42 error_strategy: ErrorHandlingStrategy::FailFast,
43 execution_strategy: ExecutionStrategy::Sequential,
44 metadata: PipelineMetadata::new(),
45 }
46 }
47
48 #[must_use]
50 pub fn add_stage(mut self, component_type: &str, config: ComponentConfig) -> Self {
51 let stage = PipelineStage {
52 stage_id: format!("stage_{}", self.stages.len()),
53 component_type: component_type.to_string(),
54 component_config: config,
55 stage_type: StageType::Component,
56 parallel_branches: Vec::new(),
57 conditional_execution: None,
58 retry_config: None,
59 timeout: None,
60 };
61
62 self.stages.push(stage);
63 self
64 }
65
66 #[must_use]
68 pub fn add_parallel_stage(mut self, branches: Vec<ParallelBranch>) -> Self {
69 let stage = PipelineStage {
70 stage_id: format!("parallel_stage_{}", self.stages.len()),
71 component_type: "parallel".to_string(),
72 component_config: ComponentConfig::new("parallel", "parallel"),
73 stage_type: StageType::Parallel,
74 parallel_branches: branches,
75 conditional_execution: None,
76 retry_config: None,
77 timeout: None,
78 };
79
80 self.stages.push(stage);
81 self
82 }
83
84 #[must_use]
86 pub fn add_conditional_stage(
87 mut self,
88 component_type: &str,
89 config: ComponentConfig,
90 condition: Box<dyn ConditionalExecution>,
91 ) -> Self {
92 let stage = PipelineStage {
93 stage_id: format!("conditional_stage_{}", self.stages.len()),
94 component_type: component_type.to_string(),
95 component_config: config,
96 stage_type: StageType::Conditional,
97 parallel_branches: Vec::new(),
98 conditional_execution: Some(condition),
99 retry_config: None,
100 timeout: None,
101 };
102
103 self.stages.push(stage);
104 self
105 }
106
107 #[must_use]
109 pub fn with_error_strategy(mut self, strategy: ErrorHandlingStrategy) -> Self {
110 self.error_strategy = strategy;
111 self
112 }
113
114 #[must_use]
116 pub fn with_execution_strategy(mut self, strategy: ExecutionStrategy) -> Self {
117 self.execution_strategy = strategy;
118 self
119 }
120
121 #[must_use]
123 pub fn with_timeout(mut self, timeout: Duration) -> Self {
124 self.config.pipeline_timeout = Some(timeout);
125 self
126 }
127
128 #[must_use]
130 pub fn with_retry_config(mut self, config: RetryConfiguration) -> Self {
131 self.config.retry_config = Some(config);
132 self
133 }
134
135 #[must_use]
137 pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
138 self.metadata
139 .custom_metadata
140 .insert(key.to_string(), value.to_string());
141 self
142 }
143
144 pub fn build(self) -> SklResult<Pipeline> {
146 self.validate_pipeline()?;
147
148 Ok(Pipeline {
149 pipeline_id: uuid::Uuid::new_v4().to_string(),
150 stages: self.stages,
151 config: self.config,
152 error_strategy: self.error_strategy,
153 execution_strategy: self.execution_strategy,
154 metadata: self.metadata,
155 state: PipelineState::Created,
156 components: Arc::new(RwLock::new(HashMap::new())),
157 event_bus: Arc::new(RwLock::new(EventBus::new())),
158 execution_context: Arc::new(RwLock::new(ExecutionContext::new())),
159 metrics: Arc::new(Mutex::new(PipelineMetrics::new())),
160 })
161 }
162
163 fn validate_pipeline(&self) -> SklResult<()> {
165 if self.stages.is_empty() {
166 return Err(SklearsError::InvalidInput(
167 "Pipeline must have at least one stage".to_string(),
168 ));
169 }
170
171 for stage in &self.stages {
173 if stage.component_type.is_empty() {
174 return Err(SklearsError::InvalidInput(
175 "Stage component type cannot be empty".to_string(),
176 ));
177 }
178 }
179
180 Ok(())
181 }
182
183 #[must_use]
185 pub fn is_empty(&self) -> bool {
186 self.stages.is_empty()
187 }
188}
189
190pub struct Pipeline {
192 pub pipeline_id: String,
194 pub stages: Vec<PipelineStage>,
196 pub config: PipelineConfiguration,
198 pub error_strategy: ErrorHandlingStrategy,
200 pub execution_strategy: ExecutionStrategy,
202 pub metadata: PipelineMetadata,
204 pub state: PipelineState,
206 pub components: Arc<RwLock<HashMap<String, Box<dyn PluggableComponent>>>>,
208 pub event_bus: Arc<RwLock<EventBus>>,
210 pub execution_context: Arc<RwLock<ExecutionContext>>,
212 pub metrics: Arc<Mutex<PipelineMetrics>>,
214}
215
216impl std::fmt::Debug for Pipeline {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 f.debug_struct("Pipeline")
219 .field("pipeline_id", &self.pipeline_id)
220 .field("stages", &self.stages)
221 .field("config", &self.config)
222 .field("error_strategy", &self.error_strategy)
223 .field("execution_strategy", &self.execution_strategy)
224 .field("metadata", &self.metadata)
225 .field("state", &self.state)
226 .field("components", &"[components: <RwLock>]".to_string())
227 .field("event_bus", &"[event_bus: <RwLock>]".to_string())
228 .field(
229 "execution_context",
230 &"[execution_context: <RwLock>]".to_string(),
231 )
232 .field("metrics", &"[metrics: <Mutex>]".to_string())
233 .finish()
234 }
235}
236
237impl Pipeline {
238 pub async fn execute(&mut self, input_data: PipelineData) -> SklResult<PipelineResult> {
240 let start_time = Instant::now();
241 self.state = PipelineState::Running;
242
243 let mut metrics = self.metrics.lock().unwrap_or_else(|e| e.into_inner());
244 metrics.execution_count += 1;
245 metrics.last_execution_start = Some(start_time);
246 drop(metrics);
247
248 {
250 let mut context = self
251 .execution_context
252 .write()
253 .unwrap_or_else(|e| e.into_inner());
254 context.input_data = input_data;
255 context.execution_id = uuid::Uuid::new_v4().to_string();
256 context.start_time = start_time;
257 }
258
259 let execution_result = match self.execution_strategy {
260 ExecutionStrategy::Sequential => self.execute_sequential().await,
261 ExecutionStrategy::Parallel => self.execute_parallel().await,
262 ExecutionStrategy::Adaptive => self.execute_adaptive().await,
263 };
264
265 let end_time = Instant::now();
266 let execution_duration = end_time.duration_since(start_time);
267
268 let mut metrics = self.metrics.lock().unwrap_or_else(|e| e.into_inner());
270 metrics.last_execution_end = Some(end_time);
271 metrics.total_execution_time += execution_duration;
272
273 if execution_result.is_ok() {
274 metrics.successful_executions += 1;
275 self.state = PipelineState::Completed;
276 } else {
277 metrics.failed_executions += 1;
278 self.state = PipelineState::Failed;
279 }
280
281 execution_result
282 }
283
284 async fn execute_sequential(&mut self) -> SklResult<PipelineResult> {
286 let mut stage_results = Vec::new();
287 let mut current_data = {
288 let context = self
289 .execution_context
290 .read()
291 .unwrap_or_else(|e| e.into_inner());
292 context.input_data.clone()
293 };
294
295 let stages = self.stages.clone(); for (index, stage) in stages.iter().enumerate() {
297 let stage_start = Instant::now();
298
299 match self.execute_stage(stage, ¤t_data).await {
300 Ok(stage_result) => {
301 current_data = stage_result.output_data.clone();
302 stage_results.push(stage_result);
303 }
304 Err(error) => {
305 let stage_result = StageResult {
306 stage_id: stage.stage_id.clone(),
307 success: false,
308 execution_time: stage_start.elapsed(),
309 error: Some(error.to_string()),
310 output_data: PipelineData::empty(),
311 metrics: StageMetrics::new(),
312 };
313 stage_results.push(stage_result);
314
315 return self.handle_stage_error(index, error);
316 }
317 }
318 }
319
320 Ok(PipelineResult {
321 pipeline_id: self.pipeline_id.clone(),
322 success: true,
323 stage_results,
324 final_output: current_data,
325 execution_time: {
326 let context = self
327 .execution_context
328 .read()
329 .unwrap_or_else(|e| e.into_inner());
330 context.start_time.elapsed()
331 },
332 error: None,
333 })
334 }
335
336 async fn execute_parallel(&mut self) -> SklResult<PipelineResult> {
338 self.execute_sequential().await
342 }
343
344 async fn execute_adaptive(&mut self) -> SklResult<PipelineResult> {
346 if self.stages.len() > 3 {
349 self.execute_parallel().await
350 } else {
351 self.execute_sequential().await
352 }
353 }
354
355 async fn execute_stage(
357 &mut self,
358 stage: &PipelineStage,
359 input_data: &PipelineData,
360 ) -> SklResult<StageResult> {
361 let stage_start = Instant::now();
362
363 if let Some(condition) = &stage.conditional_execution {
365 if !condition.should_execute(input_data)? {
366 return Ok(StageResult {
367 stage_id: stage.stage_id.clone(),
368 success: true,
369 execution_time: stage_start.elapsed(),
370 error: None,
371 output_data: input_data.clone(),
372 metrics: StageMetrics::new(),
373 });
374 }
375 }
376
377 let execution_result = match stage.stage_type {
378 StageType::Component => self.execute_component_stage(stage, input_data).await,
379 StageType::Parallel => self.execute_parallel_stage(stage, input_data).await,
380 StageType::Conditional => self.execute_component_stage(stage, input_data).await,
381 };
382
383 let mut stage_result = match execution_result {
384 Ok(result) => result,
385 Err(error) => StageResult {
386 stage_id: stage.stage_id.clone(),
387 success: false,
388 execution_time: stage_start.elapsed(),
389 error: Some(error.to_string()),
390 output_data: PipelineData::empty(),
391 metrics: StageMetrics::new(),
392 },
393 };
394
395 stage_result.execution_time = stage_start.elapsed();
396 Ok(stage_result)
397 }
398
399 async fn execute_component_stage(
401 &mut self,
402 stage: &PipelineStage,
403 input_data: &PipelineData,
404 ) -> SklResult<StageResult> {
405 let component_key = format!("{}_{}", stage.component_type, stage.stage_id);
407
408 Ok(StageResult {
416 stage_id: stage.stage_id.clone(),
417 success: true,
418 execution_time: Duration::from_millis(10),
419 error: None,
420 output_data: input_data.clone(),
421 metrics: StageMetrics::new(),
422 })
423 }
424
425 async fn execute_parallel_stage(
427 &mut self,
428 stage: &PipelineStage,
429 input_data: &PipelineData,
430 ) -> SklResult<StageResult> {
431 let mut branch_results = Vec::new();
432
433 for branch in &stage.parallel_branches {
435 let branch_result = self.execute_parallel_branch(branch, input_data).await?;
437 branch_results.push(branch_result);
438 }
439
440 let combined_output = self.combine_parallel_results(&branch_results)?;
442
443 Ok(StageResult {
444 stage_id: stage.stage_id.clone(),
445 success: true,
446 execution_time: Duration::from_millis(20),
447 error: None,
448 output_data: combined_output,
449 metrics: StageMetrics::new(),
450 })
451 }
452
453 async fn execute_parallel_branch(
455 &mut self,
456 branch: &ParallelBranch,
457 input_data: &PipelineData,
458 ) -> SklResult<PipelineData> {
459 Ok(input_data.clone())
461 }
462
463 fn combine_parallel_results(&self, results: &[PipelineData]) -> SklResult<PipelineData> {
465 if let Some(first) = results.first() {
467 Ok(first.clone())
468 } else {
469 Ok(PipelineData::empty())
470 }
471 }
472
473 fn handle_stage_error(
475 &self,
476 stage_index: usize,
477 error: SklearsError,
478 ) -> SklResult<PipelineResult> {
479 match self.error_strategy {
480 ErrorHandlingStrategy::FailFast => Err(error),
481 ErrorHandlingStrategy::ContinueOnError => {
482 Ok(PipelineResult {
484 pipeline_id: self.pipeline_id.clone(),
485 success: false,
486 stage_results: Vec::new(),
487 final_output: PipelineData::empty(),
488 execution_time: Duration::from_secs(0),
489 error: Some(error.to_string()),
490 })
491 }
492 ErrorHandlingStrategy::Retry => {
493 Err(error)
495 }
496 }
497 }
498
499 #[must_use]
501 pub fn get_metrics(&self) -> PipelineMetrics {
502 let metrics = self.metrics.lock().unwrap_or_else(|e| e.into_inner());
503 metrics.clone()
504 }
505
506 #[must_use]
508 pub fn get_state(&self) -> PipelineState {
509 self.state.clone()
510 }
511}
512
513#[derive(Debug)]
515pub struct PipelineStage {
516 pub stage_id: String,
518 pub component_type: String,
520 pub component_config: ComponentConfig,
522 pub stage_type: StageType,
524 pub parallel_branches: Vec<ParallelBranch>,
526 pub conditional_execution: Option<Box<dyn ConditionalExecution>>,
528 pub retry_config: Option<RetryConfiguration>,
530 pub timeout: Option<Duration>,
532}
533
534impl Clone for PipelineStage {
535 fn clone(&self) -> Self {
536 Self {
537 stage_id: self.stage_id.clone(),
538 component_type: self.component_type.clone(),
539 component_config: self.component_config.clone(),
540 stage_type: self.stage_type.clone(),
541 parallel_branches: self.parallel_branches.clone(),
542 conditional_execution: None, retry_config: self.retry_config.clone(),
544 timeout: self.timeout,
545 }
546 }
547}
548
549#[derive(Debug, Clone, PartialEq)]
551pub enum StageType {
552 Component,
554 Parallel,
556 Conditional,
558}
559
560#[derive(Debug, Clone)]
562pub struct ParallelBranch {
563 pub branch_id: String,
565 pub component_type: String,
567 pub config: ComponentConfig,
569 pub weight: f64,
571}
572
573pub trait ConditionalExecution: Send + Sync + std::fmt::Debug {
575 fn should_execute(&self, input_data: &PipelineData) -> SklResult<bool>;
577
578 fn description(&self) -> String;
580}
581
582#[derive(Debug, Clone, PartialEq, Default)]
584pub enum ErrorHandlingStrategy {
585 #[default]
587 FailFast,
588 ContinueOnError,
590 Retry,
592}
593
594#[derive(Debug, Clone, PartialEq, Default)]
596pub enum ExecutionStrategy {
597 #[default]
598 Sequential,
600 Parallel,
602 Adaptive,
604}
605
606#[derive(Debug, Clone, PartialEq)]
608pub enum PipelineState {
609 Created,
611 Running,
613 Completed,
615 Failed,
617 Cancelled,
619 Paused,
621}
622
623#[derive(Debug, Clone, Serialize, Deserialize)]
625pub struct PipelineData {
626 pub data: HashMap<String, serde_json::Value>,
628 pub metadata: HashMap<String, String>,
630 pub timestamp: Option<String>,
632}
633
634impl PipelineData {
635 #[must_use]
637 pub fn empty() -> Self {
638 Self {
639 data: HashMap::new(),
640 metadata: HashMap::new(),
641 timestamp: None,
642 }
643 }
644
645 #[must_use]
647 pub fn new(data: HashMap<String, serde_json::Value>) -> Self {
648 Self {
649 data,
650 metadata: HashMap::new(),
651 timestamp: Some(chrono::Utc::now().to_rfc3339()),
652 }
653 }
654
655 #[must_use]
657 pub fn with_data(mut self, key: &str, value: serde_json::Value) -> Self {
658 self.data.insert(key.to_string(), value);
659 self
660 }
661
662 #[must_use]
664 pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
665 self.metadata.insert(key.to_string(), value.to_string());
666 self
667 }
668
669 #[must_use]
671 pub fn get_data(&self, key: &str) -> Option<&serde_json::Value> {
672 self.data.get(key)
673 }
674
675 pub fn get_metadata(&self, key: &str) -> Option<&str> {
677 self.metadata.get(key).map(String::as_str)
678 }
679}
680
681#[derive(Debug, Clone)]
683pub struct PipelineResult {
684 pub pipeline_id: String,
686 pub success: bool,
688 pub stage_results: Vec<StageResult>,
690 pub final_output: PipelineData,
692 pub execution_time: Duration,
694 pub error: Option<String>,
696}
697
698#[derive(Debug, Clone)]
700pub struct StageResult {
701 pub stage_id: String,
703 pub success: bool,
705 pub execution_time: Duration,
707 pub error: Option<String>,
709 pub output_data: PipelineData,
711 pub metrics: StageMetrics,
713}
714
715#[derive(Debug, Clone)]
717pub struct StageMetrics {
718 pub memory_usage: u64,
720 pub cpu_usage: f64,
722 pub processed_items: u64,
724 pub custom_metrics: HashMap<String, f64>,
726}
727
728impl StageMetrics {
729 #[must_use]
730 pub fn new() -> Self {
731 Self {
732 memory_usage: 0,
733 cpu_usage: 0.0,
734 processed_items: 0,
735 custom_metrics: HashMap::new(),
736 }
737 }
738}
739
740#[derive(Debug, Clone)]
742pub struct PipelineConfiguration {
743 pub pipeline_timeout: Option<Duration>,
745 pub max_parallel_stages: usize,
747 pub retry_config: Option<RetryConfiguration>,
749 pub enable_metrics: bool,
751 pub enable_profiling: bool,
753}
754
755impl Default for PipelineConfiguration {
756 fn default() -> Self {
757 Self {
758 pipeline_timeout: None,
759 max_parallel_stages: 4,
760 retry_config: None,
761 enable_metrics: true,
762 enable_profiling: false,
763 }
764 }
765}
766
767#[derive(Debug, Clone)]
769pub struct RetryConfiguration {
770 pub max_attempts: u32,
772 pub base_delay: Duration,
774 pub backoff_multiplier: f64,
776 pub max_delay: Duration,
778}
779
780#[derive(Debug, Clone)]
782pub struct PipelineMetadata {
783 pub name: Option<String>,
785 pub description: Option<String>,
787 pub version: Option<String>,
789 pub author: Option<String>,
791 pub created_at: Instant,
793 pub custom_metadata: HashMap<String, String>,
795}
796
797impl PipelineMetadata {
798 #[must_use]
799 pub fn new() -> Self {
800 Self {
801 name: None,
802 description: None,
803 version: None,
804 author: None,
805 created_at: Instant::now(),
806 custom_metadata: HashMap::new(),
807 }
808 }
809}
810
811#[derive(Debug)]
813pub struct ExecutionContext {
814 pub execution_id: String,
816 pub input_data: PipelineData,
818 pub start_time: Instant,
820 pub variables: HashMap<String, serde_json::Value>,
822 pub trace: Vec<ExecutionTrace>,
824}
825
826impl ExecutionContext {
827 #[must_use]
828 pub fn new() -> Self {
829 Self {
830 execution_id: String::new(),
831 input_data: PipelineData::empty(),
832 start_time: Instant::now(),
833 variables: HashMap::new(),
834 trace: Vec::new(),
835 }
836 }
837
838 pub fn add_trace(&mut self, stage_id: &str, event: &str, data: Option<serde_json::Value>) {
840 self.trace.push(ExecutionTrace {
841 timestamp: Instant::now(),
842 stage_id: stage_id.to_string(),
843 event: event.to_string(),
844 data,
845 });
846 }
847}
848
849#[derive(Debug, Clone)]
851pub struct ExecutionTrace {
852 pub timestamp: Instant,
854 pub stage_id: String,
856 pub event: String,
858 pub data: Option<serde_json::Value>,
860}
861
862#[derive(Debug, Clone)]
864pub struct PipelineMetrics {
865 pub execution_count: u64,
867 pub successful_executions: u64,
869 pub failed_executions: u64,
871 pub total_execution_time: Duration,
873 pub last_execution_start: Option<Instant>,
875 pub last_execution_end: Option<Instant>,
877 pub average_execution_time: Duration,
879}
880
881impl PipelineMetrics {
882 #[must_use]
883 pub fn new() -> Self {
884 Self {
885 execution_count: 0,
886 successful_executions: 0,
887 failed_executions: 0,
888 total_execution_time: Duration::from_secs(0),
889 last_execution_start: None,
890 last_execution_end: None,
891 average_execution_time: Duration::from_secs(0),
892 }
893 }
894
895 #[must_use]
897 pub fn success_rate(&self) -> f64 {
898 if self.execution_count == 0 {
899 0.0
900 } else {
901 self.successful_executions as f64 / self.execution_count as f64
902 }
903 }
904
905 pub fn update_average(&mut self) {
907 if self.execution_count > 0 {
908 self.average_execution_time = self.total_execution_time / self.execution_count as u32;
909 }
910 }
911}
912
913#[derive(Debug, Error)]
915pub enum PipelineError {
916 #[error("Pipeline validation failed: {0}")]
917 ValidationFailed(String),
918
919 #[error("Stage execution failed: {stage_id}: {error}")]
920 StageExecutionFailed { stage_id: String, error: String },
921
922 #[error("Pipeline timeout exceeded: {0:?}")]
923 TimeoutExceeded(Duration),
924
925 #[error("Invalid pipeline state: {0}")]
926 InvalidState(String),
927
928 #[error("Component not found: {0}")]
929 ComponentNotFound(String),
930}
931
932impl Default for PipelineBuilder {
933 fn default() -> Self {
934 Self::new()
935 }
936}
937
938impl Default for PipelineMetadata {
939 fn default() -> Self {
940 Self::new()
941 }
942}
943
944impl Default for ExecutionContext {
945 fn default() -> Self {
946 Self::new()
947 }
948}
949
950impl Default for PipelineMetrics {
951 fn default() -> Self {
952 Self::new()
953 }
954}
955
956impl Default for StageMetrics {
957 fn default() -> Self {
958 Self::new()
959 }
960}
961
962#[derive(Debug, Clone)]
966pub struct ModularPipeline {
967 pub id: String,
969 pub stages: Vec<PipelineStage>,
971 pub config: PipelineConfig,
973 pub metadata: PipelineMetadata,
975 pub execution_context: Arc<Mutex<ExecutionContext>>,
977}
978
979pub type ModularPipelineBuilder = PipelineBuilder;
981
982#[derive(Debug, Clone, Serialize, Deserialize)]
984pub struct PipelineConfig {
985 pub name: String,
987 pub description: Option<String>,
989 #[serde(skip)]
991 pub execution_strategy: ExecutionStrategy,
992 #[serde(skip)]
994 pub error_handling: ErrorHandlingStrategy,
995 pub resource_constraints: Option<ResourceConstraints>,
997 pub timeout_config: Option<TimeoutConfig>,
999 pub retry_config: Option<RetryConfig>,
1001}
1002
1003#[derive(Debug, Clone, Serialize, Deserialize)]
1005pub struct PipelineStep {
1006 pub id: String,
1008 pub name: String,
1010 pub component_type: String,
1012 pub config: ComponentConfig,
1014 pub dependencies: Vec<String>,
1016 pub condition: Option<String>,
1018 pub retry_policy: Option<RetryPolicy>,
1020}
1021
1022#[derive(Debug, Clone, Serialize, Deserialize)]
1024pub struct ResourceConstraints {
1025 pub max_memory_mb: Option<usize>,
1027 pub max_cpu_cores: Option<usize>,
1029 pub max_execution_time_sec: Option<u64>,
1031 pub max_concurrent_steps: Option<usize>,
1033}
1034
1035#[derive(Debug, Clone, Serialize, Deserialize)]
1037pub struct TimeoutConfig {
1038 pub step_timeout_sec: u64,
1040 pub pipeline_timeout_sec: u64,
1042 pub timeout_action: TimeoutAction,
1044}
1045
1046#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1048pub enum TimeoutAction {
1049 Fail,
1051 Skip,
1053 Retry,
1055 UseDefault,
1057}
1058
1059#[derive(Debug, Clone, Serialize, Deserialize)]
1061pub struct RetryConfig {
1062 pub max_attempts: u32,
1064 pub retry_delay_ms: u64,
1066 pub backoff_strategy: BackoffStrategy,
1068 pub retryable_errors: Vec<String>,
1070}
1071
1072#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1074pub enum BackoffStrategy {
1075 Fixed,
1077 Linear,
1079 Exponential,
1081 Custom(String),
1083}
1084
1085#[derive(Debug, Clone, Serialize, Deserialize)]
1087pub struct RetryPolicy {
1088 pub enabled: bool,
1090 pub max_attempts: u32,
1092 pub delay_ms: u64,
1094 pub backoff_multiplier: f64,
1096}
1097
1098impl ModularPipeline {
1099 #[must_use]
1101 pub fn new(id: String, config: PipelineConfig) -> Self {
1102 Self {
1103 id,
1104 stages: Vec::new(),
1105 config,
1106 metadata: PipelineMetadata::new(),
1107 execution_context: Arc::new(Mutex::new(ExecutionContext::new())),
1108 }
1109 }
1110
1111 pub fn add_step(&mut self, step: PipelineStep) {
1113 let stage = PipelineStage {
1115 stage_id: step.id.clone(),
1116 component_type: step.component_type,
1117 component_config: step.config,
1118 stage_type: StageType::Component,
1119 parallel_branches: Vec::new(),
1120 conditional_execution: None,
1121 retry_config: None,
1122 timeout: None,
1123 };
1124 self.stages.push(stage);
1125 }
1126
1127 #[must_use]
1129 pub fn get_step(&self, step_id: &str) -> Option<PipelineStep> {
1130 self.stages
1131 .iter()
1132 .find(|stage| stage.stage_id == step_id)
1133 .map(|stage| {
1134 PipelineStep {
1136 id: stage.stage_id.clone(),
1137 name: stage.stage_id.clone(), component_type: stage.component_type.clone(),
1139 config: stage.component_config.clone(),
1140 dependencies: Vec::new(), condition: None,
1142 retry_policy: None,
1143 }
1144 })
1145 }
1146}
1147
1148impl Default for PipelineConfig {
1149 fn default() -> Self {
1150 Self {
1151 name: "DefaultPipeline".to_string(),
1152 description: None,
1153 execution_strategy: ExecutionStrategy::Sequential,
1154 error_handling: ErrorHandlingStrategy::FailFast,
1155 resource_constraints: None,
1156 timeout_config: None,
1157 retry_config: None,
1158 }
1159 }
1160}
1161
1162impl Default for TimeoutAction {
1163 fn default() -> Self {
1164 Self::Fail
1165 }
1166}
1167
1168impl Default for BackoffStrategy {
1169 fn default() -> Self {
1170 Self::Exponential
1171 }
1172}
1173
1174#[allow(non_snake_case)]
1175#[cfg(test)]
1176mod tests {
1177 use super::*;
1178
1179 #[test]
1180 fn test_pipeline_builder() {
1181 let pipeline = PipelineBuilder::new()
1182 .add_stage("test_component", ComponentConfig::new("test", "test_type"))
1183 .with_error_strategy(ErrorHandlingStrategy::FailFast)
1184 .with_execution_strategy(ExecutionStrategy::Sequential)
1185 .build();
1186
1187 assert!(pipeline.is_ok());
1188 let pipeline = pipeline.expect("operation should succeed");
1189 assert_eq!(pipeline.stages.len(), 1);
1190 assert_eq!(pipeline.error_strategy, ErrorHandlingStrategy::FailFast);
1191 }
1192
1193 #[test]
1194 fn test_pipeline_data() {
1195 let mut data = HashMap::new();
1196 data.insert(
1197 "key1".to_string(),
1198 serde_json::Value::String("value1".to_string()),
1199 );
1200
1201 let pipeline_data = PipelineData::new(data)
1202 .with_metadata("source", "test")
1203 .with_data(
1204 "key2",
1205 serde_json::Value::Number(serde_json::Number::from(42)),
1206 );
1207
1208 assert_eq!(pipeline_data.get_metadata("source"), Some("test"));
1209 assert!(pipeline_data.get_data("key1").is_some());
1210 assert!(pipeline_data.get_data("key2").is_some());
1211 }
1212
1213 #[test]
1214 fn test_pipeline_metrics() {
1215 let mut metrics = PipelineMetrics::new();
1216 metrics.execution_count = 10;
1217 metrics.successful_executions = 8;
1218 metrics.failed_executions = 2;
1219
1220 assert_eq!(metrics.success_rate(), 0.8);
1221 }
1222
1223 #[test]
1224 fn test_execution_context() {
1225 let mut context = ExecutionContext::new();
1226 context.add_trace("stage1", "started", None);
1227 context.add_trace(
1228 "stage1",
1229 "completed",
1230 Some(serde_json::Value::String("success".to_string())),
1231 );
1232
1233 assert_eq!(context.trace.len(), 2);
1234 assert_eq!(context.trace[0].stage_id, "stage1");
1235 assert_eq!(context.trace[0].event, "started");
1236 }
1237
1238 #[test]
1239 fn test_empty_pipeline_validation() {
1240 let pipeline = PipelineBuilder::new().build();
1241 assert!(pipeline.is_err());
1242 }
1243}