Skip to main content

rx_rs/core/
rx_subject.rs

1use super::rx_observable::RxObservable;
2use super::rx_ref::RxRef;
3use super::rx_val::RxVal;
4use super::tracker::Tracker;
5
6/// A read-write stream of events.
7///
8/// RxSubject provides both read and write access to an event stream. It exposes
9/// a read-only RxObservable via the `.observable()` method, and allows emitting
10/// events via the `.next()` method.
11///
12/// Unlike RxRef, RxSubject does NOT hold a current value. It only emits discrete
13/// events to subscribers.
14///
15/// # Example
16/// ```
17/// use rx_rs::core::{RxSubject, DisposableTracker};
18///
19/// let mut tracker = DisposableTracker::new();
20/// let button_clicks = RxSubject::new();
21///
22/// // Subscribe to button click events
23/// button_clicks.observable().subscribe(tracker.tracker(), |click_count| {
24///     println!("Button clicked {} times", click_count);
25/// }); // Nothing printed yet (no current value)
26///
27/// // Emit events
28/// button_clicks.next(1); // Prints "Button clicked 1 times"
29/// button_clicks.next(2); // Prints "Button clicked 2 times"
30/// ```
31#[derive(Clone)]
32pub struct RxSubject<T> {
33    inner: RxObservable<T>,
34}
35
36impl<T: 'static> RxSubject<T> {
37    /// Creates a new RxSubject.
38    ///
39    /// # Example
40    /// ```
41    /// use rx_rs::core::RxSubject;
42    ///
43    /// let messages = RxSubject::<String>::new();
44    /// ```
45    pub fn new() -> Self {
46        Self {
47            inner: RxObservable::new(),
48        }
49    }
50
51    /// Emits a new event to all subscribers.
52    ///
53    /// All subscribers to the RxObservable obtained via `.observable()` will be
54    /// called with the event.
55    ///
56    /// # Arguments
57    /// * `value` - The event to emit
58    ///
59    /// # Example
60    /// ```
61    /// use rx_rs::core::RxSubject;
62    ///
63    /// let events = RxSubject::new();
64    /// events.next("click");
65    /// events.next("hover");
66    /// ```
67    pub fn next(&self, value: T) {
68        self.inner.emit(&value);
69    }
70
71    /// Returns a read-only view of this event stream.
72    ///
73    /// The returned RxObservable can be cloned and passed around, allowing multiple
74    /// parts of the code to subscribe to events without having write access.
75    ///
76    /// # Example
77    /// ```
78    /// use rx_rs::core::{RxSubject, DisposableTracker};
79    ///
80    /// let mut tracker = DisposableTracker::new();
81    /// let events = RxSubject::new();
82    /// let read_only = events.observable();
83    ///
84    /// read_only.subscribe(tracker.tracker(), |event| {
85    ///     println!("Event: {}", event);
86    /// });
87    ///
88    /// events.next(42);
89    /// ```
90    pub fn observable(&self) -> RxObservable<T> {
91        self.inner.clone()
92    }
93
94    /// Returns the number of active subscribers.
95    pub fn subscriber_count(&self) -> usize {
96        self.inner.subscriber_count()
97    }
98
99    /// Converts this RxSubject to an RxVal with an initial value.
100    ///
101    /// The RxVal is updated whenever the subject emits a new value.
102    /// A tracker must be provided to manage the subscription lifetime.
103    ///
104    /// # Arguments
105    /// * `initial` - The initial value for the RxVal
106    /// * `tracker` - Tracker to manage the subscription lifetime
107    ///
108    /// # Example
109    /// ```
110    /// use rx_rs::core::{RxSubject, DisposableTracker};
111    ///
112    /// let mut tracker = DisposableTracker::new();
113    /// let subject = RxSubject::new();
114    /// let val = subject.to_val(0, tracker.tracker());
115    ///
116    /// assert_eq!(val.get(), 0);
117    ///
118    /// subject.next(42);
119    /// assert_eq!(val.get(), 42);
120    /// ```
121    pub fn to_val(&self, initial: T, tracker: &Tracker) -> RxVal<T>
122    where
123        T: Clone + PartialEq,
124    {
125        self.inner.to_val(initial, tracker)
126    }
127
128    /// Maps the values of this RxSubject using a transformation function.
129    ///
130    /// Returns a new RxObservable that emits transformed values.
131    /// When the source subject emits, the transformation is applied and
132    /// the resulting observable emits the transformed value.
133    ///
134    /// # Arguments
135    /// * `f` - Function to transform values from A to B
136    ///
137    /// # Example
138    /// ```
139    /// use rx_rs::core::{RxSubject, DisposableTracker};
140    /// use std::cell::RefCell;
141    /// use std::rc::Rc;
142    ///
143    /// let tracker = DisposableTracker::new();
144    /// let subject = RxSubject::new();
145    /// let doubled = subject.map(|x| x * 2);
146    ///
147    /// let result = Rc::new(RefCell::new(None));
148    /// let result_clone = result.clone();
149    ///
150    /// doubled.subscribe(tracker.tracker(), move |value| {
151    ///     *result_clone.borrow_mut() = Some(*value);
152    /// });
153    ///
154    /// subject.next(5);
155    /// assert_eq!(*result.borrow(), Some(10));
156    /// ```
157    pub fn map<B, F>(&self, f: F) -> RxObservable<B>
158    where
159        B: Clone + 'static,
160        F: Fn(&T) -> B + 'static,
161    {
162        self.inner.map(f)
163    }
164
165    /// Flat-maps the values of this RxSubject using a function that returns RxVal<B>.
166    ///
167    /// When the subject emits, the function is called to get an RxVal<B>,
168    /// and the resulting observable emits the current value of that RxVal.
169    ///
170    /// # Arguments
171    /// * `f` - Function to transform values from A to RxVal<B>
172    ///
173    /// # Example
174    /// ```
175    /// use rx_rs::core::{RxSubject, RxRef, DisposableTracker};
176    /// use std::cell::RefCell;
177    /// use std::rc::Rc;
178    ///
179    /// let tracker = DisposableTracker::new();
180    /// let subject = RxSubject::new();
181    /// let inner = RxRef::new(100);
182    ///
183    /// let inner_clone = inner.clone();
184    /// let flattened = subject.flat_map_val(move |_| inner_clone.val());
185    ///
186    /// let result = Rc::new(RefCell::new(None));
187    /// let result_clone = result.clone();
188    ///
189    /// flattened.subscribe(tracker.tracker(), move |value| {
190    ///     *result_clone.borrow_mut() = Some(*value);
191    /// });
192    ///
193    /// subject.next(1);
194    /// assert_eq!(*result.borrow(), Some(100));
195    /// ```
196    pub fn flat_map_val<B, F>(&self, f: F) -> RxObservable<B>
197    where
198        B: Clone + PartialEq + 'static,
199        F: Fn(&T) -> RxVal<B> + 'static,
200    {
201        self.inner.flat_map_val(f)
202    }
203
204    /// Flat-maps using a function that returns RxRef<B>.
205    pub fn flat_map_ref<B, F>(&self, f: F) -> RxObservable<B>
206    where
207        B: Clone + PartialEq + 'static,
208        F: Fn(&T) -> RxRef<B> + 'static,
209    {
210        self.inner.flat_map_ref(f)
211    }
212
213    /// Flat-maps using a function that returns RxObservable<B>.
214    pub fn flat_map_observable<B, F>(&self, f: F) -> RxObservable<B>
215    where
216        B: Clone + 'static,
217        F: Fn(&T) -> RxObservable<B> + 'static,
218    {
219        self.inner.flat_map_observable(f)
220    }
221
222    /// Flat-maps using a function that returns RxSubject<B>.
223    pub fn flat_map_subject<B, F>(&self, f: F) -> RxObservable<B>
224    where
225        B: Clone + 'static,
226        F: Fn(&T) -> RxSubject<B> + 'static,
227    {
228        self.inner.flat_map_subject(f)
229    }
230
231    /// Joins this RxSubject with an RxObservable.
232    pub fn join_observable(&self, other: RxObservable<T>) -> RxObservable<T>
233    where
234        T: Clone,
235    {
236        self.inner.join_observable(other)
237    }
238
239    /// Joins this RxSubject with another RxSubject.
240    pub fn join_subject(&self, other: RxSubject<T>) -> RxObservable<T>
241    where
242        T: Clone,
243    {
244        self.inner.join_subject(other)
245    }
246}
247
248impl<T: 'static> Default for RxSubject<T> {
249    fn default() -> Self {
250        Self::new()
251    }
252}