1use std::any::Any;
7use std::collections::{HashMap, VecDeque};
8use std::fmt::Debug;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12use super::extensible::{PipelineStage, StageContext, StageData, StageDependency, StageId};
13use super::types::StageResult;
14use crate::core::OCRError;
15use crate::core::config::{ConfigError, ConfigValidator};
16
17pub trait ErasedConfig: Any + Send + Sync + Debug {
19 fn validate_erased(&self) -> Result<(), ConfigError>;
21 fn default_erased() -> Box<dyn ErasedConfig>
23 where
24 Self: Sized;
25 fn clone_erased(&self) -> Box<dyn ErasedConfig>;
27 fn as_any(&self) -> &dyn Any;
29}
30
31impl<T> ErasedConfig for T
32where
33 T: ConfigValidator + Default + Clone + Any + Send + Sync + Debug,
34{
35 fn validate_erased(&self) -> Result<(), ConfigError> {
36 self.validate()
37 }
38
39 fn default_erased() -> Box<dyn ErasedConfig> {
40 Box::new(T::get_defaults())
41 }
42
43 fn clone_erased(&self) -> Box<dyn ErasedConfig> {
44 Box::new(self.clone())
45 }
46
47 fn as_any(&self) -> &dyn Any {
48 self
49 }
50}
51
52#[derive(Debug, Clone)]
56#[allow(dead_code)] struct InvalidBoxedConfig;
58
59impl ErasedConfig for InvalidBoxedConfig {
60 fn validate_erased(&self) -> Result<(), ConfigError> {
61 Err(ConfigError::InvalidConfig {
62 message: "Invalid BoxedConfig: This configuration was created through BoxedConfig::default() or BoxedConfig::get_defaults(), which should never be called directly. Use ErasedConfig::default_erased() or box a concrete config type instead.".to_string(),
63 })
64 }
65
66 fn default_erased() -> Box<dyn ErasedConfig> {
67 Box::new(InvalidBoxedConfig)
68 }
69
70 fn clone_erased(&self) -> Box<dyn ErasedConfig> {
71 Box::new(self.clone())
72 }
73
74 fn as_any(&self) -> &dyn Any {
75 self
76 }
77}
78
79type BoxedConfig = Box<dyn ErasedConfig>;
81
82impl ConfigValidator for BoxedConfig {
83 fn validate(&self) -> Result<(), ConfigError> {
84 self.validate_erased()
85 }
86
87 fn get_defaults() -> Self {
88 Box::new(InvalidBoxedConfig)
101 }
102}
103
104impl Default for BoxedConfig {
105 fn default() -> Self {
106 Box::new(InvalidBoxedConfig)
119 }
120}
121
122pub trait BoxedConfigExt {
124 fn downcast_ref<T: 'static>(&self) -> Option<&T>;
126}
127
128impl BoxedConfigExt for BoxedConfig {
129 fn downcast_ref<T: 'static>(&self) -> Option<&T> {
130 self.as_any().downcast_ref::<T>()
131 }
132}
133
134type BoxedResult = Box<dyn Any + Send + Sync>;
136
137type RegisteredStage = Arc<dyn PipelineStage<Config = BoxedConfig, Result = BoxedResult>>;
139
140#[derive(Debug)]
145pub struct StageRegistry {
146 stages: HashMap<StageId, RegisteredStage>,
148 configs: HashMap<StageId, BoxedConfig>,
150 execution_order: Option<Vec<StageId>>,
152}
153
154impl StageRegistry {
155 pub fn new() -> Self {
157 Self {
158 stages: HashMap::new(),
159 configs: HashMap::new(),
160 execution_order: None,
161 }
162 }
163
164 pub fn register_stage<S, C>(&mut self, stage: S, config: Option<C>) -> Result<(), OCRError>
171 where
172 S: PipelineStage<Config = C> + 'static,
173 C: Send + Sync + Debug + Clone + ConfigValidator + Default + 'static,
174 {
175 let stage_id = stage.stage_id();
176
177 if let Some(ref cfg) = config {
179 stage.validate_config(cfg)?;
180 }
181
182 let erased_stage = Arc::new(TypeErasedStage::new(stage));
184
185 self.stages.insert(stage_id.clone(), erased_stage);
187
188 if let Some(cfg) = config {
190 self.configs.insert(stage_id, Box::new(cfg));
191 }
192
193 self.execution_order = None;
195
196 Ok(())
197 }
198
199 pub fn get_stage(&self, stage_id: &StageId) -> Option<&RegisteredStage> {
201 self.stages.get(stage_id)
202 }
203
204 pub fn get_config(&self, stage_id: &StageId) -> Option<&BoxedConfig> {
206 self.configs.get(stage_id)
207 }
208
209 #[allow(dead_code)]
211 pub fn stage_ids(&self) -> Vec<StageId> {
212 self.stages.keys().cloned().collect()
213 }
214
215 pub fn resolve_execution_order(&mut self) -> Result<Vec<StageId>, OCRError> {
217 if let Some(ref order) = self.execution_order {
218 return Ok(order.clone());
219 }
220
221 let order = self.topological_sort()?;
222 self.execution_order = Some(order.clone());
223 Ok(order)
224 }
225
226 fn topological_sort(&self) -> Result<Vec<StageId>, OCRError> {
228 let mut in_degree: HashMap<StageId, usize> = HashMap::new();
229 let mut graph: HashMap<StageId, Vec<StageId>> = HashMap::new();
230
231 for stage_id in self.stages.keys() {
233 in_degree.insert(stage_id.clone(), 0);
234 graph.insert(stage_id.clone(), Vec::new());
235 }
236
237 for (stage_id, stage) in &self.stages {
239 for dependency in stage.dependencies() {
240 match dependency {
241 StageDependency::After(dep_id) | StageDependency::Requires(dep_id) => {
242 if self.stages.contains_key(&dep_id) {
243 graph.get_mut(&dep_id).unwrap().push(stage_id.clone());
244 *in_degree.get_mut(stage_id).unwrap() += 1;
245 }
246 }
247 StageDependency::Before(dep_id) | StageDependency::Provides(dep_id) => {
248 if self.stages.contains_key(&dep_id) {
249 graph.get_mut(stage_id).unwrap().push(dep_id.clone());
250 *in_degree.get_mut(&dep_id).unwrap() += 1;
251 }
252 }
253 }
254 }
255 }
256
257 let mut queue: VecDeque<StageId> = VecDeque::new();
259 let mut result: Vec<StageId> = Vec::new();
260
261 for (stage_id, °ree) in &in_degree {
263 if degree == 0 {
264 queue.push_back(stage_id.clone());
265 }
266 }
267
268 while let Some(stage_id) = queue.pop_front() {
269 result.push(stage_id.clone());
270
271 if let Some(neighbors) = graph.get(&stage_id) {
273 for neighbor in neighbors {
274 let degree = in_degree.get_mut(neighbor).unwrap();
275 *degree -= 1;
276 if *degree == 0 {
277 queue.push_back(neighbor.clone());
278 }
279 }
280 }
281 }
282
283 if result.len() != self.stages.len() {
285 return Err(OCRError::ConfigError {
286 message: "Circular dependency detected in pipeline stages".to_string(),
287 });
288 }
289
290 Ok(result)
291 }
292}
293
294impl Default for StageRegistry {
295 fn default() -> Self {
296 Self::new()
297 }
298}
299
300#[derive(Debug)]
302struct TypeErasedStage<S> {
303 inner: S,
304}
305
306impl<S> TypeErasedStage<S>
307where
308 S: PipelineStage,
309{
310 fn new(stage: S) -> Self {
311 Self { inner: stage }
312 }
313}
314
315impl<S> PipelineStage for TypeErasedStage<S>
316where
317 S: PipelineStage + 'static,
318 S::Config: Clone + 'static,
319 S::Result: 'static,
320{
321 type Config = BoxedConfig;
322 type Result = Box<dyn Any + Send + Sync>;
323
324 fn stage_id(&self) -> StageId {
325 self.inner.stage_id()
326 }
327
328 fn stage_name(&self) -> &str {
329 self.inner.stage_name()
330 }
331
332 fn dependencies(&self) -> Vec<StageDependency> {
333 self.inner.dependencies()
334 }
335
336 fn is_enabled(&self, context: &StageContext, config: Option<&Self::Config>) -> bool {
337 let typed_config = config.and_then(|c| BoxedConfigExt::downcast_ref::<S::Config>(c));
338 self.inner.is_enabled(context, typed_config)
339 }
340
341 fn process(
342 &self,
343 context: &mut StageContext,
344 data: StageData,
345 config: Option<&Self::Config>,
346 ) -> Result<StageResult<Self::Result>, OCRError> {
347 let typed_config = config.and_then(|c| BoxedConfigExt::downcast_ref::<S::Config>(c));
348
349 let result = self.inner.process(context, data, typed_config)?;
350
351 Ok(StageResult::new(
352 Box::new(result.data) as Box<dyn Any + Send + Sync>,
353 result.metrics,
354 ))
355 }
356
357 fn validate_config(&self, config: &Self::Config) -> Result<(), OCRError> {
358 if let Some(typed_config) = BoxedConfigExt::downcast_ref::<S::Config>(config) {
359 self.inner.validate_config(typed_config)
360 } else {
361 Err(OCRError::pipeline_stage_error(
362 self.stage_name(),
363 &self.stage_id().to_string(),
364 1, "validate_config",
366 crate::core::errors::SimpleError::new("Invalid configuration type for stage"),
367 ))
368 }
369 }
370
371 fn default_config(&self) -> Self::Config {
372 Box::new(self.inner.default_config())
373 }
374}
375
376pub struct ExtensiblePipeline {
378 registry: StageRegistry,
379}
380
381impl ExtensiblePipeline {
382 pub fn new() -> Self {
384 Self {
385 registry: StageRegistry::new(),
386 }
387 }
388
389 pub fn register_stage<S, C>(&mut self, stage: S, config: Option<C>) -> Result<(), OCRError>
391 where
392 S: PipelineStage<Config = C> + 'static,
393 C: Send + Sync + Debug + Clone + ConfigValidator + Default + 'static,
394 {
395 self.registry.register_stage(stage, config)
396 }
397
398 #[allow(dead_code)]
400 pub fn execute(&mut self, context: &mut StageContext) -> Result<(), OCRError> {
401 let execution_order = self.registry.resolve_execution_order()?;
402
403 for stage_id in execution_order {
404 let stage =
405 self.registry
406 .get_stage(&stage_id)
407 .ok_or_else(|| OCRError::ConfigError {
408 message: format!("Stage not found: {}", stage_id.as_str()),
409 })?;
410
411 let config = self.registry.get_config(&stage_id);
412 let stage_data = StageData::new((*context.original_image).clone());
413 let result = stage.process(context, stage_data, config)?;
414 context.set_stage_result(stage_id, result);
415 }
416
417 Ok(())
418 }
419
420 #[allow(dead_code)]
422 pub fn registry(&self) -> &StageRegistry {
423 &self.registry
424 }
425
426 #[allow(dead_code)]
428 pub fn registry_mut(&mut self) -> &mut StageRegistry {
429 &mut self.registry
430 }
431}
432
433impl Default for ExtensiblePipeline {
434 fn default() -> Self {
435 Self::new()
436 }
437}
438
439pub struct PipelineExecutor;
441
442impl PipelineExecutor {
443 pub fn execute(
445 pipeline: &mut ExtensiblePipeline,
446 mut context: StageContext,
447 initial_data: StageData,
448 ) -> Result<StageData, OCRError> {
449 let execution_order = pipeline.registry.resolve_execution_order()?;
450 let mut current_data = initial_data;
451
452 info!("Executing pipeline with {} stages", execution_order.len());
453
454 for stage_id in execution_order {
455 let stage =
456 pipeline
457 .registry
458 .get_stage(&stage_id)
459 .ok_or_else(|| OCRError::ConfigError {
460 message: format!("Stage not found: {}", stage_id.as_str()),
461 })?;
462
463 let config = pipeline.registry.get_config(&stage_id);
464
465 if !stage.is_enabled(&context, config) {
467 debug!("Skipping disabled stage: {}", stage.stage_name());
468 continue;
469 }
470
471 debug!("Executing stage: {}", stage.stage_name());
472
473 let stage_result = stage.process(&mut context, current_data, config)?;
475
476 context.set_stage_result(stage_id.clone(), stage_result.data);
478
479 current_data = StageData::new(context.current_image.as_ref().clone());
482
483 debug!(
484 "Stage {} completed in {:?}",
485 stage.stage_name(),
486 stage_result.metrics.processing_time
487 );
488 }
489
490 Ok(current_data)
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn test_boxed_config_default_returns_invalid_config() {
500 let config = BoxedConfig::default();
501 let result = config.validate();
502 assert!(result.is_err());
503 let error_message = result.unwrap_err().to_string();
504 assert!(error_message.contains("Invalid BoxedConfig"));
505 assert!(error_message.contains("should never be called directly"));
506 }
507
508 #[test]
509 fn test_boxed_config_get_defaults_returns_invalid_config() {
510 let config = BoxedConfig::get_defaults();
511 let result = config.validate();
512 assert!(result.is_err());
513 let error_message = result.unwrap_err().to_string();
514 assert!(error_message.contains("Invalid BoxedConfig"));
515 assert!(error_message.contains("should never be called directly"));
516 }
517}