nexrad_process/pipeline.rs
1use crate::result::Result;
2use crate::SweepProcessor;
3use nexrad_model::data::SweepField;
4
5/// A composable pipeline of sweep processing steps.
6///
7/// Steps are executed in order, with the output of each step becoming the input
8/// to the next. The pipeline itself implements [`SweepProcessor`], so pipelines
9/// can be nested.
10///
11/// # Example
12///
13/// ```ignore
14/// use nexrad_process::{SweepPipeline, filter::ThresholdFilter};
15///
16/// let pipeline = SweepPipeline::new()
17/// .then(ThresholdFilter { min: Some(5.0), max: None })
18/// .then(ThresholdFilter { min: None, max: Some(75.0) });
19///
20/// let output = pipeline.execute(&input_field)?;
21/// ```
22pub struct SweepPipeline {
23 steps: Vec<Box<dyn SweepProcessor>>,
24}
25
26impl SweepPipeline {
27 /// Create a new empty pipeline.
28 pub fn new() -> Self {
29 Self { steps: Vec::new() }
30 }
31
32 /// Append a processing step to the pipeline.
33 pub fn then(mut self, processor: impl SweepProcessor + 'static) -> Self {
34 self.steps.push(Box::new(processor));
35 self
36 }
37
38 /// Execute the pipeline, returning the final processed field.
39 ///
40 /// If the pipeline has no steps, the input is cloned and returned as-is.
41 pub fn execute(&self, input: &SweepField) -> Result<SweepField> {
42 if self.steps.is_empty() {
43 return Ok(input.clone());
44 }
45
46 let mut current = self.steps[0].process(input)?;
47 for step in &self.steps[1..] {
48 current = step.process(¤t)?;
49 }
50 Ok(current)
51 }
52}
53
54impl Default for SweepPipeline {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl SweepProcessor for SweepPipeline {
61 fn name(&self) -> &str {
62 "Pipeline"
63 }
64
65 fn process(&self, input: &SweepField) -> Result<SweepField> {
66 self.execute(input)
67 }
68}