Skip to main content

nexrad_process/
pipeline.rs

1use crate::result::Result;
2use crate::SweepProcessor;
3use nexrad_model::data::SweepField;
4
5/// A composable pipeline of sweep processing steps.
6///
7/// Steps are executed in order, with the output of each step becoming the input
8/// to the next. The pipeline itself implements [`SweepProcessor`], so pipelines
9/// can be nested.
10///
11/// # Example
12///
13/// ```ignore
14/// use nexrad_process::{SweepPipeline, filter::ThresholdFilter};
15///
16/// let pipeline = SweepPipeline::new()
17///     .then(ThresholdFilter { min: Some(5.0), max: None })
18///     .then(ThresholdFilter { min: None, max: Some(75.0) });
19///
20/// let output = pipeline.execute(&input_field)?;
21/// ```
22pub struct SweepPipeline {
23    steps: Vec<Box<dyn SweepProcessor>>,
24}
25
26impl SweepPipeline {
27    /// Create a new empty pipeline.
28    pub fn new() -> Self {
29        Self { steps: Vec::new() }
30    }
31
32    /// Append a processing step to the pipeline.
33    pub fn then(mut self, processor: impl SweepProcessor + 'static) -> Self {
34        self.steps.push(Box::new(processor));
35        self
36    }
37
38    /// Execute the pipeline, returning the final processed field.
39    ///
40    /// If the pipeline has no steps, the input is cloned and returned as-is.
41    pub fn execute(&self, input: &SweepField) -> Result<SweepField> {
42        if self.steps.is_empty() {
43            return Ok(input.clone());
44        }
45
46        let mut current = self.steps[0].process(input)?;
47        for step in &self.steps[1..] {
48            current = step.process(&current)?;
49        }
50        Ok(current)
51    }
52}
53
54impl Default for SweepPipeline {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60impl SweepProcessor for SweepPipeline {
61    fn name(&self) -> &str {
62        "Pipeline"
63    }
64
65    fn process(&self, input: &SweepField) -> Result<SweepField> {
66        self.execute(input)
67    }
68}