Skip to main content

rx_rs/core/
rx_observable.rs

1use std::any::Any;
2use std::cell::RefCell;
3use std::rc::Rc;
4
5use super::rx_ref::RxRef;
6use super::rx_subject::RxSubject;
7use super::rx_val::RxVal;
8use super::tracker::{DisposableTracker, Tracker};
9
10type Subscriber<T> = Rc<RefCell<Box<dyn FnMut(&T)>>>;
11
12/// Internal storage for an observable stream.
13pub(super) struct RxObservableInner<T> {
14    subscribers: Vec<Subscriber<T>>,
15    // Optional tracker to keep subscriptions alive
16    // Used by .stream() to maintain the source subscription
17    pub(super) _lifetime_tracker: Option<Rc<dyn Any>>,
18}
19
20/// A read-only stream of events.
21///
22/// Unlike RxVal, RxObservable does NOT have a current value. Subscribers are
23/// only called when new events are emitted, not immediately upon subscription.
24///
25/// This is useful for representing discrete events like button clicks, network
26/// messages, or user actions that don't have a "current state".
27///
28/// # Example
29/// ```
30/// use rx_rs::core::{RxSubject, DisposableTracker};
31///
32/// let mut tracker = DisposableTracker::new();
33/// let rx_subject = RxSubject::new();
34/// let rx_observable = rx_subject.observable();
35///
36/// rx_observable.subscribe(tracker.tracker(), |value| {
37///     println!("Event: {}", value);
38/// }); // Nothing printed yet
39///
40/// rx_subject.next(42); // Prints "Event: 42"
41/// rx_subject.next(100); // Prints "Event: 100"
42/// ```
43pub struct RxObservable<T> {
44    pub(super) inner: Rc<RefCell<RxObservableInner<T>>>,
45}
46
47impl<T> Clone for RxObservable<T> {
48    fn clone(&self) -> Self {
49        Self {
50            inner: self.inner.clone(),
51        }
52    }
53}
54
55impl<T: 'static> RxObservable<T> {
56    /// Subscribes to events.
57    ///
58    /// The subscriber function is called each time a new event is emitted.
59    /// Unlike RxVal, it is NOT called immediately upon subscription.
60    ///
61    /// The subscription is automatically cleaned up when the tracker is dropped.
62    ///
63    /// # Arguments
64    /// * `tracker` - Tracker that will manage this subscription's lifetime
65    /// * `f` - Function called with a reference to the event on each emission
66    pub fn subscribe<F>(&self, tracker: &Tracker, f: F)
67    where
68        F: FnMut(&T) + 'static,
69    {
70        // Wrap the subscriber in Rc<RefCell<>> for shared ownership
71        let subscriber = Rc::new(RefCell::new(Box::new(f) as Box<dyn FnMut(&T)>));
72
73        // Store for future events
74        let subscriber_clone = subscriber.clone();
75        let inner_weak = Rc::downgrade(&self.inner);
76
77        self.inner.borrow_mut().subscribers.push(subscriber_clone);
78
79        // Add cleanup to tracker
80        tracker.add(move || {
81            // Remove subscriber when tracker drops
82            // Use weak reference to avoid cycle
83            if let Some(inner_rc) = inner_weak.upgrade() {
84                inner_rc
85                    .borrow_mut()
86                    .subscribers
87                    .retain(|s| !Rc::ptr_eq(s, &subscriber));
88            }
89        });
90    }
91
92    /// Returns the number of active subscribers.
93    pub fn subscriber_count(&self) -> usize {
94        self.inner.borrow().subscribers.len()
95    }
96}
97
98impl<T: 'static> RxObservable<T> {
99    /// Creates a new RxObservable.
100    ///
101    /// This is primarily used internally by RxSubject. Users should typically
102    /// create an RxSubject and get the RxObservable via `.observable()`.
103    pub(crate) fn new() -> Self {
104        Self {
105            inner: Rc::new(RefCell::new(RxObservableInner {
106                subscribers: Vec::new(),
107                _lifetime_tracker: None,
108            })),
109        }
110    }
111
112    /// Emits an event to all subscribers.
113    ///
114    /// This is an internal method used by RxSubject.
115    pub(crate) fn emit(&self, value: &T) {
116        // Clone subscribers list to avoid holding borrow during notification
117        let subscribers = self.inner.borrow().subscribers.clone();
118
119        // Notify all subscribers without holding the borrow
120        for subscriber in &subscribers {
121            let mut sub = subscriber.borrow_mut();
122            sub(value);
123        }
124    }
125
126    /// Converts this RxObservable to an RxVal with an initial value.
127    ///
128    /// The RxVal is updated whenever the observable emits a new value.
129    /// A tracker must be provided to manage the subscription lifetime.
130    ///
131    /// # Arguments
132    /// * `initial` - The initial value for the RxVal
133    /// * `tracker` - Tracker to manage the subscription lifetime
134    ///
135    /// # Example
136    /// ```
137    /// use rx_rs::core::{RxSubject, DisposableTracker};
138    ///
139    /// let mut tracker = DisposableTracker::new();
140    /// let subject = RxSubject::new();
141    /// let val = subject.observable().to_val(0, tracker.tracker());
142    ///
143    /// assert_eq!(val.get(), 0);
144    ///
145    /// subject.next(42);
146    /// assert_eq!(val.get(), 42);
147    /// ```
148    pub fn to_val(&self, initial: T, tracker: &Tracker) -> RxVal<T>
149    where
150        T: Clone + PartialEq,
151    {
152        // Create a new RxRef with the initial value
153        let rx_ref = RxRef::new(initial);
154
155        // Subscribe to this observable and update the ref
156        let rx_ref_clone = rx_ref.clone();
157        self.subscribe(tracker, move |value| {
158            rx_ref_clone.set(value.clone());
159        });
160
161        // Return the val
162        rx_ref.val()
163    }
164
165    /// Maps the values of this RxObservable using a transformation function.
166    ///
167    /// Returns a new RxObservable that emits transformed values.
168    /// When the source observable emits, the transformation is applied and
169    /// the resulting observable emits the transformed value.
170    ///
171    /// # Arguments
172    /// * `f` - Function to transform values from A to B
173    ///
174    /// # Example
175    /// ```
176    /// use rx_rs::core::{RxSubject, DisposableTracker};
177    /// use std::cell::RefCell;
178    /// use std::rc::Rc;
179    ///
180    /// let tracker = DisposableTracker::new();
181    /// let subject = RxSubject::new();
182    /// let doubled = subject.observable().map(|x| x * 2);
183    ///
184    /// let result = Rc::new(RefCell::new(None));
185    /// let result_clone = result.clone();
186    ///
187    /// doubled.subscribe(tracker.tracker(), move |value| {
188    ///     *result_clone.borrow_mut() = Some(*value);
189    /// });
190    ///
191    /// subject.next(5);
192    /// assert_eq!(*result.borrow(), Some(10));
193    /// ```
194    pub fn map<B, F>(&self, f: F) -> RxObservable<B>
195    where
196        B: Clone + 'static,
197        F: Fn(&T) -> B + 'static,
198    {
199        use super::rx_subject::RxSubject;
200
201        // Create a subject to forward transformed values to
202        let subject = RxSubject::new();
203
204        // Create a tracker that will live as long as the returned observable
205        let tracker = Rc::new(DisposableTracker::new());
206
207        // Subscribe to this observable and forward transformed values to the subject
208        let subject_clone = subject.clone();
209        self.subscribe(tracker.tracker(), move |value| {
210            subject_clone.next(f(value));
211        });
212
213        // Get the observable from the subject
214        let observable = subject.observable();
215
216        // Attach the tracker to keep subscription alive
217        observable.inner.borrow_mut()._lifetime_tracker = Some(tracker as Rc<dyn Any>);
218
219        observable
220    }
221
222    /// Flat-maps the values of this RxObservable using a function that returns RxVal<B>.
223    ///
224    /// When the observable emits, the function is called to get an RxVal<B>,
225    /// and the resulting observable emits the current value of that RxVal.
226    ///
227    /// # Arguments
228    /// * `f` - Function to transform values from A to RxVal<B>
229    ///
230    /// # Example
231    /// ```
232    /// use rx_rs::core::{RxSubject, RxRef, DisposableTracker};
233    /// use std::cell::RefCell;
234    /// use std::rc::Rc;
235    ///
236    /// let tracker = DisposableTracker::new();
237    /// let subject = RxSubject::new();
238    /// let inner = RxRef::new(100);
239    ///
240    /// let inner_clone = inner.clone();
241    /// let flattened = subject.observable().flat_map_val(move |_| inner_clone.val());
242    ///
243    /// let result = Rc::new(RefCell::new(None));
244    /// let result_clone = result.clone();
245    ///
246    /// flattened.subscribe(tracker.tracker(), move |value| {
247    ///     *result_clone.borrow_mut() = Some(*value);
248    /// });
249    ///
250    /// subject.next(1);
251    /// // Emits twice: once for current value, once for subscription
252    /// assert_eq!(*result.borrow(), Some(100));
253    /// ```
254    pub fn flat_map_val<B, F>(&self, f: F) -> RxObservable<B>
255    where
256        B: Clone + PartialEq + 'static,
257        F: Fn(&T) -> RxVal<B> + 'static,
258    {
259        use super::rx_subject::RxSubject;
260
261        // Create a subject to forward values to
262        let subject = RxSubject::new();
263
264        // Create trackers
265        let outer_tracker = Rc::new(DisposableTracker::new());
266        let inner_tracker = Rc::new(RefCell::new(DisposableTracker::new()));
267
268        // Subscribe to this observable
269        let subject_clone = subject.clone();
270        let inner_tracker_clone = inner_tracker.clone();
271        let f_rc = Rc::new(f);
272
273        self.subscribe(outer_tracker.tracker(), move |outer_value| {
274            // Get new inner RxVal
275            let new_inner = f_rc(outer_value);
276
277            // Cancel previous inner subscription
278            inner_tracker_clone.borrow_mut().dispose();
279            *inner_tracker_clone.borrow_mut() = DisposableTracker::new();
280
281            // Emit the current value of the new inner
282            subject_clone.next(new_inner.get());
283
284            // Subscribe to the new inner
285            let subject_clone2 = subject_clone.clone();
286            new_inner.subscribe(inner_tracker_clone.borrow().tracker(), move |inner_value| {
287                subject_clone2.next(inner_value.clone());
288            });
289        });
290
291        // Get observable with trackers attached
292        let observable = subject.observable();
293        let combined_tracker = Rc::new((outer_tracker, inner_tracker));
294        observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
295
296        observable
297    }
298
299    /// Flat-maps using a function that returns RxRef<B>.
300    /// Delegates to flat_map_val by converting the RxRef to RxVal.
301    pub fn flat_map_ref<B, F>(&self, f: F) -> RxObservable<B>
302    where
303        B: Clone + PartialEq + 'static,
304        F: Fn(&T) -> RxRef<B> + 'static,
305    {
306        self.flat_map_val(move |x| f(x).val())
307    }
308
309    /// Flat-maps using a function that returns RxObservable<B>.
310    /// Switches to the new observable each time the source emits.
311    pub fn flat_map_observable<B, F>(&self, f: F) -> RxObservable<B>
312    where
313        B: Clone + 'static,
314        F: Fn(&T) -> RxObservable<B> + 'static,
315    {
316        use super::rx_subject::RxSubject;
317
318        let subject = RxSubject::new();
319        let outer_tracker = Rc::new(DisposableTracker::new());
320        let inner_tracker = Rc::new(RefCell::new(DisposableTracker::new()));
321
322        let subject_clone = subject.clone();
323        let inner_tracker_clone = inner_tracker.clone();
324        let f_rc = Rc::new(f);
325
326        self.subscribe(outer_tracker.tracker(), move |outer_value| {
327            let new_inner = f_rc(outer_value);
328            inner_tracker_clone.borrow_mut().dispose();
329            *inner_tracker_clone.borrow_mut() = DisposableTracker::new();
330
331            let subject_clone2 = subject_clone.clone();
332            new_inner.subscribe(inner_tracker_clone.borrow().tracker(), move |inner_value| {
333                subject_clone2.next(inner_value.clone());
334            });
335        });
336
337        let observable = subject.observable();
338        let combined_tracker = Rc::new((outer_tracker, inner_tracker));
339        observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
340
341        observable
342    }
343
344    /// Flat-maps using a function that returns RxSubject<B>.
345    /// Delegates to flat_map_observable by converting the RxSubject to RxObservable.
346    pub fn flat_map_subject<B, F>(&self, f: F) -> RxObservable<B>
347    where
348        B: Clone + 'static,
349        F: Fn(&T) -> RxSubject<B> + 'static,
350    {
351        self.flat_map_observable(move |x| f(x).observable())
352    }
353
354    /// Joins this RxObservable with another RxObservable.
355    ///
356    /// The resulting observable emits whenever either source emits.
357    /// Both observables must have the same type.
358    ///
359    /// # Arguments
360    /// * `other` - Another RxObservable to join with
361    ///
362    /// # Example
363    /// ```
364    /// use rx_rs::core::{RxSubject, DisposableTracker};
365    /// use std::cell::RefCell;
366    /// use std::rc::Rc;
367    ///
368    /// let tracker = DisposableTracker::new();
369    /// let subject1 = RxSubject::new();
370    /// let subject2 = RxSubject::new();
371    ///
372    /// let joined = subject1.observable().join_observable(subject2.observable());
373    ///
374    /// let results = Rc::new(RefCell::new(Vec::new()));
375    /// let results_clone = results.clone();
376    ///
377    /// joined.subscribe(tracker.tracker(), move |value| {
378    ///     results_clone.borrow_mut().push(*value);
379    /// });
380    ///
381    /// subject1.next(1);
382    /// subject2.next(2);
383    /// subject1.next(3);
384    ///
385    /// assert_eq!(*results.borrow(), vec![1, 2, 3]);
386    /// ```
387    pub fn join_observable(&self, other: RxObservable<T>) -> RxObservable<T>
388    where
389        T: Clone,
390    {
391        use super::rx_subject::RxSubject;
392
393        // Create a subject to forward values to
394        let subject = RxSubject::new();
395
396        // Create trackers for both subscriptions
397        let tracker1 = Rc::new(DisposableTracker::new());
398        let tracker2 = Rc::new(DisposableTracker::new());
399
400        // Subscribe to self
401        let subject_clone1 = subject.clone();
402        self.subscribe(tracker1.tracker(), move |value| {
403            subject_clone1.next(value.clone());
404        });
405
406        // Subscribe to other
407        let subject_clone2 = subject.clone();
408        other.subscribe(tracker2.tracker(), move |value| {
409            subject_clone2.next(value.clone());
410        });
411
412        // Get observable with both trackers attached
413        let observable = subject.observable();
414        let combined_tracker = Rc::new((tracker1, tracker2));
415        observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
416
417        observable
418    }
419
420    /// Joins this RxObservable with an RxSubject.
421    /// Delegates to join_observable by converting the RxSubject to RxObservable.
422    pub fn join_subject(&self, other: RxSubject<T>) -> RxObservable<T>
423    where
424        T: Clone,
425    {
426        self.join_observable(other.observable())
427    }
428}