oar_ocr/pipeline/stages/
config.rs

1//! Configuration system for extensible pipeline stages.
2//!
3//! This module provides configuration structures and utilities for
4//! managing extensible pipeline stage configurations.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9use super::extensible::StageId;
10use crate::core::config::{ConfigError, ConfigValidator};
11
12/// Configuration for the extensible pipeline system.
13#[derive(Debug, Clone, Serialize, Deserialize, Default)]
14pub struct ExtensiblePipelineConfig {
15    /// Whether to use the extensible pipeline system
16    pub enabled: bool,
17    /// Global pipeline settings
18    pub global_settings: GlobalPipelineSettings,
19    /// Stage-specific configurations
20    pub stage_configs: HashMap<String, serde_json::Value>,
21    /// Stage execution order (if not specified, dependencies will determine order)
22    pub stage_order: Option<Vec<String>>,
23    /// Stages to enable/disable
24    pub enabled_stages: Option<Vec<String>>,
25    pub disabled_stages: Option<Vec<String>>,
26}
27
28/// Global settings that apply to the entire pipeline.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct GlobalPipelineSettings {
31    /// Centralized parallel processing policy
32    #[serde(default)]
33    pub parallel_policy: crate::core::config::ParallelPolicy,
34    /// Whether to continue processing if a stage fails
35    pub continue_on_stage_failure: bool,
36    /// Global timeout for pipeline execution (in seconds)
37    pub pipeline_timeout_seconds: Option<u64>,
38    /// Whether to collect detailed metrics for each stage
39    pub collect_detailed_metrics: bool,
40}
41
42impl Default for GlobalPipelineSettings {
43    fn default() -> Self {
44        Self {
45            parallel_policy: crate::core::config::ParallelPolicy::default(),
46            continue_on_stage_failure: false,
47            pipeline_timeout_seconds: None,
48            collect_detailed_metrics: true,
49        }
50    }
51}
52
53impl GlobalPipelineSettings {
54    /// Get the effective parallel policy
55    pub fn effective_parallel_policy(&self) -> crate::core::config::ParallelPolicy {
56        self.parallel_policy.clone()
57    }
58}
59
60/// Utility functions for working with extensible pipeline configurations.
61impl ExtensiblePipelineConfig {}
62
63/// Utility functions for working with extensible pipeline configurations.
64impl ExtensiblePipelineConfig {
65    /// Get configuration for a specific stage.
66    pub fn get_stage_config<T>(&self, stage_id: &str) -> Option<T>
67    where
68        T: for<'de> Deserialize<'de>,
69    {
70        self.stage_configs
71            .get(stage_id)
72            .and_then(|value| serde_json::from_value(value.clone()).ok())
73    }
74
75    /// Check if a stage is enabled.
76    pub fn is_stage_enabled(&self, stage_id: &str) -> bool {
77        // If enabled_stages is specified, only those stages are enabled
78        if let Some(ref enabled) = self.enabled_stages {
79            return enabled.contains(&stage_id.to_string());
80        }
81
82        // If disabled_stages is specified, check if this stage is disabled
83        if let Some(ref disabled) = self.disabled_stages {
84            return !disabled.contains(&stage_id.to_string());
85        }
86
87        // By default, all stages are enabled
88        true
89    }
90
91    /// Get the configured stage execution order.
92    pub fn get_stage_order(&self) -> Option<Vec<StageId>> {
93        self.stage_order
94            .as_ref()
95            .map(|order| order.iter().map(|s| StageId::new(s.clone())).collect())
96    }
97}
98
99/// Configuration validation utilities.
100impl ExtensiblePipelineConfig {
101    /// Validate the configuration for consistency and correctness.
102    pub fn validate(&self) -> Result<(), ConfigError> {
103        // Check for conflicting enabled/disabled stage settings
104        if let (Some(enabled), Some(disabled)) = (&self.enabled_stages, &self.disabled_stages) {
105            for stage in enabled {
106                if disabled.contains(stage) {
107                    return Err(ConfigError::ValidationFailed {
108                        message: format!("Stage '{}' is both enabled and disabled", stage),
109                    });
110                }
111            }
112        }
113
114        // Validate global settings
115        if let Some(timeout) = self.global_settings.pipeline_timeout_seconds {
116            #[allow(clippy::collapsible_if)]
117            if timeout == 0 {
118                return Err(ConfigError::InvalidConfig {
119                    message: "Pipeline timeout must be greater than 0".to_string(),
120                });
121            }
122        }
123
124        // Validate parallel policy
125        let effective_policy = self.global_settings.effective_parallel_policy();
126        if let Some(threads) = effective_policy.max_threads
127            && threads == 0
128        {
129            return Err(ConfigError::InvalidConfig {
130                message: "Max parallel threads must be greater than 0".to_string(),
131            });
132        }
133
134        // Validate stage configurations for basic JSON validity
135        for (stage_id, config_value) in &self.stage_configs {
136            if config_value.is_null() {
137                return Err(ConfigError::InvalidConfig {
138                    message: format!("Stage '{}' has null configuration", stage_id),
139                });
140            }
141        }
142
143        // Validate stage order references
144        if let Some(ref stage_order) = self.stage_order {
145            if stage_order.is_empty() {
146                return Err(ConfigError::InvalidConfig {
147                    message: "Stage order cannot be empty when specified".to_string(),
148                });
149            }
150
151            // Check for duplicate stages in order
152            let mut seen_stages = std::collections::HashSet::new();
153            for stage_id in stage_order {
154                if !seen_stages.insert(stage_id) {
155                    return Err(ConfigError::ValidationFailed {
156                        message: format!("Duplicate stage '{}' in execution order", stage_id),
157                    });
158                }
159            }
160        }
161
162        Ok(())
163    }
164}
165
166/// Implementation of ConfigValidator trait for ExtensiblePipelineConfig.
167impl ConfigValidator for ExtensiblePipelineConfig {
168    /// Validate the configuration for consistency and correctness.
169    ///
170    /// This implementation delegates to the existing validate method to maintain
171    /// consistency with the structured error handling used throughout the codebase.
172    fn validate(&self) -> Result<(), ConfigError> {
173        self.validate()
174    }
175
176    /// Get default configuration.
177    fn get_defaults() -> Self {
178        Self::default()
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use crate::core::config::ConfigValidator;
186
187    #[test]
188    fn test_validate_conflicting_enabled_disabled_stages() {
189        let config = ExtensiblePipelineConfig {
190            enabled_stages: Some(vec!["stage1".to_string(), "stage2".to_string()]),
191            disabled_stages: Some(vec!["stage2".to_string(), "stage3".to_string()]),
192            ..Default::default()
193        };
194
195        let result = config.validate();
196        assert!(result.is_err());
197
198        let error = result.unwrap_err();
199        assert!(matches!(error, ConfigError::ValidationFailed { .. }));
200        assert!(
201            error
202                .to_string()
203                .contains("Stage 'stage2' is both enabled and disabled")
204        );
205    }
206
207    #[test]
208    fn test_validate_zero_pipeline_timeout() {
209        let mut config = ExtensiblePipelineConfig::default();
210        config.global_settings.pipeline_timeout_seconds = Some(0);
211
212        let result = config.validate();
213        assert!(result.is_err());
214
215        let error = result.unwrap_err();
216        assert!(matches!(error, ConfigError::InvalidConfig { .. }));
217        assert!(
218            error
219                .to_string()
220                .contains("Pipeline timeout must be greater than 0")
221        );
222    }
223
224    #[test]
225    fn test_validate_zero_max_threads() {
226        let mut config = ExtensiblePipelineConfig::default();
227        config.global_settings.parallel_policy.max_threads = Some(0);
228
229        let result = config.validate();
230        assert!(result.is_err());
231
232        let error = result.unwrap_err();
233        assert!(matches!(error, ConfigError::InvalidConfig { .. }));
234        assert!(
235            error
236                .to_string()
237                .contains("Max parallel threads must be greater than 0")
238        );
239    }
240
241    #[test]
242    fn test_validate_null_stage_config() {
243        let mut config = ExtensiblePipelineConfig::default();
244        config
245            .stage_configs
246            .insert("test_stage".to_string(), serde_json::Value::Null);
247
248        let result = config.validate();
249        assert!(result.is_err());
250
251        let error = result.unwrap_err();
252        assert!(matches!(error, ConfigError::InvalidConfig { .. }));
253        assert!(
254            error
255                .to_string()
256                .contains("Stage 'test_stage' has null configuration")
257        );
258    }
259
260    #[test]
261    fn test_validate_empty_stage_order() {
262        let config = ExtensiblePipelineConfig {
263            stage_order: Some(vec![]),
264            ..Default::default()
265        };
266
267        let result = config.validate();
268        assert!(result.is_err());
269
270        let error = result.unwrap_err();
271        assert!(matches!(error, ConfigError::InvalidConfig { .. }));
272        assert!(
273            error
274                .to_string()
275                .contains("Stage order cannot be empty when specified")
276        );
277    }
278
279    #[test]
280    fn test_validate_duplicate_stages_in_order() {
281        let config = ExtensiblePipelineConfig {
282            stage_order: Some(vec![
283                "stage1".to_string(),
284                "stage2".to_string(),
285                "stage1".to_string(), // Duplicate
286            ]),
287            ..Default::default()
288        };
289
290        let result = config.validate();
291        assert!(result.is_err());
292
293        let error = result.unwrap_err();
294        assert!(matches!(error, ConfigError::ValidationFailed { .. }));
295        assert!(
296            error
297                .to_string()
298                .contains("Duplicate stage 'stage1' in execution order")
299        );
300    }
301
302    #[test]
303    fn test_validate_valid_configuration() {
304        let mut config = ExtensiblePipelineConfig {
305            enabled_stages: Some(vec!["stage1".to_string(), "stage2".to_string()]),
306            disabled_stages: Some(vec!["stage3".to_string()]),
307            stage_order: Some(vec!["stage1".to_string(), "stage2".to_string()]),
308            ..Default::default()
309        };
310        config.global_settings.pipeline_timeout_seconds = Some(60);
311        config.global_settings.parallel_policy.max_threads = Some(4);
312        config.stage_configs.insert(
313            "test_stage".to_string(),
314            serde_json::json!({"enabled": true}),
315        );
316
317        let result = config.validate();
318        assert!(result.is_ok());
319    }
320
321    #[test]
322    fn test_config_validator_trait_implementation() {
323        let config = ExtensiblePipelineConfig::default();
324
325        // Test that ConfigValidator trait methods work
326        assert!(config.validate().is_ok());
327
328        let defaults = ExtensiblePipelineConfig::get_defaults();
329        assert!(defaults.validate().is_ok());
330    }
331}