1use graph_sp::Graph;
10use std::collections::HashMap;
11
12fn main() {
13 println!("═══════════════════════════════════════════════════════════");
14 println!(" Output Access Demo");
15 println!(" How to retrieve execution results");
16 println!("═══════════════════════════════════════════════════════════\n");
17
18 demo_simple_output_access();
19 demo_branch_output_access();
20 demo_variant_output_access();
21 demo_multiple_outputs();
22}
23
24fn demo_simple_output_access() {
25 println!("─────────────────────────────────────────────────────────");
26 println!("Demo 1: Simple Pipeline Output Access");
27 println!("─────────────────────────────────────────────────────────\n");
28
29 let mut graph = Graph::new();
30
31 graph.add(
33 |_: &HashMap<String, String>, _| {
34 let mut result = HashMap::new();
35 result.insert("initial_value".to_string(), "100".to_string());
36 result
37 },
38 Some("Source"),
39 None,
40 Some(vec![("initial_value", "raw_data")]) );
42
43 graph.add(
45 |inputs: &HashMap<String, String>, _| {
46 let value = inputs.get("input").unwrap().parse::<i32>().unwrap();
47 let mut result = HashMap::new();
48 result.insert("processed".to_string(), (value * 2).to_string());
49 result
50 },
51 Some("Process"),
52 Some(vec![("raw_data", "input")]), Some(vec![("processed", "final_result")]) );
55
56 let dag = graph.build();
58 let context = dag.execute(false, None);
59
60 println!("📦 Execution Context (all variables):");
61 for (key, value) in &context {
62 println!(" {} = {}", key, value);
63 }
64
65 println!("\n🎯 Accessing specific outputs:");
66
67 if let Some(raw_data) = context.get("raw_data") {
69 println!(" raw_data: {}", raw_data);
70 }
71
72 if let Some(final_result) = context.get("final_result") {
73 println!(" final_result: {}", final_result);
74 }
75
76 if context.contains_key("final_result") {
78 println!("\n✅ Final result is available in context");
79 }
80
81 println!();
82}
83
84fn demo_branch_output_access() {
85 println!("─────────────────────────────────────────────────────────");
86 println!("Demo 2: Branch Output Access (Parallel Paths)");
87 println!("─────────────────────────────────────────────────────────\n");
88
89 let mut graph = Graph::new();
90
91 graph.add(
93 |_: &HashMap<String, String>, _| {
94 let mut result = HashMap::new();
95 result.insert("value".to_string(), "50".to_string());
96 result
97 },
98 Some("Source"),
99 None,
100 Some(vec![("value", "shared_data")])
101 );
102
103 let mut branch_a = Graph::new();
105 branch_a.add(
106 |inputs: &HashMap<String, String>, _| {
107 let val = inputs.get("data").unwrap();
108 let mut result = HashMap::new();
109 result.insert("stats_output".to_string(), format!("Stats of {}", val));
110 result
111 },
112 Some("Stats"),
113 Some(vec![("shared_data", "data")]),
114 Some(vec![("stats_output", "statistics")]) );
116
117 let mut branch_b = Graph::new();
119 branch_b.add(
120 |inputs: &HashMap<String, String>, _| {
121 let val = inputs.get("data").unwrap();
122 let mut result = HashMap::new();
123 result.insert("model_output".to_string(), format!("Model trained on {}", val));
124 result
125 },
126 Some("Train"),
127 Some(vec![("shared_data", "data")]),
128 Some(vec![("model_output", "model")]) );
130
131 let mut branch_c = Graph::new();
133 branch_c.add(
134 |inputs: &HashMap<String, String>, _| {
135 let val = inputs.get("data").unwrap();
136 let mut result = HashMap::new();
137 result.insert("viz_output".to_string(), format!("Plot of {}", val));
138 result
139 },
140 Some("Visualize"),
141 Some(vec![("shared_data", "data")]),
142 Some(vec![("viz_output", "visualization")]) );
144
145 graph.branch(branch_a);
147 graph.branch(branch_b);
148 graph.branch(branch_c);
149
150 let dag = graph.build();
152 let context = dag.execute(false, None);
153
154 println!("📦 All outputs from parallel branches:");
155
156 if let Some(stats) = context.get("statistics") {
158 println!(" Branch A (Statistics): {}", stats);
159 }
160
161 if let Some(model) = context.get("model") {
162 println!(" Branch B (Model): {}", model);
163 }
164
165 if let Some(viz) = context.get("visualization") {
166 println!(" Branch C (Visualization): {}", viz);
167 }
168
169 println!("\n🔍 All variables in context:");
170 for (key, value) in &context {
171 println!(" {} = {}", key, value);
172 }
173
174 println!();
175}
176
177fn demo_variant_output_access() {
178 println!("─────────────────────────────────────────────────────────");
179 println!("Demo 3: Variant Output Access (Parameter Sweep)");
180 println!("─────────────────────────────────────────────────────────\n");
181
182 let mut graph = Graph::new();
183
184 graph.add(
186 |_: &HashMap<String, String>, _| {
187 let mut result = HashMap::new();
188 result.insert("value".to_string(), "10.0".to_string());
189 result
190 },
191 Some("DataSource"),
192 None,
193 Some(vec![("value", "input_data")])
194 );
195
196 fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
199 move |inputs: &HashMap<String, String>, _| {
200 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
201 let mut result = HashMap::new();
202 result.insert("scaled".to_string(), (value * factor).to_string());
203 result
204 }
205 }
206
207 graph.variant(
208 make_scaler,
209 vec![2.0, 3.0, 5.0],
210 Some("Scale"),
211 Some(vec![("input_data", "data")]),
212 Some(vec![("scaled", "result")]) );
214
215 let dag = graph.build();
217 let context = dag.execute(false, None);
218
219 println!("📦 Variant outputs:");
220 println!(" Note: Variants with same output name overwrite each other");
221 println!(" The last variant (factor=5.0) writes to 'result'\n");
222
223 if let Some(result) = context.get("result") {
225 println!(" result = {} (from last variant: 10.0 * 5.0)", result);
226 }
227
228 println!("\n💡 Tip: To preserve all variant outputs, use unique output names:");
229 println!(" Option 1: Map each variant to a different broadcast variable");
230 println!(" Option 2: Collect results in merge node");
231 println!(" Option 3: Use variant_params or closure capture to distinguish");
232
233 let mut graph2 = Graph::new();
235
236 graph2.add(
237 |_: &HashMap<String, String>, _| {
238 let mut result = HashMap::new();
239 result.insert("value".to_string(), "10.0".to_string());
240 result
241 },
242 Some("DataSource"),
243 None,
244 Some(vec![("value", "input_data")])
245 );
246
247 fn make_scaler_unique(label: &str, factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> + '_ {
249 move |inputs: &HashMap<String, String>, _| {
250 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
251 let mut result = HashMap::new();
252 result.insert(label.to_string(), (value * factor).to_string());
253 result
254 }
255 }
256
257 graph2.variant(
258 |_x: &str| make_scaler_unique("scaled_2x", 2.0),
259 vec!["2x"],
260 Some("Scale2x"),
261 Some(vec![("input_data", "data")]),
262 Some(vec![("scaled_2x", "result_2x")])
263 );
264
265 graph2.variant(
266 |_x: &str| make_scaler_unique("scaled_3x", 3.0),
267 vec!["3x"],
268 Some("Scale3x"),
269 Some(vec![("input_data", "data")]),
270 Some(vec![("scaled_3x", "result_3x")])
271 );
272
273 let dag2 = graph2.build();
274 let context2 = dag2.execute(false, None);
275
276 println!("\n✅ Better approach - unique output names:");
277 if let Some(result_2x) = context2.get("result_2x") {
278 println!(" result_2x = {}", result_2x);
279 }
280 if let Some(result_3x) = context2.get("result_3x") {
281 println!(" result_3x = {}", result_3x);
282 }
283
284 println!();
285}
286
287fn demo_multiple_outputs() {
288 println!("─────────────────────────────────────────────────────────");
289 println!("Demo 4: Multiple Outputs from Single Node");
290 println!("─────────────────────────────────────────────────────────\n");
291
292 let mut graph = Graph::new();
293
294 graph.add(
296 |_: &HashMap<String, String>, _| {
297 let mut result = HashMap::new();
298 result.insert("mean".to_string(), "50.5".to_string());
299 result.insert("median".to_string(), "48.0".to_string());
300 result.insert("stddev".to_string(), "12.3".to_string());
301 result.insert("count".to_string(), "100".to_string());
302 result
303 },
304 Some("Statistics"),
305 None,
306 Some(vec![
307 ("mean", "stat_mean"),
308 ("median", "stat_median"),
309 ("stddev", "stat_stddev"),
310 ("count", "sample_count")
311 ])
312 );
313
314 let dag = graph.build();
316 let context = dag.execute(false, None);
317
318 println!("📊 Multiple outputs from single node:");
319
320 println!(" Mean: {}", context.get("stat_mean").unwrap());
322 println!(" Median: {}", context.get("stat_median").unwrap());
323 println!(" StdDev: {}", context.get("stat_stddev").unwrap());
324 println!(" Count: {}", context.get("sample_count").unwrap());
325
326 println!("\n📋 Complete execution context:");
327 for (key, value) in &context {
328 println!(" {} = {}", key, value);
329 }
330
331 println!("\n💡 Summary:");
332 println!(" ✓ dag.execute(false, None) returns HashMap<String, String>");
333 println!(" ✓ Keys are broadcast variable names (from output mappings)");
334 println!(" ✓ Use context.get(\"variable_name\") to access specific outputs");
335 println!(" ✓ All outputs accumulate in the context throughout execution");
336
337 println!();
338}