Skip to main content

rx_rs/core/
rx_val.rs

1use std::any::Any;
2use std::cell::RefCell;
3use std::rc::Rc;
4
5use super::rx_observable::RxObservable;
6use super::rx_ref::RxRef;
7use super::rx_subject::RxSubject;
8use super::tracker::{DisposableTracker, Tracker};
9
10type Subscriber<T> = Rc<RefCell<Box<dyn FnMut(&T)>>>;
11
12/// Internal storage for a reactive value.
13#[derive(Clone)]
14struct RxValInner<T> {
15    value: T,
16    subscribers: Vec<Subscriber<T>>,
17    // Optional tracker to keep subscriptions alive
18    // Used by .map() and other operators to maintain source subscriptions
19    // Can store any type that needs to be kept alive
20    _lifetime_tracker: Option<Rc<dyn Any>>,
21}
22
23/// A read-only reactive value that holds a current state.
24///
25/// When subscribed, the subscriber is called immediately with the current value,
26/// and then called again whenever the value changes.
27///
28/// This is useful for representing state that always has a current value, like
29/// connection status, client ID, or configuration values.
30///
31/// # Example
32/// ```
33/// use rx_rs::core::{RxRef, DisposableTracker};
34///
35/// let mut tracker = DisposableTracker::new();
36/// let rx_ref = RxRef::new(42);
37/// let rx_val = rx_ref.val();
38///
39/// rx_val.subscribe(tracker.tracker(), |value| {
40///     println!("Value: {}", value);
41/// }); // Prints "Value: 42" immediately
42///
43/// rx_ref.set(100); // Prints "Value: 100"
44/// ```
45#[derive(Clone)]
46pub struct RxVal<T> {
47    inner: Rc<RefCell<RxValInner<T>>>,
48}
49
50impl<T: 'static> RxVal<T> {
51    /// Gets the current value.
52    pub fn get(&self) -> T
53    where
54        T: Clone,
55    {
56        self.inner.borrow().value.clone()
57    }
58
59    /// Subscribes to value changes.
60    ///
61    /// The subscriber function is called immediately with the current value,
62    /// and then called again whenever the value changes.
63    ///
64    /// The subscription is automatically cleaned up when the tracker is dropped.
65    ///
66    /// # Arguments
67    /// * `tracker` - Tracker that will manage this subscription's lifetime
68    /// * `f` - Function called with a reference to the value on each update
69    pub fn subscribe<F>(&self, tracker: &Tracker, mut f: F)
70    where
71        F: FnMut(&T) + 'static,
72        T: Clone,
73    {
74        // Clone current value and release borrow before calling callback
75        let current_value = self.inner.borrow().value.clone();
76
77        // Call immediately with cloned value (no borrow held)
78        f(&current_value);
79
80        // Wrap the subscriber in Rc<RefCell<>> for shared ownership
81        let subscriber = Rc::new(RefCell::new(Box::new(f) as Box<dyn FnMut(&T)>));
82
83        // Store for future updates
84        let subscriber_clone = subscriber.clone();
85        let inner_weak = Rc::downgrade(&self.inner);
86
87        // Add subscriber - now this will succeed even if called from within a notification
88        self.inner.borrow_mut().subscribers.push(subscriber_clone);
89
90        #[cfg(feature = "debug")]
91        {
92            let debug_ptr = std::rc::Rc::as_ptr(&self.inner) as usize;
93            let sub_count = self.inner.borrow().subscribers.len();
94            tracing::debug!(
95                ptr = format!("0x{:x}", debug_ptr),
96                subscriber_count = sub_count,
97                "subscription added to RxVal"
98            );
99        }
100
101        // Add cleanup to tracker
102        #[cfg(feature = "debug")]
103        let debug_ptr = std::rc::Rc::as_ptr(&self.inner) as usize;
104
105        tracker.add(move || {
106            // Remove subscriber when tracker drops
107            // Use weak reference to avoid cycle
108            #[cfg(feature = "debug")]
109            tracing::debug!(
110                ptr = format!("0x{:x}", debug_ptr),
111                "subscription cleanup called"
112            );
113
114            if let Some(inner_rc) = inner_weak.upgrade() {
115                if let Ok(mut inner) = inner_rc.try_borrow_mut() {
116                    #[cfg(feature = "debug")]
117                    let before_count = inner.subscribers.len();
118
119                    inner.subscribers.retain(|s| !Rc::ptr_eq(s, &subscriber));
120
121                    #[cfg(feature = "debug")]
122                    {
123                        let after_count = inner.subscribers.len();
124                        tracing::debug!(
125                            ptr = format!("0x{:x}", debug_ptr),
126                            before = before_count,
127                            after = after_count,
128                            "removed subscriber from RxVal"
129                        );
130                    }
131                }
132            } else {
133                #[cfg(feature = "debug")]
134                tracing::debug!(ptr = format!("0x{:x}", debug_ptr), "RxVal already dropped");
135            }
136        });
137    }
138
139    /// Returns the number of active subscribers.
140    pub fn subscriber_count(&self) -> usize {
141        self.inner.borrow().subscribers.len()
142    }
143
144    /// Returns a pointer address for debugging identity.
145    /// Use this to check if two RxVal instances share the same underlying data.
146    pub fn debug_ptr(&self) -> usize {
147        std::rc::Rc::as_ptr(&self.inner) as usize
148    }
149
150    /// Converts this RxVal into a stream (RxObservable).
151    ///
152    /// The returned observable does NOT emit the current value immediately on subscription.
153    /// It only emits when the RxVal changes to a new value.
154    ///
155    /// Note: If you subscribe directly to the RxVal, it WILL emit immediately.
156    /// But when converted to a stream, it behaves like a pure observable.
157    ///
158    /// # Example
159    /// ```
160    /// use rx_rs::core::{RxRef, DisposableTracker};
161    ///
162    /// let mut tracker = DisposableTracker::new();
163    /// let rx_ref = RxRef::new(42);
164    /// let stream = rx_ref.val().stream();
165    ///
166    /// stream.subscribe(tracker.tracker(), |value| {
167    ///     println!("Value: {}", value);
168    /// }); // Does NOT print immediately
169    ///
170    /// rx_ref.set(100); // Prints "Value: 100"
171    /// ```
172    pub fn stream(&self) -> RxObservable<T>
173    where
174        T: Clone,
175    {
176        // Create a subject to forward values to
177        let subject = RxSubject::new();
178
179        // Create a tracker that will keep the subscription alive
180        let tracker = Rc::new(DisposableTracker::new());
181
182        // Create a flag to skip the first emission (current value)
183        let first = Rc::new(RefCell::new(true));
184
185        // Subscribe to this RxVal and forward all values to the subject
186        // BUT skip the immediate emission of the current value
187        let subject_clone = subject.clone();
188        self.subscribe(tracker.tracker(), move |value| {
189            if *first.borrow() {
190                *first.borrow_mut() = false;
191                return; // Skip the first emission (current value)
192            }
193            subject_clone.next(value.clone());
194        });
195
196        // Get the inner from the subject's observable
197        let subject_observable = subject.observable();
198
199        // Attach the tracker to keep subscription alive
200        subject_observable.inner.borrow_mut()._lifetime_tracker = Some(tracker as Rc<dyn Any>);
201
202        subject_observable
203    }
204
205    /// Maps the values of this RxVal using a transformation function.
206    ///
207    /// Returns a new RxVal that always contains the transformed value.
208    /// When the source RxVal changes, the transformation is applied and
209    /// the resulting RxVal is updated.
210    ///
211    /// # Arguments
212    /// * `f` - Function to transform values from A to B
213    ///
214    /// # Example
215    /// ```
216    /// use rx_rs::core::RxRef;
217    ///
218    /// let number = RxRef::new(5);
219    /// let doubled = number.val().map(|x| x * 2);
220    ///
221    /// assert_eq!(doubled.get(), 10);
222    ///
223    /// number.set(10);
224    /// assert_eq!(doubled.get(), 20);
225    /// ```
226    pub fn map<B, F>(&self, f: F) -> RxVal<B>
227    where
228        T: Clone,
229        B: Clone + PartialEq + 'static,
230        F: Fn(&T) -> B + 'static,
231    {
232        // Create the initial mapped value
233        let initial = f(&self.get());
234
235        // Create a tracker that will live as long as the returned RxVal
236        let tracker = Rc::new(DisposableTracker::new());
237
238        // Create a new RxRef to hold the mapped values
239        let rx_ref = RxRef::new(initial);
240
241        // Get the result val first
242        let mapped_val = rx_ref.val();
243
244        // Subscribe to this RxVal and update the mapped ref
245        // Use weak reference to result's inner to avoid cycle
246        let result_weak = Rc::downgrade(&mapped_val.inner);
247        self.subscribe(tracker.tracker(), move |value| {
248            if let Some(result_inner) = result_weak.upgrade() {
249                let new_value = f(value);
250                let mut inner = result_inner.borrow_mut();
251                if inner.value != new_value {
252                    inner.value = new_value.clone();
253                    // Notify subscribers
254                    for subscriber in &inner.subscribers {
255                        let mut sub = subscriber.borrow_mut();
256                        sub(&new_value);
257                    }
258                }
259            }
260        });
261
262        // Attach the tracker to keep subscription alive
263        mapped_val.inner.borrow_mut()._lifetime_tracker = Some(tracker as Rc<dyn Any>);
264
265        mapped_val
266    }
267
268    /// Flat-maps the values of this RxVal using a function that returns RxVal<B>.
269    ///
270    /// When the source RxVal changes, the function is called to produce a new RxVal<B>,
271    /// and the result RxVal is updated to reflect the current value of that inner RxVal.
272    /// The result also updates when the inner RxVal changes.
273    ///
274    /// # Arguments
275    /// * `f` - Function to transform values from A to RxVal<B>
276    ///
277    /// # Example
278    /// ```
279    /// use rx_rs::core::RxRef;
280    ///
281    /// let outer = RxRef::new(1);
282    /// let inner1 = RxRef::new(10);
283    /// let inner2 = RxRef::new(20);
284    ///
285    /// let inner1_clone = inner1.clone();
286    /// let inner2_clone = inner2.clone();
287    ///
288    /// let flattened = outer.val().flat_map(move |&x| {
289    ///     if x == 1 { inner1_clone.val() } else { inner2_clone.val() }
290    /// });
291    ///
292    /// assert_eq!(flattened.get(), 10);
293    ///
294    /// outer.set(2);
295    /// assert_eq!(flattened.get(), 20);
296    /// ```
297    pub fn flat_map<B, F>(&self, f: F) -> RxVal<B>
298    where
299        T: Clone + PartialEq,
300        B: Clone + PartialEq + 'static,
301        F: Fn(&T) -> RxVal<B> + 'static,
302    {
303        use super::tracker::DisposableTracker;
304
305        // Get initial inner RxVal
306        let initial_inner = f(&self.get());
307        let initial_value = initial_inner.get();
308
309        // Create RxRef to hold the result
310        let result_ref = RxRef::new(initial_value);
311
312        // Create tracker for the outer subscription
313        let outer_tracker = Rc::new(DisposableTracker::new());
314
315        // Track the current inner subscription
316        let inner_tracker = Rc::new(RefCell::new(DisposableTracker::new()));
317
318        // Store the current inner RxVal to keep it alive
319        let current_inner = Rc::new(RefCell::new(Some(initial_inner.clone())));
320
321        // Get result val first
322        let result_val = result_ref.val();
323
324        // Subscribe to the initial inner RxVal
325        let result_weak_init = Rc::downgrade(&result_val.inner);
326        initial_inner.subscribe(inner_tracker.borrow().tracker(), move |inner_value| {
327            if let Some(result_inner) = result_weak_init.upgrade() {
328                let mut inner = result_inner.borrow_mut();
329                if inner.value != *inner_value {
330                    inner.value = inner_value.clone();
331                    // Notify subscribers
332                    for subscriber in &inner.subscribers {
333                        let mut sub = subscriber.borrow_mut();
334                        sub(inner_value);
335                    }
336                }
337            }
338        });
339
340        // Track the last outer value to avoid re-subscribing on duplicate updates
341        let last_outer_value = Rc::new(RefCell::new(self.get()));
342
343        // Subscribe to the outer RxVal using weak reference to result
344        let result_weak = Rc::downgrade(&result_val.inner);
345        let inner_tracker_clone = inner_tracker.clone();
346        let current_inner_clone = current_inner.clone();
347        let f_clone = Rc::new(f);
348
349        self.subscribe(outer_tracker.tracker(), move |outer_value| {
350            if let Some(result_inner) = result_weak.upgrade() {
351                // Only recreate inner subscription if outer value changed
352                let should_update = {
353                    let mut last_val = last_outer_value.borrow_mut();
354                    if *last_val != *outer_value {
355                        *last_val = outer_value.clone();
356                        true
357                    } else {
358                        false
359                    }
360                };
361
362                if !should_update {
363                    return; // Same value, don't recreate subscription
364                }
365
366                // Get new inner RxVal
367                let new_inner = f_clone(outer_value);
368
369                // Cancel previous inner subscription
370                inner_tracker_clone.borrow_mut().dispose();
371                *inner_tracker_clone.borrow_mut() = DisposableTracker::new();
372
373                // Set to the new inner's current value
374                let new_value = new_inner.get();
375                {
376                    let mut inner = result_inner.borrow_mut();
377                    if inner.value != new_value {
378                        inner.value = new_value.clone();
379                        // Notify subscribers
380                        for subscriber in &inner.subscribers {
381                            let mut sub = subscriber.borrow_mut();
382                            sub(&new_value);
383                        }
384                    }
385                }
386
387                // Subscribe to the new inner using weak reference
388                let result_weak2 = Rc::downgrade(&result_inner);
389                new_inner.subscribe(inner_tracker_clone.borrow().tracker(), move |inner_value| {
390                    if let Some(result_inner2) = result_weak2.upgrade() {
391                        let mut inner = result_inner2.borrow_mut();
392                        if inner.value != *inner_value {
393                            inner.value = inner_value.clone();
394                            // Notify subscribers
395                            for subscriber in &inner.subscribers {
396                                let mut sub = subscriber.borrow_mut();
397                                sub(inner_value);
398                            }
399                        }
400                    }
401                });
402
403                // Update current_inner to keep the new one alive
404                *current_inner_clone.borrow_mut() = Some(new_inner);
405            }
406        });
407
408        // We need to keep both trackers and current_inner alive
409        // Store them in a combined structure
410        let combined_tracker = Rc::new((outer_tracker, inner_tracker, current_inner));
411        result_val.inner.borrow_mut()._lifetime_tracker =
412            Some(combined_tracker as Rc<dyn std::any::Any>);
413
414        result_val
415    }
416
417    /// Flat-maps using a function that returns RxRef<B>.
418    /// Delegates to flat_map by converting the RxRef to RxVal.
419    pub fn flat_map_ref<B, F>(&self, f: F) -> RxVal<B>
420    where
421        T: Clone + PartialEq,
422        B: Clone + PartialEq + 'static,
423        F: Fn(&T) -> RxRef<B> + 'static,
424    {
425        self.flat_map(move |x| f(x).val())
426    }
427
428    /// Flat-maps using a function that returns RxObservable<B>.
429    /// Returns an RxObservable that switches to the new observable on each change.
430    pub fn flat_map_observable<B, F>(&self, f: F) -> RxObservable<B>
431    where
432        T: Clone,
433        B: Clone + 'static,
434        F: Fn(&T) -> RxObservable<B> + 'static,
435    {
436        use super::rx_subject::RxSubject;
437        use super::tracker::DisposableTracker;
438
439        let subject = RxSubject::new();
440        let outer_tracker = Rc::new(DisposableTracker::new());
441        let inner_tracker = Rc::new(RefCell::new(DisposableTracker::new()));
442
443        let subject_clone = subject.clone();
444        let inner_tracker_clone = inner_tracker.clone();
445        let f_rc = Rc::new(f);
446
447        self.subscribe(outer_tracker.tracker(), move |outer_value| {
448            let new_inner = f_rc(outer_value);
449            inner_tracker_clone.borrow_mut().dispose();
450            *inner_tracker_clone.borrow_mut() = DisposableTracker::new();
451
452            let subject_clone2 = subject_clone.clone();
453            new_inner.subscribe(inner_tracker_clone.borrow().tracker(), move |inner_value| {
454                subject_clone2.next(inner_value.clone());
455            });
456        });
457
458        let observable = subject.observable();
459        let combined_tracker = Rc::new((outer_tracker, inner_tracker));
460        observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
461
462        observable
463    }
464
465    /// Flat-maps using a function that returns RxSubject<B>.
466    /// Delegates to flat_map_observable by converting the RxSubject to RxObservable.
467    pub fn flat_map_subject<B, F>(&self, f: F) -> RxObservable<B>
468    where
469        T: Clone,
470        B: Clone + 'static,
471        F: Fn(&T) -> RxSubject<B> + 'static,
472    {
473        self.flat_map_observable(move |x| f(x).observable())
474    }
475
476    /// Combines this RxVal with another RxVal, producing a new RxVal containing a tuple.
477    ///
478    /// The resulting RxVal updates whenever either source changes.
479    ///
480    /// # Arguments
481    /// * `other` - Another RxVal to combine with
482    ///
483    /// # Example
484    /// ```
485    /// use rx_rs::core::RxRef;
486    ///
487    /// let name = RxRef::new("Alice");
488    /// let age = RxRef::new(30);
489    ///
490    /// let combined = name.val().zip_val(age.val());
491    ///
492    /// assert_eq!(combined.get(), ("Alice", 30));
493    ///
494    /// name.set("Bob");
495    /// assert_eq!(combined.get(), ("Bob", 30));
496    /// ```
497    pub fn zip_val<U>(&self, other: RxVal<U>) -> RxVal<(T, U)>
498    where
499        T: Clone + PartialEq,
500        U: Clone + PartialEq + 'static,
501    {
502        // Create initial combined value
503        let initial = (self.get(), other.get());
504
505        // Create RxRef to hold the zipped values
506        let result_ref = RxRef::new(initial);
507
508        // Create trackers
509        let tracker1 = Rc::new(DisposableTracker::new());
510        let tracker2 = Rc::new(DisposableTracker::new());
511
512        // Get result val first
513        let result_val = result_ref.val();
514
515        // Subscribe to self using weak references
516        let result_weak1 = Rc::downgrade(&result_val.inner);
517        let other_clone1 = other.clone();
518        self.subscribe(tracker1.tracker(), move |self_val| {
519            if let Some(result_inner) = result_weak1.upgrade() {
520                let new_value = (self_val.clone(), other_clone1.get());
521                let mut inner = result_inner.borrow_mut();
522                if inner.value != new_value {
523                    inner.value = new_value.clone();
524                    // Notify subscribers
525                    for subscriber in &inner.subscribers {
526                        let mut sub = subscriber.borrow_mut();
527                        sub(&new_value);
528                    }
529                }
530            }
531        });
532
533        // Subscribe to other using weak references
534        let result_weak2 = Rc::downgrade(&result_val.inner);
535        let self_clone = self.clone();
536        other.subscribe(tracker2.tracker(), move |other_val| {
537            if let Some(result_inner) = result_weak2.upgrade() {
538                let new_value = (self_clone.get(), other_val.clone());
539                let mut inner = result_inner.borrow_mut();
540                if inner.value != new_value {
541                    inner.value = new_value.clone();
542                    // Notify subscribers
543                    for subscriber in &inner.subscribers {
544                        let mut sub = subscriber.borrow_mut();
545                        sub(&new_value);
546                    }
547                }
548            }
549        });
550        let combined_tracker = Rc::new((tracker1, tracker2));
551        result_val.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
552
553        result_val
554    }
555
556    /// Combines this RxVal with an RxRef.
557    /// Delegates to zip_val by converting the RxRef to RxVal.
558    pub fn zip_ref<U>(&self, other: RxRef<U>) -> RxVal<(T, U)>
559    where
560        T: Clone + PartialEq,
561        U: Clone + PartialEq + 'static,
562    {
563        self.zip_val(other.val())
564    }
565}
566
567impl<T: 'static> RxVal<T>
568where
569    T: Clone,
570{
571    /// Creates a new RxVal with the given initial value.
572    ///
573    /// This is primarily used internally by RxRef. Users should typically
574    /// create an RxRef and get the RxVal via `.val()`.
575    pub(crate) fn new(value: T) -> Self {
576        Self {
577            inner: Rc::new(RefCell::new(RxValInner {
578                value,
579                subscribers: Vec::new(),
580                _lifetime_tracker: None,
581            })),
582        }
583    }
584
585    /// Updates the value and notifies all subscribers.
586    ///
587    /// This is an internal method used by RxRef.
588    pub(crate) fn update(&self, value: T)
589    where
590        T: PartialEq,
591    {
592        // Clone subscribers list and value to avoid holding borrow during notification
593        let (subscribers, new_value) = {
594            let mut inner = self.inner.borrow_mut();
595
596            // Only update and notify if the value actually changed
597            if inner.value != value {
598                inner.value = value.clone();
599                (inner.subscribers.clone(), value)
600            } else {
601                return; // No change, no notification
602            }
603        };
604
605        // Notify all subscribers without holding the borrow
606        for subscriber in &subscribers {
607            let mut sub = subscriber.borrow_mut();
608            sub(&new_value);
609        }
610    }
611}