oar_ocr/pipeline/stages/
config.rs1use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9use super::extensible::StageId;
10use crate::core::config::{ConfigError, ConfigValidator};
11
12#[derive(Debug, Clone, Serialize, Deserialize, Default)]
14pub struct ExtensiblePipelineConfig {
15 pub enabled: bool,
17 pub global_settings: GlobalPipelineSettings,
19 pub stage_configs: HashMap<String, serde_json::Value>,
21 pub stage_order: Option<Vec<String>>,
23 pub enabled_stages: Option<Vec<String>>,
25 pub disabled_stages: Option<Vec<String>>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct GlobalPipelineSettings {
31 #[serde(default)]
33 pub parallel_policy: crate::core::config::ParallelPolicy,
34 pub continue_on_stage_failure: bool,
36 pub pipeline_timeout_seconds: Option<u64>,
38 pub collect_detailed_metrics: bool,
40}
41
42impl Default for GlobalPipelineSettings {
43 fn default() -> Self {
44 Self {
45 parallel_policy: crate::core::config::ParallelPolicy::default(),
46 continue_on_stage_failure: false,
47 pipeline_timeout_seconds: None,
48 collect_detailed_metrics: true,
49 }
50 }
51}
52
53impl GlobalPipelineSettings {
54 pub fn effective_parallel_policy(&self) -> crate::core::config::ParallelPolicy {
56 self.parallel_policy.clone()
57 }
58}
59
60impl ExtensiblePipelineConfig {}
62
63impl ExtensiblePipelineConfig {
65 pub fn get_stage_config<T>(&self, stage_id: &str) -> Option<T>
67 where
68 T: for<'de> Deserialize<'de>,
69 {
70 self.stage_configs
71 .get(stage_id)
72 .and_then(|value| serde_json::from_value(value.clone()).ok())
73 }
74
75 pub fn is_stage_enabled(&self, stage_id: &str) -> bool {
77 if let Some(ref enabled) = self.enabled_stages {
79 return enabled.contains(&stage_id.to_string());
80 }
81
82 if let Some(ref disabled) = self.disabled_stages {
84 return !disabled.contains(&stage_id.to_string());
85 }
86
87 true
89 }
90
91 pub fn get_stage_order(&self) -> Option<Vec<StageId>> {
93 self.stage_order
94 .as_ref()
95 .map(|order| order.iter().map(|s| StageId::new(s.clone())).collect())
96 }
97}
98
99impl ExtensiblePipelineConfig {
101 pub fn validate(&self) -> Result<(), ConfigError> {
103 if let (Some(enabled), Some(disabled)) = (&self.enabled_stages, &self.disabled_stages) {
105 for stage in enabled {
106 if disabled.contains(stage) {
107 return Err(ConfigError::ValidationFailed {
108 message: format!("Stage '{}' is both enabled and disabled", stage),
109 });
110 }
111 }
112 }
113
114 if let Some(timeout) = self.global_settings.pipeline_timeout_seconds {
116 #[allow(clippy::collapsible_if)]
117 if timeout == 0 {
118 return Err(ConfigError::InvalidConfig {
119 message: "Pipeline timeout must be greater than 0".to_string(),
120 });
121 }
122 }
123
124 let effective_policy = self.global_settings.effective_parallel_policy();
126 if let Some(threads) = effective_policy.max_threads
127 && threads == 0
128 {
129 return Err(ConfigError::InvalidConfig {
130 message: "Max parallel threads must be greater than 0".to_string(),
131 });
132 }
133
134 for (stage_id, config_value) in &self.stage_configs {
136 if config_value.is_null() {
137 return Err(ConfigError::InvalidConfig {
138 message: format!("Stage '{}' has null configuration", stage_id),
139 });
140 }
141 }
142
143 if let Some(ref stage_order) = self.stage_order {
145 if stage_order.is_empty() {
146 return Err(ConfigError::InvalidConfig {
147 message: "Stage order cannot be empty when specified".to_string(),
148 });
149 }
150
151 let mut seen_stages = std::collections::HashSet::new();
153 for stage_id in stage_order {
154 if !seen_stages.insert(stage_id) {
155 return Err(ConfigError::ValidationFailed {
156 message: format!("Duplicate stage '{}' in execution order", stage_id),
157 });
158 }
159 }
160 }
161
162 Ok(())
163 }
164}
165
166impl ConfigValidator for ExtensiblePipelineConfig {
168 fn validate(&self) -> Result<(), ConfigError> {
173 self.validate()
174 }
175
176 fn get_defaults() -> Self {
178 Self::default()
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use crate::core::config::ConfigValidator;
186
187 #[test]
188 fn test_validate_conflicting_enabled_disabled_stages() {
189 let config = ExtensiblePipelineConfig {
190 enabled_stages: Some(vec!["stage1".to_string(), "stage2".to_string()]),
191 disabled_stages: Some(vec!["stage2".to_string(), "stage3".to_string()]),
192 ..Default::default()
193 };
194
195 let result = config.validate();
196 assert!(result.is_err());
197
198 let error = result.unwrap_err();
199 assert!(matches!(error, ConfigError::ValidationFailed { .. }));
200 assert!(
201 error
202 .to_string()
203 .contains("Stage 'stage2' is both enabled and disabled")
204 );
205 }
206
207 #[test]
208 fn test_validate_zero_pipeline_timeout() {
209 let mut config = ExtensiblePipelineConfig::default();
210 config.global_settings.pipeline_timeout_seconds = Some(0);
211
212 let result = config.validate();
213 assert!(result.is_err());
214
215 let error = result.unwrap_err();
216 assert!(matches!(error, ConfigError::InvalidConfig { .. }));
217 assert!(
218 error
219 .to_string()
220 .contains("Pipeline timeout must be greater than 0")
221 );
222 }
223
224 #[test]
225 fn test_validate_zero_max_threads() {
226 let mut config = ExtensiblePipelineConfig::default();
227 config.global_settings.parallel_policy.max_threads = Some(0);
228
229 let result = config.validate();
230 assert!(result.is_err());
231
232 let error = result.unwrap_err();
233 assert!(matches!(error, ConfigError::InvalidConfig { .. }));
234 assert!(
235 error
236 .to_string()
237 .contains("Max parallel threads must be greater than 0")
238 );
239 }
240
241 #[test]
242 fn test_validate_null_stage_config() {
243 let mut config = ExtensiblePipelineConfig::default();
244 config
245 .stage_configs
246 .insert("test_stage".to_string(), serde_json::Value::Null);
247
248 let result = config.validate();
249 assert!(result.is_err());
250
251 let error = result.unwrap_err();
252 assert!(matches!(error, ConfigError::InvalidConfig { .. }));
253 assert!(
254 error
255 .to_string()
256 .contains("Stage 'test_stage' has null configuration")
257 );
258 }
259
260 #[test]
261 fn test_validate_empty_stage_order() {
262 let config = ExtensiblePipelineConfig {
263 stage_order: Some(vec![]),
264 ..Default::default()
265 };
266
267 let result = config.validate();
268 assert!(result.is_err());
269
270 let error = result.unwrap_err();
271 assert!(matches!(error, ConfigError::InvalidConfig { .. }));
272 assert!(
273 error
274 .to_string()
275 .contains("Stage order cannot be empty when specified")
276 );
277 }
278
279 #[test]
280 fn test_validate_duplicate_stages_in_order() {
281 let config = ExtensiblePipelineConfig {
282 stage_order: Some(vec![
283 "stage1".to_string(),
284 "stage2".to_string(),
285 "stage1".to_string(), ]),
287 ..Default::default()
288 };
289
290 let result = config.validate();
291 assert!(result.is_err());
292
293 let error = result.unwrap_err();
294 assert!(matches!(error, ConfigError::ValidationFailed { .. }));
295 assert!(
296 error
297 .to_string()
298 .contains("Duplicate stage 'stage1' in execution order")
299 );
300 }
301
302 #[test]
303 fn test_validate_valid_configuration() {
304 let mut config = ExtensiblePipelineConfig {
305 enabled_stages: Some(vec!["stage1".to_string(), "stage2".to_string()]),
306 disabled_stages: Some(vec!["stage3".to_string()]),
307 stage_order: Some(vec!["stage1".to_string(), "stage2".to_string()]),
308 ..Default::default()
309 };
310 config.global_settings.pipeline_timeout_seconds = Some(60);
311 config.global_settings.parallel_policy.max_threads = Some(4);
312 config.stage_configs.insert(
313 "test_stage".to_string(),
314 serde_json::json!({"enabled": true}),
315 );
316
317 let result = config.validate();
318 assert!(result.is_ok());
319 }
320
321 #[test]
322 fn test_config_validator_trait_implementation() {
323 let config = ExtensiblePipelineConfig::default();
324
325 assert!(config.validate().is_ok());
327
328 let defaults = ExtensiblePipelineConfig::get_defaults();
329 assert!(defaults.validate().is_ok());
330 }
331}