rx_rs/core/rx_observable.rs
1use std::any::Any;
2use std::cell::RefCell;
3use std::rc::Rc;
4
5use super::rx_ref::RxRef;
6use super::rx_subject::RxSubject;
7use super::rx_val::RxVal;
8use super::tracker::{DisposableTracker, Tracker};
9
10type Subscriber<T> = Rc<RefCell<Box<dyn FnMut(&T)>>>;
11
12/// Internal storage for an observable stream.
13pub(super) struct RxObservableInner<T> {
14 subscribers: Vec<Subscriber<T>>,
15 // Optional tracker to keep subscriptions alive
16 // Used by .stream() to maintain the source subscription
17 pub(super) _lifetime_tracker: Option<Rc<dyn Any>>,
18}
19
20/// A read-only stream of events.
21///
22/// Unlike RxVal, RxObservable does NOT have a current value. Subscribers are
23/// only called when new events are emitted, not immediately upon subscription.
24///
25/// This is useful for representing discrete events like button clicks, network
26/// messages, or user actions that don't have a "current state".
27///
28/// # Example
29/// ```
30/// use rx_rs::core::{RxSubject, DisposableTracker};
31///
32/// let mut tracker = DisposableTracker::new();
33/// let rx_subject = RxSubject::new();
34/// let rx_observable = rx_subject.observable();
35///
36/// rx_observable.subscribe(tracker.tracker(), |value| {
37/// println!("Event: {}", value);
38/// }); // Nothing printed yet
39///
40/// rx_subject.next(42); // Prints "Event: 42"
41/// rx_subject.next(100); // Prints "Event: 100"
42/// ```
43pub struct RxObservable<T> {
44 pub(super) inner: Rc<RefCell<RxObservableInner<T>>>,
45}
46
47impl<T> Clone for RxObservable<T> {
48 fn clone(&self) -> Self {
49 Self {
50 inner: self.inner.clone(),
51 }
52 }
53}
54
55impl<T: 'static> RxObservable<T> {
56 /// Subscribes to events.
57 ///
58 /// The subscriber function is called each time a new event is emitted.
59 /// Unlike RxVal, it is NOT called immediately upon subscription.
60 ///
61 /// The subscription is automatically cleaned up when the tracker is dropped.
62 ///
63 /// # Arguments
64 /// * `tracker` - Tracker that will manage this subscription's lifetime
65 /// * `f` - Function called with a reference to the event on each emission
66 pub fn subscribe<F>(&self, tracker: &Tracker, f: F)
67 where
68 F: FnMut(&T) + 'static,
69 {
70 // Wrap the subscriber in Rc<RefCell<>> for shared ownership
71 let subscriber = Rc::new(RefCell::new(Box::new(f) as Box<dyn FnMut(&T)>));
72
73 // Store for future events
74 let subscriber_clone = subscriber.clone();
75 let inner_weak = Rc::downgrade(&self.inner);
76
77 self.inner.borrow_mut().subscribers.push(subscriber_clone);
78
79 // Add cleanup to tracker
80 tracker.add(move || {
81 // Remove subscriber when tracker drops
82 // Use weak reference to avoid cycle
83 if let Some(inner_rc) = inner_weak.upgrade() {
84 inner_rc
85 .borrow_mut()
86 .subscribers
87 .retain(|s| !Rc::ptr_eq(s, &subscriber));
88 }
89 });
90 }
91
92 /// Returns the number of active subscribers.
93 pub fn subscriber_count(&self) -> usize {
94 self.inner.borrow().subscribers.len()
95 }
96}
97
98impl<T: 'static> RxObservable<T> {
99 /// Creates a new RxObservable.
100 ///
101 /// This is primarily used internally by RxSubject. Users should typically
102 /// create an RxSubject and get the RxObservable via `.observable()`.
103 pub(crate) fn new() -> Self {
104 Self {
105 inner: Rc::new(RefCell::new(RxObservableInner {
106 subscribers: Vec::new(),
107 _lifetime_tracker: None,
108 })),
109 }
110 }
111
112 /// Emits an event to all subscribers.
113 ///
114 /// This is an internal method used by RxSubject.
115 pub(crate) fn emit(&self, value: &T) {
116 // Clone subscribers list to avoid holding borrow during notification
117 let subscribers = self.inner.borrow().subscribers.clone();
118
119 // Notify all subscribers without holding the borrow
120 for subscriber in &subscribers {
121 let mut sub = subscriber.borrow_mut();
122 sub(value);
123 }
124 }
125
126 /// Converts this RxObservable to an RxVal with an initial value.
127 ///
128 /// The RxVal is updated whenever the observable emits a new value.
129 /// A tracker must be provided to manage the subscription lifetime.
130 ///
131 /// # Arguments
132 /// * `initial` - The initial value for the RxVal
133 /// * `tracker` - Tracker to manage the subscription lifetime
134 ///
135 /// # Example
136 /// ```
137 /// use rx_rs::core::{RxSubject, DisposableTracker};
138 ///
139 /// let mut tracker = DisposableTracker::new();
140 /// let subject = RxSubject::new();
141 /// let val = subject.observable().to_val(0, tracker.tracker());
142 ///
143 /// assert_eq!(val.get(), 0);
144 ///
145 /// subject.next(42);
146 /// assert_eq!(val.get(), 42);
147 /// ```
148 pub fn to_val(&self, initial: T, tracker: &Tracker) -> RxVal<T>
149 where
150 T: Clone + PartialEq,
151 {
152 // Create a new RxRef with the initial value
153 let rx_ref = RxRef::new(initial);
154
155 // Subscribe to this observable and update the ref
156 let rx_ref_clone = rx_ref.clone();
157 self.subscribe(tracker, move |value| {
158 rx_ref_clone.set(value.clone());
159 });
160
161 // Return the val
162 rx_ref.val()
163 }
164
165 /// Maps the values of this RxObservable using a transformation function.
166 ///
167 /// Returns a new RxObservable that emits transformed values.
168 /// When the source observable emits, the transformation is applied and
169 /// the resulting observable emits the transformed value.
170 ///
171 /// # Arguments
172 /// * `f` - Function to transform values from A to B
173 ///
174 /// # Example
175 /// ```
176 /// use rx_rs::core::{RxSubject, DisposableTracker};
177 /// use std::cell::RefCell;
178 /// use std::rc::Rc;
179 ///
180 /// let tracker = DisposableTracker::new();
181 /// let subject = RxSubject::new();
182 /// let doubled = subject.observable().map(|x| x * 2);
183 ///
184 /// let result = Rc::new(RefCell::new(None));
185 /// let result_clone = result.clone();
186 ///
187 /// doubled.subscribe(tracker.tracker(), move |value| {
188 /// *result_clone.borrow_mut() = Some(*value);
189 /// });
190 ///
191 /// subject.next(5);
192 /// assert_eq!(*result.borrow(), Some(10));
193 /// ```
194 pub fn map<B, F>(&self, f: F) -> RxObservable<B>
195 where
196 B: Clone + 'static,
197 F: Fn(&T) -> B + 'static,
198 {
199 use super::rx_subject::RxSubject;
200
201 // Create a subject to forward transformed values to
202 let subject = RxSubject::new();
203
204 // Create a tracker that will live as long as the returned observable
205 let tracker = Rc::new(DisposableTracker::new());
206
207 // Subscribe to this observable and forward transformed values to the subject
208 let subject_clone = subject.clone();
209 self.subscribe(tracker.tracker(), move |value| {
210 subject_clone.next(f(value));
211 });
212
213 // Get the observable from the subject
214 let observable = subject.observable();
215
216 // Attach the tracker to keep subscription alive
217 observable.inner.borrow_mut()._lifetime_tracker = Some(tracker as Rc<dyn Any>);
218
219 observable
220 }
221
222 /// Flat-maps the values of this RxObservable using a function that returns RxVal<B>.
223 ///
224 /// When the observable emits, the function is called to get an RxVal<B>,
225 /// and the resulting observable emits the current value of that RxVal.
226 ///
227 /// # Arguments
228 /// * `f` - Function to transform values from A to RxVal<B>
229 ///
230 /// # Example
231 /// ```
232 /// use rx_rs::core::{RxSubject, RxRef, DisposableTracker};
233 /// use std::cell::RefCell;
234 /// use std::rc::Rc;
235 ///
236 /// let tracker = DisposableTracker::new();
237 /// let subject = RxSubject::new();
238 /// let inner = RxRef::new(100);
239 ///
240 /// let inner_clone = inner.clone();
241 /// let flattened = subject.observable().flat_map_val(move |_| inner_clone.val());
242 ///
243 /// let result = Rc::new(RefCell::new(None));
244 /// let result_clone = result.clone();
245 ///
246 /// flattened.subscribe(tracker.tracker(), move |value| {
247 /// *result_clone.borrow_mut() = Some(*value);
248 /// });
249 ///
250 /// subject.next(1);
251 /// // Emits twice: once for current value, once for subscription
252 /// assert_eq!(*result.borrow(), Some(100));
253 /// ```
254 pub fn flat_map_val<B, F>(&self, f: F) -> RxObservable<B>
255 where
256 B: Clone + PartialEq + 'static,
257 F: Fn(&T) -> RxVal<B> + 'static,
258 {
259 use super::rx_subject::RxSubject;
260
261 // Create a subject to forward values to
262 let subject = RxSubject::new();
263
264 // Create trackers
265 let outer_tracker = Rc::new(DisposableTracker::new());
266 let inner_tracker = Rc::new(RefCell::new(DisposableTracker::new()));
267
268 // Subscribe to this observable
269 let subject_clone = subject.clone();
270 let inner_tracker_clone = inner_tracker.clone();
271 let f_rc = Rc::new(f);
272
273 self.subscribe(outer_tracker.tracker(), move |outer_value| {
274 // Get new inner RxVal
275 let new_inner = f_rc(outer_value);
276
277 // Cancel previous inner subscription
278 inner_tracker_clone.borrow_mut().dispose();
279 *inner_tracker_clone.borrow_mut() = DisposableTracker::new();
280
281 // Emit the current value of the new inner
282 subject_clone.next(new_inner.get());
283
284 // Subscribe to the new inner
285 let subject_clone2 = subject_clone.clone();
286 new_inner.subscribe(inner_tracker_clone.borrow().tracker(), move |inner_value| {
287 subject_clone2.next(inner_value.clone());
288 });
289 });
290
291 // Get observable with trackers attached
292 let observable = subject.observable();
293 let combined_tracker = Rc::new((outer_tracker, inner_tracker));
294 observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
295
296 observable
297 }
298
299 /// Flat-maps using a function that returns RxRef<B>.
300 /// Delegates to flat_map_val by converting the RxRef to RxVal.
301 pub fn flat_map_ref<B, F>(&self, f: F) -> RxObservable<B>
302 where
303 B: Clone + PartialEq + 'static,
304 F: Fn(&T) -> RxRef<B> + 'static,
305 {
306 self.flat_map_val(move |x| f(x).val())
307 }
308
309 /// Flat-maps using a function that returns RxObservable<B>.
310 /// Switches to the new observable each time the source emits.
311 pub fn flat_map_observable<B, F>(&self, f: F) -> RxObservable<B>
312 where
313 B: Clone + 'static,
314 F: Fn(&T) -> RxObservable<B> + 'static,
315 {
316 use super::rx_subject::RxSubject;
317
318 let subject = RxSubject::new();
319 let outer_tracker = Rc::new(DisposableTracker::new());
320 let inner_tracker = Rc::new(RefCell::new(DisposableTracker::new()));
321
322 let subject_clone = subject.clone();
323 let inner_tracker_clone = inner_tracker.clone();
324 let f_rc = Rc::new(f);
325
326 self.subscribe(outer_tracker.tracker(), move |outer_value| {
327 let new_inner = f_rc(outer_value);
328 inner_tracker_clone.borrow_mut().dispose();
329 *inner_tracker_clone.borrow_mut() = DisposableTracker::new();
330
331 let subject_clone2 = subject_clone.clone();
332 new_inner.subscribe(inner_tracker_clone.borrow().tracker(), move |inner_value| {
333 subject_clone2.next(inner_value.clone());
334 });
335 });
336
337 let observable = subject.observable();
338 let combined_tracker = Rc::new((outer_tracker, inner_tracker));
339 observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
340
341 observable
342 }
343
344 /// Flat-maps using a function that returns RxSubject<B>.
345 /// Delegates to flat_map_observable by converting the RxSubject to RxObservable.
346 pub fn flat_map_subject<B, F>(&self, f: F) -> RxObservable<B>
347 where
348 B: Clone + 'static,
349 F: Fn(&T) -> RxSubject<B> + 'static,
350 {
351 self.flat_map_observable(move |x| f(x).observable())
352 }
353
354 /// Joins this RxObservable with another RxObservable.
355 ///
356 /// The resulting observable emits whenever either source emits.
357 /// Both observables must have the same type.
358 ///
359 /// # Arguments
360 /// * `other` - Another RxObservable to join with
361 ///
362 /// # Example
363 /// ```
364 /// use rx_rs::core::{RxSubject, DisposableTracker};
365 /// use std::cell::RefCell;
366 /// use std::rc::Rc;
367 ///
368 /// let tracker = DisposableTracker::new();
369 /// let subject1 = RxSubject::new();
370 /// let subject2 = RxSubject::new();
371 ///
372 /// let joined = subject1.observable().join_observable(subject2.observable());
373 ///
374 /// let results = Rc::new(RefCell::new(Vec::new()));
375 /// let results_clone = results.clone();
376 ///
377 /// joined.subscribe(tracker.tracker(), move |value| {
378 /// results_clone.borrow_mut().push(*value);
379 /// });
380 ///
381 /// subject1.next(1);
382 /// subject2.next(2);
383 /// subject1.next(3);
384 ///
385 /// assert_eq!(*results.borrow(), vec![1, 2, 3]);
386 /// ```
387 pub fn join_observable(&self, other: RxObservable<T>) -> RxObservable<T>
388 where
389 T: Clone,
390 {
391 use super::rx_subject::RxSubject;
392
393 // Create a subject to forward values to
394 let subject = RxSubject::new();
395
396 // Create trackers for both subscriptions
397 let tracker1 = Rc::new(DisposableTracker::new());
398 let tracker2 = Rc::new(DisposableTracker::new());
399
400 // Subscribe to self
401 let subject_clone1 = subject.clone();
402 self.subscribe(tracker1.tracker(), move |value| {
403 subject_clone1.next(value.clone());
404 });
405
406 // Subscribe to other
407 let subject_clone2 = subject.clone();
408 other.subscribe(tracker2.tracker(), move |value| {
409 subject_clone2.next(value.clone());
410 });
411
412 // Get observable with both trackers attached
413 let observable = subject.observable();
414 let combined_tracker = Rc::new((tracker1, tracker2));
415 observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
416
417 observable
418 }
419
420 /// Joins this RxObservable with an RxSubject.
421 /// Delegates to join_observable by converting the RxSubject to RxObservable.
422 pub fn join_subject(&self, other: RxSubject<T>) -> RxObservable<T>
423 where
424 T: Clone,
425 {
426 self.join_observable(other.observable())
427 }
428}