pub struct BoxStatefulConsumer<T> { /* private fields */ }Expand description
BoxStatefulConsumer struct
Consumer implementation based on Box<dyn FnMut(&T)> for single ownership
scenarios. When sharing is not needed, this is the simplest and most
efficient consumer type.
§Features
- Single Ownership: Not cloneable, transfers ownership when used
- Zero Overhead: No reference counting or lock overhead
- Mutable State: Can modify captured environment through
FnMut - Builder Pattern: Method chaining naturally consumes
self
§Use Cases
Choose BoxStatefulConsumer when:
- Consumer is used only once or in a linear flow
- Building pipelines where ownership flows naturally
- No need to share consumers across contexts
- Performance critical and cannot accept sharing overhead
§Performance
BoxStatefulConsumer has the best performance among the three consumer types:
- No reference counting overhead
- No lock acquisition or runtime borrowing checks
- Direct function calls through vtable
- Minimal memory footprint (single pointer)
§Examples
use qubit_function::{Consumer, StatefulConsumer, BoxStatefulConsumer};
use std::sync::{Arc, Mutex};
let log = Arc::new(Mutex::new(Vec::new()));
let l = log.clone();
let mut consumer = BoxStatefulConsumer::new(move |x: &i32| {
l.lock().unwrap().push(*x);
});
consumer.accept(&5);
assert_eq!(*log.lock().unwrap(), vec![5]);§Author
Haixing Hu
Implementations§
Source§impl<T> BoxStatefulConsumer<T>
impl<T> BoxStatefulConsumer<T>
Sourcepub fn new<F>(f: F) -> Self
pub fn new<F>(f: F) -> Self
Creates a new consumer.
Wraps the provided closure in the appropriate smart pointer type for this consumer implementation.
Examples found in repository?
33fn main() {
34 println!("=== Consumer Examples ===\n");
35 println!("Note: Consumer only reads values, does not modify the original value");
36 println!("If you need to modify values, please refer to mutator_demo.rs\n");
37
38 // ========================================================================
39 // Example 1: BoxConsumer basic usage
40 // ========================================================================
41 println!("Example 1: BoxConsumer basic usage");
42 println!("{}", "-".repeat(50));
43
44 let consumer = BoxConsumer::new(|x: &i32| {
45 println!("Read and calculate: {} * 2 = {}", x, x * 2);
46 });
47 let value = 5;
48 println!("Value: {}", value);
49 consumer.accept(&value);
50 println!("Value remains: {} (not modified)\n", value);
51
52 // ========================================================================
53 // Example 2: BoxConsumer method chaining
54 // ========================================================================
55 println!("Example 2: BoxConsumer method chaining");
56 println!("{}", "-".repeat(50));
57
58 let results = Arc::new(Mutex::new(Vec::new()));
59 let r1 = results.clone();
60 let r2 = results.clone();
61 let r3 = results.clone();
62
63 let chained = BoxConsumer::new(move |x: &i32| {
64 r1.lock().unwrap().push(*x * 2);
65 })
66 .and_then(move |x: &i32| {
67 r2.lock().unwrap().push(*x + 10);
68 })
69 .and_then(move |x: &i32| {
70 r3.lock().unwrap().push(*x);
71 println!("Processing value: {}", x);
72 });
73
74 let value = 5;
75 println!("Initial value: {}", value);
76 chained.accept(&value);
77 println!("Collected results: {:?}", *results.lock().unwrap());
78 println!("Original value: {} (not modified)\n", value);
79
80 // ========================================================================
81 // Example 3: Closure extension methods
82 // ========================================================================
83 println!("Example 3: Direct use of extension methods on closures");
84 println!("{}", "-".repeat(50));
85
86 let result = Arc::new(Mutex::new(0));
87 let r1 = result.clone();
88 let r2 = result.clone();
89
90 let closure_chain = (move |x: &i32| {
91 *r1.lock().unwrap() = *x * 2;
92 })
93 .and_then(move |_x: &i32| {
94 *r2.lock().unwrap() += 10;
95 });
96
97 let value = 5;
98 println!("Initial value: {}", value);
99 closure_chain.accept(&value);
100 println!("Calculation result: {}", *result.lock().unwrap());
101 println!("Original value: {} (not modified)\n", value);
102
103 // ========================================================================
104 // Example 4: BoxConsumer factory methods
105 // ========================================================================
106 println!("Example 4: BoxConsumer factory methods");
107 println!("{}", "-".repeat(50));
108
109 // noop
110 println!("noop - does nothing:");
111 let noop = BoxConsumer::<i32>::noop();
112 let value = 42;
113 noop.accept(&value);
114 println!("Value: {}\n", value);
115
116 // print
117 print!("print - prints value: ");
118 let print = BoxConsumer::new(|x: &i32| println!("{}", x));
119 let value = 42;
120 print.accept(&value);
121 println!();
122
123 // print with prefix
124 let print_with = BoxConsumer::new(|x: &i32| println!("Value is: {}", x));
125 let value = 42;
126 print_with.accept(&value);
127 println!();
128
129 // ========================================================================
130 // Example 5: Conditional Consumer
131 // ========================================================================
132 println!("Example 5: Conditional Consumer");
133 println!("{}", "-".repeat(50));
134
135 // when
136 let mut check_positive =
137 BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x)).when(|x: &i32| *x > 0);
138
139 let positive = 5;
140 let negative = -5;
141 print!("Check {}: ", positive);
142 check_positive.accept(&positive);
143 print!("Check {}: ", negative);
144 check_positive.accept(&negative);
145 println!("(negative numbers not printed)\n");
146
147 // when().or_else()
148 let mut categorize = BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x))
149 .when(|x: &i32| *x > 0)
150 .or_else(|x: &i32| println!("Non-positive: {}", x));
151
152 let positive = 10;
153 let negative = -10;
154 categorize.accept(&positive);
155 categorize.accept(&negative);
156 println!();
157
158 // ========================================================================
159 // Example 6: ArcConsumer - multi-threaded sharing
160 // ========================================================================
161 println!("Example 6: ArcConsumer - multi-threaded sharing");
162 println!("{}", "-".repeat(50));
163
164 let shared = ArcConsumer::new(|x: &i32| println!("Processing value: {}", x * 2));
165
166 // Clone for another thread
167 let shared_clone = shared.clone();
168 let handle = thread::spawn(move || {
169 let value = 5;
170 let consumer = shared_clone;
171 consumer.accept(&value);
172 value
173 });
174
175 // Use in main thread
176 let value = 3;
177 let consumer = shared;
178 consumer.accept(&value);
179
180 let thread_result = handle.join().unwrap();
181 println!("Thread result: {}\n", thread_result);
182
183 // ========================================================================
184 // Example 7: ArcConsumer composition (does not consume original consumer)
185 // ========================================================================
186 println!("Example 7: ArcConsumer composition (borrowing &self)");
187 println!("{}", "-".repeat(50));
188
189 let double = ArcConsumer::new(|x: &i32| println!("double: {}", x * 2));
190 let add_ten = ArcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
191
192 // Composition does not consume original consumer
193 let pipeline1 = double.and_then(add_ten.clone());
194 let pipeline2 = add_ten.and_then(double.clone());
195
196 let value1 = 5;
197 let p1 = pipeline1;
198 print!("pipeline1 processing 5: ");
199 p1.accept(&value1);
200
201 let value2 = 5;
202 let p2 = pipeline2;
203 print!("pipeline2 processing 5: ");
204 p2.accept(&value2);
205
206 // double and add_ten are still available
207 let value3 = 10;
208 let d = double;
209 print!("Original double still available, processing 10: ");
210 d.accept(&value3);
211 println!();
212
213 // ========================================================================
214 // Example 8: RcConsumer - single-threaded sharing
215 // ========================================================================
216 println!("Example 8: RcConsumer - single-threaded sharing");
217 println!("{}", "-".repeat(50));
218
219 let rc_consumer = RcConsumer::new(|x: &i32| println!("Processing: {}", x * 2));
220
221 // Clone multiple copies
222 let clone1 = rc_consumer.clone();
223 let clone2 = rc_consumer.clone();
224
225 let value1 = 5;
226 let c1 = clone1;
227 print!("clone1 processing 5: ");
228 c1.accept(&value1);
229
230 let value2 = 3;
231 let c2 = clone2;
232 print!("clone2 processing 3: ");
233 c2.accept(&value2);
234
235 let value3 = 7;
236 let c3 = rc_consumer;
237 print!("Original processing 7: ");
238 c3.accept(&value3);
239 println!();
240
241 // ========================================================================
242 // Example 9: RcConsumer composition (borrowing &self)
243 // ========================================================================
244 println!("Example 9: RcConsumer composition (borrowing &self)");
245 println!("{}", "-".repeat(50));
246
247 let double = RcConsumer::new(|x: &i32| println!("double: {}", x * 2));
248 let add_ten = RcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
249
250 let pipeline1 = double.and_then(add_ten.clone());
251 let pipeline2 = add_ten.and_then(double.clone());
252
253 let value1 = 5;
254 let p1 = pipeline1;
255 print!("pipeline1 processing 5: ");
256 p1.accept(&value1);
257
258 let value2 = 5;
259 let p2 = pipeline2;
260 print!("pipeline2 processing 5: ");
261 p2.accept(&value2);
262 println!();
263
264 // ========================================================================
265 // Example 10: Unified Consumer trait
266 // ========================================================================
267 println!("Example 10: Unified Consumer trait");
268 println!("{}", "-".repeat(50));
269
270 fn log_all<C: Consumer<i32>>(consumer: &mut C, values: &[i32]) {
271 for value in values.iter() {
272 consumer.accept(value);
273 }
274 }
275
276 let values = vec![1, 2, 3, 4, 5];
277
278 let mut box_con = BoxConsumer::new(|x: &i32| print!("{} ", x * 2));
279 print!("BoxConsumer processing {:?}: ", values);
280 log_all(&mut box_con, &values);
281 println!();
282
283 let mut arc_con = ArcConsumer::new(|x: &i32| print!("{} ", x * 2));
284 print!("ArcConsumer processing {:?}: ", values);
285 log_all(&mut arc_con, &values);
286 println!();
287
288 let mut rc_con = RcConsumer::new(|x: &i32| print!("{} ", x * 2));
289 print!("RcConsumer processing {:?}: ", values);
290 log_all(&mut rc_con, &values);
291 println!();
292
293 let mut closure = |x: &i32| print!("{} ", x * 2);
294 print!("Closure processing {:?}: ", values);
295 log_all(&mut closure, &values);
296 println!("\n");
297
298 // ========================================================================
299 // Example 11: Data validation and logging
300 // ========================================================================
301 println!("Example 11: Data validation and logging");
302 println!("{}", "-".repeat(50));
303
304 let validator = BoxConsumer::new(|x: &i32| {
305 let status = if *x >= 0 && *x <= 100 {
306 "valid"
307 } else {
308 "out of range"
309 };
310 println!("Validate {}: {}", x, status);
311 });
312
313 let logger = BoxConsumer::new(|x: &i32| {
314 println!("Log to file: value={}, square={}", x, x * x);
315 });
316
317 let pipeline = validator.and_then(logger);
318
319 let test_values = vec![-50, 30, 200];
320 for value in test_values {
321 pipeline.accept(&value);
322 }
323 println!();
324
325 // ========================================================================
326 // Example 12: String analysis
327 // ========================================================================
328 println!("Example 12: String analysis");
329 println!("{}", "-".repeat(50));
330
331 let string_analyzer = BoxConsumer::new(|s: &String| {
332 println!("Length: {}", s.len());
333 })
334 .and_then(|s: &String| {
335 println!("Lowercase: {}", s.to_lowercase());
336 })
337 .and_then(|s: &String| {
338 println!("Uppercase: {}", s.to_uppercase());
339 })
340 .and_then(|s: &String| {
341 let word_count = s.split_whitespace().count();
342 println!("Word count: {}", word_count);
343 });
344
345 let text = String::from("Hello World");
346 println!("Analyzing text: \"{}\"", text);
347 string_analyzer.accept(&text);
348 println!("Original text: \"{}\" (not modified)\n", text);
349
350 // ========================================================================
351 // Example 13: Type conversion
352 // ========================================================================
353 println!("Example 13: Type conversion");
354 println!("{}", "-".repeat(50));
355
356 // Closure -> BoxConsumer
357 let closure = |x: &i32| print!("Processing: {} ", x * 2);
358 let box_con = Consumer::into_box(closure);
359 let value = 5;
360 print!("Closure -> BoxConsumer: ");
361 box_con.accept(&value);
362 println!();
363
364 // Closure -> RcConsumer
365 let closure = |x: &i32| print!("Processing: {} ", x * 2);
366 let rc_con = Consumer::into_rc(closure);
367 let value = 5;
368 print!("Closure -> RcConsumer: ");
369 rc_con.accept(&value);
370 println!();
371
372 // Closure -> ArcConsumer
373 let closure = |x: &i32| print!("Processing: {} ", x * 2);
374 let arc_con = Consumer::into_arc(closure);
375 let value = 5;
376 print!("Closure -> ArcConsumer: ");
377 arc_con.accept(&value);
378 println!();
379
380 // BoxConsumer -> RcConsumer
381 let box_con = BoxConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
382 let rc_con = box_con.into_rc();
383 let value = 5;
384 print!("BoxConsumer -> RcConsumer: ");
385 rc_con.accept(&value);
386 println!();
387
388 // RcConsumer -> BoxConsumer
389 let rc_con = RcConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
390 let box_con = rc_con.into_box();
391 let value = 5;
392 print!("RcConsumer -> BoxConsumer: ");
393 box_con.accept(&value);
394 println!("\n");
395
396 // ========================================================================
397 // Example 14: Custom types
398 // ========================================================================
399 println!("Example 14: Custom types");
400 println!("{}", "-".repeat(50));
401
402 #[derive(Debug, Clone)]
403 struct Point {
404 x: i32,
405 y: i32,
406 }
407
408 let analyzer = BoxConsumer::new(|p: &Point| {
409 println!("Point coordinates: ({}, {})", p.x, p.y);
410 })
411 .and_then(|p: &Point| {
412 let distance = ((p.x * p.x + p.y * p.y) as f64).sqrt();
413 println!("Distance from origin: {:.2}", distance);
414 })
415 .and_then(|p: &Point| {
416 let quadrant = match (p.x >= 0, p.y >= 0) {
417 (true, true) => "First quadrant",
418 (false, true) => "Second quadrant",
419 (false, false) => "Third quadrant",
420 (true, false) => "Fourth quadrant",
421 };
422 println!("Quadrant: {}", quadrant);
423 });
424
425 let point = Point { x: 3, y: 4 };
426 println!("Analyzing point: {:?}", point);
427 analyzer.accept(&point);
428 println!("Original point: {:?} (not modified)\n", point);
429
430 // ========================================================================
431 // Example 15: Data collection and statistics
432 // ========================================================================
433 println!("Example 15: Data collection and statistics");
434 println!("{}", "-".repeat(50));
435
436 let sum = Arc::new(Mutex::new(0));
437 let count = Arc::new(Mutex::new(0));
438 let sum_clone = sum.clone();
439 let count_clone = count.clone();
440
441 let collector = BoxConsumer::new(move |x: &i32| {
442 *sum_clone.lock().unwrap() += *x;
443 *count_clone.lock().unwrap() += 1;
444 });
445
446 let numbers = vec![10, 20, 30, 40, 50];
447 println!("Numbers: {:?}", numbers);
448 for num in &numbers {
449 collector.accept(num);
450 }
451
452 let total = *sum.lock().unwrap();
453 let cnt = *count.lock().unwrap();
454 println!("Sum: {}", total);
455 println!("Count: {}", cnt);
456 println!("Average: {:.2}\n", total as f64 / cnt as f64);
457
458 println!("=== All examples completed ===");
459 println!("\nTip: For value modification functionality, please refer to mutator_demo.rs");
460}Sourcepub fn new_with_name<F>(name: &str, f: F) -> Self
pub fn new_with_name<F>(name: &str, f: F) -> Self
Creates a new named consumer.
Wraps the provided closure and assigns it a name, which is useful for debugging and logging purposes.
Sourcepub fn new_with_optional_name<F>(f: F, name: Option<String>) -> Self
pub fn new_with_optional_name<F>(f: F, name: Option<String>) -> Self
Creates a new named consumer with an optional name.
Wraps the provided closure and assigns it an optional name.
Sourcepub fn clear_name(&mut self)
pub fn clear_name(&mut self)
Clears the name of this consumer.
Sourcepub fn noop() -> Self
pub fn noop() -> Self
Creates a no-operation consumer.
Creates a consumer that does nothing when called. Useful for default values or placeholder implementations.
§Returns
Returns a new consumer instance that performs no operation.
Sourcepub fn when<P>(self, predicate: P) -> BoxConditionalStatefulConsumer<T>where
T: 'static,
P: Predicate<T> + 'static,
pub fn when<P>(self, predicate: P) -> BoxConditionalStatefulConsumer<T>where
T: 'static,
P: Predicate<T> + 'static,
Creates a conditional consumer that executes based on predicate result.
§Parameters
predicate- The predicate to determine whether to execute the consumption operation
§Returns
Returns a conditional consumer that only executes when the
predicate returns true.
§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use qubit_function::consumers::*;
let counter = Arc::new(AtomicI32::new(0));
let consumer = BoxConsumer::new({
let counter = Arc::clone(&counter);
move |value: &i32| {
counter.fetch_add(*value, Ordering::SeqCst);
}
});
let conditional = consumer.when(|value: &i32| *value > 0);
conditional.accept(&1); // counter = 1
conditional.accept(&-1); // not executed, counter remains 1Examples found in repository?
33fn main() {
34 println!("=== Consumer Examples ===\n");
35 println!("Note: Consumer only reads values, does not modify the original value");
36 println!("If you need to modify values, please refer to mutator_demo.rs\n");
37
38 // ========================================================================
39 // Example 1: BoxConsumer basic usage
40 // ========================================================================
41 println!("Example 1: BoxConsumer basic usage");
42 println!("{}", "-".repeat(50));
43
44 let consumer = BoxConsumer::new(|x: &i32| {
45 println!("Read and calculate: {} * 2 = {}", x, x * 2);
46 });
47 let value = 5;
48 println!("Value: {}", value);
49 consumer.accept(&value);
50 println!("Value remains: {} (not modified)\n", value);
51
52 // ========================================================================
53 // Example 2: BoxConsumer method chaining
54 // ========================================================================
55 println!("Example 2: BoxConsumer method chaining");
56 println!("{}", "-".repeat(50));
57
58 let results = Arc::new(Mutex::new(Vec::new()));
59 let r1 = results.clone();
60 let r2 = results.clone();
61 let r3 = results.clone();
62
63 let chained = BoxConsumer::new(move |x: &i32| {
64 r1.lock().unwrap().push(*x * 2);
65 })
66 .and_then(move |x: &i32| {
67 r2.lock().unwrap().push(*x + 10);
68 })
69 .and_then(move |x: &i32| {
70 r3.lock().unwrap().push(*x);
71 println!("Processing value: {}", x);
72 });
73
74 let value = 5;
75 println!("Initial value: {}", value);
76 chained.accept(&value);
77 println!("Collected results: {:?}", *results.lock().unwrap());
78 println!("Original value: {} (not modified)\n", value);
79
80 // ========================================================================
81 // Example 3: Closure extension methods
82 // ========================================================================
83 println!("Example 3: Direct use of extension methods on closures");
84 println!("{}", "-".repeat(50));
85
86 let result = Arc::new(Mutex::new(0));
87 let r1 = result.clone();
88 let r2 = result.clone();
89
90 let closure_chain = (move |x: &i32| {
91 *r1.lock().unwrap() = *x * 2;
92 })
93 .and_then(move |_x: &i32| {
94 *r2.lock().unwrap() += 10;
95 });
96
97 let value = 5;
98 println!("Initial value: {}", value);
99 closure_chain.accept(&value);
100 println!("Calculation result: {}", *result.lock().unwrap());
101 println!("Original value: {} (not modified)\n", value);
102
103 // ========================================================================
104 // Example 4: BoxConsumer factory methods
105 // ========================================================================
106 println!("Example 4: BoxConsumer factory methods");
107 println!("{}", "-".repeat(50));
108
109 // noop
110 println!("noop - does nothing:");
111 let noop = BoxConsumer::<i32>::noop();
112 let value = 42;
113 noop.accept(&value);
114 println!("Value: {}\n", value);
115
116 // print
117 print!("print - prints value: ");
118 let print = BoxConsumer::new(|x: &i32| println!("{}", x));
119 let value = 42;
120 print.accept(&value);
121 println!();
122
123 // print with prefix
124 let print_with = BoxConsumer::new(|x: &i32| println!("Value is: {}", x));
125 let value = 42;
126 print_with.accept(&value);
127 println!();
128
129 // ========================================================================
130 // Example 5: Conditional Consumer
131 // ========================================================================
132 println!("Example 5: Conditional Consumer");
133 println!("{}", "-".repeat(50));
134
135 // when
136 let mut check_positive =
137 BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x)).when(|x: &i32| *x > 0);
138
139 let positive = 5;
140 let negative = -5;
141 print!("Check {}: ", positive);
142 check_positive.accept(&positive);
143 print!("Check {}: ", negative);
144 check_positive.accept(&negative);
145 println!("(negative numbers not printed)\n");
146
147 // when().or_else()
148 let mut categorize = BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x))
149 .when(|x: &i32| *x > 0)
150 .or_else(|x: &i32| println!("Non-positive: {}", x));
151
152 let positive = 10;
153 let negative = -10;
154 categorize.accept(&positive);
155 categorize.accept(&negative);
156 println!();
157
158 // ========================================================================
159 // Example 6: ArcConsumer - multi-threaded sharing
160 // ========================================================================
161 println!("Example 6: ArcConsumer - multi-threaded sharing");
162 println!("{}", "-".repeat(50));
163
164 let shared = ArcConsumer::new(|x: &i32| println!("Processing value: {}", x * 2));
165
166 // Clone for another thread
167 let shared_clone = shared.clone();
168 let handle = thread::spawn(move || {
169 let value = 5;
170 let consumer = shared_clone;
171 consumer.accept(&value);
172 value
173 });
174
175 // Use in main thread
176 let value = 3;
177 let consumer = shared;
178 consumer.accept(&value);
179
180 let thread_result = handle.join().unwrap();
181 println!("Thread result: {}\n", thread_result);
182
183 // ========================================================================
184 // Example 7: ArcConsumer composition (does not consume original consumer)
185 // ========================================================================
186 println!("Example 7: ArcConsumer composition (borrowing &self)");
187 println!("{}", "-".repeat(50));
188
189 let double = ArcConsumer::new(|x: &i32| println!("double: {}", x * 2));
190 let add_ten = ArcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
191
192 // Composition does not consume original consumer
193 let pipeline1 = double.and_then(add_ten.clone());
194 let pipeline2 = add_ten.and_then(double.clone());
195
196 let value1 = 5;
197 let p1 = pipeline1;
198 print!("pipeline1 processing 5: ");
199 p1.accept(&value1);
200
201 let value2 = 5;
202 let p2 = pipeline2;
203 print!("pipeline2 processing 5: ");
204 p2.accept(&value2);
205
206 // double and add_ten are still available
207 let value3 = 10;
208 let d = double;
209 print!("Original double still available, processing 10: ");
210 d.accept(&value3);
211 println!();
212
213 // ========================================================================
214 // Example 8: RcConsumer - single-threaded sharing
215 // ========================================================================
216 println!("Example 8: RcConsumer - single-threaded sharing");
217 println!("{}", "-".repeat(50));
218
219 let rc_consumer = RcConsumer::new(|x: &i32| println!("Processing: {}", x * 2));
220
221 // Clone multiple copies
222 let clone1 = rc_consumer.clone();
223 let clone2 = rc_consumer.clone();
224
225 let value1 = 5;
226 let c1 = clone1;
227 print!("clone1 processing 5: ");
228 c1.accept(&value1);
229
230 let value2 = 3;
231 let c2 = clone2;
232 print!("clone2 processing 3: ");
233 c2.accept(&value2);
234
235 let value3 = 7;
236 let c3 = rc_consumer;
237 print!("Original processing 7: ");
238 c3.accept(&value3);
239 println!();
240
241 // ========================================================================
242 // Example 9: RcConsumer composition (borrowing &self)
243 // ========================================================================
244 println!("Example 9: RcConsumer composition (borrowing &self)");
245 println!("{}", "-".repeat(50));
246
247 let double = RcConsumer::new(|x: &i32| println!("double: {}", x * 2));
248 let add_ten = RcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
249
250 let pipeline1 = double.and_then(add_ten.clone());
251 let pipeline2 = add_ten.and_then(double.clone());
252
253 let value1 = 5;
254 let p1 = pipeline1;
255 print!("pipeline1 processing 5: ");
256 p1.accept(&value1);
257
258 let value2 = 5;
259 let p2 = pipeline2;
260 print!("pipeline2 processing 5: ");
261 p2.accept(&value2);
262 println!();
263
264 // ========================================================================
265 // Example 10: Unified Consumer trait
266 // ========================================================================
267 println!("Example 10: Unified Consumer trait");
268 println!("{}", "-".repeat(50));
269
270 fn log_all<C: Consumer<i32>>(consumer: &mut C, values: &[i32]) {
271 for value in values.iter() {
272 consumer.accept(value);
273 }
274 }
275
276 let values = vec![1, 2, 3, 4, 5];
277
278 let mut box_con = BoxConsumer::new(|x: &i32| print!("{} ", x * 2));
279 print!("BoxConsumer processing {:?}: ", values);
280 log_all(&mut box_con, &values);
281 println!();
282
283 let mut arc_con = ArcConsumer::new(|x: &i32| print!("{} ", x * 2));
284 print!("ArcConsumer processing {:?}: ", values);
285 log_all(&mut arc_con, &values);
286 println!();
287
288 let mut rc_con = RcConsumer::new(|x: &i32| print!("{} ", x * 2));
289 print!("RcConsumer processing {:?}: ", values);
290 log_all(&mut rc_con, &values);
291 println!();
292
293 let mut closure = |x: &i32| print!("{} ", x * 2);
294 print!("Closure processing {:?}: ", values);
295 log_all(&mut closure, &values);
296 println!("\n");
297
298 // ========================================================================
299 // Example 11: Data validation and logging
300 // ========================================================================
301 println!("Example 11: Data validation and logging");
302 println!("{}", "-".repeat(50));
303
304 let validator = BoxConsumer::new(|x: &i32| {
305 let status = if *x >= 0 && *x <= 100 {
306 "valid"
307 } else {
308 "out of range"
309 };
310 println!("Validate {}: {}", x, status);
311 });
312
313 let logger = BoxConsumer::new(|x: &i32| {
314 println!("Log to file: value={}, square={}", x, x * x);
315 });
316
317 let pipeline = validator.and_then(logger);
318
319 let test_values = vec![-50, 30, 200];
320 for value in test_values {
321 pipeline.accept(&value);
322 }
323 println!();
324
325 // ========================================================================
326 // Example 12: String analysis
327 // ========================================================================
328 println!("Example 12: String analysis");
329 println!("{}", "-".repeat(50));
330
331 let string_analyzer = BoxConsumer::new(|s: &String| {
332 println!("Length: {}", s.len());
333 })
334 .and_then(|s: &String| {
335 println!("Lowercase: {}", s.to_lowercase());
336 })
337 .and_then(|s: &String| {
338 println!("Uppercase: {}", s.to_uppercase());
339 })
340 .and_then(|s: &String| {
341 let word_count = s.split_whitespace().count();
342 println!("Word count: {}", word_count);
343 });
344
345 let text = String::from("Hello World");
346 println!("Analyzing text: \"{}\"", text);
347 string_analyzer.accept(&text);
348 println!("Original text: \"{}\" (not modified)\n", text);
349
350 // ========================================================================
351 // Example 13: Type conversion
352 // ========================================================================
353 println!("Example 13: Type conversion");
354 println!("{}", "-".repeat(50));
355
356 // Closure -> BoxConsumer
357 let closure = |x: &i32| print!("Processing: {} ", x * 2);
358 let box_con = Consumer::into_box(closure);
359 let value = 5;
360 print!("Closure -> BoxConsumer: ");
361 box_con.accept(&value);
362 println!();
363
364 // Closure -> RcConsumer
365 let closure = |x: &i32| print!("Processing: {} ", x * 2);
366 let rc_con = Consumer::into_rc(closure);
367 let value = 5;
368 print!("Closure -> RcConsumer: ");
369 rc_con.accept(&value);
370 println!();
371
372 // Closure -> ArcConsumer
373 let closure = |x: &i32| print!("Processing: {} ", x * 2);
374 let arc_con = Consumer::into_arc(closure);
375 let value = 5;
376 print!("Closure -> ArcConsumer: ");
377 arc_con.accept(&value);
378 println!();
379
380 // BoxConsumer -> RcConsumer
381 let box_con = BoxConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
382 let rc_con = box_con.into_rc();
383 let value = 5;
384 print!("BoxConsumer -> RcConsumer: ");
385 rc_con.accept(&value);
386 println!();
387
388 // RcConsumer -> BoxConsumer
389 let rc_con = RcConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
390 let box_con = rc_con.into_box();
391 let value = 5;
392 print!("RcConsumer -> BoxConsumer: ");
393 box_con.accept(&value);
394 println!("\n");
395
396 // ========================================================================
397 // Example 14: Custom types
398 // ========================================================================
399 println!("Example 14: Custom types");
400 println!("{}", "-".repeat(50));
401
402 #[derive(Debug, Clone)]
403 struct Point {
404 x: i32,
405 y: i32,
406 }
407
408 let analyzer = BoxConsumer::new(|p: &Point| {
409 println!("Point coordinates: ({}, {})", p.x, p.y);
410 })
411 .and_then(|p: &Point| {
412 let distance = ((p.x * p.x + p.y * p.y) as f64).sqrt();
413 println!("Distance from origin: {:.2}", distance);
414 })
415 .and_then(|p: &Point| {
416 let quadrant = match (p.x >= 0, p.y >= 0) {
417 (true, true) => "First quadrant",
418 (false, true) => "Second quadrant",
419 (false, false) => "Third quadrant",
420 (true, false) => "Fourth quadrant",
421 };
422 println!("Quadrant: {}", quadrant);
423 });
424
425 let point = Point { x: 3, y: 4 };
426 println!("Analyzing point: {:?}", point);
427 analyzer.accept(&point);
428 println!("Original point: {:?} (not modified)\n", point);
429
430 // ========================================================================
431 // Example 15: Data collection and statistics
432 // ========================================================================
433 println!("Example 15: Data collection and statistics");
434 println!("{}", "-".repeat(50));
435
436 let sum = Arc::new(Mutex::new(0));
437 let count = Arc::new(Mutex::new(0));
438 let sum_clone = sum.clone();
439 let count_clone = count.clone();
440
441 let collector = BoxConsumer::new(move |x: &i32| {
442 *sum_clone.lock().unwrap() += *x;
443 *count_clone.lock().unwrap() += 1;
444 });
445
446 let numbers = vec![10, 20, 30, 40, 50];
447 println!("Numbers: {:?}", numbers);
448 for num in &numbers {
449 collector.accept(num);
450 }
451
452 let total = *sum.lock().unwrap();
453 let cnt = *count.lock().unwrap();
454 println!("Sum: {}", total);
455 println!("Count: {}", cnt);
456 println!("Average: {:.2}\n", total as f64 / cnt as f64);
457
458 println!("=== All examples completed ===");
459 println!("\nTip: For value modification functionality, please refer to mutator_demo.rs");
460}Sourcepub fn and_then<C>(self, after: C) -> BoxStatefulConsumer<T>where
Self: Sized + 'static,
T: 'static,
C: StatefulConsumer<T> + 'static,
pub fn and_then<C>(self, after: C) -> BoxStatefulConsumer<T>where
Self: Sized + 'static,
T: 'static,
C: StatefulConsumer<T> + 'static,
Chains execution with another consumer, executing the current consumer first, then the subsequent consumer.
§Parameters
after- The subsequent consumer to execute after the current consumer completes
§Returns
Returns a new consumer that executes the current consumer and the subsequent consumer in sequence.
§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use qubit_function::consumers::*;
let counter1 = Arc::new(AtomicI32::new(0));
let counter2 = Arc::new(AtomicI32::new(0));
let consumer1 = BoxConsumer::new({
let counter = Arc::clone(&counter1);
move |value: &i32| {
counter.fetch_add(*value, Ordering::SeqCst);
}
});
let consumer2 = BoxConsumer::new({
let counter = Arc::clone(&counter2);
move |value: &i32| {
counter.fetch_add(*value * 2, Ordering::SeqCst);
}
});
let chained = consumer1.and_then(consumer2);
chained.accept(&5);
// counter1 = 5, counter2 = 10