Skip to main content

BoxStatefulConsumer

Struct BoxStatefulConsumer 

Source
pub struct BoxStatefulConsumer<T> { /* private fields */ }
Expand description

BoxStatefulConsumer struct

Consumer implementation based on Box<dyn FnMut(&T)> for single ownership scenarios. When sharing is not needed, this is the simplest and most efficient consumer type.

§Features

  • Single Ownership: Not cloneable, transfers ownership when used
  • Zero Overhead: No reference counting or lock overhead
  • Mutable State: Can modify captured environment through FnMut
  • Builder Pattern: Method chaining naturally consumes self

§Use Cases

Choose BoxStatefulConsumer when:

  • Consumer is used only once or in a linear flow
  • Building pipelines where ownership flows naturally
  • No need to share consumers across contexts
  • Performance critical and cannot accept sharing overhead

§Performance

BoxStatefulConsumer has the best performance among the three consumer types:

  • No reference counting overhead
  • No lock acquisition or runtime borrowing checks
  • Direct function calls through vtable
  • Minimal memory footprint (single pointer)

§Examples

use qubit_function::{Consumer, StatefulConsumer, BoxStatefulConsumer};
use std::sync::{Arc, Mutex};

let log = Arc::new(Mutex::new(Vec::new()));
let l = log.clone();
let mut consumer = BoxStatefulConsumer::new(move |x: &i32| {
    l.lock().expect("mutex should not be poisoned").push(*x);
});
consumer.accept(&5);
assert_eq!(*log.lock().expect("mutex should not be poisoned"), vec![5]);

Implementations§

Source§

impl<T> BoxStatefulConsumer<T>

Source

pub fn new<F>(f: F) -> Self
where F: FnMut(&T) + 'static,

Creates a new consumer.

Wraps the provided closure in the appropriate smart pointer type for this consumer implementation.

Examples found in repository?
examples/consumers/consumer_demo.rs (line 143)
34fn main() {
35    println!("=== Consumer Examples ===\n");
36    println!("Note: Consumer only reads values, does not modify the original value");
37    println!("If you need to modify values, please refer to mutator_demo.rs\n");
38
39    // ========================================================================
40    // Example 1: BoxConsumer basic usage
41    // ========================================================================
42    println!("Example 1: BoxConsumer basic usage");
43    println!("{}", "-".repeat(50));
44
45    let consumer = BoxConsumer::new(|x: &i32| {
46        println!("Read and calculate: {} * 2 = {}", x, x * 2);
47    });
48    let value = 5;
49    println!("Value: {}", value);
50    consumer.accept(&value);
51    println!("Value remains: {} (not modified)\n", value);
52
53    // ========================================================================
54    // Example 2: BoxConsumer method chaining
55    // ========================================================================
56    println!("Example 2: BoxConsumer method chaining");
57    println!("{}", "-".repeat(50));
58
59    let results = Arc::new(Mutex::new(Vec::new()));
60    let r1 = results.clone();
61    let r2 = results.clone();
62    let r3 = results.clone();
63
64    let chained = BoxConsumer::new(move |x: &i32| {
65        r1.lock().expect("mutex should not be poisoned").push(*x * 2);
66    })
67    .and_then(move |x: &i32| {
68        r2.lock().expect("mutex should not be poisoned").push(*x + 10);
69    })
70    .and_then(move |x: &i32| {
71        r3.lock().expect("mutex should not be poisoned").push(*x);
72        println!("Processing value: {}", x);
73    });
74
75    let value = 5;
76    println!("Initial value: {}", value);
77    chained.accept(&value);
78    println!(
79        "Collected results: {:?}",
80        *results.lock().expect("mutex should not be poisoned")
81    );
82    println!("Original value: {} (not modified)\n", value);
83
84    // ========================================================================
85    // Example 3: Closure extension methods
86    // ========================================================================
87    println!("Example 3: Direct use of extension methods on closures");
88    println!("{}", "-".repeat(50));
89
90    let result = Arc::new(Mutex::new(0));
91    let r1 = result.clone();
92    let r2 = result.clone();
93
94    let closure_chain = (move |x: &i32| {
95        *r1.lock().expect("mutex should not be poisoned") = *x * 2;
96    })
97    .and_then(move |_x: &i32| {
98        *r2.lock().expect("mutex should not be poisoned") += 10;
99    });
100
101    let value = 5;
102    println!("Initial value: {}", value);
103    closure_chain.accept(&value);
104    println!(
105        "Calculation result: {}",
106        *result.lock().expect("mutex should not be poisoned")
107    );
108    println!("Original value: {} (not modified)\n", value);
109
110    // ========================================================================
111    // Example 4: BoxConsumer factory methods
112    // ========================================================================
113    println!("Example 4: BoxConsumer factory methods");
114    println!("{}", "-".repeat(50));
115
116    // noop
117    println!("noop - does nothing:");
118    let noop = BoxConsumer::<i32>::noop();
119    let value = 42;
120    noop.accept(&value);
121    println!("Value: {}\n", value);
122
123    // print
124    print!("print - prints value: ");
125    let print = BoxConsumer::new(|x: &i32| println!("{}", x));
126    let value = 42;
127    print.accept(&value);
128    println!();
129
130    // print with prefix
131    let print_with = BoxConsumer::new(|x: &i32| println!("Value is: {}", x));
132    let value = 42;
133    print_with.accept(&value);
134    println!();
135
136    // ========================================================================
137    // Example 5: Conditional Consumer
138    // ========================================================================
139    println!("Example 5: Conditional Consumer");
140    println!("{}", "-".repeat(50));
141
142    // when
143    let mut check_positive = BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x)).when(|x: &i32| *x > 0);
144
145    let positive = 5;
146    let negative = -5;
147    print!("Check {}: ", positive);
148    check_positive.accept(&positive);
149    print!("Check {}: ", negative);
150    check_positive.accept(&negative);
151    println!("(negative numbers not printed)\n");
152
153    // when().or_else()
154    let mut categorize = BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x))
155        .when(|x: &i32| *x > 0)
156        .or_else(|x: &i32| println!("Non-positive: {}", x));
157
158    let positive = 10;
159    let negative = -10;
160    categorize.accept(&positive);
161    categorize.accept(&negative);
162    println!();
163
164    // ========================================================================
165    // Example 6: ArcConsumer - multi-threaded sharing
166    // ========================================================================
167    println!("Example 6: ArcConsumer - multi-threaded sharing");
168    println!("{}", "-".repeat(50));
169
170    let shared = ArcConsumer::new(|x: &i32| println!("Processing value: {}", x * 2));
171
172    // Clone for another thread
173    let shared_clone = shared.clone();
174    let handle = thread::spawn(move || {
175        let value = 5;
176        let consumer = shared_clone;
177        consumer.accept(&value);
178        value
179    });
180
181    // Use in main thread
182    let value = 3;
183    let consumer = shared;
184    consumer.accept(&value);
185
186    let thread_result = handle.join().expect("thread should not panic");
187    println!("Thread result: {}\n", thread_result);
188
189    // ========================================================================
190    // Example 7: ArcConsumer composition (does not consume original consumer)
191    // ========================================================================
192    println!("Example 7: ArcConsumer composition (borrowing &self)");
193    println!("{}", "-".repeat(50));
194
195    let double = ArcConsumer::new(|x: &i32| println!("double: {}", x * 2));
196    let add_ten = ArcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
197
198    // Composition does not consume original consumer
199    let pipeline1 = double.and_then(add_ten.clone());
200    let pipeline2 = add_ten.and_then(double.clone());
201
202    let value1 = 5;
203    let p1 = pipeline1;
204    print!("pipeline1 processing 5: ");
205    p1.accept(&value1);
206
207    let value2 = 5;
208    let p2 = pipeline2;
209    print!("pipeline2 processing 5: ");
210    p2.accept(&value2);
211
212    // double and add_ten are still available
213    let value3 = 10;
214    let d = double;
215    print!("Original double still available, processing 10: ");
216    d.accept(&value3);
217    println!();
218
219    // ========================================================================
220    // Example 8: RcConsumer - single-threaded sharing
221    // ========================================================================
222    println!("Example 8: RcConsumer - single-threaded sharing");
223    println!("{}", "-".repeat(50));
224
225    let rc_consumer = RcConsumer::new(|x: &i32| println!("Processing: {}", x * 2));
226
227    // Clone multiple copies
228    let clone1 = rc_consumer.clone();
229    let clone2 = rc_consumer.clone();
230
231    let value1 = 5;
232    let c1 = clone1;
233    print!("clone1 processing 5: ");
234    c1.accept(&value1);
235
236    let value2 = 3;
237    let c2 = clone2;
238    print!("clone2 processing 3: ");
239    c2.accept(&value2);
240
241    let value3 = 7;
242    let c3 = rc_consumer;
243    print!("Original processing 7: ");
244    c3.accept(&value3);
245    println!();
246
247    // ========================================================================
248    // Example 9: RcConsumer composition (borrowing &self)
249    // ========================================================================
250    println!("Example 9: RcConsumer composition (borrowing &self)");
251    println!("{}", "-".repeat(50));
252
253    let double = RcConsumer::new(|x: &i32| println!("double: {}", x * 2));
254    let add_ten = RcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
255
256    let pipeline1 = double.and_then(add_ten.clone());
257    let pipeline2 = add_ten.and_then(double.clone());
258
259    let value1 = 5;
260    let p1 = pipeline1;
261    print!("pipeline1 processing 5: ");
262    p1.accept(&value1);
263
264    let value2 = 5;
265    let p2 = pipeline2;
266    print!("pipeline2 processing 5: ");
267    p2.accept(&value2);
268    println!();
269
270    // ========================================================================
271    // Example 10: Unified Consumer trait
272    // ========================================================================
273    println!("Example 10: Unified Consumer trait");
274    println!("{}", "-".repeat(50));
275
276    fn log_all<C: Consumer<i32>>(consumer: &mut C, values: &[i32]) {
277        for value in values.iter() {
278            consumer.accept(value);
279        }
280    }
281
282    let values = vec![1, 2, 3, 4, 5];
283
284    let mut box_con = BoxConsumer::new(|x: &i32| print!("{} ", x * 2));
285    print!("BoxConsumer processing {:?}: ", values);
286    log_all(&mut box_con, &values);
287    println!();
288
289    let mut arc_con = ArcConsumer::new(|x: &i32| print!("{} ", x * 2));
290    print!("ArcConsumer processing {:?}: ", values);
291    log_all(&mut arc_con, &values);
292    println!();
293
294    let mut rc_con = RcConsumer::new(|x: &i32| print!("{} ", x * 2));
295    print!("RcConsumer processing {:?}: ", values);
296    log_all(&mut rc_con, &values);
297    println!();
298
299    let mut closure = |x: &i32| print!("{} ", x * 2);
300    print!("Closure processing {:?}: ", values);
301    log_all(&mut closure, &values);
302    println!("\n");
303
304    // ========================================================================
305    // Example 11: Data validation and logging
306    // ========================================================================
307    println!("Example 11: Data validation and logging");
308    println!("{}", "-".repeat(50));
309
310    let validator = BoxConsumer::new(|x: &i32| {
311        let status = if *x >= 0 && *x <= 100 { "valid" } else { "out of range" };
312        println!("Validate {}: {}", x, status);
313    });
314
315    let logger = BoxConsumer::new(|x: &i32| {
316        println!("Log to file: value={}, square={}", x, x * x);
317    });
318
319    let pipeline = validator.and_then(logger);
320
321    let test_values = vec![-50, 30, 200];
322    for value in test_values {
323        pipeline.accept(&value);
324    }
325    println!();
326
327    // ========================================================================
328    // Example 12: String analysis
329    // ========================================================================
330    println!("Example 12: String analysis");
331    println!("{}", "-".repeat(50));
332
333    let string_analyzer = BoxConsumer::new(|s: &String| {
334        println!("Length: {}", s.len());
335    })
336    .and_then(|s: &String| {
337        println!("Lowercase: {}", s.to_lowercase());
338    })
339    .and_then(|s: &String| {
340        println!("Uppercase: {}", s.to_uppercase());
341    })
342    .and_then(|s: &String| {
343        let word_count = s.split_whitespace().count();
344        println!("Word count: {}", word_count);
345    });
346
347    let text = String::from("Hello World");
348    println!("Analyzing text: \"{}\"", text);
349    string_analyzer.accept(&text);
350    println!("Original text: \"{}\" (not modified)\n", text);
351
352    // ========================================================================
353    // Example 13: Type conversion
354    // ========================================================================
355    println!("Example 13: Type conversion");
356    println!("{}", "-".repeat(50));
357
358    // Closure -> BoxConsumer
359    let closure = |x: &i32| print!("Processing: {} ", x * 2);
360    let box_con = Consumer::into_box(closure);
361    let value = 5;
362    print!("Closure -> BoxConsumer: ");
363    box_con.accept(&value);
364    println!();
365
366    // Closure -> RcConsumer
367    let closure = |x: &i32| print!("Processing: {} ", x * 2);
368    let rc_con = Consumer::into_rc(closure);
369    let value = 5;
370    print!("Closure -> RcConsumer: ");
371    rc_con.accept(&value);
372    println!();
373
374    // Closure -> ArcConsumer
375    let closure = |x: &i32| print!("Processing: {} ", x * 2);
376    let arc_con = Consumer::into_arc(closure);
377    let value = 5;
378    print!("Closure -> ArcConsumer: ");
379    arc_con.accept(&value);
380    println!();
381
382    // BoxConsumer -> RcConsumer
383    let box_con = BoxConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
384    let rc_con = box_con.into_rc();
385    let value = 5;
386    print!("BoxConsumer -> RcConsumer: ");
387    rc_con.accept(&value);
388    println!();
389
390    // RcConsumer -> BoxConsumer
391    let rc_con = RcConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
392    let box_con = rc_con.into_box();
393    let value = 5;
394    print!("RcConsumer -> BoxConsumer: ");
395    box_con.accept(&value);
396    println!("\n");
397
398    // ========================================================================
399    // Example 14: Custom types
400    // ========================================================================
401    println!("Example 14: Custom types");
402    println!("{}", "-".repeat(50));
403
404    #[derive(Debug, Clone)]
405    struct Point {
406        x: i32,
407        y: i32,
408    }
409
410    let analyzer = BoxConsumer::new(|p: &Point| {
411        println!("Point coordinates: ({}, {})", p.x, p.y);
412    })
413    .and_then(|p: &Point| {
414        let distance = ((p.x * p.x + p.y * p.y) as f64).sqrt();
415        println!("Distance from origin: {:.2}", distance);
416    })
417    .and_then(|p: &Point| {
418        let quadrant = match (p.x >= 0, p.y >= 0) {
419            (true, true) => "First quadrant",
420            (false, true) => "Second quadrant",
421            (false, false) => "Third quadrant",
422            (true, false) => "Fourth quadrant",
423        };
424        println!("Quadrant: {}", quadrant);
425    });
426
427    let point = Point { x: 3, y: 4 };
428    println!("Analyzing point: {:?}", point);
429    analyzer.accept(&point);
430    println!("Original point: {:?} (not modified)\n", point);
431
432    // ========================================================================
433    // Example 15: Data collection and statistics
434    // ========================================================================
435    println!("Example 15: Data collection and statistics");
436    println!("{}", "-".repeat(50));
437
438    let sum = Arc::new(Mutex::new(0));
439    let count = Arc::new(Mutex::new(0));
440    let sum_clone = sum.clone();
441    let count_clone = count.clone();
442
443    let collector = BoxConsumer::new(move |x: &i32| {
444        *sum_clone.lock().expect("mutex should not be poisoned") += *x;
445        *count_clone.lock().expect("mutex should not be poisoned") += 1;
446    });
447
448    let numbers = vec![10, 20, 30, 40, 50];
449    println!("Numbers: {:?}", numbers);
450    for num in &numbers {
451        collector.accept(num);
452    }
453
454    let total = *sum.lock().expect("mutex should not be poisoned");
455    let cnt = *count.lock().expect("mutex should not be poisoned");
456    println!("Sum: {}", total);
457    println!("Count: {}", cnt);
458    println!("Average: {:.2}\n", total as f64 / cnt as f64);
459
460    println!("=== All examples completed ===");
461    println!("\nTip: For value modification functionality, please refer to mutator_demo.rs");
462}
Source

pub fn new_with_name<F>(name: &str, f: F) -> Self
where F: FnMut(&T) + 'static,

Creates a new named consumer.

Wraps the provided closure and assigns it a name, which is useful for debugging and logging purposes.

Source

pub fn new_with_optional_name<F>(f: F, name: Option<String>) -> Self
where F: FnMut(&T) + 'static,

Creates a new named consumer with an optional name.

Wraps the provided closure and assigns it an optional name.

Source

pub fn name(&self) -> Option<&str>

Gets the name of this consumer.

§Returns

Returns Some(&str) if a name was set, None otherwise.

Source

pub fn set_name(&mut self, name: &str)

Sets the name of this consumer.

§Parameters
  • name - The name to set for this consumer
Source

pub fn clear_name(&mut self)

Clears the name of this consumer.

Source

pub fn noop() -> Self

Creates a no-operation consumer.

Creates a consumer that does nothing when called. Useful for default values or placeholder implementations.

§Returns

Returns a new consumer instance that performs no operation.

Source

pub fn when<P>(self, predicate: P) -> BoxConditionalStatefulConsumer<T>
where T: 'static, P: Predicate<T> + 'static,

Creates a conditional consumer that executes based on predicate result.

§Parameters
  • predicate - The predicate to determine whether to execute the consumption operation
§Returns

Returns a conditional consumer that only executes when the predicate returns true.

§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use qubit_function::consumers::*;

let counter = Arc::new(AtomicI32::new(0));
let consumer = BoxConsumer::new({
    let counter = Arc::clone(&counter);
    move |value: &i32| {
        counter.fetch_add(*value, Ordering::SeqCst);
    }
});

let conditional = consumer.when(|value: &i32| *value > 0);
conditional.accept(&1);  // counter = 1
conditional.accept(&-1); // not executed, counter remains 1
Examples found in repository?
examples/consumers/consumer_demo.rs (line 143)
34fn main() {
35    println!("=== Consumer Examples ===\n");
36    println!("Note: Consumer only reads values, does not modify the original value");
37    println!("If you need to modify values, please refer to mutator_demo.rs\n");
38
39    // ========================================================================
40    // Example 1: BoxConsumer basic usage
41    // ========================================================================
42    println!("Example 1: BoxConsumer basic usage");
43    println!("{}", "-".repeat(50));
44
45    let consumer = BoxConsumer::new(|x: &i32| {
46        println!("Read and calculate: {} * 2 = {}", x, x * 2);
47    });
48    let value = 5;
49    println!("Value: {}", value);
50    consumer.accept(&value);
51    println!("Value remains: {} (not modified)\n", value);
52
53    // ========================================================================
54    // Example 2: BoxConsumer method chaining
55    // ========================================================================
56    println!("Example 2: BoxConsumer method chaining");
57    println!("{}", "-".repeat(50));
58
59    let results = Arc::new(Mutex::new(Vec::new()));
60    let r1 = results.clone();
61    let r2 = results.clone();
62    let r3 = results.clone();
63
64    let chained = BoxConsumer::new(move |x: &i32| {
65        r1.lock().expect("mutex should not be poisoned").push(*x * 2);
66    })
67    .and_then(move |x: &i32| {
68        r2.lock().expect("mutex should not be poisoned").push(*x + 10);
69    })
70    .and_then(move |x: &i32| {
71        r3.lock().expect("mutex should not be poisoned").push(*x);
72        println!("Processing value: {}", x);
73    });
74
75    let value = 5;
76    println!("Initial value: {}", value);
77    chained.accept(&value);
78    println!(
79        "Collected results: {:?}",
80        *results.lock().expect("mutex should not be poisoned")
81    );
82    println!("Original value: {} (not modified)\n", value);
83
84    // ========================================================================
85    // Example 3: Closure extension methods
86    // ========================================================================
87    println!("Example 3: Direct use of extension methods on closures");
88    println!("{}", "-".repeat(50));
89
90    let result = Arc::new(Mutex::new(0));
91    let r1 = result.clone();
92    let r2 = result.clone();
93
94    let closure_chain = (move |x: &i32| {
95        *r1.lock().expect("mutex should not be poisoned") = *x * 2;
96    })
97    .and_then(move |_x: &i32| {
98        *r2.lock().expect("mutex should not be poisoned") += 10;
99    });
100
101    let value = 5;
102    println!("Initial value: {}", value);
103    closure_chain.accept(&value);
104    println!(
105        "Calculation result: {}",
106        *result.lock().expect("mutex should not be poisoned")
107    );
108    println!("Original value: {} (not modified)\n", value);
109
110    // ========================================================================
111    // Example 4: BoxConsumer factory methods
112    // ========================================================================
113    println!("Example 4: BoxConsumer factory methods");
114    println!("{}", "-".repeat(50));
115
116    // noop
117    println!("noop - does nothing:");
118    let noop = BoxConsumer::<i32>::noop();
119    let value = 42;
120    noop.accept(&value);
121    println!("Value: {}\n", value);
122
123    // print
124    print!("print - prints value: ");
125    let print = BoxConsumer::new(|x: &i32| println!("{}", x));
126    let value = 42;
127    print.accept(&value);
128    println!();
129
130    // print with prefix
131    let print_with = BoxConsumer::new(|x: &i32| println!("Value is: {}", x));
132    let value = 42;
133    print_with.accept(&value);
134    println!();
135
136    // ========================================================================
137    // Example 5: Conditional Consumer
138    // ========================================================================
139    println!("Example 5: Conditional Consumer");
140    println!("{}", "-".repeat(50));
141
142    // when
143    let mut check_positive = BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x)).when(|x: &i32| *x > 0);
144
145    let positive = 5;
146    let negative = -5;
147    print!("Check {}: ", positive);
148    check_positive.accept(&positive);
149    print!("Check {}: ", negative);
150    check_positive.accept(&negative);
151    println!("(negative numbers not printed)\n");
152
153    // when().or_else()
154    let mut categorize = BoxStatefulConsumer::new(|x: &i32| println!("Positive: {}", x))
155        .when(|x: &i32| *x > 0)
156        .or_else(|x: &i32| println!("Non-positive: {}", x));
157
158    let positive = 10;
159    let negative = -10;
160    categorize.accept(&positive);
161    categorize.accept(&negative);
162    println!();
163
164    // ========================================================================
165    // Example 6: ArcConsumer - multi-threaded sharing
166    // ========================================================================
167    println!("Example 6: ArcConsumer - multi-threaded sharing");
168    println!("{}", "-".repeat(50));
169
170    let shared = ArcConsumer::new(|x: &i32| println!("Processing value: {}", x * 2));
171
172    // Clone for another thread
173    let shared_clone = shared.clone();
174    let handle = thread::spawn(move || {
175        let value = 5;
176        let consumer = shared_clone;
177        consumer.accept(&value);
178        value
179    });
180
181    // Use in main thread
182    let value = 3;
183    let consumer = shared;
184    consumer.accept(&value);
185
186    let thread_result = handle.join().expect("thread should not panic");
187    println!("Thread result: {}\n", thread_result);
188
189    // ========================================================================
190    // Example 7: ArcConsumer composition (does not consume original consumer)
191    // ========================================================================
192    println!("Example 7: ArcConsumer composition (borrowing &self)");
193    println!("{}", "-".repeat(50));
194
195    let double = ArcConsumer::new(|x: &i32| println!("double: {}", x * 2));
196    let add_ten = ArcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
197
198    // Composition does not consume original consumer
199    let pipeline1 = double.and_then(add_ten.clone());
200    let pipeline2 = add_ten.and_then(double.clone());
201
202    let value1 = 5;
203    let p1 = pipeline1;
204    print!("pipeline1 processing 5: ");
205    p1.accept(&value1);
206
207    let value2 = 5;
208    let p2 = pipeline2;
209    print!("pipeline2 processing 5: ");
210    p2.accept(&value2);
211
212    // double and add_ten are still available
213    let value3 = 10;
214    let d = double;
215    print!("Original double still available, processing 10: ");
216    d.accept(&value3);
217    println!();
218
219    // ========================================================================
220    // Example 8: RcConsumer - single-threaded sharing
221    // ========================================================================
222    println!("Example 8: RcConsumer - single-threaded sharing");
223    println!("{}", "-".repeat(50));
224
225    let rc_consumer = RcConsumer::new(|x: &i32| println!("Processing: {}", x * 2));
226
227    // Clone multiple copies
228    let clone1 = rc_consumer.clone();
229    let clone2 = rc_consumer.clone();
230
231    let value1 = 5;
232    let c1 = clone1;
233    print!("clone1 processing 5: ");
234    c1.accept(&value1);
235
236    let value2 = 3;
237    let c2 = clone2;
238    print!("clone2 processing 3: ");
239    c2.accept(&value2);
240
241    let value3 = 7;
242    let c3 = rc_consumer;
243    print!("Original processing 7: ");
244    c3.accept(&value3);
245    println!();
246
247    // ========================================================================
248    // Example 9: RcConsumer composition (borrowing &self)
249    // ========================================================================
250    println!("Example 9: RcConsumer composition (borrowing &self)");
251    println!("{}", "-".repeat(50));
252
253    let double = RcConsumer::new(|x: &i32| println!("double: {}", x * 2));
254    let add_ten = RcConsumer::new(|x: &i32| println!("add_ten: {}", x + 10));
255
256    let pipeline1 = double.and_then(add_ten.clone());
257    let pipeline2 = add_ten.and_then(double.clone());
258
259    let value1 = 5;
260    let p1 = pipeline1;
261    print!("pipeline1 processing 5: ");
262    p1.accept(&value1);
263
264    let value2 = 5;
265    let p2 = pipeline2;
266    print!("pipeline2 processing 5: ");
267    p2.accept(&value2);
268    println!();
269
270    // ========================================================================
271    // Example 10: Unified Consumer trait
272    // ========================================================================
273    println!("Example 10: Unified Consumer trait");
274    println!("{}", "-".repeat(50));
275
276    fn log_all<C: Consumer<i32>>(consumer: &mut C, values: &[i32]) {
277        for value in values.iter() {
278            consumer.accept(value);
279        }
280    }
281
282    let values = vec![1, 2, 3, 4, 5];
283
284    let mut box_con = BoxConsumer::new(|x: &i32| print!("{} ", x * 2));
285    print!("BoxConsumer processing {:?}: ", values);
286    log_all(&mut box_con, &values);
287    println!();
288
289    let mut arc_con = ArcConsumer::new(|x: &i32| print!("{} ", x * 2));
290    print!("ArcConsumer processing {:?}: ", values);
291    log_all(&mut arc_con, &values);
292    println!();
293
294    let mut rc_con = RcConsumer::new(|x: &i32| print!("{} ", x * 2));
295    print!("RcConsumer processing {:?}: ", values);
296    log_all(&mut rc_con, &values);
297    println!();
298
299    let mut closure = |x: &i32| print!("{} ", x * 2);
300    print!("Closure processing {:?}: ", values);
301    log_all(&mut closure, &values);
302    println!("\n");
303
304    // ========================================================================
305    // Example 11: Data validation and logging
306    // ========================================================================
307    println!("Example 11: Data validation and logging");
308    println!("{}", "-".repeat(50));
309
310    let validator = BoxConsumer::new(|x: &i32| {
311        let status = if *x >= 0 && *x <= 100 { "valid" } else { "out of range" };
312        println!("Validate {}: {}", x, status);
313    });
314
315    let logger = BoxConsumer::new(|x: &i32| {
316        println!("Log to file: value={}, square={}", x, x * x);
317    });
318
319    let pipeline = validator.and_then(logger);
320
321    let test_values = vec![-50, 30, 200];
322    for value in test_values {
323        pipeline.accept(&value);
324    }
325    println!();
326
327    // ========================================================================
328    // Example 12: String analysis
329    // ========================================================================
330    println!("Example 12: String analysis");
331    println!("{}", "-".repeat(50));
332
333    let string_analyzer = BoxConsumer::new(|s: &String| {
334        println!("Length: {}", s.len());
335    })
336    .and_then(|s: &String| {
337        println!("Lowercase: {}", s.to_lowercase());
338    })
339    .and_then(|s: &String| {
340        println!("Uppercase: {}", s.to_uppercase());
341    })
342    .and_then(|s: &String| {
343        let word_count = s.split_whitespace().count();
344        println!("Word count: {}", word_count);
345    });
346
347    let text = String::from("Hello World");
348    println!("Analyzing text: \"{}\"", text);
349    string_analyzer.accept(&text);
350    println!("Original text: \"{}\" (not modified)\n", text);
351
352    // ========================================================================
353    // Example 13: Type conversion
354    // ========================================================================
355    println!("Example 13: Type conversion");
356    println!("{}", "-".repeat(50));
357
358    // Closure -> BoxConsumer
359    let closure = |x: &i32| print!("Processing: {} ", x * 2);
360    let box_con = Consumer::into_box(closure);
361    let value = 5;
362    print!("Closure -> BoxConsumer: ");
363    box_con.accept(&value);
364    println!();
365
366    // Closure -> RcConsumer
367    let closure = |x: &i32| print!("Processing: {} ", x * 2);
368    let rc_con = Consumer::into_rc(closure);
369    let value = 5;
370    print!("Closure -> RcConsumer: ");
371    rc_con.accept(&value);
372    println!();
373
374    // Closure -> ArcConsumer
375    let closure = |x: &i32| print!("Processing: {} ", x * 2);
376    let arc_con = Consumer::into_arc(closure);
377    let value = 5;
378    print!("Closure -> ArcConsumer: ");
379    arc_con.accept(&value);
380    println!();
381
382    // BoxConsumer -> RcConsumer
383    let box_con = BoxConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
384    let rc_con = box_con.into_rc();
385    let value = 5;
386    print!("BoxConsumer -> RcConsumer: ");
387    rc_con.accept(&value);
388    println!();
389
390    // RcConsumer -> BoxConsumer
391    let rc_con = RcConsumer::new(|x: &i32| print!("Processing: {} ", x * 2));
392    let box_con = rc_con.into_box();
393    let value = 5;
394    print!("RcConsumer -> BoxConsumer: ");
395    box_con.accept(&value);
396    println!("\n");
397
398    // ========================================================================
399    // Example 14: Custom types
400    // ========================================================================
401    println!("Example 14: Custom types");
402    println!("{}", "-".repeat(50));
403
404    #[derive(Debug, Clone)]
405    struct Point {
406        x: i32,
407        y: i32,
408    }
409
410    let analyzer = BoxConsumer::new(|p: &Point| {
411        println!("Point coordinates: ({}, {})", p.x, p.y);
412    })
413    .and_then(|p: &Point| {
414        let distance = ((p.x * p.x + p.y * p.y) as f64).sqrt();
415        println!("Distance from origin: {:.2}", distance);
416    })
417    .and_then(|p: &Point| {
418        let quadrant = match (p.x >= 0, p.y >= 0) {
419            (true, true) => "First quadrant",
420            (false, true) => "Second quadrant",
421            (false, false) => "Third quadrant",
422            (true, false) => "Fourth quadrant",
423        };
424        println!("Quadrant: {}", quadrant);
425    });
426
427    let point = Point { x: 3, y: 4 };
428    println!("Analyzing point: {:?}", point);
429    analyzer.accept(&point);
430    println!("Original point: {:?} (not modified)\n", point);
431
432    // ========================================================================
433    // Example 15: Data collection and statistics
434    // ========================================================================
435    println!("Example 15: Data collection and statistics");
436    println!("{}", "-".repeat(50));
437
438    let sum = Arc::new(Mutex::new(0));
439    let count = Arc::new(Mutex::new(0));
440    let sum_clone = sum.clone();
441    let count_clone = count.clone();
442
443    let collector = BoxConsumer::new(move |x: &i32| {
444        *sum_clone.lock().expect("mutex should not be poisoned") += *x;
445        *count_clone.lock().expect("mutex should not be poisoned") += 1;
446    });
447
448    let numbers = vec![10, 20, 30, 40, 50];
449    println!("Numbers: {:?}", numbers);
450    for num in &numbers {
451        collector.accept(num);
452    }
453
454    let total = *sum.lock().expect("mutex should not be poisoned");
455    let cnt = *count.lock().expect("mutex should not be poisoned");
456    println!("Sum: {}", total);
457    println!("Count: {}", cnt);
458    println!("Average: {:.2}\n", total as f64 / cnt as f64);
459
460    println!("=== All examples completed ===");
461    println!("\nTip: For value modification functionality, please refer to mutator_demo.rs");
462}
Source

pub fn and_then<C>(self, after: C) -> BoxStatefulConsumer<T>
where Self: Sized + 'static, T: 'static, C: StatefulConsumer<T> + 'static,

Chains execution with another consumer, executing the current consumer first, then the subsequent consumer.

§Parameters
  • after - The subsequent consumer to execute after the current consumer completes
§Returns

Returns a new consumer that executes the current consumer and the subsequent consumer in sequence.

§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use qubit_function::consumers::*;

let counter1 = Arc::new(AtomicI32::new(0));
let counter2 = Arc::new(AtomicI32::new(0));

let consumer1 = BoxConsumer::new({
    let counter = Arc::clone(&counter1);
    move |value: &i32| {
        counter.fetch_add(*value, Ordering::SeqCst);
    }
});

let consumer2 = BoxConsumer::new({
    let counter = Arc::clone(&counter2);
    move |value: &i32| {
        counter.fetch_add(*value * 2, Ordering::SeqCst);
    }
});

let chained = consumer1.and_then(consumer2);
chained.accept(&5);
// counter1 = 5, counter2 = 10

Trait Implementations§

Source§

impl<T> Debug for BoxStatefulConsumer<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<T> Display for BoxStatefulConsumer<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<T> StatefulConsumer<T> for BoxStatefulConsumer<T>

Source§

fn accept(&mut self, value: &T)

Execute consumption operation Read more
Source§

fn into_box(self) -> BoxStatefulConsumer<T>

Convert to BoxStatefulConsumer Read more
Source§

fn into_rc(self) -> RcStatefulConsumer<T>
where Self: 'static,

Convert to RcStatefulConsumer Read more
Source§

fn into_fn(self) -> impl FnMut(&T)

Convert to closure Read more
Source§

fn into_once(self) -> BoxConsumerOnce<T>
where Self: 'static,

Convert to ConsumerOnce Read more
Source§

fn into_arc(self) -> ArcStatefulConsumer<T>
where Self: Sized + Send + 'static,

Convert to ArcStatefulConsumer Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToString for T
where T: Display + ?Sized,

Source§

fn to_string(&self) -> String

Converts the given value to a String. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.