prism3_function/consumer.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025.
4 * 3-Prism Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! # Consumer Types
10//!
11//! Provides implementations of consumer interfaces for executing operations
12//! that accept a single input parameter but return no result.
13//!
14//! This module provides a unified `Consumer` trait and three concrete
15//! implementations based on different ownership models:
16//!
17//! - **`BoxConsumer<T>`**: Box-based single ownership implementation for
18//! one-time use scenarios
19//! - **`ArcConsumer<T>`**: Thread-safe shared ownership implementation
20//! based on Arc<Mutex<>>
21//! - **`RcConsumer<T>`**: Single-threaded shared ownership implementation
22//! based on Rc<RefCell<>>
23//!
24//! # Design Philosophy
25//!
26//! Consumer uses `FnMut(&T)` semantics, allowing modification of its own state
27//! but not the input value. Suitable for statistics, accumulation, event
28//! handling, and other scenarios.
29//!
30//! # Author
31//!
32//! Hu Haixing
33
34use std::cell::RefCell;
35use std::fmt;
36use std::rc::Rc;
37use std::sync::{Arc, Mutex};
38
39use crate::predicate::{ArcPredicate, BoxPredicate, Predicate, RcPredicate};
40
41/// Type alias for consumer function to simplify complex types.
42///
43/// This type alias represents a mutable function that takes a reference and
44/// returns nothing. It is used to reduce type complexity in struct definitions.
45type ConsumerFn<T> = dyn FnMut(&T);
46
47/// Type alias for thread-safe consumer function to simplify complex types.
48///
49/// This type alias represents a mutable function that takes a reference and
50/// returns nothing, with Send bound for thread-safe usage. It is used to
51/// reduce type complexity in Arc-based struct definitions.
52type SendConsumerFn<T> = dyn FnMut(&T) + Send;
53
54// ============================================================================
55// 1. Consumer Trait - Unified Consumer Interface
56// ============================================================================
57
58/// Consumer trait - Unified consumer interface
59///
60/// Defines the core behavior of all consumer types. Similar to Java's
61/// `Consumer<T>` interface, executes operations that accept a value but return
62/// no result (side effects only).
63///
64/// Consumer can modify its own state (such as accumulation, counting), but
65/// should not modify the consumed value itself.
66///
67/// # Automatic Implementation
68///
69/// - All closures implementing `FnMut(&T)`
70/// - `BoxConsumer<T>`, `ArcConsumer<T>`, `RcConsumer<T>`
71///
72/// # Features
73///
74/// - **Unified Interface**: All consumer types share the same `accept` method
75/// signature
76/// - **Automatic Implementation**: Closures automatically implement this trait
77/// with zero overhead
78/// - **Type Conversion**: Easy conversion between different ownership models
79/// - **Generic Programming**: Write functions that work with any consumer type
80///
81/// # Examples
82///
83/// ```rust
84/// use prism3_function::{Consumer, BoxConsumer, ArcConsumer};
85/// use std::sync::{Arc, Mutex};
86///
87/// fn apply_consumer<C: Consumer<i32>>(consumer: &mut C, value: &i32) {
88/// consumer.accept(value);
89/// }
90///
91/// // Works with any consumer type
92/// let log = Arc::new(Mutex::new(Vec::new()));
93/// let l = log.clone();
94/// let mut box_con = BoxConsumer::new(move |x: &i32| {
95/// l.lock().unwrap().push(*x);
96/// });
97/// apply_consumer(&mut box_con, &5);
98/// assert_eq!(*log.lock().unwrap(), vec![5]);
99/// ```
100///
101/// # Author
102///
103/// Hu Haixing
104pub trait Consumer<T> {
105 /// Execute consumption operation
106 ///
107 /// Performs an operation on the given reference. The operation typically
108 /// reads the input value or produces side effects, but does not modify the
109 /// input value itself. Can modify the consumer's own state.
110 ///
111 /// # Parameters
112 ///
113 /// * `value` - Reference to the value to be consumed
114 ///
115 /// # Examples
116 ///
117 /// ```rust
118 /// use prism3_function::{Consumer, BoxConsumer};
119 ///
120 /// let mut consumer = BoxConsumer::new(|x: &i32| println!("{}", x));
121 /// let value = 5;
122 /// consumer.accept(&value);
123 /// ```
124 fn accept(&mut self, value: &T);
125
126 /// Convert to BoxConsumer
127 ///
128 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
129 /// calling this method.
130 ///
131 /// Converts the current consumer to `BoxConsumer<T>`.
132 ///
133 /// # Ownership
134 ///
135 /// This method **consumes** the consumer (takes ownership of `self`).
136 /// After calling this method, the original consumer is no longer available.
137 ///
138 /// **Tip**: For cloneable consumers ([`ArcConsumer`], [`RcConsumer`]),
139 /// if you need to preserve the original object, you can call `.clone()`
140 /// first.
141 ///
142 /// # Return Value
143 ///
144 /// Returns the wrapped `BoxConsumer<T>`
145 ///
146 /// # Examples
147 ///
148 /// ```rust
149 /// use prism3_function::Consumer;
150 /// use std::sync::{Arc, Mutex};
151 ///
152 /// let log = Arc::new(Mutex::new(Vec::new()));
153 /// let l = log.clone();
154 /// let closure = move |x: &i32| {
155 /// l.lock().unwrap().push(*x);
156 /// };
157 /// let mut box_consumer = closure.into_box();
158 /// box_consumer.accept(&5);
159 /// assert_eq!(*log.lock().unwrap(), vec![5]);
160 /// ```
161 fn into_box(self) -> BoxConsumer<T>
162 where
163 Self: Sized + 'static,
164 T: 'static,
165 {
166 let mut consumer = self;
167 BoxConsumer::new(move |t| consumer.accept(t))
168 }
169
170 /// Convert to RcConsumer
171 ///
172 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
173 /// calling this method.
174 ///
175 /// # Return Value
176 ///
177 /// Returns the wrapped `RcConsumer<T>`
178 fn into_rc(self) -> RcConsumer<T>
179 where
180 Self: Sized + 'static,
181 T: 'static,
182 {
183 let mut consumer = self;
184 RcConsumer::new(move |t| consumer.accept(t))
185 }
186
187 /// Convert to ArcConsumer
188 ///
189 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
190 /// calling this method.
191 ///
192 /// # Return Value
193 ///
194 /// Returns the wrapped `ArcConsumer<T>`
195 fn into_arc(self) -> ArcConsumer<T>
196 where
197 Self: Sized + Send + 'static,
198 T: Send + 'static,
199 {
200 let mut consumer = self;
201 ArcConsumer::new(move |t| consumer.accept(t))
202 }
203
204 /// Convert to closure
205 ///
206 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
207 /// calling this method.
208 ///
209 /// Converts the consumer to a closure that can be used directly in standard
210 /// library functions requiring `FnMut`.
211 ///
212 /// # Return Value
213 ///
214 /// Returns a closure implementing `FnMut(&T)`
215 ///
216 /// # Examples
217 ///
218 /// ```rust
219 /// use prism3_function::{Consumer, BoxConsumer};
220 /// use std::sync::{Arc, Mutex};
221 ///
222 /// let log = Arc::new(Mutex::new(Vec::new()));
223 /// let l = log.clone();
224 /// let consumer = BoxConsumer::new(move |x: &i32| {
225 /// l.lock().unwrap().push(*x);
226 /// });
227 /// let mut func = consumer.into_fn();
228 /// func(&5);
229 /// assert_eq!(*log.lock().unwrap(), vec![5]);
230 /// ```
231 fn into_fn(self) -> impl FnMut(&T)
232 where
233 Self: Sized + 'static,
234 T: 'static,
235 {
236 let mut consumer = self;
237 move |t| consumer.accept(t)
238 }
239
240 /// Convert to BoxConsumer
241 ///
242 /// **⚠️ Requires Clone**: The original consumer must implement Clone.
243 ///
244 /// Converts the current consumer to `BoxConsumer<T>` by cloning it first.
245 ///
246 /// # Ownership
247 ///
248 /// This method does **not consume** the consumer. It clones the consumer and
249 /// then converts the clone to `BoxConsumer<T>`. The original consumer remains
250 /// available after calling this method.
251 ///
252 /// # Return Value
253 ///
254 /// Returns the wrapped `BoxConsumer<T>` from the clone
255 ///
256 /// # Examples
257 ///
258 /// ```rust
259 /// use prism3_function::{Consumer, ArcConsumer};
260 /// use std::sync::{Arc, Mutex};
261 ///
262 /// let log = Arc::new(Mutex::new(Vec::new()));
263 /// let l = log.clone();
264 /// let consumer = ArcConsumer::new(move |x: &i32| {
265 /// l.lock().unwrap().push(*x);
266 /// });
267 /// let mut box_consumer = consumer.to_box();
268 /// box_consumer.accept(&5);
269 /// assert_eq!(*log.lock().unwrap(), vec![5]);
270 /// // Original consumer still usable
271 /// consumer.accept(&3);
272 /// assert_eq!(*log.lock().unwrap(), vec![5, 3]);
273 /// ```
274 fn to_box(&self) -> BoxConsumer<T>
275 where
276 Self: Sized + Clone + 'static,
277 T: 'static,
278 {
279 self.clone().into_box()
280 }
281
282 /// Convert to RcConsumer
283 ///
284 /// **⚠️ Requires Clone**: The original consumer must implement Clone.
285 ///
286 /// Converts the current consumer to `RcConsumer<T>` by cloning it first.
287 ///
288 /// # Ownership
289 ///
290 /// This method does **not consume** the consumer. It clones the consumer and
291 /// then converts the clone to `RcConsumer<T>`. The original consumer remains
292 /// available after calling this method.
293 ///
294 /// # Return Value
295 ///
296 /// Returns the wrapped `RcConsumer<T>` from the clone
297 ///
298 /// # Examples
299 ///
300 /// ```rust
301 /// use prism3_function::{Consumer, ArcConsumer};
302 /// use std::sync::{Arc, Mutex};
303 ///
304 /// let log = Arc::new(Mutex::new(Vec::new()));
305 /// let l = log.clone();
306 /// let consumer = ArcConsumer::new(move |x: &i32| {
307 /// l.lock().unwrap().push(*x);
308 /// });
309 /// let mut rc_consumer = consumer.to_rc();
310 /// rc_consumer.accept(&5);
311 /// assert_eq!(*log.lock().unwrap(), vec![5]);
312 /// // Original consumer still usable
313 /// consumer.accept(&3);
314 /// assert_eq!(*log.lock().unwrap(), vec![5, 3]);
315 /// ```
316 fn to_rc(&self) -> RcConsumer<T>
317 where
318 Self: Sized + Clone + 'static,
319 T: 'static,
320 {
321 self.clone().into_rc()
322 }
323
324 /// Convert to ArcConsumer
325 ///
326 /// **⚠️ Requires Clone + Send**: The original consumer must implement
327 /// Clone + Send.
328 ///
329 /// Converts the current consumer to `ArcConsumer<T>` by cloning it first.
330 ///
331 /// # Ownership
332 ///
333 /// This method does **not consume** the consumer. It clones the consumer and
334 /// then converts the clone to `ArcConsumer<T>`. The original consumer remains
335 /// available after calling this method.
336 ///
337 /// # Return Value
338 ///
339 /// Returns the wrapped `ArcConsumer<T>` from the clone
340 ///
341 /// # Examples
342 ///
343 /// ```rust
344 /// use prism3_function::{Consumer, RcConsumer};
345 /// use std::rc::Rc;
346 /// use std::cell::RefCell;
347 ///
348 /// let log = Rc::new(RefCell::new(Vec::new()));
349 /// let l = log.clone();
350 /// let consumer = RcConsumer::new(move |x: &i32| {
351 /// l.borrow_mut().push(*x);
352 /// });
353 /// let mut arc_consumer = consumer.to_arc();
354 /// arc_consumer.accept(&5);
355 /// assert_eq!(*log.borrow(), vec![5]);
356 /// // Original consumer still usable
357 /// consumer.accept(&3);
358 /// assert_eq!(*log.borrow(), vec![5, 3]);
359 /// ```
360 fn to_arc(&self) -> ArcConsumer<T>
361 where
362 Self: Sized + Clone + Send + 'static,
363 T: Send + 'static,
364 {
365 self.clone().into_arc()
366 }
367
368 /// Convert to closure
369 ///
370 /// **⚠️ Requires Clone**: The original consumer must implement Clone.
371 ///
372 /// Converts the consumer to a closure that can be used directly in standard
373 /// library functions requiring `FnMut`.
374 ///
375 /// # Ownership
376 ///
377 /// This method does **not consume** the consumer. It clones the consumer and
378 /// then converts the clone to a closure. The original consumer remains
379 /// available after calling this method.
380 ///
381 /// # Return Value
382 ///
383 /// Returns a closure implementing `FnMut(&T)` from the clone
384 ///
385 /// # Examples
386 ///
387 /// ```rust
388 /// use prism3_function::{Consumer, BoxConsumer};
389 /// use std::sync::{Arc, Mutex};
390 ///
391 /// let log = Arc::new(Mutex::new(Vec::new()));
392 /// let l = log.clone();
393 /// let consumer = BoxConsumer::new(move |x: &i32| {
394 /// l.lock().unwrap().push(*x);
395 /// });
396 /// let mut func = consumer.to_fn();
397 /// func(&5);
398 /// assert_eq!(*log.lock().unwrap(), vec![5]);
399 /// // Original consumer still usable
400 /// consumer.accept(&3);
401 /// assert_eq!(*log.lock().unwrap(), vec![5, 3]);
402 /// ```
403 fn to_fn(&self) -> impl FnMut(&T)
404 where
405 Self: Sized + Clone + 'static,
406 T: 'static,
407 {
408 self.clone().into_fn()
409 }
410}
411
412// ============================================================================
413// 2. BoxConsumer - Single Ownership Implementation
414// ============================================================================
415
416/// BoxConsumer struct
417///
418/// Consumer implementation based on `Box<dyn FnMut(&T)>` for single ownership
419/// scenarios. When sharing is not needed, this is the simplest and most
420/// efficient consumer type.
421///
422/// # Features
423///
424/// - **Single Ownership**: Not cloneable, transfers ownership when used
425/// - **Zero Overhead**: No reference counting or lock overhead
426/// - **Mutable State**: Can modify captured environment through `FnMut`
427/// - **Builder Pattern**: Method chaining naturally consumes `self`
428///
429/// # Use Cases
430///
431/// Choose `BoxConsumer` when:
432/// - Consumer is used only once or in a linear flow
433/// - Building pipelines where ownership flows naturally
434/// - No need to share consumers across contexts
435/// - Performance critical and cannot accept sharing overhead
436///
437/// # Performance
438///
439/// `BoxConsumer` has the best performance among the three consumer types:
440/// - No reference counting overhead
441/// - No lock acquisition or runtime borrowing checks
442/// - Direct function calls through vtable
443/// - Minimal memory footprint (single pointer)
444///
445/// # Examples
446///
447/// ```rust
448/// use prism3_function::{Consumer, BoxConsumer};
449/// use std::sync::{Arc, Mutex};
450///
451/// let log = Arc::new(Mutex::new(Vec::new()));
452/// let l = log.clone();
453/// let mut consumer = BoxConsumer::new(move |x: &i32| {
454/// l.lock().unwrap().push(*x);
455/// });
456/// consumer.accept(&5);
457/// assert_eq!(*log.lock().unwrap(), vec![5]);
458/// ```
459///
460/// # Author
461///
462/// Hu Haixing
463pub struct BoxConsumer<T> {
464 function: Box<dyn FnMut(&T)>,
465 name: Option<String>,
466}
467
468impl<T> BoxConsumer<T>
469where
470 T: 'static,
471{
472 /// Create a new BoxConsumer
473 ///
474 /// # Type Parameters
475 ///
476 /// * `F` - Closure type
477 ///
478 /// # Parameters
479 ///
480 /// * `f` - Closure to wrap
481 ///
482 /// # Return Value
483 ///
484 /// Returns a new `BoxConsumer<T>` instance
485 ///
486 /// # Examples
487 ///
488 /// ```rust
489 /// use prism3_function::{Consumer, BoxConsumer};
490 /// use std::sync::{Arc, Mutex};
491 ///
492 /// let log = Arc::new(Mutex::new(Vec::new()));
493 /// let l = log.clone();
494 /// let mut consumer = BoxConsumer::new(move |x: &i32| {
495 /// l.lock().unwrap().push(*x + 1);
496 /// });
497 /// consumer.accept(&5);
498 /// assert_eq!(*log.lock().unwrap(), vec![6]);
499 /// ```
500 pub fn new<F>(f: F) -> Self
501 where
502 F: FnMut(&T) + 'static,
503 {
504 BoxConsumer {
505 function: Box::new(f),
506 name: None,
507 }
508 }
509
510 /// Create a new named BoxConsumer
511 ///
512 /// # Type Parameters
513 ///
514 /// * `F` - Closure type
515 ///
516 /// # Parameters
517 ///
518 /// * `name` - Name of the consumer
519 /// * `f` - Closure to wrap
520 ///
521 /// # Return Value
522 ///
523 /// Returns a new `BoxConsumer<T>` instance
524 ///
525 /// # Examples
526 ///
527 /// ```rust
528 /// use prism3_function::{Consumer, BoxConsumer};
529 /// use std::sync::{Arc, Mutex};
530 ///
531 /// let log = Arc::new(Mutex::new(Vec::new()));
532 /// let l = log.clone();
533 /// let mut consumer = BoxConsumer::new_with_name("my_consumer", move |x: &i32| {
534 /// l.lock().unwrap().push(*x + 1);
535 /// });
536 /// assert_eq!(consumer.name(), Some("my_consumer"));
537 /// consumer.accept(&5);
538 /// assert_eq!(*log.lock().unwrap(), vec![6]);
539 /// ```
540 pub fn new_with_name<F>(name: impl Into<String>, f: F) -> Self
541 where
542 F: FnMut(&T) + 'static,
543 {
544 BoxConsumer {
545 function: Box::new(f),
546 name: Some(name.into()),
547 }
548 }
549
550 /// Create a no-op consumer
551 ///
552 /// Returns a consumer that performs no operation.
553 ///
554 /// # Return Value
555 ///
556 /// Returns a no-op consumer
557 ///
558 /// # Examples
559 ///
560 /// ```rust
561 /// use prism3_function::{Consumer, BoxConsumer};
562 ///
563 /// let mut noop = BoxConsumer::<i32>::noop();
564 /// noop.accept(&42);
565 /// // Value unchanged
566 /// ```
567 pub fn noop() -> Self {
568 BoxConsumer::new(|_| {})
569 }
570
571 /// Get the consumer's name
572 ///
573 /// # Return Value
574 ///
575 /// Returns the consumer's name, or `None` if not set
576 pub fn name(&self) -> Option<&str> {
577 self.name.as_deref()
578 }
579
580 /// Set the consumer's name
581 ///
582 /// # Parameters
583 ///
584 /// * `name` - Name to set
585 pub fn set_name(&mut self, name: impl Into<String>) {
586 self.name = Some(name.into());
587 }
588
589 /// Sequentially chain another consumer
590 ///
591 /// Returns a new consumer that executes the current operation first, then
592 /// the next operation. Consumes self.
593 ///
594 /// # Type Parameters
595 ///
596 /// * `C` - Type of the next consumer
597 ///
598 /// # Parameters
599 ///
600 /// * `next` - Consumer to execute after the current operation. **Note: This
601 /// parameter is passed by value and will transfer ownership.** If you need
602 /// to preserve the original consumer, clone it first (if it implements
603 /// `Clone`). Can be:
604 /// - A closure: `|x: &T|`
605 /// - A `BoxConsumer<T>`
606 /// - An `RcConsumer<T>`
607 /// - An `ArcConsumer<T>`
608 /// - Any type implementing `Consumer<T>`
609 ///
610 /// # Return Value
611 ///
612 /// Returns a new combined `BoxConsumer<T>`
613 ///
614 /// # Examples
615 ///
616 /// ## Direct value passing (ownership transfer)
617 ///
618 /// ```rust
619 /// use prism3_function::{Consumer, BoxConsumer};
620 /// use std::sync::{Arc, Mutex};
621 ///
622 /// let log = Arc::new(Mutex::new(Vec::new()));
623 /// let l1 = log.clone();
624 /// let l2 = log.clone();
625 /// let first = BoxConsumer::new(move |x: &i32| {
626 /// l1.lock().unwrap().push(*x * 2);
627 /// });
628 /// let second = BoxConsumer::new(move |x: &i32| {
629 /// l2.lock().unwrap().push(*x + 10);
630 /// });
631 ///
632 /// // second is moved here
633 /// let mut chained = first.and_then(second);
634 /// chained.accept(&5);
635 /// assert_eq!(*log.lock().unwrap(), vec![10, 15]);
636 /// // second.accept(&3); // Would not compile - moved
637 /// ```
638 ///
639 /// ## Preserving original with clone
640 ///
641 /// ```rust
642 /// use prism3_function::{Consumer, BoxConsumer};
643 /// use std::sync::{Arc, Mutex};
644 ///
645 /// let log = Arc::new(Mutex::new(Vec::new()));
646 /// let l1 = log.clone();
647 /// let l2 = log.clone();
648 /// let first = BoxConsumer::new(move |x: &i32| {
649 /// l1.lock().unwrap().push(*x * 2);
650 /// });
651 /// let second = BoxConsumer::new(move |x: &i32| {
652 /// l2.lock().unwrap().push(*x + 10);
653 /// });
654 ///
655 /// // Clone to preserve original
656 /// let mut chained = first.and_then(second.clone());
657 /// chained.accept(&5);
658 /// assert_eq!(*log.lock().unwrap(), vec![10, 15]);
659 ///
660 /// // Original still usable
661 /// second.accept(&3);
662 /// assert_eq!(*log.lock().unwrap(), vec![10, 15, 13]);
663 /// ```
664 pub fn and_then<C>(self, next: C) -> Self
665 where
666 C: Consumer<T> + 'static,
667 {
668 let mut first = self.function;
669 let mut second = next;
670 BoxConsumer::new(move |t| {
671 first(t);
672 second.accept(t);
673 })
674 }
675
676 /// Creates a conditional consumer
677 ///
678 /// Returns a consumer that only executes when a predicate is satisfied.
679 ///
680 /// # Parameters
681 ///
682 /// * `predicate` - The condition to check. **Note: This parameter is passed
683 /// by value and will transfer ownership.** If you need to preserve the
684 /// original predicate, clone it first (if it implements `Clone`). Can be:
685 /// - A closure: `|x: &T| -> bool`
686 /// - A function pointer: `fn(&T) -> bool`
687 /// - A `BoxPredicate<T>`
688 /// - An `RcPredicate<T>`
689 /// - An `ArcPredicate<T>`
690 /// - Any type implementing `Predicate<T>`
691 ///
692 /// # Return Value
693 ///
694 /// Returns `BoxConditionalConsumer<T>`
695 ///
696 /// # Examples
697 ///
698 /// ## Using a closure
699 ///
700 /// ```rust
701 /// use prism3_function::{Consumer, BoxConsumer};
702 /// use std::sync::{Arc, Mutex};
703 ///
704 /// let log = Arc::new(Mutex::new(Vec::new()));
705 /// let l = log.clone();
706 /// let consumer = BoxConsumer::new(move |x: &i32| {
707 /// l.lock().unwrap().push(*x);
708 /// });
709 /// let mut conditional = consumer.when(|x: &i32| *x > 0);
710 ///
711 /// conditional.accept(&5);
712 /// assert_eq!(*log.lock().unwrap(), vec![5]);
713 ///
714 /// conditional.accept(&-5);
715 /// assert_eq!(*log.lock().unwrap(), vec![5]); // Unchanged
716 /// ```
717 ///
718 /// ## Preserving predicate with clone
719 ///
720 /// ```rust
721 /// use prism3_function::{Consumer, BoxConsumer};
722 /// use prism3_function::predicate::{Predicate, RcPredicate};
723 /// use std::sync::{Arc, Mutex};
724 ///
725 /// let log = Arc::new(Mutex::new(Vec::new()));
726 /// let l = log.clone();
727 /// let is_positive = RcPredicate::new(|x: &i32| *x > 0);
728 /// let consumer = BoxConsumer::new(move |x: &i32| {
729 /// l.lock().unwrap().push(*x);
730 /// });
731 ///
732 /// // Clone to preserve original predicate
733 /// let mut conditional = consumer.when(is_positive.clone());
734 ///
735 /// conditional.accept(&5);
736 /// assert_eq!(*log.lock().unwrap(), vec![5]);
737 ///
738 /// // Original predicate still usable
739 /// assert!(is_positive.test(&3));
740 /// ```
741 pub fn when<P>(self, predicate: P) -> BoxConditionalConsumer<T>
742 where
743 P: Predicate<T> + 'static,
744 {
745 BoxConditionalConsumer {
746 consumer: self,
747 predicate: predicate.into_box(),
748 }
749 }
750}
751
752impl<T> Consumer<T> for BoxConsumer<T> {
753 fn accept(&mut self, value: &T) {
754 (self.function)(value)
755 }
756
757 fn into_box(self) -> BoxConsumer<T>
758 where
759 T: 'static,
760 {
761 self
762 }
763
764 fn into_rc(self) -> RcConsumer<T>
765 where
766 T: 'static,
767 {
768 let mut self_fn = self.function;
769 RcConsumer::new(move |t| self_fn(t))
770 }
771
772 // do NOT override Consumer::into_arc() because BoxConsumer is not Send + Sync
773 // and calling BoxConsumer::into_arc() will cause a compile error
774
775 fn into_fn(self) -> impl FnMut(&T)
776 where
777 T: 'static,
778 {
779 self.function
780 }
781
782 // do NOT override Consumer::to_xxx() because BoxConsumer is not Clone
783 // and calling BoxConsumer::to_xxx() will cause a compile error
784}
785
786impl<T> fmt::Debug for BoxConsumer<T> {
787 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
788 f.debug_struct("BoxConsumer")
789 .field("name", &self.name)
790 .field("function", &"<function>")
791 .finish()
792 }
793}
794
795impl<T> fmt::Display for BoxConsumer<T> {
796 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
797 match &self.name {
798 Some(name) => write!(f, "BoxConsumer({})", name),
799 None => write!(f, "BoxConsumer"),
800 }
801 }
802}
803
804// ============================================================================
805// 9. BoxConsumer ConsumerOnce Implementation
806// ============================================================================
807
808impl<T> crate::consumer_once::ConsumerOnce<T> for BoxConsumer<T> {
809 /// Execute one-time consumption operation
810 ///
811 /// Executes the consumer operation once and consumes self. This method
812 /// provides a bridge between the reusable Consumer interface and the
813 /// one-time ConsumerOnce interface.
814 ///
815 /// # Parameters
816 ///
817 /// * `value` - Reference to the value to be consumed
818 ///
819 /// # Examples
820 ///
821 /// ```rust
822 /// use prism3_function::{Consumer, ConsumerOnce, BoxConsumer};
823 /// use std::sync::{Arc, Mutex};
824 ///
825 /// let log = Arc::new(Mutex::new(Vec::new()));
826 /// let l = log.clone();
827 /// let consumer = BoxConsumer::new(move |x: &i32| {
828 /// l.lock().unwrap().push(*x);
829 /// });
830 /// consumer.accept_once(&5);
831 /// assert_eq!(*log.lock().unwrap(), vec![5]);
832 /// ```
833 fn accept_once(mut self, value: &T) {
834 self.accept(value);
835 }
836
837 /// Convert to BoxConsumerOnce
838 ///
839 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
840 /// calling this method.
841 ///
842 /// Converts the current consumer to `BoxConsumerOnce<T>` by wrapping the
843 /// consumer's accept method in a FnOnce closure.
844 ///
845 /// # Return Value
846 ///
847 /// Returns the wrapped `BoxConsumerOnce<T>`
848 ///
849 /// # Examples
850 ///
851 /// ```rust
852 /// use prism3_function::{Consumer, ConsumerOnce, BoxConsumer};
853 /// use std::sync::{Arc, Mutex};
854 ///
855 /// let log = Arc::new(Mutex::new(Vec::new()));
856 /// let l = log.clone();
857 /// let consumer = BoxConsumer::new(move |x: &i32| {
858 /// l.lock().unwrap().push(*x);
859 /// });
860 /// let box_consumer_once = consumer.into_box_once();
861 /// box_consumer_once.accept_once(&5);
862 /// assert_eq!(*log.lock().unwrap(), vec![5]);
863 /// ```
864 fn into_box_once(self) -> crate::consumer_once::BoxConsumerOnce<T>
865 where
866 T: 'static,
867 {
868 let mut consumer = self;
869 crate::consumer_once::BoxConsumerOnce::new(move |t| {
870 consumer.accept(t);
871 })
872 }
873
874 /// Convert to closure
875 ///
876 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
877 /// calling this method.
878 ///
879 /// Converts the consumer to a closure that can be used directly in places
880 /// where the standard library requires `FnOnce`.
881 ///
882 /// # Return Value
883 ///
884 /// Returns a closure implementing `FnOnce(&T)`
885 ///
886 /// # Examples
887 ///
888 /// ```rust
889 /// use prism3_function::{Consumer, ConsumerOnce, BoxConsumer};
890 /// use std::sync::{Arc, Mutex};
891 ///
892 /// let log = Arc::new(Mutex::new(Vec::new()));
893 /// let l = log.clone();
894 /// let consumer = BoxConsumer::new(move |x: &i32| {
895 /// l.lock().unwrap().push(*x);
896 /// });
897 /// let func = consumer.into_fn_once();
898 /// func(&5);
899 /// assert_eq!(*log.lock().unwrap(), vec![5]);
900 /// ```
901 fn into_fn_once(self) -> impl FnOnce(&T)
902 where
903 T: 'static,
904 {
905 let mut consumer = self;
906 move |t| consumer.accept(t)
907 }
908
909 // do NOT override ConsumerOnce::to_box_once() because BoxConsumer is not Clone
910 // and calling BoxConsumer::to_box_once() will cause a compile error
911
912 // do NOT override ConsumerOnce::to_fn_once() because BoxConsumer is not Clone
913 // and calling BoxConsumer::to_fn_once() will cause a compile error
914}
915
916// ============================================================================
917// 3. BoxConditionalConsumer - Box-based Conditional Consumer
918// ============================================================================
919
920/// BoxConditionalConsumer struct
921///
922/// A conditional consumer that only executes when a predicate is satisfied.
923/// Uses `BoxConsumer` and `BoxPredicate` for single ownership semantics.
924///
925/// This type is typically created by calling `BoxConsumer::when()` and is
926/// designed to work with the `or_else()` method to create if-then-else logic.
927///
928/// # Features
929///
930/// - **Single Ownership**: Not cloneable, consumes `self` on use
931/// - **Conditional Execution**: Only consumes when predicate returns `true`
932/// - **Chainable**: Can add `or_else` branch to create if-then-else logic
933/// - **Implements Consumer**: Can be used anywhere a `Consumer` is expected
934///
935/// # Examples
936///
937/// ## Basic Conditional Execution
938///
939/// ```rust
940/// use prism3_function::{Consumer, BoxConsumer};
941/// use std::sync::{Arc, Mutex};
942///
943/// let log = Arc::new(Mutex::new(Vec::new()));
944/// let l = log.clone();
945/// let consumer = BoxConsumer::new(move |x: &i32| {
946/// l.lock().unwrap().push(*x);
947/// });
948/// let mut conditional = consumer.when(|x: &i32| *x > 0);
949///
950/// conditional.accept(&5);
951/// assert_eq!(*log.lock().unwrap(), vec![5]); // Executed
952///
953/// conditional.accept(&-5);
954/// assert_eq!(*log.lock().unwrap(), vec![5]); // Not executed
955/// ```
956///
957/// ## With or_else Branch
958///
959/// ```rust
960/// use prism3_function::{Consumer, BoxConsumer};
961/// use std::sync::{Arc, Mutex};
962///
963/// let log = Arc::new(Mutex::new(Vec::new()));
964/// let l1 = log.clone();
965/// let l2 = log.clone();
966/// let mut consumer = BoxConsumer::new(move |x: &i32| {
967/// l1.lock().unwrap().push(*x);
968/// })
969/// .when(|x: &i32| *x > 0)
970/// .or_else(move |x: &i32| {
971/// l2.lock().unwrap().push(-*x);
972/// });
973///
974/// consumer.accept(&5);
975/// assert_eq!(*log.lock().unwrap(), vec![5]); // when branch executed
976///
977/// consumer.accept(&-5);
978/// assert_eq!(*log.lock().unwrap(), vec![5, 5]); // or_else branch executed
979/// ```
980///
981/// # Author
982///
983/// Hu Haixing
984pub struct BoxConditionalConsumer<T> {
985 consumer: BoxConsumer<T>,
986 predicate: BoxPredicate<T>,
987}
988
989impl<T> Consumer<T> for BoxConditionalConsumer<T>
990where
991 T: 'static,
992{
993 fn accept(&mut self, value: &T) {
994 if self.predicate.test(value) {
995 self.consumer.accept(value);
996 }
997 }
998
999 fn into_box(self) -> BoxConsumer<T> {
1000 let pred = self.predicate;
1001 let mut consumer = self.consumer;
1002 BoxConsumer::new(move |t| {
1003 if pred.test(t) {
1004 consumer.accept(t);
1005 }
1006 })
1007 }
1008
1009 fn into_rc(self) -> RcConsumer<T> {
1010 let pred = self.predicate.into_rc();
1011 let consumer = self.consumer.into_rc();
1012 let mut consumer_fn = consumer;
1013 RcConsumer::new(move |t| {
1014 if pred.test(t) {
1015 consumer_fn.accept(t);
1016 }
1017 })
1018 }
1019
1020 // do NOT override Consumer::into_arc() because BoxConditionalConsumer is not Send + Sync
1021 // and calling BoxConditionalConsumer::into_arc() will cause a compile error
1022
1023 fn into_fn(self) -> impl FnMut(&T) {
1024 let pred = self.predicate;
1025 let mut consumer = self.consumer;
1026 move |t: &T| {
1027 if pred.test(t) {
1028 consumer.accept(t);
1029 }
1030 }
1031 }
1032
1033 // do NOT override Consumer::to_xxx() because BoxConditionalConsumer is not Clone
1034 // and calling BoxConditionalConsumer::to_xxx() will cause a compile error
1035}
1036
1037impl<T> BoxConditionalConsumer<T>
1038where
1039 T: 'static,
1040{
1041 /// Chains another consumer in sequence
1042 ///
1043 /// Combines the current conditional consumer with another consumer into a new
1044 /// consumer. The current conditional consumer executes first, followed by the
1045 /// next consumer.
1046 ///
1047 /// # Parameters
1048 ///
1049 /// * `next` - The next consumer to execute
1050 ///
1051 /// # Returns
1052 ///
1053 /// Returns a new `BoxConsumer<T>`
1054 ///
1055 /// # Examples
1056 ///
1057 /// ```rust
1058 /// use prism3_function::{Consumer, BoxConsumer};
1059 /// use std::sync::{Arc, Mutex};
1060 ///
1061 /// let log = Arc::new(Mutex::new(Vec::new()));
1062 /// let l1 = log.clone();
1063 /// let l2 = log.clone();
1064 /// let cond1 = BoxConsumer::new(move |x: &i32| {
1065 /// l1.lock().unwrap().push(*x * 2);
1066 /// }).when(|x: &i32| *x > 0);
1067 /// let cond2 = BoxConsumer::new(move |x: &i32| {
1068 /// l2.lock().unwrap().push(*x + 100);
1069 /// }).when(|x: &i32| *x > 10);
1070 /// let mut chained = cond1.and_then(cond2);
1071 ///
1072 /// chained.accept(&6);
1073 /// assert_eq!(*log.lock().unwrap(), vec![12, 106]);
1074 /// // First *2 = 12, then +100 = 106
1075 /// ```
1076 pub fn and_then<C>(self, next: C) -> BoxConsumer<T>
1077 where
1078 C: Consumer<T> + 'static,
1079 {
1080 let mut first = self;
1081 let mut second = next;
1082 BoxConsumer::new(move |t| {
1083 first.accept(t);
1084 second.accept(t);
1085 })
1086 }
1087
1088 /// Adds an else branch
1089 ///
1090 /// Executes the original consumer when the condition is satisfied, otherwise
1091 /// executes else_consumer.
1092 ///
1093 /// # Parameters
1094 ///
1095 /// * `else_consumer` - The consumer for the else branch, can be:
1096 /// - Closure: `|x: &T|`
1097 /// - `BoxConsumer<T>`, `RcConsumer<T>`, `ArcConsumer<T>`
1098 /// - Any type implementing `Consumer<T>`
1099 ///
1100 /// # Returns
1101 ///
1102 /// Returns the composed `BoxConsumer<T>`
1103 ///
1104 /// # Examples
1105 ///
1106 /// ## Using a closure (recommended)
1107 ///
1108 /// ```rust
1109 /// use prism3_function::{Consumer, BoxConsumer};
1110 /// use std::sync::{Arc, Mutex};
1111 ///
1112 /// let log = Arc::new(Mutex::new(Vec::new()));
1113 /// let l1 = log.clone();
1114 /// let l2 = log.clone();
1115 /// let mut consumer = BoxConsumer::new(move |x: &i32| {
1116 /// l1.lock().unwrap().push(*x);
1117 /// })
1118 /// .when(|x: &i32| *x > 0)
1119 /// .or_else(move |x: &i32| {
1120 /// l2.lock().unwrap().push(-*x);
1121 /// });
1122 ///
1123 /// consumer.accept(&5);
1124 /// assert_eq!(*log.lock().unwrap(), vec![5]);
1125 /// // Condition satisfied, execute first
1126 ///
1127 /// consumer.accept(&-5);
1128 /// assert_eq!(*log.lock().unwrap(), vec![5, 5]);
1129 /// // Condition not satisfied, execute else
1130 /// ```
1131 pub fn or_else<C>(self, else_consumer: C) -> BoxConsumer<T>
1132 where
1133 C: Consumer<T> + 'static,
1134 {
1135 let pred = self.predicate;
1136 let mut then_cons = self.consumer;
1137 let mut else_cons = else_consumer;
1138 BoxConsumer::new(move |t| {
1139 if pred.test(t) {
1140 then_cons.accept(t);
1141 } else {
1142 else_cons.accept(t);
1143 }
1144 })
1145 }
1146}
1147
1148// ============================================================================
1149// 4. ArcConsumer - Thread-Safe Shared Ownership Implementation
1150// ============================================================================
1151
1152/// ArcConsumer struct
1153///
1154/// Consumer implementation based on `Arc<Mutex<dyn FnMut(&T) + Send>>` for
1155/// thread-safe shared ownership scenarios. This consumer can be safely cloned
1156/// and shared across multiple threads.
1157///
1158/// # Features
1159///
1160/// - **Shared Ownership**: Cloneable through `Arc`, allowing multiple owners
1161/// - **Thread Safety**: Implements `Send + Sync`, safe for concurrent use
1162/// - **Interior Mutability**: Uses `Mutex` for safe mutable access
1163/// - **Non-Consuming API**: `and_then` borrows `&self`, original object remains
1164/// usable
1165/// - **Cross-Thread Sharing**: Can be sent to other threads and used
1166///
1167/// # Use Cases
1168///
1169/// Choose `ArcConsumer` when:
1170/// - Need to share consumers across multiple threads
1171/// - Concurrent task processing (e.g., thread pools)
1172/// - Using the same consumer in multiple places simultaneously
1173/// - Need thread safety (Send + Sync)
1174///
1175/// # Performance Considerations
1176///
1177/// `ArcConsumer` has some performance overhead compared to `BoxConsumer`:
1178/// - **Reference Counting**: Atomic operations on clone/drop
1179/// - **Mutex Locking**: Each `accept` call requires lock acquisition
1180/// - **Lock Contention**: High concurrency may cause contention
1181///
1182/// These overheads are necessary for safe concurrent access. If thread safety
1183/// is not needed, consider using `RcConsumer` for less single-threaded sharing
1184/// overhead.
1185///
1186/// # Examples
1187///
1188/// ```rust
1189/// use prism3_function::{Consumer, ArcConsumer};
1190/// use std::sync::{Arc, Mutex};
1191///
1192/// let log = Arc::new(Mutex::new(Vec::new()));
1193/// let l = log.clone();
1194/// let mut consumer = ArcConsumer::new(move |x: &i32| {
1195/// l.lock().unwrap().push(*x * 2);
1196/// });
1197/// let mut clone = consumer.clone();
1198///
1199/// consumer.accept(&5);
1200/// assert_eq!(*log.lock().unwrap(), vec![10]);
1201/// ```
1202///
1203/// # Author
1204///
1205/// Hu Haixing
1206pub struct ArcConsumer<T> {
1207 function: Arc<Mutex<SendConsumerFn<T>>>,
1208 name: Option<String>,
1209}
1210
1211impl<T> ArcConsumer<T>
1212where
1213 T: Send + 'static,
1214{
1215 /// Create a new ArcConsumer
1216 ///
1217 /// # Type Parameters
1218 ///
1219 /// * `F` - Closure type
1220 ///
1221 /// # Parameters
1222 ///
1223 /// * `f` - Closure to wrap
1224 ///
1225 /// # Return Value
1226 ///
1227 /// Returns a new `ArcConsumer<T>` instance
1228 ///
1229 /// # Examples
1230 ///
1231 /// ```rust
1232 /// use prism3_function::{Consumer, ArcConsumer};
1233 /// use std::sync::{Arc, Mutex};
1234 ///
1235 /// let log = Arc::new(Mutex::new(Vec::new()));
1236 /// let l = log.clone();
1237 /// let mut consumer = ArcConsumer::new(move |x: &i32| {
1238 /// l.lock().unwrap().push(*x + 1);
1239 /// });
1240 /// consumer.accept(&5);
1241 /// assert_eq!(*log.lock().unwrap(), vec![6]);
1242 /// ```
1243 pub fn new<F>(f: F) -> Self
1244 where
1245 F: FnMut(&T) + Send + 'static,
1246 {
1247 ArcConsumer {
1248 function: Arc::new(Mutex::new(f)),
1249 name: None,
1250 }
1251 }
1252
1253 /// Create a new named ArcConsumer
1254 ///
1255 /// # Type Parameters
1256 ///
1257 /// * `F` - Closure type
1258 ///
1259 /// # Parameters
1260 ///
1261 /// * `name` - Name of the consumer
1262 /// * `f` - Closure to wrap
1263 ///
1264 /// # Return Value
1265 ///
1266 /// Returns a new `ArcConsumer<T>` instance
1267 ///
1268 /// # Examples
1269 ///
1270 /// ```rust
1271 /// use prism3_function::{Consumer, ArcConsumer};
1272 /// use std::sync::{Arc, Mutex};
1273 ///
1274 /// let log = Arc::new(Mutex::new(Vec::new()));
1275 /// let l = log.clone();
1276 /// let mut consumer = ArcConsumer::new_with_name("my_consumer", move |x: &i32| {
1277 /// l.lock().unwrap().push(*x + 1);
1278 /// });
1279 /// assert_eq!(consumer.name(), Some("my_consumer"));
1280 /// consumer.accept(&5);
1281 /// assert_eq!(*log.lock().unwrap(), vec![6]);
1282 /// ```
1283 pub fn new_with_name<F>(name: impl Into<String>, f: F) -> Self
1284 where
1285 F: FnMut(&T) + Send + 'static,
1286 {
1287 ArcConsumer {
1288 function: Arc::new(Mutex::new(f)),
1289 name: Some(name.into()),
1290 }
1291 }
1292
1293 /// Create a no-op consumer
1294 ///
1295 /// Returns a consumer that performs no operation.
1296 ///
1297 /// # Return Value
1298 ///
1299 /// Returns a no-op consumer
1300 ///
1301 /// # Examples
1302 ///
1303 /// ```rust
1304 /// use prism3_function::{Consumer, ArcConsumer};
1305 ///
1306 /// let mut noop = ArcConsumer::<i32>::noop();
1307 /// noop.accept(&42);
1308 /// // Value unchanged
1309 /// ```
1310 pub fn noop() -> Self {
1311 ArcConsumer::new(|_| {})
1312 }
1313
1314 /// Get the consumer's name
1315 ///
1316 /// # Return Value
1317 ///
1318 /// Returns the consumer's name, or `None` if not set
1319 pub fn name(&self) -> Option<&str> {
1320 self.name.as_deref()
1321 }
1322
1323 /// Set the consumer's name
1324 ///
1325 /// # Parameters
1326 ///
1327 /// * `name` - Name to set
1328 pub fn set_name(&mut self, name: impl Into<String>) {
1329 self.name = Some(name.into());
1330 }
1331
1332 /// Sequentially chain another ArcConsumer
1333 ///
1334 /// Returns a new consumer that executes the current operation first, then the
1335 /// next operation. Borrows &self, does not consume the original consumer.
1336 ///
1337 /// # Parameters
1338 ///
1339 /// * `next` - Consumer to execute after the current operation
1340 ///
1341 /// # Return Value
1342 ///
1343 /// Returns a new combined `ArcConsumer<T>`
1344 ///
1345 /// # Examples
1346 ///
1347 /// ```rust
1348 /// use prism3_function::{Consumer, ArcConsumer};
1349 /// use std::sync::{Arc, Mutex};
1350 ///
1351 /// let log = Arc::new(Mutex::new(Vec::new()));
1352 /// let l1 = log.clone();
1353 /// let l2 = log.clone();
1354 /// let first = ArcConsumer::new(move |x: &i32| {
1355 /// l1.lock().unwrap().push(*x * 2);
1356 /// });
1357 /// let second = ArcConsumer::new(move |x: &i32| {
1358 /// l2.lock().unwrap().push(*x + 10);
1359 /// });
1360 ///
1361 /// let mut chained = first.and_then(&second);
1362 ///
1363 /// // first and second remain usable after chaining
1364 /// chained.accept(&5);
1365 /// assert_eq!(*log.lock().unwrap(), vec![10, 15]);
1366 /// // (5 * 2), (5 + 10)
1367 /// ```
1368 pub fn and_then(&self, next: &ArcConsumer<T>) -> ArcConsumer<T> {
1369 let first = Arc::clone(&self.function);
1370 let second = Arc::clone(&next.function);
1371 ArcConsumer {
1372 function: Arc::new(Mutex::new(move |t: &T| {
1373 first.lock().unwrap()(t);
1374 second.lock().unwrap()(t);
1375 })),
1376 name: None,
1377 }
1378 }
1379
1380 /// Creates a conditional consumer (thread-safe version)
1381 ///
1382 /// Returns a consumer that only executes when a predicate is satisfied.
1383 ///
1384 /// # Parameters
1385 ///
1386 /// * `predicate` - The condition to check, must be `Send + Sync`, can be:
1387 /// - Closure: `|x: &T| -> bool` (requires `Send + Sync`)
1388 /// - Function pointer: `fn(&T) -> bool`
1389 /// - `ArcPredicate<T>`
1390 /// - Any type implementing `Predicate<T> + Send + Sync`
1391 ///
1392 /// # Returns
1393 ///
1394 /// Returns `ArcConditionalConsumer<T>`
1395 ///
1396 /// # Examples
1397 ///
1398 /// ```rust
1399 /// use prism3_function::{Consumer, ArcConsumer};
1400 /// use std::sync::{Arc, Mutex};
1401 ///
1402 /// let log = Arc::new(Mutex::new(Vec::new()));
1403 /// let l = log.clone();
1404 /// let consumer = ArcConsumer::new(move |x: &i32| {
1405 /// l.lock().unwrap().push(*x);
1406 /// });
1407 /// let conditional = consumer.when(|x: &i32| *x > 0);
1408 ///
1409 /// let conditional_clone = conditional.clone();
1410 ///
1411 /// let mut positive = 5;
1412 /// let mut m = conditional;
1413 /// m.accept(&positive);
1414 /// assert_eq!(*log.lock().unwrap(), vec![5]);
1415 /// ```
1416 pub fn when<P>(&self, predicate: P) -> ArcConditionalConsumer<T>
1417 where
1418 P: Predicate<T> + Send + Sync + 'static,
1419 T: Send + Sync,
1420 {
1421 ArcConditionalConsumer {
1422 consumer: self.clone(),
1423 predicate: predicate.into_arc(),
1424 }
1425 }
1426}
1427
1428impl<T> Consumer<T> for ArcConsumer<T> {
1429 fn accept(&mut self, value: &T) {
1430 (self.function.lock().unwrap())(value)
1431 }
1432
1433 fn into_box(self) -> BoxConsumer<T>
1434 where
1435 T: 'static,
1436 {
1437 let self_fn = self.function;
1438 BoxConsumer::new(move |t| self_fn.lock().unwrap()(t))
1439 }
1440
1441 fn into_rc(self) -> RcConsumer<T>
1442 where
1443 T: 'static,
1444 {
1445 let self_fn = self.function;
1446 RcConsumer::new(move |t| self_fn.lock().unwrap()(t))
1447 }
1448
1449 fn into_arc(self) -> ArcConsumer<T>
1450 where
1451 T: Send + 'static,
1452 {
1453 self
1454 }
1455
1456 fn into_fn(self) -> impl FnMut(&T)
1457 where
1458 T: 'static,
1459 {
1460 let self_fn = self.function;
1461 move |t: &T| {
1462 self_fn.lock().unwrap()(t);
1463 }
1464 }
1465
1466 fn to_box(&self) -> BoxConsumer<T>
1467 where
1468 T: 'static,
1469 {
1470 let self_fn = self.function.clone();
1471 BoxConsumer::new(move |t| self_fn.lock().unwrap()(t))
1472 }
1473
1474 fn to_rc(&self) -> RcConsumer<T>
1475 where
1476 T: 'static,
1477 {
1478 let self_fn = self.function.clone();
1479 RcConsumer::new(move |t| self_fn.lock().unwrap()(t))
1480 }
1481
1482 fn to_arc(&self) -> ArcConsumer<T>
1483 where
1484 T: Send + 'static,
1485 {
1486 self.clone()
1487 }
1488
1489 fn to_fn(&self) -> impl FnMut(&T) {
1490 let self_fn = self.function.clone();
1491 move |t| self_fn.lock().unwrap()(t)
1492 }
1493}
1494
1495impl<T> Clone for ArcConsumer<T> {
1496 /// Clone ArcConsumer
1497 ///
1498 /// Creates a new ArcConsumer that shares the underlying function with the
1499 /// original instance.
1500 fn clone(&self) -> Self {
1501 ArcConsumer {
1502 function: Arc::clone(&self.function),
1503 name: self.name.clone(),
1504 }
1505 }
1506}
1507
1508impl<T> fmt::Debug for ArcConsumer<T> {
1509 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1510 f.debug_struct("ArcConsumer")
1511 .field("name", &self.name)
1512 .field("function", &"<function>")
1513 .finish()
1514 }
1515}
1516
1517impl<T> fmt::Display for ArcConsumer<T> {
1518 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1519 match &self.name {
1520 Some(name) => write!(f, "ArcConsumer({})", name),
1521 None => write!(f, "ArcConsumer"),
1522 }
1523 }
1524}
1525
1526// ============================================================================
1527// 11. ArcConsumer ConsumerOnce Implementation
1528// ============================================================================
1529
1530impl<T> crate::consumer_once::ConsumerOnce<T> for ArcConsumer<T> {
1531 /// Execute one-time consumption operation
1532 ///
1533 /// Executes the consumer operation once and consumes self. This method
1534 /// provides a bridge between the reusable Consumer interface and the
1535 /// one-time ConsumerOnce interface.
1536 ///
1537 /// # Parameters
1538 ///
1539 /// * `value` - Reference to the value to be consumed
1540 ///
1541 /// # Examples
1542 ///
1543 /// ```rust
1544 /// use prism3_function::{Consumer, ConsumerOnce, ArcConsumer};
1545 /// use std::sync::{Arc, Mutex};
1546 ///
1547 /// let log = Arc::new(Mutex::new(Vec::new()));
1548 /// let l = log.clone();
1549 /// let consumer = ArcConsumer::new(move |x: &i32| {
1550 /// l.lock().unwrap().push(*x);
1551 /// });
1552 /// consumer.accept_once(&5);
1553 /// assert_eq!(*log.lock().unwrap(), vec![5]);
1554 /// ```
1555 fn accept_once(mut self, value: &T) {
1556 self.accept(value);
1557 }
1558
1559 /// Convert to BoxConsumerOnce
1560 ///
1561 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
1562 /// calling this method.
1563 ///
1564 /// Converts the current consumer to `BoxConsumerOnce<T>` by wrapping the
1565 /// consumer's accept method in a FnOnce closure.
1566 ///
1567 /// # Return Value
1568 ///
1569 /// Returns the wrapped `BoxConsumerOnce<T>`
1570 ///
1571 /// # Examples
1572 ///
1573 /// ```rust
1574 /// use prism3_function::{Consumer, ConsumerOnce, ArcConsumer};
1575 /// use std::sync::{Arc, Mutex};
1576 ///
1577 /// let log = Arc::new(Mutex::new(Vec::new()));
1578 /// let l = log.clone();
1579 /// let consumer = ArcConsumer::new(move |x: &i32| {
1580 /// l.lock().unwrap().push(*x);
1581 /// });
1582 /// let box_consumer_once = consumer.into_box_once();
1583 /// box_consumer_once.accept_once(&5);
1584 /// assert_eq!(*log.lock().unwrap(), vec![5]);
1585 /// ```
1586 fn into_box_once(self) -> crate::consumer_once::BoxConsumerOnce<T>
1587 where
1588 T: 'static,
1589 {
1590 let mut consumer = self;
1591 crate::consumer_once::BoxConsumerOnce::new(move |t| {
1592 consumer.accept(t);
1593 })
1594 }
1595
1596 /// Convert to closure
1597 ///
1598 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
1599 /// calling this method.
1600 ///
1601 /// Converts the consumer to a closure that can be used directly in places
1602 /// where the standard library requires `FnOnce`.
1603 ///
1604 /// # Return Value
1605 ///
1606 /// Returns a closure implementing `FnOnce(&T)`
1607 ///
1608 /// # Examples
1609 ///
1610 /// ```rust
1611 /// use prism3_function::{Consumer, ConsumerOnce, ArcConsumer};
1612 /// use std::sync::{Arc, Mutex};
1613 ///
1614 /// let log = Arc::new(Mutex::new(Vec::new()));
1615 /// let l = log.clone();
1616 /// let consumer = ArcConsumer::new(move |x: &i32| {
1617 /// l.lock().unwrap().push(*x);
1618 /// });
1619 /// let func = consumer.into_fn_once();
1620 /// func(&5);
1621 /// assert_eq!(*log.lock().unwrap(), vec![5]);
1622 /// ```
1623 fn into_fn_once(self) -> impl FnOnce(&T)
1624 where
1625 T: 'static,
1626 {
1627 let mut consumer = self;
1628 move |t| consumer.accept(t)
1629 }
1630
1631 /// Convert to BoxConsumerOnce without consuming self
1632 ///
1633 /// **⚠️ Requires Clone**: This method requires `Self` to implement
1634 /// `Clone`. Clones the current consumer and wraps it in a
1635 /// `BoxConsumerOnce`.
1636 ///
1637 /// # Return Value
1638 ///
1639 /// Returns the wrapped `BoxConsumerOnce<T>`
1640 ///
1641 /// # Examples
1642 ///
1643 /// ```rust
1644 /// use prism3_function::{Consumer, ConsumerOnce, ArcConsumer};
1645 /// use std::sync::{Arc, Mutex};
1646 ///
1647 /// let log = Arc::new(Mutex::new(Vec::new()));
1648 /// let l = log.clone();
1649 /// let consumer = ArcConsumer::new(move |x: &i32| {
1650 /// l.lock().unwrap().push(*x);
1651 /// });
1652 /// let box_consumer_once = consumer.to_box_once();
1653 /// box_consumer_once.accept_once(&5);
1654 /// assert_eq!(*log.lock().unwrap(), vec![5]);
1655 /// // Original consumer still usable
1656 /// consumer.accept(&3);
1657 /// assert_eq!(*log.lock().unwrap(), vec![5, 3]);
1658 /// ```
1659 fn to_box_once(&self) -> crate::consumer_once::BoxConsumerOnce<T>
1660 where
1661 T: 'static,
1662 {
1663 let self_fn = self.function.clone();
1664 crate::consumer_once::BoxConsumerOnce::new(move |t| {
1665 self_fn.lock().unwrap()(t);
1666 })
1667 }
1668
1669 /// Convert to closure without consuming self
1670 ///
1671 /// **⚠️ Requires Clone**: This method requires `Self` to implement
1672 /// `Clone`. Clones the current consumer and then converts the clone
1673 /// to a closure.
1674 ///
1675 /// # Return Value
1676 ///
1677 /// Returns a closure implementing `FnOnce(&T)`
1678 ///
1679 /// # Examples
1680 ///
1681 /// ```rust
1682 /// use prism3_function::{Consumer, ConsumerOnce, ArcConsumer};
1683 /// use std::sync::{Arc, Mutex};
1684 ///
1685 /// let log = Arc::new(Mutex::new(Vec::new()));
1686 /// let l = log.clone();
1687 /// let consumer = ArcConsumer::new(move |x: &i32| {
1688 /// l.lock().unwrap().push(*x);
1689 /// });
1690 /// let func = consumer.to_fn_once();
1691 /// func(&5);
1692 /// assert_eq!(*log.lock().unwrap(), vec![5]);
1693 /// // Original consumer still usable
1694 /// consumer.accept(&3);
1695 /// assert_eq!(*log.lock().unwrap(), vec![5, 3]);
1696 /// ```
1697 fn to_fn_once(&self) -> impl FnOnce(&T)
1698 where
1699 T: 'static,
1700 {
1701 let self_fn = self.function.clone();
1702 move |t| self_fn.lock().unwrap()(t)
1703 }
1704}
1705
1706// ============================================================================
1707// 5. ArcConditionalConsumer - Arc-based Conditional Consumer
1708// ============================================================================
1709
1710/// ArcConditionalConsumer struct
1711///
1712/// A thread-safe conditional consumer that only executes when a predicate is
1713/// satisfied. Uses `ArcConsumer` and `ArcPredicate` for shared ownership across
1714/// threads.
1715///
1716/// This type is typically created by calling `ArcConsumer::when()` and is
1717/// designed to work with the `or_else()` method to create if-then-else logic.
1718///
1719/// # Features
1720///
1721/// - **Shared Ownership**: Cloneable via `Arc`, multiple owners allowed
1722/// - **Thread-Safe**: Implements `Send + Sync`, safe for concurrent use
1723/// - **Conditional Execution**: Only consumes when predicate returns `true`
1724/// - **Chainable**: Can add `or_else` branch to create if-then-else logic
1725///
1726/// # Examples
1727///
1728/// ```rust
1729/// use prism3_function::{Consumer, ArcConsumer};
1730/// use std::sync::{Arc, Mutex};
1731///
1732/// let log = Arc::new(Mutex::new(Vec::new()));
1733/// let l = log.clone();
1734/// let conditional = ArcConsumer::new(move |x: &i32| {
1735/// l.lock().unwrap().push(*x);
1736/// })
1737/// .when(|x: &i32| *x > 0);
1738///
1739/// let conditional_clone = conditional.clone();
1740///
1741/// let mut value = 5;
1742/// let mut m = conditional;
1743/// m.accept(&value);
1744/// assert_eq!(*log.lock().unwrap(), vec![5]);
1745/// ```
1746///
1747/// # Author
1748///
1749/// Hu Haixing
1750pub struct ArcConditionalConsumer<T> {
1751 consumer: ArcConsumer<T>,
1752 predicate: ArcPredicate<T>,
1753}
1754
1755impl<T> Consumer<T> for ArcConditionalConsumer<T>
1756where
1757 T: Send + 'static,
1758{
1759 fn accept(&mut self, value: &T) {
1760 if self.predicate.test(value) {
1761 self.consumer.accept(value);
1762 }
1763 }
1764
1765 fn into_box(self) -> BoxConsumer<T>
1766 where
1767 T: 'static,
1768 {
1769 let pred = self.predicate;
1770 let mut consumer = self.consumer;
1771 BoxConsumer::new(move |t| {
1772 if pred.test(t) {
1773 consumer.accept(t);
1774 }
1775 })
1776 }
1777
1778 fn into_rc(self) -> RcConsumer<T>
1779 where
1780 T: 'static,
1781 {
1782 let pred = self.predicate.to_rc();
1783 let consumer = self.consumer.into_rc();
1784 let mut consumer_fn = consumer;
1785 RcConsumer::new(move |t| {
1786 if pred.test(t) {
1787 consumer_fn.accept(t);
1788 }
1789 })
1790 }
1791
1792 fn into_arc(self) -> ArcConsumer<T>
1793 where
1794 T: Send + 'static,
1795 {
1796 let pred = self.predicate;
1797 let mut consumer = self.consumer;
1798 ArcConsumer::new(move |t| {
1799 if pred.test(t) {
1800 consumer.accept(t);
1801 }
1802 })
1803 }
1804
1805 fn into_fn(self) -> impl FnMut(&T)
1806 where
1807 T: 'static,
1808 {
1809 let pred = self.predicate;
1810 let mut consumer = self.consumer;
1811 move |t: &T| {
1812 if pred.test(t) {
1813 consumer.accept(t);
1814 }
1815 }
1816 }
1817
1818 // inherit the default implementation of to_xxx() from Consumer
1819}
1820
1821impl<T> ArcConditionalConsumer<T>
1822where
1823 T: Send + 'static,
1824{
1825 /// Adds an else branch (thread-safe version)
1826 ///
1827 /// Executes the original consumer when the condition is satisfied, otherwise
1828 /// executes else_consumer.
1829 ///
1830 /// # Parameters
1831 ///
1832 /// * `else_consumer` - The consumer for the else branch, can be:
1833 /// - Closure: `|x: &T|` (must be `Send`)
1834 /// - `ArcConsumer<T>`, `BoxConsumer<T>`
1835 /// - Any type implementing `Consumer<T> + Send`
1836 ///
1837 /// # Returns
1838 ///
1839 /// Returns the composed `ArcConsumer<T>`
1840 ///
1841 /// # Examples
1842 ///
1843 /// ## Using a closure (recommended)
1844 ///
1845 /// ```rust
1846 /// use prism3_function::{Consumer, ArcConsumer};
1847 /// use std::sync::{Arc, Mutex};
1848 ///
1849 /// let log = Arc::new(Mutex::new(Vec::new()));
1850 /// let l1 = log.clone();
1851 /// let l2 = log.clone();
1852 /// let mut consumer = ArcConsumer::new(move |x: &i32| {
1853 /// l1.lock().unwrap().push(*x);
1854 /// })
1855 /// .when(|x: &i32| *x > 0)
1856 /// .or_else(move |x: &i32| {
1857 /// l2.lock().unwrap().push(-*x);
1858 /// });
1859 ///
1860 /// consumer.accept(&5);
1861 /// assert_eq!(*log.lock().unwrap(), vec![5]);
1862 ///
1863 /// consumer.accept(&-5);
1864 /// assert_eq!(*log.lock().unwrap(), vec![5, 5]);
1865 /// ```
1866 pub fn or_else<C>(&self, else_consumer: C) -> ArcConsumer<T>
1867 where
1868 C: Consumer<T> + Send + 'static,
1869 T: Send + Sync,
1870 {
1871 let pred = self.predicate.clone();
1872 let mut then_cons = self.consumer.clone();
1873 let mut else_cons = else_consumer;
1874
1875 ArcConsumer::new(move |t: &T| {
1876 if pred.test(t) {
1877 then_cons.accept(t);
1878 } else {
1879 else_cons.accept(t);
1880 }
1881 })
1882 }
1883}
1884
1885impl<T> Clone for ArcConditionalConsumer<T> {
1886 /// Clones the conditional consumer
1887 ///
1888 /// Creates a new instance that shares the underlying consumer and predicate
1889 /// with the original instance.
1890 fn clone(&self) -> Self {
1891 ArcConditionalConsumer {
1892 consumer: self.consumer.clone(),
1893 predicate: self.predicate.clone(),
1894 }
1895 }
1896}
1897
1898// ============================================================================
1899// 6. RcConsumer - Single-Threaded Shared Ownership Implementation
1900// ============================================================================
1901
1902/// RcConsumer struct
1903///
1904/// Consumer implementation based on `Rc<RefCell<dyn FnMut(&T)>>` for
1905/// single-threaded shared ownership scenarios. This consumer provides the
1906/// benefits of shared ownership without the overhead of thread safety.
1907///
1908/// # Features
1909///
1910/// - **Shared Ownership**: Cloneable through `Rc`, allowing multiple owners
1911/// - **Single-Threaded**: Not thread-safe, cannot be sent across threads
1912/// - **Interior Mutability**: Uses `RefCell` for runtime borrowing checks
1913/// - **No Lock Overhead**: More efficient than `ArcConsumer` for single-threaded
1914/// use
1915/// - **Non-Consuming API**: `and_then` borrows `&self`, original object remains
1916/// usable
1917///
1918/// # Use Cases
1919///
1920/// Choose `RcConsumer` when:
1921/// - Need to share consumers within a single thread
1922/// - Thread safety is not needed
1923/// - Performance is important (avoid lock overhead)
1924/// - UI event handling in single-threaded frameworks
1925/// - Building complex single-threaded state machines
1926///
1927/// # Performance Considerations
1928///
1929/// `RcConsumer` performs better than `ArcConsumer` in single-threaded scenarios:
1930/// - **Non-Atomic Counting**: clone/drop is cheaper than `Arc`
1931/// - **No Lock Overhead**: `RefCell` uses runtime checks, no locks
1932/// - **Better Cache Locality**: No atomic operations means better CPU cache
1933/// behavior
1934///
1935/// But still has slight overhead compared to `BoxConsumer`:
1936/// - **Reference Counting**: Non-atomic but still exists
1937/// - **Runtime Borrowing Checks**: `RefCell` checks at runtime
1938///
1939/// # Safety
1940///
1941/// `RcConsumer` is not thread-safe and does not implement `Send` or `Sync`.
1942/// Attempting to send it to another thread will result in a compilation error.
1943/// For thread-safe sharing, use `ArcConsumer` instead.
1944///
1945/// # Examples
1946///
1947/// ```rust
1948/// use prism3_function::{Consumer, RcConsumer};
1949/// use std::rc::Rc;
1950/// use std::cell::RefCell;
1951///
1952/// let log = Rc::new(RefCell::new(Vec::new()));
1953/// let l = log.clone();
1954/// let mut consumer = RcConsumer::new(move |x: &i32| {
1955/// l.borrow_mut().push(*x * 2);
1956/// });
1957/// let mut clone = consumer.clone();
1958///
1959/// consumer.accept(&5);
1960/// assert_eq!(*log.borrow(), vec![10]);
1961/// ```
1962///
1963/// # Author
1964///
1965/// Hu Haixing
1966pub struct RcConsumer<T> {
1967 function: Rc<RefCell<ConsumerFn<T>>>,
1968 name: Option<String>,
1969}
1970
1971impl<T> RcConsumer<T>
1972where
1973 T: 'static,
1974{
1975 /// Create a new RcConsumer
1976 ///
1977 /// # Type Parameters
1978 ///
1979 /// * `F` - Closure type
1980 ///
1981 /// # Parameters
1982 ///
1983 /// * `f` - Closure to wrap
1984 ///
1985 /// # Return Value
1986 ///
1987 /// Returns a new `RcConsumer<T>` instance
1988 ///
1989 /// # Examples
1990 ///
1991 /// ```rust
1992 /// use prism3_function::{Consumer, RcConsumer};
1993 /// use std::rc::Rc;
1994 /// use std::cell::RefCell;
1995 ///
1996 /// let log = Rc::new(RefCell::new(Vec::new()));
1997 /// let l = log.clone();
1998 /// let mut consumer = RcConsumer::new(move |x: &i32| {
1999 /// l.borrow_mut().push(*x + 1);
2000 /// });
2001 /// consumer.accept(&5);
2002 /// assert_eq!(*log.borrow(), vec![6]);
2003 /// ```
2004 pub fn new<F>(f: F) -> Self
2005 where
2006 F: FnMut(&T) + 'static,
2007 {
2008 RcConsumer {
2009 function: Rc::new(RefCell::new(f)),
2010 name: None,
2011 }
2012 }
2013
2014 /// Create a new named RcConsumer
2015 ///
2016 /// # Type Parameters
2017 ///
2018 /// * `F` - Closure type
2019 ///
2020 /// # Parameters
2021 ///
2022 /// * `name` - Name of the consumer
2023 /// * `f` - Closure to wrap
2024 ///
2025 /// # Return Value
2026 ///
2027 /// Returns a new `RcConsumer<T>` instance
2028 ///
2029 /// # Examples
2030 ///
2031 /// ```rust
2032 /// use prism3_function::{Consumer, RcConsumer};
2033 /// use std::rc::Rc;
2034 /// use std::cell::RefCell;
2035 ///
2036 /// let log = Rc::new(RefCell::new(Vec::new()));
2037 /// let l = log.clone();
2038 /// let mut consumer = RcConsumer::new_with_name("my_consumer", move |x: &i32| {
2039 /// l.borrow_mut().push(*x + 1);
2040 /// });
2041 /// assert_eq!(consumer.name(), Some("my_consumer"));
2042 /// consumer.accept(&5);
2043 /// assert_eq!(*log.borrow(), vec![6]);
2044 /// ```
2045 pub fn new_with_name<F>(name: impl Into<String>, f: F) -> Self
2046 where
2047 F: FnMut(&T) + 'static,
2048 {
2049 RcConsumer {
2050 function: Rc::new(RefCell::new(f)),
2051 name: Some(name.into()),
2052 }
2053 }
2054
2055 /// Create a no-op consumer
2056 ///
2057 /// Returns a consumer that performs no operation.
2058 ///
2059 /// # Return Value
2060 ///
2061 /// Returns a no-op consumer
2062 ///
2063 /// # Examples
2064 ///
2065 /// ```rust
2066 /// use prism3_function::{Consumer, RcConsumer};
2067 ///
2068 /// let mut noop = RcConsumer::<i32>::noop();
2069 /// noop.accept(&42);
2070 /// // Value unchanged
2071 /// ```
2072 pub fn noop() -> Self {
2073 RcConsumer::new(|_| {})
2074 }
2075
2076 /// Get the consumer's name
2077 ///
2078 /// # Return Value
2079 ///
2080 /// Returns the consumer's name, or `None` if not set
2081 pub fn name(&self) -> Option<&str> {
2082 self.name.as_deref()
2083 }
2084
2085 /// Set the consumer's name
2086 ///
2087 /// # Parameters
2088 ///
2089 /// * `name` - Name to set
2090 pub fn set_name(&mut self, name: impl Into<String>) {
2091 self.name = Some(name.into());
2092 }
2093
2094 /// Sequentially chain another RcConsumer
2095 ///
2096 /// Returns a new consumer that executes the current operation first, then the
2097 /// next operation. Borrows &self, does not consume the original consumer.
2098 ///
2099 /// # Parameters
2100 ///
2101 /// * `next` - Consumer to execute after the current operation
2102 ///
2103 /// # Return Value
2104 ///
2105 /// Returns a new combined `RcConsumer<T>`
2106 ///
2107 /// # Examples
2108 ///
2109 /// ```rust
2110 /// use prism3_function::{Consumer, RcConsumer};
2111 /// use std::rc::Rc;
2112 /// use std::cell::RefCell;
2113 ///
2114 /// let log = Rc::new(RefCell::new(Vec::new()));
2115 /// let l1 = log.clone();
2116 /// let l2 = log.clone();
2117 /// let first = RcConsumer::new(move |x: &i32| {
2118 /// l1.borrow_mut().push(*x * 2);
2119 /// });
2120 /// let second = RcConsumer::new(move |x: &i32| {
2121 /// l2.borrow_mut().push(*x + 10);
2122 /// });
2123 ///
2124 /// let mut chained = first.and_then(&second);
2125 ///
2126 /// // first and second remain usable after chaining
2127 /// chained.accept(&5);
2128 /// assert_eq!(*log.borrow(), vec![10, 15]);
2129 /// // (5 * 2), (5 + 10)
2130 /// ```
2131 pub fn and_then(&self, next: &RcConsumer<T>) -> RcConsumer<T> {
2132 let first = Rc::clone(&self.function);
2133 let second = Rc::clone(&next.function);
2134 RcConsumer {
2135 function: Rc::new(RefCell::new(move |t: &T| {
2136 first.borrow_mut()(t);
2137 second.borrow_mut()(t);
2138 })),
2139 name: None,
2140 }
2141 }
2142
2143 /// Creates a conditional consumer (single-threaded shared version)
2144 ///
2145 /// Returns a consumer that only executes when a predicate is satisfied.
2146 ///
2147 /// # Parameters
2148 ///
2149 /// * `predicate` - The condition to check, can be:
2150 /// - Closure: `|x: &T| -> bool`
2151 /// - Function pointer: `fn(&T) -> bool`
2152 /// - `RcPredicate<T>`, `BoxPredicate<T>`
2153 /// - Any type implementing `Predicate<T>`
2154 ///
2155 /// # Returns
2156 ///
2157 /// Returns `RcConditionalConsumer<T>`
2158 ///
2159 /// # Examples
2160 ///
2161 /// ```rust
2162 /// use prism3_function::{Consumer, RcConsumer};
2163 /// use std::rc::Rc;
2164 /// use std::cell::RefCell;
2165 ///
2166 /// let log = Rc::new(RefCell::new(Vec::new()));
2167 /// let l = log.clone();
2168 /// let consumer = RcConsumer::new(move |x: &i32| {
2169 /// l.borrow_mut().push(*x);
2170 /// });
2171 /// let conditional = consumer.when(|x: &i32| *x > 0);
2172 ///
2173 /// let conditional_clone = conditional.clone();
2174 ///
2175 /// let mut positive = 5;
2176 /// let mut m = conditional;
2177 /// m.accept(&positive);
2178 /// assert_eq!(*log.borrow(), vec![5]);
2179 /// ```
2180 pub fn when<P>(&self, predicate: P) -> RcConditionalConsumer<T>
2181 where
2182 P: Predicate<T> + 'static,
2183 {
2184 RcConditionalConsumer {
2185 consumer: self.clone(),
2186 predicate: predicate.into_rc(),
2187 }
2188 }
2189}
2190
2191impl<T> Consumer<T> for RcConsumer<T> {
2192 fn accept(&mut self, value: &T) {
2193 (self.function.borrow_mut())(value)
2194 }
2195
2196 fn into_box(self) -> BoxConsumer<T>
2197 where
2198 T: 'static,
2199 {
2200 let self_fn = self.function;
2201 BoxConsumer::new(move |t| self_fn.borrow_mut()(t))
2202 }
2203
2204 fn into_rc(self) -> RcConsumer<T>
2205 where
2206 T: 'static,
2207 {
2208 self
2209 }
2210
2211 // do NOT override Consumer::into_arc() because RcConsumer is not Send + Sync
2212 // and calling RcConsumer::into_arc() will cause a compile error
2213
2214 fn into_fn(self) -> impl FnMut(&T)
2215 where
2216 T: 'static,
2217 {
2218 let self_fn = self.function;
2219 move |t| self_fn.borrow_mut()(t)
2220 }
2221
2222 fn to_box(&self) -> BoxConsumer<T>
2223 where
2224 T: 'static,
2225 {
2226 let self_fn = self.function.clone();
2227 BoxConsumer::new(move |t| self_fn.borrow_mut()(t))
2228 }
2229
2230 fn to_rc(&self) -> RcConsumer<T>
2231 where
2232 T: 'static,
2233 {
2234 self.clone()
2235 }
2236
2237 // do NOT override Consumer::to_arc() because RcConsumer is not Send + Sync
2238 // and calling RcConsumer::to_arc() will cause a compile error
2239
2240 fn to_fn(&self) -> impl FnMut(&T) {
2241 let self_fn = self.function.clone();
2242 move |t| self_fn.borrow_mut()(t)
2243 }
2244}
2245
2246impl<T> Clone for RcConsumer<T> {
2247 /// Clone RcConsumer
2248 ///
2249 /// Creates a new RcConsumer that shares the underlying function with the
2250 /// original instance.
2251 fn clone(&self) -> Self {
2252 RcConsumer {
2253 function: self.function.clone(),
2254 name: self.name.clone(),
2255 }
2256 }
2257}
2258
2259impl<T> fmt::Debug for RcConsumer<T> {
2260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2261 f.debug_struct("RcConsumer")
2262 .field("name", &self.name)
2263 .field("function", &"<function>")
2264 .finish()
2265 }
2266}
2267
2268impl<T> fmt::Display for RcConsumer<T> {
2269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2270 match &self.name {
2271 Some(name) => write!(f, "RcConsumer({})", name),
2272 None => write!(f, "RcConsumer"),
2273 }
2274 }
2275}
2276
2277// ============================================================================
2278// 10. RcConsumer ConsumerOnce Implementation
2279// ============================================================================
2280
2281impl<T> crate::consumer_once::ConsumerOnce<T> for RcConsumer<T> {
2282 /// Execute one-time consumption operation
2283 ///
2284 /// Executes the consumer operation once and consumes self. This method
2285 /// provides a bridge between the reusable Consumer interface and the
2286 /// one-time ConsumerOnce interface.
2287 ///
2288 /// # Parameters
2289 ///
2290 /// * `value` - Reference to the value to be consumed
2291 ///
2292 /// # Examples
2293 ///
2294 /// ```rust
2295 /// use prism3_function::{Consumer, ConsumerOnce, RcConsumer};
2296 /// use std::rc::Rc;
2297 /// use std::cell::RefCell;
2298 ///
2299 /// let log = Rc::new(RefCell::new(Vec::new()));
2300 /// let l = log.clone();
2301 /// let consumer = RcConsumer::new(move |x: &i32| {
2302 /// l.borrow_mut().push(*x);
2303 /// });
2304 /// consumer.accept_once(&5);
2305 /// assert_eq!(*log.borrow(), vec![5]);
2306 /// ```
2307 fn accept_once(mut self, value: &T) {
2308 self.accept(value);
2309 }
2310
2311 /// Convert to BoxConsumerOnce
2312 ///
2313 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
2314 /// calling this method.
2315 ///
2316 /// Converts the current consumer to `BoxConsumerOnce<T>` by wrapping the
2317 /// consumer's accept method in a FnOnce closure.
2318 ///
2319 /// # Return Value
2320 ///
2321 /// Returns the wrapped `BoxConsumerOnce<T>`
2322 ///
2323 /// # Examples
2324 ///
2325 /// ```rust
2326 /// use prism3_function::{Consumer, ConsumerOnce, RcConsumer};
2327 /// use std::rc::Rc;
2328 /// use std::cell::RefCell;
2329 ///
2330 /// let log = Rc::new(RefCell::new(Vec::new()));
2331 /// let l = log.clone();
2332 /// let consumer = RcConsumer::new(move |x: &i32| {
2333 /// l.borrow_mut().push(*x);
2334 /// });
2335 /// let box_consumer_once = consumer.into_box_once();
2336 /// box_consumer_once.accept_once(&5);
2337 /// assert_eq!(*log.borrow(), vec![5]);
2338 /// ```
2339 fn into_box_once(self) -> crate::consumer_once::BoxConsumerOnce<T>
2340 where
2341 T: 'static,
2342 {
2343 let mut consumer = self;
2344 crate::consumer_once::BoxConsumerOnce::new(move |t| {
2345 consumer.accept(t);
2346 })
2347 }
2348
2349 /// Convert to closure
2350 ///
2351 /// **⚠️ Consumes `self`**: The original consumer will be unavailable after
2352 /// calling this method.
2353 ///
2354 /// Converts the consumer to a closure that can be used directly in places
2355 /// where the standard library requires `FnOnce`.
2356 ///
2357 /// # Return Value
2358 ///
2359 /// Returns a closure implementing `FnOnce(&T)`
2360 ///
2361 /// # Examples
2362 ///
2363 /// ```rust
2364 /// use prism3_function::{Consumer, ConsumerOnce, RcConsumer};
2365 /// use std::rc::Rc;
2366 /// use std::cell::RefCell;
2367 ///
2368 /// let log = Rc::new(RefCell::new(Vec::new()));
2369 /// let l = log.clone();
2370 /// let consumer = RcConsumer::new(move |x: &i32| {
2371 /// l.borrow_mut().push(*x);
2372 /// });
2373 /// let func = consumer.into_fn_once();
2374 /// func(&5);
2375 /// assert_eq!(*log.borrow(), vec![5]);
2376 /// ```
2377 fn into_fn_once(self) -> impl FnOnce(&T)
2378 where
2379 T: 'static,
2380 {
2381 let mut consumer = self;
2382 move |t| consumer.accept(t)
2383 }
2384
2385 /// Convert to BoxConsumerOnce without consuming self
2386 ///
2387 /// **⚠️ Requires Clone**: This method requires `Self` to implement
2388 /// `Clone`. Clones the current consumer and wraps it in a
2389 /// `BoxConsumerOnce`.
2390 ///
2391 /// # Return Value
2392 ///
2393 /// Returns the wrapped `BoxConsumerOnce<T>`
2394 ///
2395 /// # Examples
2396 ///
2397 /// ```rust
2398 /// use prism3_function::{Consumer, ConsumerOnce, RcConsumer};
2399 /// use std::rc::Rc;
2400 /// use std::cell::RefCell;
2401 ///
2402 /// let log = Rc::new(RefCell::new(Vec::new()));
2403 /// let l = log.clone();
2404 /// let consumer = RcConsumer::new(move |x: &i32| {
2405 /// l.borrow_mut().push(*x);
2406 /// });
2407 /// let box_consumer_once = consumer.to_box_once();
2408 /// box_consumer_once.accept_once(&5);
2409 /// assert_eq!(*log.borrow(), vec![5]);
2410 /// // Original consumer still usable
2411 /// consumer.accept(&3);
2412 /// assert_eq!(*log.borrow(), vec![5, 3]);
2413 /// ```
2414 fn to_box_once(&self) -> crate::consumer_once::BoxConsumerOnce<T>
2415 where
2416 T: 'static,
2417 {
2418 let self_fn = self.function.clone();
2419 crate::consumer_once::BoxConsumerOnce::new(move |t| {
2420 self_fn.borrow_mut()(t);
2421 })
2422 }
2423
2424 /// Convert to closure without consuming self
2425 ///
2426 /// **⚠️ Requires Clone**: This method requires `Self` to implement
2427 /// `Clone`. Clones the current consumer and then converts the clone
2428 /// to a closure.
2429 ///
2430 /// # Return Value
2431 ///
2432 /// Returns a closure implementing `FnOnce(&T)`
2433 ///
2434 /// # Examples
2435 ///
2436 /// ```rust
2437 /// use prism3_function::{Consumer, ConsumerOnce, RcConsumer};
2438 /// use std::rc::Rc;
2439 /// use std::cell::RefCell;
2440 ///
2441 /// let log = Rc::new(RefCell::new(Vec::new()));
2442 /// let l = log.clone();
2443 /// let consumer = RcConsumer::new(move |x: &i32| {
2444 /// l.borrow_mut().push(*x);
2445 /// });
2446 /// let func = consumer.to_fn_once();
2447 /// func(&5);
2448 /// assert_eq!(*log.borrow(), vec![5]);
2449 /// // Original consumer still usable
2450 /// consumer.accept(&3);
2451 /// assert_eq!(*log.borrow(), vec![5, 3]);
2452 /// ```
2453 fn to_fn_once(&self) -> impl FnOnce(&T)
2454 where
2455 T: 'static,
2456 {
2457 let self_fn = self.function.clone();
2458 move |t| self_fn.borrow_mut()(t)
2459 }
2460}
2461
2462// ============================================================================
2463// 7. RcConditionalConsumer - Rc-based Conditional Consumer
2464// ============================================================================
2465
2466/// RcConditionalConsumer struct
2467///
2468/// A single-threaded conditional consumer that only executes when a predicate is
2469/// satisfied. Uses `RcConsumer` and `RcPredicate` for shared ownership within a
2470/// single thread.
2471///
2472/// This type is typically created by calling `RcConsumer::when()` and is
2473/// designed to work with the `or_else()` method to create if-then-else logic.
2474///
2475/// # Features
2476///
2477/// - **Shared Ownership**: Cloneable via `Rc`, multiple owners allowed
2478/// - **Single-Threaded**: Not thread-safe, cannot be sent across threads
2479/// - **Conditional Execution**: Only consumes when predicate returns `true`
2480/// - **No Lock Overhead**: More efficient than `ArcConditionalConsumer`
2481///
2482/// # Examples
2483///
2484/// ```rust
2485/// use prism3_function::{Consumer, RcConsumer};
2486/// use std::rc::Rc;
2487/// use std::cell::RefCell;
2488///
2489/// let log = Rc::new(RefCell::new(Vec::new()));
2490/// let l = log.clone();
2491/// let conditional = RcConsumer::new(move |x: &i32| {
2492/// l.borrow_mut().push(*x);
2493/// })
2494/// .when(|x: &i32| *x > 0);
2495///
2496/// let conditional_clone = conditional.clone();
2497///
2498/// let mut value = 5;
2499/// let mut m = conditional;
2500/// m.accept(&value);
2501/// assert_eq!(*log.borrow(), vec![5]);
2502/// ```
2503///
2504/// # Author
2505///
2506/// Hu Haixing
2507pub struct RcConditionalConsumer<T> {
2508 consumer: RcConsumer<T>,
2509 predicate: RcPredicate<T>,
2510}
2511
2512impl<T> Consumer<T> for RcConditionalConsumer<T>
2513where
2514 T: 'static,
2515{
2516 fn accept(&mut self, value: &T) {
2517 if self.predicate.test(value) {
2518 self.consumer.accept(value);
2519 }
2520 }
2521
2522 fn into_box(self) -> BoxConsumer<T> {
2523 let pred = self.predicate;
2524 let mut consumer = self.consumer;
2525 BoxConsumer::new(move |t| {
2526 if pred.test(t) {
2527 consumer.accept(t);
2528 }
2529 })
2530 }
2531
2532 fn into_rc(self) -> RcConsumer<T> {
2533 let pred = self.predicate;
2534 let mut consumer = self.consumer;
2535 RcConsumer::new(move |t| {
2536 if pred.test(t) {
2537 consumer.accept(t);
2538 }
2539 })
2540 }
2541
2542 // do NOT override Consumer::into_arc() because RcConditionalConsumer is not Send + Sync
2543 // and calling RcConditionalConsumer::into_arc() will cause a compile error
2544
2545 fn into_fn(self) -> impl FnMut(&T) {
2546 let pred = self.predicate;
2547 let mut consumer = self.consumer;
2548 move |t: &T| {
2549 if pred.test(t) {
2550 consumer.accept(t);
2551 }
2552 }
2553 }
2554
2555 // inherit the default implementation of to_xxx() from Consumer
2556}
2557
2558impl<T> RcConditionalConsumer<T>
2559where
2560 T: 'static,
2561{
2562 /// Adds an else branch (single-threaded shared version)
2563 ///
2564 /// Executes the original consumer when the condition is satisfied, otherwise
2565 /// executes else_consumer.
2566 ///
2567 /// # Parameters
2568 ///
2569 /// * `else_consumer` - The consumer for the else branch, can be:
2570 /// - Closure: `|x: &T|`
2571 /// - `RcConsumer<T>`, `BoxConsumer<T>`
2572 /// - Any type implementing `Consumer<T>`
2573 ///
2574 /// # Returns
2575 ///
2576 /// Returns the composed `RcConsumer<T>`
2577 ///
2578 /// # Examples
2579 ///
2580 /// ## Using a closure (recommended)
2581 ///
2582 /// ```rust
2583 /// use prism3_function::{Consumer, RcConsumer};
2584 /// use std::rc::Rc;
2585 /// use std::cell::RefCell;
2586 ///
2587 /// let log = Rc::new(RefCell::new(Vec::new()));
2588 /// let l1 = log.clone();
2589 /// let l2 = log.clone();
2590 /// let mut consumer = RcConsumer::new(move |x: &i32| {
2591 /// l1.borrow_mut().push(*x);
2592 /// })
2593 /// .when(|x: &i32| *x > 0)
2594 /// .or_else(move |x: &i32| {
2595 /// l2.borrow_mut().push(-*x);
2596 /// });
2597 ///
2598 /// consumer.accept(&5);
2599 /// assert_eq!(*log.borrow(), vec![5]);
2600 ///
2601 /// consumer.accept(&-5);
2602 /// assert_eq!(*log.borrow(), vec![5, 5]);
2603 /// ```
2604 pub fn or_else<C>(&self, else_consumer: C) -> RcConsumer<T>
2605 where
2606 C: Consumer<T> + 'static,
2607 {
2608 let pred = self.predicate.clone();
2609 let mut then_cons = self.consumer.clone();
2610 let mut else_cons = else_consumer;
2611
2612 RcConsumer::new(move |t: &T| {
2613 if pred.test(t) {
2614 then_cons.accept(t);
2615 } else {
2616 else_cons.accept(t);
2617 }
2618 })
2619 }
2620}
2621
2622impl<T> Clone for RcConditionalConsumer<T> {
2623 /// Clones the conditional consumer
2624 ///
2625 /// Creates a new instance that shares the underlying consumer and predicate
2626 /// with the original instance.
2627 fn clone(&self) -> Self {
2628 RcConditionalConsumer {
2629 consumer: self.consumer.clone(),
2630 predicate: self.predicate.clone(),
2631 }
2632 }
2633}
2634
2635// ============================================================================
2636// 8. Implement Consumer trait for closures
2637// ============================================================================
2638
2639/// Implement Consumer for all FnMut(&T)
2640impl<T, F> Consumer<T> for F
2641where
2642 F: FnMut(&T),
2643{
2644 fn accept(&mut self, value: &T) {
2645 self(value)
2646 }
2647
2648 fn into_box(self) -> BoxConsumer<T>
2649 where
2650 Self: Sized + 'static,
2651 T: 'static,
2652 {
2653 BoxConsumer::new(self)
2654 }
2655
2656 fn into_rc(self) -> RcConsumer<T>
2657 where
2658 Self: Sized + 'static,
2659 T: 'static,
2660 {
2661 RcConsumer::new(self)
2662 }
2663
2664 fn into_arc(self) -> ArcConsumer<T>
2665 where
2666 Self: Sized + Send + 'static,
2667 T: Send + 'static,
2668 {
2669 ArcConsumer::new(self)
2670 }
2671
2672 fn into_fn(self) -> impl FnMut(&T)
2673 where
2674 Self: Sized + 'static,
2675 T: 'static,
2676 {
2677 self
2678 }
2679
2680 fn to_box(&self) -> BoxConsumer<T>
2681 where
2682 Self: Sized + Clone + 'static,
2683 T: 'static,
2684 {
2685 let cloned = self.clone();
2686 BoxConsumer::new(cloned)
2687 }
2688
2689 fn to_rc(&self) -> RcConsumer<T>
2690 where
2691 Self: Sized + Clone + 'static,
2692 T: 'static,
2693 {
2694 let cloned = self.clone();
2695 RcConsumer::new(cloned)
2696 }
2697
2698 fn to_arc(&self) -> ArcConsumer<T>
2699 where
2700 Self: Sized + Clone + Send + 'static,
2701 T: Send + 'static,
2702 {
2703 let cloned = self.clone();
2704 ArcConsumer::new(cloned)
2705 }
2706
2707 fn to_fn(&self) -> impl FnMut(&T)
2708 where
2709 Self: Sized + Clone + 'static,
2710 T: 'static,
2711 {
2712 self.clone()
2713 }
2714}
2715
2716// ============================================================================
2717// 9. Extension methods for closures
2718// ============================================================================
2719
2720/// Extension trait providing consumer composition methods for closures
2721///
2722/// Provides `and_then` and other composition methods for all closures
2723/// implementing `FnMut(&T)`, allowing direct method chaining on closures
2724/// without explicit wrapper types.
2725///
2726/// # Design Philosophy
2727///
2728/// This trait allows closures to be naturally composed using method syntax,
2729/// similar to iterator combinators. Composition methods consume the closure and
2730/// return `BoxConsumer<T>`, which can continue chaining.
2731///
2732/// # Features
2733///
2734/// - **Natural Syntax**: Direct method chaining on closures
2735/// - **Returns BoxConsumer**: Composition results in `BoxConsumer<T>`, can
2736/// continue chaining
2737/// - **Zero Cost**: No overhead when composing closures
2738/// - **Automatic Implementation**: All `FnMut(&T)` closures automatically get
2739/// these methods
2740///
2741/// # Examples
2742///
2743/// ```rust
2744/// use prism3_function::{Consumer, FnConsumerOps};
2745/// use std::sync::{Arc, Mutex};
2746///
2747/// let log = Arc::new(Mutex::new(Vec::new()));
2748/// let l1 = log.clone();
2749/// let l2 = log.clone();
2750/// let mut chained = (move |x: &i32| {
2751/// l1.lock().unwrap().push(*x * 2);
2752/// }).and_then(move |x: &i32| {
2753/// l2.lock().unwrap().push(*x + 10);
2754/// });
2755/// chained.accept(&5);
2756/// assert_eq!(*log.lock().unwrap(), vec![10, 15]);
2757/// // (5 * 2), (5 + 10)
2758/// ```
2759///
2760/// # Author
2761///
2762/// Hu Haixing
2763pub trait FnConsumerOps<T>: FnMut(&T) + Sized {
2764 /// Sequentially chain another consumer
2765 ///
2766 /// Returns a new consumer that executes the current operation first, then the
2767 /// next operation. Consumes the current closure and returns `BoxConsumer<T>`.
2768 ///
2769 /// # Type Parameters
2770 ///
2771 /// * `C` - Type of the next consumer
2772 ///
2773 /// # Parameters
2774 ///
2775 /// * `next` - Consumer to execute after the current operation. **Note: This
2776 /// parameter is passed by value and will transfer ownership.** If you need
2777 /// to preserve the original consumer, clone it first (if it implements
2778 /// `Clone`). Can be:
2779 /// - A closure: `|x: &T|`
2780 /// - A `BoxConsumer<T>`
2781 /// - An `RcConsumer<T>`
2782 /// - An `ArcConsumer<T>`
2783 /// - Any type implementing `Consumer<T>`
2784 ///
2785 /// # Return Value
2786 ///
2787 /// Returns a combined `BoxConsumer<T>`
2788 ///
2789 /// # Examples
2790 ///
2791 /// ## Direct value passing (ownership transfer)
2792 ///
2793 /// ```rust
2794 /// use prism3_function::{Consumer, FnConsumerOps, BoxConsumer};
2795 /// use std::sync::{Arc, Mutex};
2796 ///
2797 /// let log = Arc::new(Mutex::new(Vec::new()));
2798 /// let l1 = log.clone();
2799 /// let l2 = log.clone();
2800 /// let second = BoxConsumer::new(move |x: &i32| {
2801 /// l2.lock().unwrap().push(*x + 10);
2802 /// });
2803 ///
2804 /// // second is moved here
2805 /// let mut chained = (move |x: &i32| {
2806 /// l1.lock().unwrap().push(*x * 2);
2807 /// }).and_then(second);
2808 ///
2809 /// chained.accept(&5);
2810 /// assert_eq!(*log.lock().unwrap(), vec![10, 15]);
2811 /// // second.accept(&3); // Would not compile - moved
2812 /// ```
2813 ///
2814 /// ## Preserving original with clone
2815 ///
2816 /// ```rust
2817 /// use prism3_function::{Consumer, FnConsumerOps, BoxConsumer};
2818 /// use std::sync::{Arc, Mutex};
2819 ///
2820 /// let log = Arc::new(Mutex::new(Vec::new()));
2821 /// let l1 = log.clone();
2822 /// let l2 = log.clone();
2823 /// let second = BoxConsumer::new(move |x: &i32| {
2824 /// l2.lock().unwrap().push(*x + 10);
2825 /// });
2826 ///
2827 /// // Clone to preserve original
2828 /// let mut chained = (move |x: &i32| {
2829 /// l1.lock().unwrap().push(*x * 2);
2830 /// }).and_then(second.clone());
2831 ///
2832 /// chained.accept(&5);
2833 /// assert_eq!(*log.lock().unwrap(), vec![10, 15]);
2834 ///
2835 /// // Original still usable
2836 /// second.accept(&3);
2837 /// assert_eq!(*log.lock().unwrap(), vec![10, 15, 13]);
2838 /// ```
2839 fn and_then<C>(self, next: C) -> BoxConsumer<T>
2840 where
2841 Self: 'static,
2842 C: Consumer<T> + 'static,
2843 T: 'static,
2844 {
2845 let mut first = self;
2846 let mut second = next;
2847 BoxConsumer::new(move |t| {
2848 first(t);
2849 second.accept(t);
2850 })
2851 }
2852}
2853
2854/// Implement FnConsumerOps for all closure types
2855impl<T, F> FnConsumerOps<T> for F where F: FnMut(&T) {}