rx_rs/core/rx_subject.rs
1use super::rx_observable::RxObservable;
2use super::rx_ref::RxRef;
3use super::rx_val::RxVal;
4use super::tracker::Tracker;
5
6/// A read-write stream of events.
7///
8/// RxSubject provides both read and write access to an event stream. It exposes
9/// a read-only RxObservable via the `.observable()` method, and allows emitting
10/// events via the `.next()` method.
11///
12/// Unlike RxRef, RxSubject does NOT hold a current value. It only emits discrete
13/// events to subscribers.
14///
15/// # Example
16/// ```
17/// use rx_rs::core::{RxSubject, DisposableTracker};
18///
19/// let mut tracker = DisposableTracker::new();
20/// let button_clicks = RxSubject::new();
21///
22/// // Subscribe to button click events
23/// button_clicks.observable().subscribe(tracker.tracker(), |click_count| {
24/// println!("Button clicked {} times", click_count);
25/// }); // Nothing printed yet (no current value)
26///
27/// // Emit events
28/// button_clicks.next(1); // Prints "Button clicked 1 times"
29/// button_clicks.next(2); // Prints "Button clicked 2 times"
30/// ```
31#[derive(Clone)]
32pub struct RxSubject<T> {
33 inner: RxObservable<T>,
34}
35
36impl<T: 'static> RxSubject<T> {
37 /// Creates a new RxSubject.
38 ///
39 /// # Example
40 /// ```
41 /// use rx_rs::core::RxSubject;
42 ///
43 /// let messages = RxSubject::<String>::new();
44 /// ```
45 pub fn new() -> Self {
46 Self {
47 inner: RxObservable::new(),
48 }
49 }
50
51 /// Emits a new event to all subscribers.
52 ///
53 /// All subscribers to the RxObservable obtained via `.observable()` will be
54 /// called with the event.
55 ///
56 /// # Arguments
57 /// * `value` - The event to emit
58 ///
59 /// # Example
60 /// ```
61 /// use rx_rs::core::RxSubject;
62 ///
63 /// let events = RxSubject::new();
64 /// events.next("click");
65 /// events.next("hover");
66 /// ```
67 pub fn next(&self, value: T) {
68 self.inner.emit(&value);
69 }
70
71 /// Returns a read-only view of this event stream.
72 ///
73 /// The returned RxObservable can be cloned and passed around, allowing multiple
74 /// parts of the code to subscribe to events without having write access.
75 ///
76 /// # Example
77 /// ```
78 /// use rx_rs::core::{RxSubject, DisposableTracker};
79 ///
80 /// let mut tracker = DisposableTracker::new();
81 /// let events = RxSubject::new();
82 /// let read_only = events.observable();
83 ///
84 /// read_only.subscribe(tracker.tracker(), |event| {
85 /// println!("Event: {}", event);
86 /// });
87 ///
88 /// events.next(42);
89 /// ```
90 pub fn observable(&self) -> RxObservable<T> {
91 self.inner.clone()
92 }
93
94 /// Returns the number of active subscribers.
95 pub fn subscriber_count(&self) -> usize {
96 self.inner.subscriber_count()
97 }
98
99 /// Converts this RxSubject to an RxVal with an initial value.
100 ///
101 /// The RxVal is updated whenever the subject emits a new value.
102 /// A tracker must be provided to manage the subscription lifetime.
103 ///
104 /// # Arguments
105 /// * `initial` - The initial value for the RxVal
106 /// * `tracker` - Tracker to manage the subscription lifetime
107 ///
108 /// # Example
109 /// ```
110 /// use rx_rs::core::{RxSubject, DisposableTracker};
111 ///
112 /// let mut tracker = DisposableTracker::new();
113 /// let subject = RxSubject::new();
114 /// let val = subject.to_val(0, tracker.tracker());
115 ///
116 /// assert_eq!(val.get(), 0);
117 ///
118 /// subject.next(42);
119 /// assert_eq!(val.get(), 42);
120 /// ```
121 pub fn to_val(&self, initial: T, tracker: &Tracker) -> RxVal<T>
122 where
123 T: Clone + PartialEq,
124 {
125 self.inner.to_val(initial, tracker)
126 }
127
128 /// Maps the values of this RxSubject using a transformation function.
129 ///
130 /// Returns a new RxObservable that emits transformed values.
131 /// When the source subject emits, the transformation is applied and
132 /// the resulting observable emits the transformed value.
133 ///
134 /// # Arguments
135 /// * `f` - Function to transform values from A to B
136 ///
137 /// # Example
138 /// ```
139 /// use rx_rs::core::{RxSubject, DisposableTracker};
140 /// use std::cell::RefCell;
141 /// use std::rc::Rc;
142 ///
143 /// let tracker = DisposableTracker::new();
144 /// let subject = RxSubject::new();
145 /// let doubled = subject.map(|x| x * 2);
146 ///
147 /// let result = Rc::new(RefCell::new(None));
148 /// let result_clone = result.clone();
149 ///
150 /// doubled.subscribe(tracker.tracker(), move |value| {
151 /// *result_clone.borrow_mut() = Some(*value);
152 /// });
153 ///
154 /// subject.next(5);
155 /// assert_eq!(*result.borrow(), Some(10));
156 /// ```
157 pub fn map<B, F>(&self, f: F) -> RxObservable<B>
158 where
159 B: Clone + 'static,
160 F: Fn(&T) -> B + 'static,
161 {
162 self.inner.map(f)
163 }
164
165 /// Flat-maps the values of this RxSubject using a function that returns RxVal<B>.
166 ///
167 /// When the subject emits, the function is called to get an RxVal<B>,
168 /// and the resulting observable emits the current value of that RxVal.
169 ///
170 /// # Arguments
171 /// * `f` - Function to transform values from A to RxVal<B>
172 ///
173 /// # Example
174 /// ```
175 /// use rx_rs::core::{RxSubject, RxRef, DisposableTracker};
176 /// use std::cell::RefCell;
177 /// use std::rc::Rc;
178 ///
179 /// let tracker = DisposableTracker::new();
180 /// let subject = RxSubject::new();
181 /// let inner = RxRef::new(100);
182 ///
183 /// let inner_clone = inner.clone();
184 /// let flattened = subject.flat_map_val(move |_| inner_clone.val());
185 ///
186 /// let result = Rc::new(RefCell::new(None));
187 /// let result_clone = result.clone();
188 ///
189 /// flattened.subscribe(tracker.tracker(), move |value| {
190 /// *result_clone.borrow_mut() = Some(*value);
191 /// });
192 ///
193 /// subject.next(1);
194 /// assert_eq!(*result.borrow(), Some(100));
195 /// ```
196 pub fn flat_map_val<B, F>(&self, f: F) -> RxObservable<B>
197 where
198 B: Clone + PartialEq + 'static,
199 F: Fn(&T) -> RxVal<B> + 'static,
200 {
201 self.inner.flat_map_val(f)
202 }
203
204 /// Flat-maps using a function that returns RxRef<B>.
205 pub fn flat_map_ref<B, F>(&self, f: F) -> RxObservable<B>
206 where
207 B: Clone + PartialEq + 'static,
208 F: Fn(&T) -> RxRef<B> + 'static,
209 {
210 self.inner.flat_map_ref(f)
211 }
212
213 /// Flat-maps using a function that returns RxObservable<B>.
214 pub fn flat_map_observable<B, F>(&self, f: F) -> RxObservable<B>
215 where
216 B: Clone + 'static,
217 F: Fn(&T) -> RxObservable<B> + 'static,
218 {
219 self.inner.flat_map_observable(f)
220 }
221
222 /// Flat-maps using a function that returns RxSubject<B>.
223 pub fn flat_map_subject<B, F>(&self, f: F) -> RxObservable<B>
224 where
225 B: Clone + 'static,
226 F: Fn(&T) -> RxSubject<B> + 'static,
227 {
228 self.inner.flat_map_subject(f)
229 }
230
231 /// Joins this RxSubject with an RxObservable.
232 pub fn join_observable(&self, other: RxObservable<T>) -> RxObservable<T>
233 where
234 T: Clone,
235 {
236 self.inner.join_observable(other)
237 }
238
239 /// Joins this RxSubject with another RxSubject.
240 pub fn join_subject(&self, other: RxSubject<T>) -> RxObservable<T>
241 where
242 T: Clone,
243 {
244 self.inner.join_subject(other)
245 }
246}
247
248impl<T: 'static> Default for RxSubject<T> {
249 fn default() -> Self {
250 Self::new()
251 }
252}