oar_ocr/pipeline/stages/
registry.rs

1//! Stage registry and extensible pipeline executor.
2//!
3//! This module provides the registry system for managing pipeline stages
4//! and the executor for running extensible pipelines.
5
6use std::any::Any;
7use std::collections::{HashMap, VecDeque};
8use std::fmt::Debug;
9use std::sync::Arc;
10use tracing::{debug, info};
11
12use super::extensible::{PipelineStage, StageContext, StageData, StageDependency, StageId};
13use super::types::StageResult;
14use crate::core::OCRError;
15use crate::core::config::{ConfigError, ConfigValidator};
16
17/// Trait for type-erased configuration that can be validated.
18pub trait ErasedConfig: Any + Send + Sync + Debug {
19    /// Validate the configuration
20    fn validate_erased(&self) -> Result<(), ConfigError>;
21    /// Get the default configuration as a boxed Any
22    fn default_erased() -> Box<dyn ErasedConfig>
23    where
24        Self: Sized;
25    /// Clone the configuration
26    fn clone_erased(&self) -> Box<dyn ErasedConfig>;
27    /// Downcast to concrete type
28    fn as_any(&self) -> &dyn Any;
29}
30
31impl<T> ErasedConfig for T
32where
33    T: ConfigValidator + Default + Clone + Any + Send + Sync + Debug,
34{
35    fn validate_erased(&self) -> Result<(), ConfigError> {
36        self.validate()
37    }
38
39    fn default_erased() -> Box<dyn ErasedConfig> {
40        Box::new(T::get_defaults())
41    }
42
43    fn clone_erased(&self) -> Box<dyn ErasedConfig> {
44        Box::new(self.clone())
45    }
46
47    fn as_any(&self) -> &dyn Any {
48        self
49    }
50}
51
52/// A dummy configuration type that represents an invalid BoxedConfig.
53/// This is used when BoxedConfig::default() or BoxedConfig::get_defaults()
54/// are called directly, which should never happen in normal usage.
55#[derive(Debug, Clone)]
56#[allow(dead_code)] // This is intentionally only used in error cases
57struct InvalidBoxedConfig;
58
59impl ErasedConfig for InvalidBoxedConfig {
60    fn validate_erased(&self) -> Result<(), ConfigError> {
61        Err(ConfigError::InvalidConfig {
62            message: "Invalid BoxedConfig: This configuration was created through BoxedConfig::default() or BoxedConfig::get_defaults(), which should never be called directly. Use ErasedConfig::default_erased() or box a concrete config type instead.".to_string(),
63        })
64    }
65
66    fn default_erased() -> Box<dyn ErasedConfig> {
67        Box::new(InvalidBoxedConfig)
68    }
69
70    fn clone_erased(&self) -> Box<dyn ErasedConfig> {
71        Box::new(self.clone())
72    }
73
74    fn as_any(&self) -> &dyn Any {
75        self
76    }
77}
78
79/// Type alias for a boxed stage configuration.
80type BoxedConfig = Box<dyn ErasedConfig>;
81
82impl ConfigValidator for BoxedConfig {
83    fn validate(&self) -> Result<(), ConfigError> {
84        self.validate_erased()
85    }
86
87    fn get_defaults() -> Self {
88        // This method should never be called directly on BoxedConfig.
89        // BoxedConfig instances should be created through ErasedConfig::default_erased()
90        // or by boxing concrete config types.
91        // Return an invalid config that will fail validation with a clear error message.
92        //
93        // DESIGN NOTE: This intentionally returns an InvalidBoxedConfig that will always
94        // fail validation. The reason for this design choice is that BoxedConfig is a
95        // type-erased trait object, and it doesn't make sense to create a "default"
96        // BoxedConfig without knowing the concrete type underneath. Instead, users should:
97        // 1. Create a default of a concrete config type and then box it
98        // 2. Use ErasedConfig::default_erased() on a concrete type
99        // This prevents runtime errors from using an incorrectly typed configuration.
100        Box::new(InvalidBoxedConfig)
101    }
102}
103
104impl Default for BoxedConfig {
105    fn default() -> Self {
106        // This method should never be called directly on BoxedConfig.
107        // BoxedConfig instances should be created through ErasedConfig::default_erased()
108        // or by boxing concrete config types.
109        // Return an invalid config that will fail validation with a clear error message.
110        //
111        // DESIGN NOTE: This intentionally returns an InvalidBoxedConfig that will always
112        // fail validation. The reason for this design choice is that BoxedConfig is a
113        // type-erased trait object, and it doesn't make sense to create a "default"
114        // BoxedConfig without knowing the concrete type underneath. Instead, users should:
115        // 1. Create a default of a concrete config type and then box it
116        // 2. Use ErasedConfig::default_erased() on a concrete type
117        // This prevents runtime errors from using an incorrectly typed configuration.
118        Box::new(InvalidBoxedConfig)
119    }
120}
121
122// Extension trait for downcasting BoxedConfig
123pub trait BoxedConfigExt {
124    /// Downcast to a concrete configuration type
125    fn downcast_ref<T: 'static>(&self) -> Option<&T>;
126}
127
128impl BoxedConfigExt for BoxedConfig {
129    fn downcast_ref<T: 'static>(&self) -> Option<&T> {
130        self.as_any().downcast_ref::<T>()
131    }
132}
133
134/// Type alias for a boxed stage result.
135type BoxedResult = Box<dyn Any + Send + Sync>;
136
137/// Type alias for a registered pipeline stage.
138type RegisteredStage = Arc<dyn PipelineStage<Config = BoxedConfig, Result = BoxedResult>>;
139
140/// Registry for managing pipeline stages.
141///
142/// The registry allows dynamic registration of stages and provides
143/// dependency resolution and execution ordering.
144#[derive(Debug)]
145pub struct StageRegistry {
146    /// Registered stages
147    stages: HashMap<StageId, RegisteredStage>,
148    /// Stage configurations
149    configs: HashMap<StageId, BoxedConfig>,
150    /// Execution order cache
151    execution_order: Option<Vec<StageId>>,
152}
153
154impl StageRegistry {
155    /// Create a new empty stage registry.
156    pub fn new() -> Self {
157        Self {
158            stages: HashMap::new(),
159            configs: HashMap::new(),
160            execution_order: None,
161        }
162    }
163
164    /// Register a stage with the registry.
165    ///
166    /// # Arguments
167    ///
168    /// * `stage` - The stage to register
169    /// * `config` - Optional configuration for the stage
170    pub fn register_stage<S, C>(&mut self, stage: S, config: Option<C>) -> Result<(), OCRError>
171    where
172        S: PipelineStage<Config = C> + 'static,
173        C: Send + Sync + Debug + Clone + ConfigValidator + Default + 'static,
174    {
175        let stage_id = stage.stage_id();
176
177        // Validate configuration if provided
178        if let Some(ref cfg) = config {
179            stage.validate_config(cfg)?;
180        }
181
182        // Create type-erased wrappers
183        let erased_stage = Arc::new(TypeErasedStage::new(stage));
184
185        // Store the stage
186        self.stages.insert(stage_id.clone(), erased_stage);
187
188        // Store configuration if provided
189        if let Some(cfg) = config {
190            self.configs.insert(stage_id, Box::new(cfg));
191        }
192
193        // Invalidate execution order cache
194        self.execution_order = None;
195
196        Ok(())
197    }
198
199    /// Get a registered stage by ID.
200    pub fn get_stage(&self, stage_id: &StageId) -> Option<&RegisteredStage> {
201        self.stages.get(stage_id)
202    }
203
204    /// Get configuration for a stage.
205    pub fn get_config(&self, stage_id: &StageId) -> Option<&BoxedConfig> {
206        self.configs.get(stage_id)
207    }
208
209    /// Get all registered stage IDs.
210    #[allow(dead_code)]
211    pub fn stage_ids(&self) -> Vec<StageId> {
212        self.stages.keys().cloned().collect()
213    }
214
215    /// Resolve execution order based on dependencies.
216    pub fn resolve_execution_order(&mut self) -> Result<Vec<StageId>, OCRError> {
217        if let Some(ref order) = self.execution_order {
218            return Ok(order.clone());
219        }
220
221        let order = self.topological_sort()?;
222        self.execution_order = Some(order.clone());
223        Ok(order)
224    }
225
226    /// Perform topological sort to determine execution order.
227    fn topological_sort(&self) -> Result<Vec<StageId>, OCRError> {
228        let mut in_degree: HashMap<StageId, usize> = HashMap::new();
229        let mut graph: HashMap<StageId, Vec<StageId>> = HashMap::new();
230
231        // Initialize in-degree and graph
232        for stage_id in self.stages.keys() {
233            in_degree.insert(stage_id.clone(), 0);
234            graph.insert(stage_id.clone(), Vec::new());
235        }
236
237        // Build dependency graph
238        for (stage_id, stage) in &self.stages {
239            for dependency in stage.dependencies() {
240                match dependency {
241                    StageDependency::After(dep_id) | StageDependency::Requires(dep_id) => {
242                        if self.stages.contains_key(&dep_id) {
243                            graph.get_mut(&dep_id).unwrap().push(stage_id.clone());
244                            *in_degree.get_mut(stage_id).unwrap() += 1;
245                        }
246                    }
247                    StageDependency::Before(dep_id) | StageDependency::Provides(dep_id) => {
248                        if self.stages.contains_key(&dep_id) {
249                            graph.get_mut(stage_id).unwrap().push(dep_id.clone());
250                            *in_degree.get_mut(&dep_id).unwrap() += 1;
251                        }
252                    }
253                }
254            }
255        }
256
257        // Kahn's algorithm for topological sorting
258        let mut queue: VecDeque<StageId> = VecDeque::new();
259        let mut result: Vec<StageId> = Vec::new();
260
261        // Find all nodes with no incoming edges
262        for (stage_id, &degree) in &in_degree {
263            if degree == 0 {
264                queue.push_back(stage_id.clone());
265            }
266        }
267
268        while let Some(stage_id) = queue.pop_front() {
269            result.push(stage_id.clone());
270
271            // For each neighbor of the current stage
272            if let Some(neighbors) = graph.get(&stage_id) {
273                for neighbor in neighbors {
274                    let degree = in_degree.get_mut(neighbor).unwrap();
275                    *degree -= 1;
276                    if *degree == 0 {
277                        queue.push_back(neighbor.clone());
278                    }
279                }
280            }
281        }
282
283        // Check for cycles
284        if result.len() != self.stages.len() {
285            return Err(OCRError::ConfigError {
286                message: "Circular dependency detected in pipeline stages".to_string(),
287            });
288        }
289
290        Ok(result)
291    }
292}
293
294impl Default for StageRegistry {
295    fn default() -> Self {
296        Self::new()
297    }
298}
299
300/// Type-erased wrapper for pipeline stages.
301#[derive(Debug)]
302struct TypeErasedStage<S> {
303    inner: S,
304}
305
306impl<S> TypeErasedStage<S>
307where
308    S: PipelineStage,
309{
310    fn new(stage: S) -> Self {
311        Self { inner: stage }
312    }
313}
314
315impl<S> PipelineStage for TypeErasedStage<S>
316where
317    S: PipelineStage + 'static,
318    S::Config: Clone + 'static,
319    S::Result: 'static,
320{
321    type Config = BoxedConfig;
322    type Result = Box<dyn Any + Send + Sync>;
323
324    fn stage_id(&self) -> StageId {
325        self.inner.stage_id()
326    }
327
328    fn stage_name(&self) -> &str {
329        self.inner.stage_name()
330    }
331
332    fn dependencies(&self) -> Vec<StageDependency> {
333        self.inner.dependencies()
334    }
335
336    fn is_enabled(&self, context: &StageContext, config: Option<&Self::Config>) -> bool {
337        let typed_config = config.and_then(|c| BoxedConfigExt::downcast_ref::<S::Config>(c));
338        self.inner.is_enabled(context, typed_config)
339    }
340
341    fn process(
342        &self,
343        context: &mut StageContext,
344        data: StageData,
345        config: Option<&Self::Config>,
346    ) -> Result<StageResult<Self::Result>, OCRError> {
347        let typed_config = config.and_then(|c| BoxedConfigExt::downcast_ref::<S::Config>(c));
348
349        let result = self.inner.process(context, data, typed_config)?;
350
351        Ok(StageResult::new(
352            Box::new(result.data) as Box<dyn Any + Send + Sync>,
353            result.metrics,
354        ))
355    }
356
357    fn validate_config(&self, config: &Self::Config) -> Result<(), OCRError> {
358        if let Some(typed_config) = BoxedConfigExt::downcast_ref::<S::Config>(config) {
359            self.inner.validate_config(typed_config)
360        } else {
361            Err(OCRError::pipeline_stage_error(
362                self.stage_name(),
363                &self.stage_id().to_string(),
364                1, // config validation is for single config
365                "validate_config",
366                crate::core::errors::SimpleError::new("Invalid configuration type for stage"),
367            ))
368        }
369    }
370
371    fn default_config(&self) -> Self::Config {
372        Box::new(self.inner.default_config())
373    }
374}
375
376/// Extensible pipeline that uses the stage registry.
377pub struct ExtensiblePipeline {
378    registry: StageRegistry,
379}
380
381impl ExtensiblePipeline {
382    /// Create a new extensible pipeline.
383    pub fn new() -> Self {
384        Self {
385            registry: StageRegistry::new(),
386        }
387    }
388
389    /// Register a stage with the pipeline.
390    pub fn register_stage<S, C>(&mut self, stage: S, config: Option<C>) -> Result<(), OCRError>
391    where
392        S: PipelineStage<Config = C> + 'static,
393        C: Send + Sync + Debug + Clone + ConfigValidator + Default + 'static,
394    {
395        self.registry.register_stage(stage, config)
396    }
397
398    /// Execute the pipeline with the given context.
399    #[allow(dead_code)]
400    pub fn execute(&mut self, context: &mut StageContext) -> Result<(), OCRError> {
401        let execution_order = self.registry.resolve_execution_order()?;
402
403        for stage_id in execution_order {
404            let stage =
405                self.registry
406                    .get_stage(&stage_id)
407                    .ok_or_else(|| OCRError::ConfigError {
408                        message: format!("Stage not found: {}", stage_id.as_str()),
409                    })?;
410
411            let config = self.registry.get_config(&stage_id);
412            let stage_data = StageData::new((*context.original_image).clone());
413            let result = stage.process(context, stage_data, config)?;
414            context.set_stage_result(stage_id, result);
415        }
416
417        Ok(())
418    }
419
420    /// Get the stage registry.
421    #[allow(dead_code)]
422    pub fn registry(&self) -> &StageRegistry {
423        &self.registry
424    }
425
426    /// Get the stage registry mutably.
427    #[allow(dead_code)]
428    pub fn registry_mut(&mut self) -> &mut StageRegistry {
429        &mut self.registry
430    }
431}
432
433impl Default for ExtensiblePipeline {
434    fn default() -> Self {
435        Self::new()
436    }
437}
438
439/// Pipeline executor for running extensible pipelines.
440pub struct PipelineExecutor;
441
442impl PipelineExecutor {
443    /// Execute a pipeline with the given context and data.
444    pub fn execute(
445        pipeline: &mut ExtensiblePipeline,
446        mut context: StageContext,
447        initial_data: StageData,
448    ) -> Result<StageData, OCRError> {
449        let execution_order = pipeline.registry.resolve_execution_order()?;
450        let mut current_data = initial_data;
451
452        info!("Executing pipeline with {} stages", execution_order.len());
453
454        for stage_id in execution_order {
455            let stage =
456                pipeline
457                    .registry
458                    .get_stage(&stage_id)
459                    .ok_or_else(|| OCRError::ConfigError {
460                        message: format!("Stage not found: {}", stage_id.as_str()),
461                    })?;
462
463            let config = pipeline.registry.get_config(&stage_id);
464
465            // Check if stage is enabled
466            if !stage.is_enabled(&context, config) {
467                debug!("Skipping disabled stage: {}", stage.stage_name());
468                continue;
469            }
470
471            debug!("Executing stage: {}", stage.stage_name());
472
473            // Execute the stage
474            let stage_result = stage.process(&mut context, current_data, config)?;
475
476            // Store the result in context for other stages
477            context.set_stage_result(stage_id.clone(), stage_result.data);
478
479            // Update current data - stages that modify the image should update the context
480            // For now, we'll keep the current image from the context
481            current_data = StageData::new(context.current_image.as_ref().clone());
482
483            debug!(
484                "Stage {} completed in {:?}",
485                stage.stage_name(),
486                stage_result.metrics.processing_time
487            );
488        }
489
490        Ok(current_data)
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn test_boxed_config_default_returns_invalid_config() {
500        let config = BoxedConfig::default();
501        let result = config.validate();
502        assert!(result.is_err());
503        let error_message = result.unwrap_err().to_string();
504        assert!(error_message.contains("Invalid BoxedConfig"));
505        assert!(error_message.contains("should never be called directly"));
506    }
507
508    #[test]
509    fn test_boxed_config_get_defaults_returns_invalid_config() {
510        let config = BoxedConfig::get_defaults();
511        let result = config.validate();
512        assert!(result.is_err());
513        let error_message = result.unwrap_err().to_string();
514        assert!(error_message.contains("Invalid BoxedConfig"));
515        assert!(error_message.contains("should never be called directly"));
516    }
517}