pub struct BoxStatefulConsumer<T> { /* private fields */ }Expand description
BoxStatefulConsumer struct
Consumer implementation based on Box<dyn FnMut(&T)> for single ownership
scenarios. When sharing is not needed, this is the simplest and most
efficient consumer type.
§Features
- Single Ownership: Not cloneable, transfers ownership when used
- Zero Overhead: No reference counting or lock overhead
- Mutable State: Can modify captured environment through
FnMut - Builder Pattern: Method chaining naturally consumes
self
§Use Cases
Choose BoxStatefulConsumer when:
- Consumer is used only once or in a linear flow
- Building pipelines where ownership flows naturally
- No need to share consumers across contexts
- Performance critical and cannot accept sharing overhead
§Performance
BoxStatefulConsumer has the best performance among the three consumer types:
- No reference counting overhead
- No lock acquisition or runtime borrowing checks
- Direct function calls through vtable
- Minimal memory footprint (single pointer)
§Examples
use qubit_function::{Consumer, StatefulConsumer, BoxStatefulConsumer};
use std::sync::{Arc, Mutex};
let log = Arc::new(Mutex::new(Vec::new()));
let l = log.clone();
let mut consumer = BoxStatefulConsumer::new(move |x: &i32| {
l.lock().unwrap().push(*x);
});
consumer.accept(&5);
assert_eq!(*log.lock().unwrap(), vec![5]);Implementations§
Source§impl<T> BoxStatefulConsumer<T>
impl<T> BoxStatefulConsumer<T>
Sourcepub fn new<F>(f: F) -> Self
pub fn new<F>(f: F) -> Self
Creates a new consumer.
Wraps the provided closure in the appropriate smart pointer type for this consumer implementation.
Examples found in repository?
34fn main() {
35 println!("=== Consumer Examples ===\n");
36 println!("Note: Consumer only reads values, does not modify the original value");
37 println!("If you need to modify values, please refer to mutator_demo.rs\n");
38
39 // ========================================================================
40 // Example 1: BoxConsumer basic usage
41 // ========================================================================
42 println!("Example 1: BoxConsumer basic usage");
43 println!("{}", "-".repeat(50));
44
45 let consumer = BoxConsumer::new(|x: &i32| {
46 println!("Read and calculate: {} * 2 = {}", x, x * 2);
47 });
48 let value = 5;
49 println!("Value: {}", value);
50 consumer.accept(&value);
51 println!("Value remains: {} (not modified)\n", value);
52
53 // ========================================================================
54 // Example 2: BoxConsumer method chaining
55 // ========================================================================
56 println!("Example 2: BoxConsumer method chaining");
57 println!("{}", "-".repeat(50));
58
59 let results = Arc::new(Mutex::new(Vec::new()));
60 let r1 = results.clone();
61 let r2 = results.clone();
62 let r3 = results.clone();
63
64 let chained = BoxConsumer::new(move |x: &i32| {
65 r1.lock().unwrap().push(*x * 2);
66 })
67 .and_then(move |x: &i32| {
68 r2.lock().unwrap().push(*x + 10);
69 })
70 .and_then(move |x: &i32| {
71 r3.lock().unwrap().push(*x);
72 println!("Processing value: {}", x);
73 });
74
75 let value = 5;
76 println!("Initial value: {}", value);
77 chained.accept(&value);
78 println!("Collected results: {:?}", *results.lock().unwrap());
79 println!("Original value: {} (not modified)\n", value);
80
81 // ========================================================================
82 // Example 3: Closure extension methods
83 // ========================================================================
84 println!("Example 3: Direct use of extension methods on closures");
85 println!("{}", "-".repeat(50));
86
87 let result = Arc::new(Mutex::new(0));
88 let r1 = result.clone();
89 let r2 = result.clone();
90
91 let closure_chain = (move |x: &i32| {
92 *r1.lock().unwrap() = *x * 2;
93 })
94 .and_then(move |_x: &i32| {
95 *r2.lock().unwrap() += 10;
96 });
97
98 let value = 5;
99 println!("Initial value: {}", value);
100 closure_chain.accept(&value);
101 println!("Calculation result: {}", *result.lock().unwrap());
102 println!("Original value: {} (not modified)\n", value);
103
104 // ========================================================================
105 // Example 4: BoxConsumer factory methods
106 // ========================================================================
107 println!("Example 4: BoxConsumer factory methods");
108 println!("{}", "-".repeat(50));
109
110 // noop
111 println!("noop - does nothing:");
112 let noop = BoxConsumer::<i32>::noop();
113 let value = 42;
114 noop.accept(&value);
115 println!("Value: {}\n", value);
116
117 // print
118 print!("print - prints value: ");
119 let print = BoxConsumer::new(|x: &i32| println!("{}", x));
120 let value = 42;
121 print.accept(&value);
122 println!();
123
124 // print with prefix
125 let print_with = BoxConsumer::new(|x: &i32| println!("Value is: {}", x));
126 let value = 42;
127 print_with.accept(&value);
128 println!();
129
130 // ========================================================================
131 // Example 5: Conditional Consumer
132 // ========================================================================
133 println!("Example 5: Conditional Consumer");
134 println!("{}", "-".repeat(50));
135
136 // when
137 let mut check_positive =
138 BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x)).when(|x: &i32| *x > 0);
139
140 let positive = 5;
141 let negative = -5;
142 print!("Check {}: ", positive);
143 check_positive.accept(&positive);
144 print!("Check {}: ", negative);
145 check_positive.accept(&negative);
146 println!("(negative numbers not printed)\n");
147
148 // when().or_else()
149 let mut categorize = BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x))
150 .when(|x: &i32| *x > 0)
151 .or_else(|x: &i32| println!("Non-positive: {}", x));
152
153 let positive = 10;
154 let negative = -10;
155 categorize.accept(&positive);
156 categorize.accept(&negative);
157 println!();
158
159 // ========================================================================
160 // Example 6: ArcConsumer - multi-threaded sharing
161 // ========================================================================
162 println!("Example 6: ArcConsumer - multi-threaded sharing");
163 println!("{}", "-".repeat(50));
164
165 let shared = ArcConsumer::new(|x: &i32| println!("Processing value: {}", x * 2));
166
167 // Clone for another thread
168 let shared_clone = shared.clone();
169 let handle = thread::spawn(move || {
170 let value = 5;
171 let consumer = shared_clone;
172 consumer.accept(&value);
173 value
174 });
175
176 // Use in main thread
177 let value = 3;
178 let consumer = shared;
179 consumer.accept(&value);
180
181 let thread_result = handle.join().unwrap();
182 println!("Thread result: {}\n", thread_result);
183
184 // ========================================================================
185 // Example 7: ArcConsumer composition (does not consume original consumer)
186 // ========================================================================
187 println!("Example 7: ArcConsumer composition (borrowing &self)");
188 println!("{}", "-".repeat(50));
189
190 let double = ArcConsumer::new(|x: &i32| println!("double: {}", x * 2));
191 let add_ten = ArcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
192
193 // Composition does not consume original consumer
194 let pipeline1 = double.and_then(add_ten.clone());
195 let pipeline2 = add_ten.and_then(double.clone());
196
197 let value1 = 5;
198 let p1 = pipeline1;
199 print!("pipeline1 processing 5: ");
200 p1.accept(&value1);
201
202 let value2 = 5;
203 let p2 = pipeline2;
204 print!("pipeline2 processing 5: ");
205 p2.accept(&value2);
206
207 // double and add_ten are still available
208 let value3 = 10;
209 let d = double;
210 print!("Original double still available, processing 10: ");
211 d.accept(&value3);
212 println!();
213
214 // ========================================================================
215 // Example 8: RcConsumer - single-threaded sharing
216 // ========================================================================
217 println!("Example 8: RcConsumer - single-threaded sharing");
218 println!("{}", "-".repeat(50));
219
220 let rc_consumer = RcConsumer::new(|x: &i32| println!("Processing: {}", x * 2));
221
222 // Clone multiple copies
223 let clone1 = rc_consumer.clone();
224 let clone2 = rc_consumer.clone();
225
226 let value1 = 5;
227 let c1 = clone1;
228 print!("clone1 processing 5: ");
229 c1.accept(&value1);
230
231 let value2 = 3;
232 let c2 = clone2;
233 print!("clone2 processing 3: ");
234 c2.accept(&value2);
235
236 let value3 = 7;
237 let c3 = rc_consumer;
238 print!("Original processing 7: ");
239 c3.accept(&value3);
240 println!();
241
242 // ========================================================================
243 // Example 9: RcConsumer composition (borrowing &self)
244 // ========================================================================
245 println!("Example 9: RcConsumer composition (borrowing &self)");
246 println!("{}", "-".repeat(50));
247
248 let double = RcConsumer::new(|x: &i32| println!("double: {}", x * 2));
249 let add_ten = RcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
250
251 let pipeline1 = double.and_then(add_ten.clone());
252 let pipeline2 = add_ten.and_then(double.clone());
253
254 let value1 = 5;
255 let p1 = pipeline1;
256 print!("pipeline1 processing 5: ");
257 p1.accept(&value1);
258
259 let value2 = 5;
260 let p2 = pipeline2;
261 print!("pipeline2 processing 5: ");
262 p2.accept(&value2);
263 println!();
264
265 // ========================================================================
266 // Example 10: Unified Consumer trait
267 // ========================================================================
268 println!("Example 10: Unified Consumer trait");
269 println!("{}", "-".repeat(50));
270
271 fn log_all<C: Consumer<i32>>(consumer: &mut C, values: &[i32]) {
272 for value in values.iter() {
273 consumer.accept(value);
274 }
275 }
276
277 let values = vec![1, 2, 3, 4, 5];
278
279 let mut box_con = BoxConsumer::new(|x: &i32| print!("{} ", x * 2));
280 print!("BoxConsumer processing {:?}: ", values);
281 log_all(&mut box_con, &values);
282 println!();
283
284 let mut arc_con = ArcConsumer::new(|x: &i32| print!("{} ", x * 2));
285 print!("ArcConsumer processing {:?}: ", values);
286 log_all(&mut arc_con, &values);
287 println!();
288
289 let mut rc_con = RcConsumer::new(|x: &i32| print!("{} ", x * 2));
290 print!("RcConsumer processing {:?}: ", values);
291 log_all(&mut rc_con, &values);
292 println!();
293
294 let mut closure = |x: &i32| print!("{} ", x * 2);
295 print!("Closure processing {:?}: ", values);
296 log_all(&mut closure, &values);
297 println!("\n");
298
299 // ========================================================================
300 // Example 11: Data validation and logging
301 // ========================================================================
302 println!("Example 11: Data validation and logging");
303 println!("{}", "-".repeat(50));
304
305 let validator = BoxConsumer::new(|x: &i32| {
306 let status = if *x >= 0 && *x <= 100 {
307 "valid"
308 } else {
309 "out of range"
310 };
311 println!("Validate {}: {}", x, status);
312 });
313
314 let logger = BoxConsumer::new(|x: &i32| {
315 println!("Log to file: value={}, square={}", x, x * x);
316 });
317
318 let pipeline = validator.and_then(logger);
319
320 let test_values = vec![-50, 30, 200];
321 for value in test_values {
322 pipeline.accept(&value);
323 }
324 println!();
325
326 // ========================================================================
327 // Example 12: String analysis
328 // ========================================================================
329 println!("Example 12: String analysis");
330 println!("{}", "-".repeat(50));
331
332 let string_analyzer = BoxConsumer::new(|s: &String| {
333 println!("Length: {}", s.len());
334 })
335 .and_then(|s: &String| {
336 println!("Lowercase: {}", s.to_lowercase());
337 })
338 .and_then(|s: &String| {
339 println!("Uppercase: {}", s.to_uppercase());
340 })
341 .and_then(|s: &String| {
342 let word_count = s.split_whitespace().count();
343 println!("Word count: {}", word_count);
344 });
345
346 let text = String::from("Hello World");
347 println!("Analyzing text: \"{}\"", text);
348 string_analyzer.accept(&text);
349 println!("Original text: \"{}\" (not modified)\n", text);
350
351 // ========================================================================
352 // Example 13: Type conversion
353 // ========================================================================
354 println!("Example 13: Type conversion");
355 println!("{}", "-".repeat(50));
356
357 // Closure -> BoxConsumer
358 let closure = |x: &i32| print!("Processing: {} ", x * 2);
359 let box_con = Consumer::into_box(closure);
360 let value = 5;
361 print!("Closure -> BoxConsumer: ");
362 box_con.accept(&value);
363 println!();
364
365 // Closure -> RcConsumer
366 let closure = |x: &i32| print!("Processing: {} ", x * 2);
367 let rc_con = Consumer::into_rc(closure);
368 let value = 5;
369 print!("Closure -> RcConsumer: ");
370 rc_con.accept(&value);
371 println!();
372
373 // Closure -> ArcConsumer
374 let closure = |x: &i32| print!("Processing: {} ", x * 2);
375 let arc_con = Consumer::into_arc(closure);
376 let value = 5;
377 print!("Closure -> ArcConsumer: ");
378 arc_con.accept(&value);
379 println!();
380
381 // BoxConsumer -> RcConsumer
382 let box_con = BoxConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
383 let rc_con = box_con.into_rc();
384 let value = 5;
385 print!("BoxConsumer -> RcConsumer: ");
386 rc_con.accept(&value);
387 println!();
388
389 // RcConsumer -> BoxConsumer
390 let rc_con = RcConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
391 let box_con = rc_con.into_box();
392 let value = 5;
393 print!("RcConsumer -> BoxConsumer: ");
394 box_con.accept(&value);
395 println!("\n");
396
397 // ========================================================================
398 // Example 14: Custom types
399 // ========================================================================
400 println!("Example 14: Custom types");
401 println!("{}", "-".repeat(50));
402
403 #[derive(Debug, Clone)]
404 struct Point {
405 x: i32,
406 y: i32,
407 }
408
409 let analyzer = BoxConsumer::new(|p: &Point| {
410 println!("Point coordinates: ({}, {})", p.x, p.y);
411 })
412 .and_then(|p: &Point| {
413 let distance = ((p.x * p.x + p.y * p.y) as f64).sqrt();
414 println!("Distance from origin: {:.2}", distance);
415 })
416 .and_then(|p: &Point| {
417 let quadrant = match (p.x >= 0, p.y >= 0) {
418 (true, true) => "First quadrant",
419 (false, true) => "Second quadrant",
420 (false, false) => "Third quadrant",
421 (true, false) => "Fourth quadrant",
422 };
423 println!("Quadrant: {}", quadrant);
424 });
425
426 let point = Point { x: 3, y: 4 };
427 println!("Analyzing point: {:?}", point);
428 analyzer.accept(&point);
429 println!("Original point: {:?} (not modified)\n", point);
430
431 // ========================================================================
432 // Example 15: Data collection and statistics
433 // ========================================================================
434 println!("Example 15: Data collection and statistics");
435 println!("{}", "-".repeat(50));
436
437 let sum = Arc::new(Mutex::new(0));
438 let count = Arc::new(Mutex::new(0));
439 let sum_clone = sum.clone();
440 let count_clone = count.clone();
441
442 let collector = BoxConsumer::new(move |x: &i32| {
443 *sum_clone.lock().unwrap() += *x;
444 *count_clone.lock().unwrap() += 1;
445 });
446
447 let numbers = vec![10, 20, 30, 40, 50];
448 println!("Numbers: {:?}", numbers);
449 for num in &numbers {
450 collector.accept(num);
451 }
452
453 let total = *sum.lock().unwrap();
454 let cnt = *count.lock().unwrap();
455 println!("Sum: {}", total);
456 println!("Count: {}", cnt);
457 println!("Average: {:.2}\n", total as f64 / cnt as f64);
458
459 println!("=== All examples completed ===");
460 println!("\nTip: For value modification functionality, please refer to mutator_demo.rs");
461}Sourcepub fn new_with_name<F>(name: &str, f: F) -> Self
pub fn new_with_name<F>(name: &str, f: F) -> Self
Creates a new named consumer.
Wraps the provided closure and assigns it a name, which is useful for debugging and logging purposes.
Sourcepub fn new_with_optional_name<F>(f: F, name: Option<String>) -> Self
pub fn new_with_optional_name<F>(f: F, name: Option<String>) -> Self
Creates a new named consumer with an optional name.
Wraps the provided closure and assigns it an optional name.
Sourcepub fn clear_name(&mut self)
pub fn clear_name(&mut self)
Clears the name of this consumer.
Sourcepub fn noop() -> Self
pub fn noop() -> Self
Creates a no-operation consumer.
Creates a consumer that does nothing when called. Useful for default values or placeholder implementations.
§Returns
Returns a new consumer instance that performs no operation.
Sourcepub fn when<P>(self, predicate: P) -> BoxConditionalStatefulConsumer<T>where
T: 'static,
P: Predicate<T> + 'static,
pub fn when<P>(self, predicate: P) -> BoxConditionalStatefulConsumer<T>where
T: 'static,
P: Predicate<T> + 'static,
Creates a conditional consumer that executes based on predicate result.
§Parameters
predicate- The predicate to determine whether to execute the consumption operation
§Returns
Returns a conditional consumer that only executes when the
predicate returns true.
§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use qubit_function::consumers::*;
let counter = Arc::new(AtomicI32::new(0));
let consumer = BoxConsumer::new({
let counter = Arc::clone(&counter);
move |value: &i32| {
counter.fetch_add(*value, Ordering::SeqCst);
}
});
let conditional = consumer.when(|value: &i32| *value > 0);
conditional.accept(&1); // counter = 1
conditional.accept(&-1); // not executed, counter remains 1Examples found in repository?
34fn main() {
35 println!("=== Consumer Examples ===\n");
36 println!("Note: Consumer only reads values, does not modify the original value");
37 println!("If you need to modify values, please refer to mutator_demo.rs\n");
38
39 // ========================================================================
40 // Example 1: BoxConsumer basic usage
41 // ========================================================================
42 println!("Example 1: BoxConsumer basic usage");
43 println!("{}", "-".repeat(50));
44
45 let consumer = BoxConsumer::new(|x: &i32| {
46 println!("Read and calculate: {} * 2 = {}", x, x * 2);
47 });
48 let value = 5;
49 println!("Value: {}", value);
50 consumer.accept(&value);
51 println!("Value remains: {} (not modified)\n", value);
52
53 // ========================================================================
54 // Example 2: BoxConsumer method chaining
55 // ========================================================================
56 println!("Example 2: BoxConsumer method chaining");
57 println!("{}", "-".repeat(50));
58
59 let results = Arc::new(Mutex::new(Vec::new()));
60 let r1 = results.clone();
61 let r2 = results.clone();
62 let r3 = results.clone();
63
64 let chained = BoxConsumer::new(move |x: &i32| {
65 r1.lock().unwrap().push(*x * 2);
66 })
67 .and_then(move |x: &i32| {
68 r2.lock().unwrap().push(*x + 10);
69 })
70 .and_then(move |x: &i32| {
71 r3.lock().unwrap().push(*x);
72 println!("Processing value: {}", x);
73 });
74
75 let value = 5;
76 println!("Initial value: {}", value);
77 chained.accept(&value);
78 println!("Collected results: {:?}", *results.lock().unwrap());
79 println!("Original value: {} (not modified)\n", value);
80
81 // ========================================================================
82 // Example 3: Closure extension methods
83 // ========================================================================
84 println!("Example 3: Direct use of extension methods on closures");
85 println!("{}", "-".repeat(50));
86
87 let result = Arc::new(Mutex::new(0));
88 let r1 = result.clone();
89 let r2 = result.clone();
90
91 let closure_chain = (move |x: &i32| {
92 *r1.lock().unwrap() = *x * 2;
93 })
94 .and_then(move |_x: &i32| {
95 *r2.lock().unwrap() += 10;
96 });
97
98 let value = 5;
99 println!("Initial value: {}", value);
100 closure_chain.accept(&value);
101 println!("Calculation result: {}", *result.lock().unwrap());
102 println!("Original value: {} (not modified)\n", value);
103
104 // ========================================================================
105 // Example 4: BoxConsumer factory methods
106 // ========================================================================
107 println!("Example 4: BoxConsumer factory methods");
108 println!("{}", "-".repeat(50));
109
110 // noop
111 println!("noop - does nothing:");
112 let noop = BoxConsumer::<i32>::noop();
113 let value = 42;
114 noop.accept(&value);
115 println!("Value: {}\n", value);
116
117 // print
118 print!("print - prints value: ");
119 let print = BoxConsumer::new(|x: &i32| println!("{}", x));
120 let value = 42;
121 print.accept(&value);
122 println!();
123
124 // print with prefix
125 let print_with = BoxConsumer::new(|x: &i32| println!("Value is: {}", x));
126 let value = 42;
127 print_with.accept(&value);
128 println!();
129
130 // ========================================================================
131 // Example 5: Conditional Consumer
132 // ========================================================================
133 println!("Example 5: Conditional Consumer");
134 println!("{}", "-".repeat(50));
135
136 // when
137 let mut check_positive =
138 BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x)).when(|x: &i32| *x > 0);
139
140 let positive = 5;
141 let negative = -5;
142 print!("Check {}: ", positive);
143 check_positive.accept(&positive);
144 print!("Check {}: ", negative);
145 check_positive.accept(&negative);
146 println!("(negative numbers not printed)\n");
147
148 // when().or_else()
149 let mut categorize = BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x))
150 .when(|x: &i32| *x > 0)
151 .or_else(|x: &i32| println!("Non-positive: {}", x));
152
153 let positive = 10;
154 let negative = -10;
155 categorize.accept(&positive);
156 categorize.accept(&negative);
157 println!();
158
159 // ========================================================================
160 // Example 6: ArcConsumer - multi-threaded sharing
161 // ========================================================================
162 println!("Example 6: ArcConsumer - multi-threaded sharing");
163 println!("{}", "-".repeat(50));
164
165 let shared = ArcConsumer::new(|x: &i32| println!("Processing value: {}", x * 2));
166
167 // Clone for another thread
168 let shared_clone = shared.clone();
169 let handle = thread::spawn(move || {
170 let value = 5;
171 let consumer = shared_clone;
172 consumer.accept(&value);
173 value
174 });
175
176 // Use in main thread
177 let value = 3;
178 let consumer = shared;
179 consumer.accept(&value);
180
181 let thread_result = handle.join().unwrap();
182 println!("Thread result: {}\n", thread_result);
183
184 // ========================================================================
185 // Example 7: ArcConsumer composition (does not consume original consumer)
186 // ========================================================================
187 println!("Example 7: ArcConsumer composition (borrowing &self)");
188 println!("{}", "-".repeat(50));
189
190 let double = ArcConsumer::new(|x: &i32| println!("double: {}", x * 2));
191 let add_ten = ArcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
192
193 // Composition does not consume original consumer
194 let pipeline1 = double.and_then(add_ten.clone());
195 let pipeline2 = add_ten.and_then(double.clone());
196
197 let value1 = 5;
198 let p1 = pipeline1;
199 print!("pipeline1 processing 5: ");
200 p1.accept(&value1);
201
202 let value2 = 5;
203 let p2 = pipeline2;
204 print!("pipeline2 processing 5: ");
205 p2.accept(&value2);
206
207 // double and add_ten are still available
208 let value3 = 10;
209 let d = double;
210 print!("Original double still available, processing 10: ");
211 d.accept(&value3);
212 println!();
213
214 // ========================================================================
215 // Example 8: RcConsumer - single-threaded sharing
216 // ========================================================================
217 println!("Example 8: RcConsumer - single-threaded sharing");
218 println!("{}", "-".repeat(50));
219
220 let rc_consumer = RcConsumer::new(|x: &i32| println!("Processing: {}", x * 2));
221
222 // Clone multiple copies
223 let clone1 = rc_consumer.clone();
224 let clone2 = rc_consumer.clone();
225
226 let value1 = 5;
227 let c1 = clone1;
228 print!("clone1 processing 5: ");
229 c1.accept(&value1);
230
231 let value2 = 3;
232 let c2 = clone2;
233 print!("clone2 processing 3: ");
234 c2.accept(&value2);
235
236 let value3 = 7;
237 let c3 = rc_consumer;
238 print!("Original processing 7: ");
239 c3.accept(&value3);
240 println!();
241
242 // ========================================================================
243 // Example 9: RcConsumer composition (borrowing &self)
244 // ========================================================================
245 println!("Example 9: RcConsumer composition (borrowing &self)");
246 println!("{}", "-".repeat(50));
247
248 let double = RcConsumer::new(|x: &i32| println!("double: {}", x * 2));
249 let add_ten = RcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
250
251 let pipeline1 = double.and_then(add_ten.clone());
252 let pipeline2 = add_ten.and_then(double.clone());
253
254 let value1 = 5;
255 let p1 = pipeline1;
256 print!("pipeline1 processing 5: ");
257 p1.accept(&value1);
258
259 let value2 = 5;
260 let p2 = pipeline2;
261 print!("pipeline2 processing 5: ");
262 p2.accept(&value2);
263 println!();
264
265 // ========================================================================
266 // Example 10: Unified Consumer trait
267 // ========================================================================
268 println!("Example 10: Unified Consumer trait");
269 println!("{}", "-".repeat(50));
270
271 fn log_all<C: Consumer<i32>>(consumer: &mut C, values: &[i32]) {
272 for value in values.iter() {
273 consumer.accept(value);
274 }
275 }
276
277 let values = vec![1, 2, 3, 4, 5];
278
279 let mut box_con = BoxConsumer::new(|x: &i32| print!("{} ", x * 2));
280 print!("BoxConsumer processing {:?}: ", values);
281 log_all(&mut box_con, &values);
282 println!();
283
284 let mut arc_con = ArcConsumer::new(|x: &i32| print!("{} ", x * 2));
285 print!("ArcConsumer processing {:?}: ", values);
286 log_all(&mut arc_con, &values);
287 println!();
288
289 let mut rc_con = RcConsumer::new(|x: &i32| print!("{} ", x * 2));
290 print!("RcConsumer processing {:?}: ", values);
291 log_all(&mut rc_con, &values);
292 println!();
293
294 let mut closure = |x: &i32| print!("{} ", x * 2);
295 print!("Closure processing {:?}: ", values);
296 log_all(&mut closure, &values);
297 println!("\n");
298
299 // ========================================================================
300 // Example 11: Data validation and logging
301 // ========================================================================
302 println!("Example 11: Data validation and logging");
303 println!("{}", "-".repeat(50));
304
305 let validator = BoxConsumer::new(|x: &i32| {
306 let status = if *x >= 0 && *x <= 100 {
307 "valid"
308 } else {
309 "out of range"
310 };
311 println!("Validate {}: {}", x, status);
312 });
313
314 let logger = BoxConsumer::new(|x: &i32| {
315 println!("Log to file: value={}, square={}", x, x * x);
316 });
317
318 let pipeline = validator.and_then(logger);
319
320 let test_values = vec![-50, 30, 200];
321 for value in test_values {
322 pipeline.accept(&value);
323 }
324 println!();
325
326 // ========================================================================
327 // Example 12: String analysis
328 // ========================================================================
329 println!("Example 12: String analysis");
330 println!("{}", "-".repeat(50));
331
332 let string_analyzer = BoxConsumer::new(|s: &String| {
333 println!("Length: {}", s.len());
334 })
335 .and_then(|s: &String| {
336 println!("Lowercase: {}", s.to_lowercase());
337 })
338 .and_then(|s: &String| {
339 println!("Uppercase: {}", s.to_uppercase());
340 })
341 .and_then(|s: &String| {
342 let word_count = s.split_whitespace().count();
343 println!("Word count: {}", word_count);
344 });
345
346 let text = String::from("Hello World");
347 println!("Analyzing text: \"{}\"", text);
348 string_analyzer.accept(&text);
349 println!("Original text: \"{}\" (not modified)\n", text);
350
351 // ========================================================================
352 // Example 13: Type conversion
353 // ========================================================================
354 println!("Example 13: Type conversion");
355 println!("{}", "-".repeat(50));
356
357 // Closure -> BoxConsumer
358 let closure = |x: &i32| print!("Processing: {} ", x * 2);
359 let box_con = Consumer::into_box(closure);
360 let value = 5;
361 print!("Closure -> BoxConsumer: ");
362 box_con.accept(&value);
363 println!();
364
365 // Closure -> RcConsumer
366 let closure = |x: &i32| print!("Processing: {} ", x * 2);
367 let rc_con = Consumer::into_rc(closure);
368 let value = 5;
369 print!("Closure -> RcConsumer: ");
370 rc_con.accept(&value);
371 println!();
372
373 // Closure -> ArcConsumer
374 let closure = |x: &i32| print!("Processing: {} ", x * 2);
375 let arc_con = Consumer::into_arc(closure);
376 let value = 5;
377 print!("Closure -> ArcConsumer: ");
378 arc_con.accept(&value);
379 println!();
380
381 // BoxConsumer -> RcConsumer
382 let box_con = BoxConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
383 let rc_con = box_con.into_rc();
384 let value = 5;
385 print!("BoxConsumer -> RcConsumer: ");
386 rc_con.accept(&value);
387 println!();
388
389 // RcConsumer -> BoxConsumer
390 let rc_con = RcConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
391 let box_con = rc_con.into_box();
392 let value = 5;
393 print!("RcConsumer -> BoxConsumer: ");
394 box_con.accept(&value);
395 println!("\n");
396
397 // ========================================================================
398 // Example 14: Custom types
399 // ========================================================================
400 println!("Example 14: Custom types");
401 println!("{}", "-".repeat(50));
402
403 #[derive(Debug, Clone)]
404 struct Point {
405 x: i32,
406 y: i32,
407 }
408
409 let analyzer = BoxConsumer::new(|p: &Point| {
410 println!("Point coordinates: ({}, {})", p.x, p.y);
411 })
412 .and_then(|p: &Point| {
413 let distance = ((p.x * p.x + p.y * p.y) as f64).sqrt();
414 println!("Distance from origin: {:.2}", distance);
415 })
416 .and_then(|p: &Point| {
417 let quadrant = match (p.x >= 0, p.y >= 0) {
418 (true, true) => "First quadrant",
419 (false, true) => "Second quadrant",
420 (false, false) => "Third quadrant",
421 (true, false) => "Fourth quadrant",
422 };
423 println!("Quadrant: {}", quadrant);
424 });
425
426 let point = Point { x: 3, y: 4 };
427 println!("Analyzing point: {:?}", point);
428 analyzer.accept(&point);
429 println!("Original point: {:?} (not modified)\n", point);
430
431 // ========================================================================
432 // Example 15: Data collection and statistics
433 // ========================================================================
434 println!("Example 15: Data collection and statistics");
435 println!("{}", "-".repeat(50));
436
437 let sum = Arc::new(Mutex::new(0));
438 let count = Arc::new(Mutex::new(0));
439 let sum_clone = sum.clone();
440 let count_clone = count.clone();
441
442 let collector = BoxConsumer::new(move |x: &i32| {
443 *sum_clone.lock().unwrap() += *x;
444 *count_clone.lock().unwrap() += 1;
445 });
446
447 let numbers = vec![10, 20, 30, 40, 50];
448 println!("Numbers: {:?}", numbers);
449 for num in &numbers {
450 collector.accept(num);
451 }
452
453 let total = *sum.lock().unwrap();
454 let cnt = *count.lock().unwrap();
455 println!("Sum: {}", total);
456 println!("Count: {}", cnt);
457 println!("Average: {:.2}\n", total as f64 / cnt as f64);
458
459 println!("=== All examples completed ===");
460 println!("\nTip: For value modification functionality, please refer to mutator_demo.rs");
461}Sourcepub fn and_then<C>(self, after: C) -> BoxStatefulConsumer<T>where
Self: Sized + 'static,
T: 'static,
C: StatefulConsumer<T> + 'static,
pub fn and_then<C>(self, after: C) -> BoxStatefulConsumer<T>where
Self: Sized + 'static,
T: 'static,
C: StatefulConsumer<T> + 'static,
Chains execution with another consumer, executing the current consumer first, then the subsequent consumer.
§Parameters
after- The subsequent consumer to execute after the current consumer completes
§Returns
Returns a new consumer that executes the current consumer and the subsequent consumer in sequence.
§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use qubit_function::consumers::*;
let counter1 = Arc::new(AtomicI32::new(0));
let counter2 = Arc::new(AtomicI32::new(0));
let consumer1 = BoxConsumer::new({
let counter = Arc::clone(&counter1);
move |value: &i32| {
counter.fetch_add(*value, Ordering::SeqCst);
}
});
let consumer2 = BoxConsumer::new({
let counter = Arc::clone(&counter2);
move |value: &i32| {
counter.fetch_add(*value * 2, Ordering::SeqCst);
}
});
let chained = consumer1.and_then(consumer2);
chained.accept(&5);
// counter1 = 5, counter2 = 10