1use serde::{Deserialize, Serialize};
8use sklears_core::error::{Result as SklResult, SklearsError};
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex, RwLock};
11use std::time::{Duration, Instant};
12use thiserror::Error;
13
14use super::component_framework::{ComponentConfig, PluggableComponent};
15use super::event_system::EventBus;
16
17#[derive(Debug)]
22pub struct PipelineBuilder {
23 stages: Vec<PipelineStage>,
25 config: PipelineConfiguration,
27 error_strategy: ErrorHandlingStrategy,
29 execution_strategy: ExecutionStrategy,
31 metadata: PipelineMetadata,
33}
34
35impl PipelineBuilder {
36 #[must_use]
38 pub fn new() -> Self {
39 Self {
40 stages: Vec::new(),
41 config: PipelineConfiguration::default(),
42 error_strategy: ErrorHandlingStrategy::FailFast,
43 execution_strategy: ExecutionStrategy::Sequential,
44 metadata: PipelineMetadata::new(),
45 }
46 }
47
48 #[must_use]
50 pub fn add_stage(mut self, component_type: &str, config: ComponentConfig) -> Self {
51 let stage = PipelineStage {
52 stage_id: format!("stage_{}", self.stages.len()),
53 component_type: component_type.to_string(),
54 component_config: config,
55 stage_type: StageType::Component,
56 parallel_branches: Vec::new(),
57 conditional_execution: None,
58 retry_config: None,
59 timeout: None,
60 };
61
62 self.stages.push(stage);
63 self
64 }
65
66 #[must_use]
68 pub fn add_parallel_stage(mut self, branches: Vec<ParallelBranch>) -> Self {
69 let stage = PipelineStage {
70 stage_id: format!("parallel_stage_{}", self.stages.len()),
71 component_type: "parallel".to_string(),
72 component_config: ComponentConfig::new("parallel", "parallel"),
73 stage_type: StageType::Parallel,
74 parallel_branches: branches,
75 conditional_execution: None,
76 retry_config: None,
77 timeout: None,
78 };
79
80 self.stages.push(stage);
81 self
82 }
83
84 #[must_use]
86 pub fn add_conditional_stage(
87 mut self,
88 component_type: &str,
89 config: ComponentConfig,
90 condition: Box<dyn ConditionalExecution>,
91 ) -> Self {
92 let stage = PipelineStage {
93 stage_id: format!("conditional_stage_{}", self.stages.len()),
94 component_type: component_type.to_string(),
95 component_config: config,
96 stage_type: StageType::Conditional,
97 parallel_branches: Vec::new(),
98 conditional_execution: Some(condition),
99 retry_config: None,
100 timeout: None,
101 };
102
103 self.stages.push(stage);
104 self
105 }
106
107 #[must_use]
109 pub fn with_error_strategy(mut self, strategy: ErrorHandlingStrategy) -> Self {
110 self.error_strategy = strategy;
111 self
112 }
113
114 #[must_use]
116 pub fn with_execution_strategy(mut self, strategy: ExecutionStrategy) -> Self {
117 self.execution_strategy = strategy;
118 self
119 }
120
121 #[must_use]
123 pub fn with_timeout(mut self, timeout: Duration) -> Self {
124 self.config.pipeline_timeout = Some(timeout);
125 self
126 }
127
128 #[must_use]
130 pub fn with_retry_config(mut self, config: RetryConfiguration) -> Self {
131 self.config.retry_config = Some(config);
132 self
133 }
134
135 #[must_use]
137 pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
138 self.metadata
139 .custom_metadata
140 .insert(key.to_string(), value.to_string());
141 self
142 }
143
144 pub fn build(self) -> SklResult<Pipeline> {
146 self.validate_pipeline()?;
147
148 Ok(Pipeline {
149 pipeline_id: uuid::Uuid::new_v4().to_string(),
150 stages: self.stages,
151 config: self.config,
152 error_strategy: self.error_strategy,
153 execution_strategy: self.execution_strategy,
154 metadata: self.metadata,
155 state: PipelineState::Created,
156 components: Arc::new(RwLock::new(HashMap::new())),
157 event_bus: Arc::new(RwLock::new(EventBus::new())),
158 execution_context: Arc::new(RwLock::new(ExecutionContext::new())),
159 metrics: Arc::new(Mutex::new(PipelineMetrics::new())),
160 })
161 }
162
163 fn validate_pipeline(&self) -> SklResult<()> {
165 if self.stages.is_empty() {
166 return Err(SklearsError::InvalidInput(
167 "Pipeline must have at least one stage".to_string(),
168 ));
169 }
170
171 for stage in &self.stages {
173 if stage.component_type.is_empty() {
174 return Err(SklearsError::InvalidInput(
175 "Stage component type cannot be empty".to_string(),
176 ));
177 }
178 }
179
180 Ok(())
181 }
182
183 #[must_use]
185 pub fn is_empty(&self) -> bool {
186 self.stages.is_empty()
187 }
188}
189
190pub struct Pipeline {
192 pub pipeline_id: String,
194 pub stages: Vec<PipelineStage>,
196 pub config: PipelineConfiguration,
198 pub error_strategy: ErrorHandlingStrategy,
200 pub execution_strategy: ExecutionStrategy,
202 pub metadata: PipelineMetadata,
204 pub state: PipelineState,
206 pub components: Arc<RwLock<HashMap<String, Box<dyn PluggableComponent>>>>,
208 pub event_bus: Arc<RwLock<EventBus>>,
210 pub execution_context: Arc<RwLock<ExecutionContext>>,
212 pub metrics: Arc<Mutex<PipelineMetrics>>,
214}
215
216impl std::fmt::Debug for Pipeline {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 f.debug_struct("Pipeline")
219 .field("pipeline_id", &self.pipeline_id)
220 .field("stages", &self.stages)
221 .field("config", &self.config)
222 .field("error_strategy", &self.error_strategy)
223 .field("execution_strategy", &self.execution_strategy)
224 .field("metadata", &self.metadata)
225 .field("state", &self.state)
226 .field("components", &"[components: <RwLock>]".to_string())
227 .field("event_bus", &"[event_bus: <RwLock>]".to_string())
228 .field(
229 "execution_context",
230 &"[execution_context: <RwLock>]".to_string(),
231 )
232 .field("metrics", &"[metrics: <Mutex>]".to_string())
233 .finish()
234 }
235}
236
237impl Pipeline {
238 pub async fn execute(&mut self, input_data: PipelineData) -> SklResult<PipelineResult> {
240 let start_time = Instant::now();
241 self.state = PipelineState::Running;
242
243 let mut metrics = self.metrics.lock().unwrap();
244 metrics.execution_count += 1;
245 metrics.last_execution_start = Some(start_time);
246 drop(metrics);
247
248 {
250 let mut context = self.execution_context.write().unwrap();
251 context.input_data = input_data;
252 context.execution_id = uuid::Uuid::new_v4().to_string();
253 context.start_time = start_time;
254 }
255
256 let execution_result = match self.execution_strategy {
257 ExecutionStrategy::Sequential => self.execute_sequential().await,
258 ExecutionStrategy::Parallel => self.execute_parallel().await,
259 ExecutionStrategy::Adaptive => self.execute_adaptive().await,
260 };
261
262 let end_time = Instant::now();
263 let execution_duration = end_time.duration_since(start_time);
264
265 let mut metrics = self.metrics.lock().unwrap();
267 metrics.last_execution_end = Some(end_time);
268 metrics.total_execution_time += execution_duration;
269
270 if execution_result.is_ok() {
271 metrics.successful_executions += 1;
272 self.state = PipelineState::Completed;
273 } else {
274 metrics.failed_executions += 1;
275 self.state = PipelineState::Failed;
276 }
277
278 execution_result
279 }
280
281 async fn execute_sequential(&mut self) -> SklResult<PipelineResult> {
283 let mut stage_results = Vec::new();
284 let mut current_data = {
285 let context = self.execution_context.read().unwrap();
286 context.input_data.clone()
287 };
288
289 let stages = self.stages.clone(); for (index, stage) in stages.iter().enumerate() {
291 let stage_start = Instant::now();
292
293 match self.execute_stage(stage, ¤t_data).await {
294 Ok(stage_result) => {
295 current_data = stage_result.output_data.clone();
296 stage_results.push(stage_result);
297 }
298 Err(error) => {
299 let stage_result = StageResult {
300 stage_id: stage.stage_id.clone(),
301 success: false,
302 execution_time: stage_start.elapsed(),
303 error: Some(error.to_string()),
304 output_data: PipelineData::empty(),
305 metrics: StageMetrics::new(),
306 };
307 stage_results.push(stage_result);
308
309 return self.handle_stage_error(index, error);
310 }
311 }
312 }
313
314 Ok(PipelineResult {
315 pipeline_id: self.pipeline_id.clone(),
316 success: true,
317 stage_results,
318 final_output: current_data,
319 execution_time: {
320 let context = self.execution_context.read().unwrap();
321 context.start_time.elapsed()
322 },
323 error: None,
324 })
325 }
326
327 async fn execute_parallel(&mut self) -> SklResult<PipelineResult> {
329 self.execute_sequential().await
333 }
334
335 async fn execute_adaptive(&mut self) -> SklResult<PipelineResult> {
337 if self.stages.len() > 3 {
340 self.execute_parallel().await
341 } else {
342 self.execute_sequential().await
343 }
344 }
345
346 async fn execute_stage(
348 &mut self,
349 stage: &PipelineStage,
350 input_data: &PipelineData,
351 ) -> SklResult<StageResult> {
352 let stage_start = Instant::now();
353
354 if let Some(condition) = &stage.conditional_execution {
356 if !condition.should_execute(input_data)? {
357 return Ok(StageResult {
358 stage_id: stage.stage_id.clone(),
359 success: true,
360 execution_time: stage_start.elapsed(),
361 error: None,
362 output_data: input_data.clone(),
363 metrics: StageMetrics::new(),
364 });
365 }
366 }
367
368 let execution_result = match stage.stage_type {
369 StageType::Component => self.execute_component_stage(stage, input_data).await,
370 StageType::Parallel => self.execute_parallel_stage(stage, input_data).await,
371 StageType::Conditional => self.execute_component_stage(stage, input_data).await,
372 };
373
374 let mut stage_result = match execution_result {
375 Ok(result) => result,
376 Err(error) => StageResult {
377 stage_id: stage.stage_id.clone(),
378 success: false,
379 execution_time: stage_start.elapsed(),
380 error: Some(error.to_string()),
381 output_data: PipelineData::empty(),
382 metrics: StageMetrics::new(),
383 },
384 };
385
386 stage_result.execution_time = stage_start.elapsed();
387 Ok(stage_result)
388 }
389
390 async fn execute_component_stage(
392 &mut self,
393 stage: &PipelineStage,
394 input_data: &PipelineData,
395 ) -> SklResult<StageResult> {
396 let component_key = format!("{}_{}", stage.component_type, stage.stage_id);
398
399 Ok(StageResult {
407 stage_id: stage.stage_id.clone(),
408 success: true,
409 execution_time: Duration::from_millis(10),
410 error: None,
411 output_data: input_data.clone(),
412 metrics: StageMetrics::new(),
413 })
414 }
415
416 async fn execute_parallel_stage(
418 &mut self,
419 stage: &PipelineStage,
420 input_data: &PipelineData,
421 ) -> SklResult<StageResult> {
422 let mut branch_results = Vec::new();
423
424 for branch in &stage.parallel_branches {
426 let branch_result = self.execute_parallel_branch(branch, input_data).await?;
428 branch_results.push(branch_result);
429 }
430
431 let combined_output = self.combine_parallel_results(&branch_results)?;
433
434 Ok(StageResult {
435 stage_id: stage.stage_id.clone(),
436 success: true,
437 execution_time: Duration::from_millis(20),
438 error: None,
439 output_data: combined_output,
440 metrics: StageMetrics::new(),
441 })
442 }
443
444 async fn execute_parallel_branch(
446 &mut self,
447 branch: &ParallelBranch,
448 input_data: &PipelineData,
449 ) -> SklResult<PipelineData> {
450 Ok(input_data.clone())
452 }
453
454 fn combine_parallel_results(&self, results: &[PipelineData]) -> SklResult<PipelineData> {
456 if let Some(first) = results.first() {
458 Ok(first.clone())
459 } else {
460 Ok(PipelineData::empty())
461 }
462 }
463
464 fn handle_stage_error(
466 &self,
467 stage_index: usize,
468 error: SklearsError,
469 ) -> SklResult<PipelineResult> {
470 match self.error_strategy {
471 ErrorHandlingStrategy::FailFast => Err(error),
472 ErrorHandlingStrategy::ContinueOnError => {
473 Ok(PipelineResult {
475 pipeline_id: self.pipeline_id.clone(),
476 success: false,
477 stage_results: Vec::new(),
478 final_output: PipelineData::empty(),
479 execution_time: Duration::from_secs(0),
480 error: Some(error.to_string()),
481 })
482 }
483 ErrorHandlingStrategy::Retry => {
484 Err(error)
486 }
487 }
488 }
489
490 #[must_use]
492 pub fn get_metrics(&self) -> PipelineMetrics {
493 let metrics = self.metrics.lock().unwrap();
494 metrics.clone()
495 }
496
497 #[must_use]
499 pub fn get_state(&self) -> PipelineState {
500 self.state.clone()
501 }
502}
503
504#[derive(Debug)]
506pub struct PipelineStage {
507 pub stage_id: String,
509 pub component_type: String,
511 pub component_config: ComponentConfig,
513 pub stage_type: StageType,
515 pub parallel_branches: Vec<ParallelBranch>,
517 pub conditional_execution: Option<Box<dyn ConditionalExecution>>,
519 pub retry_config: Option<RetryConfiguration>,
521 pub timeout: Option<Duration>,
523}
524
525impl Clone for PipelineStage {
526 fn clone(&self) -> Self {
527 Self {
528 stage_id: self.stage_id.clone(),
529 component_type: self.component_type.clone(),
530 component_config: self.component_config.clone(),
531 stage_type: self.stage_type.clone(),
532 parallel_branches: self.parallel_branches.clone(),
533 conditional_execution: None, retry_config: self.retry_config.clone(),
535 timeout: self.timeout,
536 }
537 }
538}
539
540#[derive(Debug, Clone, PartialEq)]
542pub enum StageType {
543 Component,
545 Parallel,
547 Conditional,
549}
550
551#[derive(Debug, Clone)]
553pub struct ParallelBranch {
554 pub branch_id: String,
556 pub component_type: String,
558 pub config: ComponentConfig,
560 pub weight: f64,
562}
563
564pub trait ConditionalExecution: Send + Sync + std::fmt::Debug {
566 fn should_execute(&self, input_data: &PipelineData) -> SklResult<bool>;
568
569 fn description(&self) -> String;
571}
572
573#[derive(Debug, Clone, PartialEq, Default)]
575pub enum ErrorHandlingStrategy {
576 #[default]
578 FailFast,
579 ContinueOnError,
581 Retry,
583}
584
585#[derive(Debug, Clone, PartialEq, Default)]
587pub enum ExecutionStrategy {
588 #[default]
589 Sequential,
591 Parallel,
593 Adaptive,
595}
596
597#[derive(Debug, Clone, PartialEq)]
599pub enum PipelineState {
600 Created,
602 Running,
604 Completed,
606 Failed,
608 Cancelled,
610 Paused,
612}
613
614#[derive(Debug, Clone, Serialize, Deserialize)]
616pub struct PipelineData {
617 pub data: HashMap<String, serde_json::Value>,
619 pub metadata: HashMap<String, String>,
621 pub timestamp: Option<String>,
623}
624
625impl PipelineData {
626 #[must_use]
628 pub fn empty() -> Self {
629 Self {
630 data: HashMap::new(),
631 metadata: HashMap::new(),
632 timestamp: None,
633 }
634 }
635
636 #[must_use]
638 pub fn new(data: HashMap<String, serde_json::Value>) -> Self {
639 Self {
640 data,
641 metadata: HashMap::new(),
642 timestamp: Some(chrono::Utc::now().to_rfc3339()),
643 }
644 }
645
646 #[must_use]
648 pub fn with_data(mut self, key: &str, value: serde_json::Value) -> Self {
649 self.data.insert(key.to_string(), value);
650 self
651 }
652
653 #[must_use]
655 pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
656 self.metadata.insert(key.to_string(), value.to_string());
657 self
658 }
659
660 #[must_use]
662 pub fn get_data(&self, key: &str) -> Option<&serde_json::Value> {
663 self.data.get(key)
664 }
665
666 pub fn get_metadata(&self, key: &str) -> Option<&str> {
668 self.metadata.get(key).map(String::as_str)
669 }
670}
671
672#[derive(Debug, Clone)]
674pub struct PipelineResult {
675 pub pipeline_id: String,
677 pub success: bool,
679 pub stage_results: Vec<StageResult>,
681 pub final_output: PipelineData,
683 pub execution_time: Duration,
685 pub error: Option<String>,
687}
688
689#[derive(Debug, Clone)]
691pub struct StageResult {
692 pub stage_id: String,
694 pub success: bool,
696 pub execution_time: Duration,
698 pub error: Option<String>,
700 pub output_data: PipelineData,
702 pub metrics: StageMetrics,
704}
705
706#[derive(Debug, Clone)]
708pub struct StageMetrics {
709 pub memory_usage: u64,
711 pub cpu_usage: f64,
713 pub processed_items: u64,
715 pub custom_metrics: HashMap<String, f64>,
717}
718
719impl StageMetrics {
720 #[must_use]
721 pub fn new() -> Self {
722 Self {
723 memory_usage: 0,
724 cpu_usage: 0.0,
725 processed_items: 0,
726 custom_metrics: HashMap::new(),
727 }
728 }
729}
730
731#[derive(Debug, Clone)]
733pub struct PipelineConfiguration {
734 pub pipeline_timeout: Option<Duration>,
736 pub max_parallel_stages: usize,
738 pub retry_config: Option<RetryConfiguration>,
740 pub enable_metrics: bool,
742 pub enable_profiling: bool,
744}
745
746impl Default for PipelineConfiguration {
747 fn default() -> Self {
748 Self {
749 pipeline_timeout: None,
750 max_parallel_stages: 4,
751 retry_config: None,
752 enable_metrics: true,
753 enable_profiling: false,
754 }
755 }
756}
757
758#[derive(Debug, Clone)]
760pub struct RetryConfiguration {
761 pub max_attempts: u32,
763 pub base_delay: Duration,
765 pub backoff_multiplier: f64,
767 pub max_delay: Duration,
769}
770
771#[derive(Debug, Clone)]
773pub struct PipelineMetadata {
774 pub name: Option<String>,
776 pub description: Option<String>,
778 pub version: Option<String>,
780 pub author: Option<String>,
782 pub created_at: Instant,
784 pub custom_metadata: HashMap<String, String>,
786}
787
788impl PipelineMetadata {
789 #[must_use]
790 pub fn new() -> Self {
791 Self {
792 name: None,
793 description: None,
794 version: None,
795 author: None,
796 created_at: Instant::now(),
797 custom_metadata: HashMap::new(),
798 }
799 }
800}
801
802#[derive(Debug)]
804pub struct ExecutionContext {
805 pub execution_id: String,
807 pub input_data: PipelineData,
809 pub start_time: Instant,
811 pub variables: HashMap<String, serde_json::Value>,
813 pub trace: Vec<ExecutionTrace>,
815}
816
817impl ExecutionContext {
818 #[must_use]
819 pub fn new() -> Self {
820 Self {
821 execution_id: String::new(),
822 input_data: PipelineData::empty(),
823 start_time: Instant::now(),
824 variables: HashMap::new(),
825 trace: Vec::new(),
826 }
827 }
828
829 pub fn add_trace(&mut self, stage_id: &str, event: &str, data: Option<serde_json::Value>) {
831 self.trace.push(ExecutionTrace {
832 timestamp: Instant::now(),
833 stage_id: stage_id.to_string(),
834 event: event.to_string(),
835 data,
836 });
837 }
838}
839
840#[derive(Debug, Clone)]
842pub struct ExecutionTrace {
843 pub timestamp: Instant,
845 pub stage_id: String,
847 pub event: String,
849 pub data: Option<serde_json::Value>,
851}
852
853#[derive(Debug, Clone)]
855pub struct PipelineMetrics {
856 pub execution_count: u64,
858 pub successful_executions: u64,
860 pub failed_executions: u64,
862 pub total_execution_time: Duration,
864 pub last_execution_start: Option<Instant>,
866 pub last_execution_end: Option<Instant>,
868 pub average_execution_time: Duration,
870}
871
872impl PipelineMetrics {
873 #[must_use]
874 pub fn new() -> Self {
875 Self {
876 execution_count: 0,
877 successful_executions: 0,
878 failed_executions: 0,
879 total_execution_time: Duration::from_secs(0),
880 last_execution_start: None,
881 last_execution_end: None,
882 average_execution_time: Duration::from_secs(0),
883 }
884 }
885
886 #[must_use]
888 pub fn success_rate(&self) -> f64 {
889 if self.execution_count == 0 {
890 0.0
891 } else {
892 self.successful_executions as f64 / self.execution_count as f64
893 }
894 }
895
896 pub fn update_average(&mut self) {
898 if self.execution_count > 0 {
899 self.average_execution_time = self.total_execution_time / self.execution_count as u32;
900 }
901 }
902}
903
904#[derive(Debug, Error)]
906pub enum PipelineError {
907 #[error("Pipeline validation failed: {0}")]
908 ValidationFailed(String),
909
910 #[error("Stage execution failed: {stage_id}: {error}")]
911 StageExecutionFailed { stage_id: String, error: String },
912
913 #[error("Pipeline timeout exceeded: {0:?}")]
914 TimeoutExceeded(Duration),
915
916 #[error("Invalid pipeline state: {0}")]
917 InvalidState(String),
918
919 #[error("Component not found: {0}")]
920 ComponentNotFound(String),
921}
922
923impl Default for PipelineBuilder {
924 fn default() -> Self {
925 Self::new()
926 }
927}
928
929impl Default for PipelineMetadata {
930 fn default() -> Self {
931 Self::new()
932 }
933}
934
935impl Default for ExecutionContext {
936 fn default() -> Self {
937 Self::new()
938 }
939}
940
941impl Default for PipelineMetrics {
942 fn default() -> Self {
943 Self::new()
944 }
945}
946
947impl Default for StageMetrics {
948 fn default() -> Self {
949 Self::new()
950 }
951}
952
953#[derive(Debug, Clone)]
957pub struct ModularPipeline {
958 pub id: String,
960 pub stages: Vec<PipelineStage>,
962 pub config: PipelineConfig,
964 pub metadata: PipelineMetadata,
966 pub execution_context: Arc<Mutex<ExecutionContext>>,
968}
969
970pub type ModularPipelineBuilder = PipelineBuilder;
972
973#[derive(Debug, Clone, Serialize, Deserialize)]
975pub struct PipelineConfig {
976 pub name: String,
978 pub description: Option<String>,
980 #[serde(skip)]
982 pub execution_strategy: ExecutionStrategy,
983 #[serde(skip)]
985 pub error_handling: ErrorHandlingStrategy,
986 pub resource_constraints: Option<ResourceConstraints>,
988 pub timeout_config: Option<TimeoutConfig>,
990 pub retry_config: Option<RetryConfig>,
992}
993
994#[derive(Debug, Clone, Serialize, Deserialize)]
996pub struct PipelineStep {
997 pub id: String,
999 pub name: String,
1001 pub component_type: String,
1003 pub config: ComponentConfig,
1005 pub dependencies: Vec<String>,
1007 pub condition: Option<String>,
1009 pub retry_policy: Option<RetryPolicy>,
1011}
1012
1013#[derive(Debug, Clone, Serialize, Deserialize)]
1015pub struct ResourceConstraints {
1016 pub max_memory_mb: Option<usize>,
1018 pub max_cpu_cores: Option<usize>,
1020 pub max_execution_time_sec: Option<u64>,
1022 pub max_concurrent_steps: Option<usize>,
1024}
1025
1026#[derive(Debug, Clone, Serialize, Deserialize)]
1028pub struct TimeoutConfig {
1029 pub step_timeout_sec: u64,
1031 pub pipeline_timeout_sec: u64,
1033 pub timeout_action: TimeoutAction,
1035}
1036
1037#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1039pub enum TimeoutAction {
1040 Fail,
1042 Skip,
1044 Retry,
1046 UseDefault,
1048}
1049
1050#[derive(Debug, Clone, Serialize, Deserialize)]
1052pub struct RetryConfig {
1053 pub max_attempts: u32,
1055 pub retry_delay_ms: u64,
1057 pub backoff_strategy: BackoffStrategy,
1059 pub retryable_errors: Vec<String>,
1061}
1062
1063#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1065pub enum BackoffStrategy {
1066 Fixed,
1068 Linear,
1070 Exponential,
1072 Custom(String),
1074}
1075
1076#[derive(Debug, Clone, Serialize, Deserialize)]
1078pub struct RetryPolicy {
1079 pub enabled: bool,
1081 pub max_attempts: u32,
1083 pub delay_ms: u64,
1085 pub backoff_multiplier: f64,
1087}
1088
1089impl ModularPipeline {
1090 #[must_use]
1092 pub fn new(id: String, config: PipelineConfig) -> Self {
1093 Self {
1094 id,
1095 stages: Vec::new(),
1096 config,
1097 metadata: PipelineMetadata::new(),
1098 execution_context: Arc::new(Mutex::new(ExecutionContext::new())),
1099 }
1100 }
1101
1102 pub fn add_step(&mut self, step: PipelineStep) {
1104 let stage = PipelineStage {
1106 stage_id: step.id.clone(),
1107 component_type: step.component_type,
1108 component_config: step.config,
1109 stage_type: StageType::Component,
1110 parallel_branches: Vec::new(),
1111 conditional_execution: None,
1112 retry_config: None,
1113 timeout: None,
1114 };
1115 self.stages.push(stage);
1116 }
1117
1118 #[must_use]
1120 pub fn get_step(&self, step_id: &str) -> Option<PipelineStep> {
1121 self.stages
1122 .iter()
1123 .find(|stage| stage.stage_id == step_id)
1124 .map(|stage| {
1125 PipelineStep {
1127 id: stage.stage_id.clone(),
1128 name: stage.stage_id.clone(), component_type: stage.component_type.clone(),
1130 config: stage.component_config.clone(),
1131 dependencies: Vec::new(), condition: None,
1133 retry_policy: None,
1134 }
1135 })
1136 }
1137}
1138
1139impl Default for PipelineConfig {
1140 fn default() -> Self {
1141 Self {
1142 name: "DefaultPipeline".to_string(),
1143 description: None,
1144 execution_strategy: ExecutionStrategy::Sequential,
1145 error_handling: ErrorHandlingStrategy::FailFast,
1146 resource_constraints: None,
1147 timeout_config: None,
1148 retry_config: None,
1149 }
1150 }
1151}
1152
1153impl Default for TimeoutAction {
1154 fn default() -> Self {
1155 Self::Fail
1156 }
1157}
1158
1159impl Default for BackoffStrategy {
1160 fn default() -> Self {
1161 Self::Exponential
1162 }
1163}
1164
1165#[allow(non_snake_case)]
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169
1170 #[test]
1171 fn test_pipeline_builder() {
1172 let pipeline = PipelineBuilder::new()
1173 .add_stage("test_component", ComponentConfig::new("test", "test_type"))
1174 .with_error_strategy(ErrorHandlingStrategy::FailFast)
1175 .with_execution_strategy(ExecutionStrategy::Sequential)
1176 .build();
1177
1178 assert!(pipeline.is_ok());
1179 let pipeline = pipeline.unwrap();
1180 assert_eq!(pipeline.stages.len(), 1);
1181 assert_eq!(pipeline.error_strategy, ErrorHandlingStrategy::FailFast);
1182 }
1183
1184 #[test]
1185 fn test_pipeline_data() {
1186 let mut data = HashMap::new();
1187 data.insert(
1188 "key1".to_string(),
1189 serde_json::Value::String("value1".to_string()),
1190 );
1191
1192 let pipeline_data = PipelineData::new(data)
1193 .with_metadata("source", "test")
1194 .with_data(
1195 "key2",
1196 serde_json::Value::Number(serde_json::Number::from(42)),
1197 );
1198
1199 assert_eq!(pipeline_data.get_metadata("source"), Some("test"));
1200 assert!(pipeline_data.get_data("key1").is_some());
1201 assert!(pipeline_data.get_data("key2").is_some());
1202 }
1203
1204 #[test]
1205 fn test_pipeline_metrics() {
1206 let mut metrics = PipelineMetrics::new();
1207 metrics.execution_count = 10;
1208 metrics.successful_executions = 8;
1209 metrics.failed_executions = 2;
1210
1211 assert_eq!(metrics.success_rate(), 0.8);
1212 }
1213
1214 #[test]
1215 fn test_execution_context() {
1216 let mut context = ExecutionContext::new();
1217 context.add_trace("stage1", "started", None);
1218 context.add_trace(
1219 "stage1",
1220 "completed",
1221 Some(serde_json::Value::String("success".to_string())),
1222 );
1223
1224 assert_eq!(context.trace.len(), 2);
1225 assert_eq!(context.trace[0].stage_id, "stage1");
1226 assert_eq!(context.trace[0].event, "started");
1227 }
1228
1229 #[test]
1230 fn test_empty_pipeline_validation() {
1231 let pipeline = PipelineBuilder::new().build();
1232 assert!(pipeline.is_err());
1233 }
1234}