1use nexus_rt::{Handler, PipelineStart, ResMut, WorldBuilder};
21
22struct PriceCache {
23 latest: f64,
24 updates: u64,
25}
26
27impl PriceCache {
28 fn new() -> Self {
29 Self {
30 latest: 0.0,
31 updates: 0,
32 }
33 }
34}
35
36struct MarketTick {
37 symbol: &'static str,
38 price: f64,
39}
40
41#[derive(Debug)]
42enum ProcessError {
43 InvalidPrice,
44 UnknownSymbol,
45}
46
47fn validate(tick: MarketTick) -> Result<MarketTick, ProcessError> {
50 if tick.price <= 0.0 {
51 Err(ProcessError::InvalidPrice)
52 } else {
53 Ok(tick)
54 }
55}
56
57fn check_known(tick: MarketTick) -> Result<MarketTick, ProcessError> {
58 if tick.symbol == "BTC" || tick.symbol == "ETH" {
59 Ok(tick)
60 } else {
61 Err(ProcessError::UnknownSymbol)
62 }
63}
64
65fn count_error(mut errors: ResMut<u64>, err: ProcessError) {
66 println!(" [catch] {err:?}");
67 *errors += 1;
68}
69
70fn store_price(mut cache: ResMut<PriceCache>, tick: MarketTick) {
71 println!(" [ok] {} @ {:.2}", tick.symbol, tick.price);
72 cache.latest = tick.price;
73 cache.updates += 1;
74}
75
76fn accumulate(mut total: ResMut<u64>, x: u32) {
77 *total += u64::from(x);
78}
79
80fn main() {
81 println!("=== Bare Value Pipeline ===\n");
84
85 let mut world = WorldBuilder::new().build();
86 let r = world.registry_mut();
87
88 let mut bare_pipeline = PipelineStart::<u32>::new()
89 .stage(|x: u32| x * 2, r)
90 .stage(|x: u32| x + 1, r);
91
92 println!(" 5 → {}", bare_pipeline.run(&mut world, 5));
93 println!(" 10 → {}", bare_pipeline.run(&mut world, 10));
94
95 println!("\n=== Option Pipeline ===\n");
98
99 let mut wb = WorldBuilder::new();
100 wb.register(PriceCache::new());
101 let mut world = wb.build();
102 let r = world.registry_mut();
103
104 let mut option_pipeline = PipelineStart::<MarketTick>::new()
105 .stage(
106 |tick: MarketTick| -> Option<MarketTick> {
107 if tick.price > 0.0 { Some(tick) } else { None }
108 },
109 r,
110 )
111 .filter(|_w, tick| tick.symbol == "BTC")
112 .inspect(|_w, tick| {
113 println!(" [inspect] {} @ {:.2}", tick.symbol, tick.price);
114 })
115 .map(store_price, r);
116
117 let ticks = [
118 MarketTick {
119 symbol: "BTC",
120 price: 50_000.0,
121 },
122 MarketTick {
123 symbol: "ETH",
124 price: 3_000.0,
125 }, MarketTick {
127 symbol: "BTC",
128 price: -1.0,
129 }, MarketTick {
131 symbol: "BTC",
132 price: 51_000.0,
133 },
134 ];
135
136 for tick in ticks {
137 option_pipeline.run(&mut world, tick);
138 }
139
140 let cache = world.resource::<PriceCache>();
141 println!(
142 "\n PriceCache: latest={:.2}, updates={}",
143 cache.latest, cache.updates
144 );
145 assert_eq!(cache.updates, 2);
146 assert_eq!(cache.latest, 51_000.0);
147
148 println!("\n=== Result Pipeline with catch ===\n");
151
152 let mut wb = WorldBuilder::new();
153 wb.register(PriceCache::new());
154 wb.register::<u64>(0); let mut world = wb.build();
156 let r = world.registry_mut();
157
158 let mut result_pipeline = PipelineStart::<MarketTick>::new()
159 .stage(validate, r)
160 .and_then(check_known, r)
161 .catch(count_error, r)
162 .map(store_price, r);
163
164 let ticks = [
165 MarketTick {
166 symbol: "BTC",
167 price: 52_000.0,
168 },
169 MarketTick {
170 symbol: "XYZ",
171 price: 100.0,
172 }, MarketTick {
174 symbol: "ETH",
175 price: -5.0,
176 }, MarketTick {
178 symbol: "ETH",
179 price: 3_500.0,
180 },
181 ];
182
183 for tick in ticks {
184 result_pipeline.run(&mut world, tick);
185 }
186
187 let errors = *world.resource::<u64>();
188 println!("\n Errors: {errors}");
189 assert_eq!(errors, 2);
190
191 println!("\n=== Pipeline as Handler ===\n");
194
195 let mut wb = WorldBuilder::new();
196 wb.register::<u64>(0);
197 let mut world = wb.build();
198 let r = world.registry_mut();
199
200 let mut pipeline = PipelineStart::<u32>::new().stage(accumulate, r).build();
201
202 pipeline.run(&mut world, 10);
203 pipeline.run(&mut world, 20);
204 pipeline.run(&mut world, 30);
205
206 let total = *world.resource::<u64>();
207 println!(" Total: {total}");
208 assert_eq!(total, 60);
209
210 println!("\nDone.");
211}