Skip to main content

pipeline/
pipeline.rs

1//! Pre-resolved pipeline composition using SystemParam stages.
2//!
3//! Pipeline stages are named functions whose SystemParam dependencies are
4//! resolved at build time. Arity-0 stages (no SystemParams) also accept
5//! closures.
6//!
7//! IntoStage-based methods (`.stage()`, `.map()`, `.and_then()`, `.catch()`)
8//! resolve params from the registry. Closure-based methods (`.filter()`,
9//! `.inspect()`, `.on_none()`, etc.) take `&mut World` for cold-path use.
10//!
11//! Two dispatch modes:
12//! - `run()` — direct call, no boxing, works with borrowed inputs
13//! - `build()` — box into `Pipeline<In>`, implements `Handler<In>`
14//!
15//! Run with:
16//! ```bash
17//! cargo run -p nexus-rt --example pipeline
18//! ```
19
20use nexus_rt::{Handler, PipelineStart, ResMut, WorldBuilder};
21
22struct PriceCache {
23    latest: f64,
24    updates: u64,
25}
26
27impl PriceCache {
28    fn new() -> Self {
29        Self {
30            latest: 0.0,
31            updates: 0,
32        }
33    }
34}
35
36struct MarketTick {
37    symbol: &'static str,
38    price: f64,
39}
40
41#[derive(Debug)]
42enum ProcessError {
43    InvalidPrice,
44    UnknownSymbol,
45}
46
47// --- Stage functions: SystemParams first, stage input last ---
48
49fn validate(tick: MarketTick) -> Result<MarketTick, ProcessError> {
50    if tick.price <= 0.0 {
51        Err(ProcessError::InvalidPrice)
52    } else {
53        Ok(tick)
54    }
55}
56
57fn check_known(tick: MarketTick) -> Result<MarketTick, ProcessError> {
58    if tick.symbol == "BTC" || tick.symbol == "ETH" {
59        Ok(tick)
60    } else {
61        Err(ProcessError::UnknownSymbol)
62    }
63}
64
65fn count_error(mut errors: ResMut<u64>, err: ProcessError) {
66    println!("  [catch] {err:?}");
67    *errors += 1;
68}
69
70fn store_price(mut cache: ResMut<PriceCache>, tick: MarketTick) {
71    println!("  [ok] {} @ {:.2}", tick.symbol, tick.price);
72    cache.latest = tick.price;
73    cache.updates += 1;
74}
75
76fn accumulate(mut total: ResMut<u64>, x: u32) {
77    *total += u64::from(x);
78}
79
80fn main() {
81    // --- Bare value pipeline: arity-0 closure stages ---
82
83    println!("=== Bare Value Pipeline ===\n");
84
85    let mut world = WorldBuilder::new().build();
86    let r = world.registry_mut();
87
88    let mut bare_pipeline = PipelineStart::<u32>::new()
89        .stage(|x: u32| x * 2, r)
90        .stage(|x: u32| x + 1, r);
91
92    println!("  5 → {}", bare_pipeline.run(&mut world, 5));
93    println!("  10 → {}", bare_pipeline.run(&mut world, 10));
94
95    // --- Option pipeline: filter + inspect (cold path), map (hot path) ---
96
97    println!("\n=== Option Pipeline ===\n");
98
99    let mut wb = WorldBuilder::new();
100    wb.register(PriceCache::new());
101    let mut world = wb.build();
102    let r = world.registry_mut();
103
104    let mut option_pipeline = PipelineStart::<MarketTick>::new()
105        .stage(
106            |tick: MarketTick| -> Option<MarketTick> {
107                if tick.price > 0.0 { Some(tick) } else { None }
108            },
109            r,
110        )
111        .filter(|_w, tick| tick.symbol == "BTC")
112        .inspect(|_w, tick| {
113            println!("  [inspect] {} @ {:.2}", tick.symbol, tick.price);
114        })
115        .map(store_price, r);
116
117    let ticks = [
118        MarketTick {
119            symbol: "BTC",
120            price: 50_000.0,
121        },
122        MarketTick {
123            symbol: "ETH",
124            price: 3_000.0,
125        }, // filtered: not BTC
126        MarketTick {
127            symbol: "BTC",
128            price: -1.0,
129        }, // filtered: negative
130        MarketTick {
131            symbol: "BTC",
132            price: 51_000.0,
133        },
134    ];
135
136    for tick in ticks {
137        option_pipeline.run(&mut world, tick);
138    }
139
140    let cache = world.resource::<PriceCache>();
141    println!(
142        "\n  PriceCache: latest={:.2}, updates={}",
143        cache.latest, cache.updates
144    );
145    assert_eq!(cache.updates, 2);
146    assert_eq!(cache.latest, 51_000.0);
147
148    // --- Result pipeline: validate → check → catch → store ---
149
150    println!("\n=== Result Pipeline with catch ===\n");
151
152    let mut wb = WorldBuilder::new();
153    wb.register(PriceCache::new());
154    wb.register::<u64>(0); // error counter
155    let mut world = wb.build();
156    let r = world.registry_mut();
157
158    let mut result_pipeline = PipelineStart::<MarketTick>::new()
159        .stage(validate, r)
160        .and_then(check_known, r)
161        .catch(count_error, r)
162        .map(store_price, r);
163
164    let ticks = [
165        MarketTick {
166            symbol: "BTC",
167            price: 52_000.0,
168        },
169        MarketTick {
170            symbol: "XYZ",
171            price: 100.0,
172        }, // unknown symbol → catch
173        MarketTick {
174            symbol: "ETH",
175            price: -5.0,
176        }, // invalid price → catch
177        MarketTick {
178            symbol: "ETH",
179            price: 3_500.0,
180        },
181    ];
182
183    for tick in ticks {
184        result_pipeline.run(&mut world, tick);
185    }
186
187    let errors = *world.resource::<u64>();
188    println!("\n  Errors: {errors}");
189    assert_eq!(errors, 2);
190
191    // --- Build into Handler ---
192
193    println!("\n=== Pipeline as Handler ===\n");
194
195    let mut wb = WorldBuilder::new();
196    wb.register::<u64>(0);
197    let mut world = wb.build();
198    let r = world.registry_mut();
199
200    let mut pipeline = PipelineStart::<u32>::new().stage(accumulate, r).build();
201
202    pipeline.run(&mut world, 10);
203    pipeline.run(&mut world, 20);
204    pipeline.run(&mut world, 30);
205
206    let total = *world.resource::<u64>();
207    println!("  Total: {total}");
208    assert_eq!(total, 60);
209
210    println!("\nDone.");
211}