rx_rs/core/rx_val.rs
1use std::any::Any;
2use std::cell::RefCell;
3use std::rc::Rc;
4
5use super::rx_observable::RxObservable;
6use super::rx_ref::RxRef;
7use super::rx_subject::RxSubject;
8use super::tracker::{DisposableTracker, Tracker};
9
10type Subscriber<T> = Rc<RefCell<Box<dyn FnMut(&T)>>>;
11
12/// Internal storage for a reactive value.
13#[derive(Clone)]
14struct RxValInner<T> {
15 value: T,
16 subscribers: Vec<Subscriber<T>>,
17 // Optional tracker to keep subscriptions alive
18 // Used by .map() and other operators to maintain source subscriptions
19 // Can store any type that needs to be kept alive
20 _lifetime_tracker: Option<Rc<dyn Any>>,
21}
22
23/// A read-only reactive value that holds a current state.
24///
25/// When subscribed, the subscriber is called immediately with the current value,
26/// and then called again whenever the value changes.
27///
28/// This is useful for representing state that always has a current value, like
29/// connection status, client ID, or configuration values.
30///
31/// # Example
32/// ```
33/// use rx_rs::core::{RxRef, DisposableTracker};
34///
35/// let mut tracker = DisposableTracker::new();
36/// let rx_ref = RxRef::new(42);
37/// let rx_val = rx_ref.val();
38///
39/// rx_val.subscribe(tracker.tracker(), |value| {
40/// println!("Value: {}", value);
41/// }); // Prints "Value: 42" immediately
42///
43/// rx_ref.set(100); // Prints "Value: 100"
44/// ```
45#[derive(Clone)]
46pub struct RxVal<T> {
47 inner: Rc<RefCell<RxValInner<T>>>,
48}
49
50impl<T: 'static> RxVal<T> {
51 /// Gets the current value.
52 pub fn get(&self) -> T
53 where
54 T: Clone,
55 {
56 self.inner.borrow().value.clone()
57 }
58
59 /// Subscribes to value changes.
60 ///
61 /// The subscriber function is called immediately with the current value,
62 /// and then called again whenever the value changes.
63 ///
64 /// The subscription is automatically cleaned up when the tracker is dropped.
65 ///
66 /// # Arguments
67 /// * `tracker` - Tracker that will manage this subscription's lifetime
68 /// * `f` - Function called with a reference to the value on each update
69 pub fn subscribe<F>(&self, tracker: &Tracker, mut f: F)
70 where
71 F: FnMut(&T) + 'static,
72 T: Clone,
73 {
74 // Clone current value and release borrow before calling callback
75 let current_value = self.inner.borrow().value.clone();
76
77 // Call immediately with cloned value (no borrow held)
78 f(¤t_value);
79
80 // Wrap the subscriber in Rc<RefCell<>> for shared ownership
81 let subscriber = Rc::new(RefCell::new(Box::new(f) as Box<dyn FnMut(&T)>));
82
83 // Store for future updates
84 let subscriber_clone = subscriber.clone();
85 let inner_weak = Rc::downgrade(&self.inner);
86
87 // Add subscriber - now this will succeed even if called from within a notification
88 self.inner.borrow_mut().subscribers.push(subscriber_clone);
89
90 #[cfg(feature = "debug")]
91 {
92 let debug_ptr = std::rc::Rc::as_ptr(&self.inner) as usize;
93 let sub_count = self.inner.borrow().subscribers.len();
94 tracing::debug!(
95 ptr = format!("0x{:x}", debug_ptr),
96 subscriber_count = sub_count,
97 "subscription added to RxVal"
98 );
99 }
100
101 // Add cleanup to tracker
102 #[cfg(feature = "debug")]
103 let debug_ptr = std::rc::Rc::as_ptr(&self.inner) as usize;
104
105 tracker.add(move || {
106 // Remove subscriber when tracker drops
107 // Use weak reference to avoid cycle
108 #[cfg(feature = "debug")]
109 tracing::debug!(
110 ptr = format!("0x{:x}", debug_ptr),
111 "subscription cleanup called"
112 );
113
114 if let Some(inner_rc) = inner_weak.upgrade() {
115 if let Ok(mut inner) = inner_rc.try_borrow_mut() {
116 #[cfg(feature = "debug")]
117 let before_count = inner.subscribers.len();
118
119 inner.subscribers.retain(|s| !Rc::ptr_eq(s, &subscriber));
120
121 #[cfg(feature = "debug")]
122 {
123 let after_count = inner.subscribers.len();
124 tracing::debug!(
125 ptr = format!("0x{:x}", debug_ptr),
126 before = before_count,
127 after = after_count,
128 "removed subscriber from RxVal"
129 );
130 }
131 }
132 } else {
133 #[cfg(feature = "debug")]
134 tracing::debug!(ptr = format!("0x{:x}", debug_ptr), "RxVal already dropped");
135 }
136 });
137 }
138
139 /// Returns the number of active subscribers.
140 pub fn subscriber_count(&self) -> usize {
141 self.inner.borrow().subscribers.len()
142 }
143
144 /// Returns a pointer address for debugging identity.
145 /// Use this to check if two RxVal instances share the same underlying data.
146 pub fn debug_ptr(&self) -> usize {
147 std::rc::Rc::as_ptr(&self.inner) as usize
148 }
149
150 /// Converts this RxVal into a stream (RxObservable).
151 ///
152 /// The returned observable does NOT emit the current value immediately on subscription.
153 /// It only emits when the RxVal changes to a new value.
154 ///
155 /// Note: If you subscribe directly to the RxVal, it WILL emit immediately.
156 /// But when converted to a stream, it behaves like a pure observable.
157 ///
158 /// # Example
159 /// ```
160 /// use rx_rs::core::{RxRef, DisposableTracker};
161 ///
162 /// let mut tracker = DisposableTracker::new();
163 /// let rx_ref = RxRef::new(42);
164 /// let stream = rx_ref.val().stream();
165 ///
166 /// stream.subscribe(tracker.tracker(), |value| {
167 /// println!("Value: {}", value);
168 /// }); // Does NOT print immediately
169 ///
170 /// rx_ref.set(100); // Prints "Value: 100"
171 /// ```
172 pub fn stream(&self) -> RxObservable<T>
173 where
174 T: Clone,
175 {
176 // Create a subject to forward values to
177 let subject = RxSubject::new();
178
179 // Create a tracker that will keep the subscription alive
180 let tracker = Rc::new(DisposableTracker::new());
181
182 // Create a flag to skip the first emission (current value)
183 let first = Rc::new(RefCell::new(true));
184
185 // Subscribe to this RxVal and forward all values to the subject
186 // BUT skip the immediate emission of the current value
187 let subject_clone = subject.clone();
188 self.subscribe(tracker.tracker(), move |value| {
189 if *first.borrow() {
190 *first.borrow_mut() = false;
191 return; // Skip the first emission (current value)
192 }
193 subject_clone.next(value.clone());
194 });
195
196 // Get the inner from the subject's observable
197 let subject_observable = subject.observable();
198
199 // Attach the tracker to keep subscription alive
200 subject_observable.inner.borrow_mut()._lifetime_tracker = Some(tracker as Rc<dyn Any>);
201
202 subject_observable
203 }
204
205 /// Maps the values of this RxVal using a transformation function.
206 ///
207 /// Returns a new RxVal that always contains the transformed value.
208 /// When the source RxVal changes, the transformation is applied and
209 /// the resulting RxVal is updated.
210 ///
211 /// # Arguments
212 /// * `f` - Function to transform values from A to B
213 ///
214 /// # Example
215 /// ```
216 /// use rx_rs::core::RxRef;
217 ///
218 /// let number = RxRef::new(5);
219 /// let doubled = number.val().map(|x| x * 2);
220 ///
221 /// assert_eq!(doubled.get(), 10);
222 ///
223 /// number.set(10);
224 /// assert_eq!(doubled.get(), 20);
225 /// ```
226 pub fn map<B, F>(&self, f: F) -> RxVal<B>
227 where
228 T: Clone,
229 B: Clone + PartialEq + 'static,
230 F: Fn(&T) -> B + 'static,
231 {
232 // Create the initial mapped value
233 let initial = f(&self.get());
234
235 // Create a tracker that will live as long as the returned RxVal
236 let tracker = Rc::new(DisposableTracker::new());
237
238 // Create a new RxRef to hold the mapped values
239 let rx_ref = RxRef::new(initial);
240
241 // Get the result val first
242 let mapped_val = rx_ref.val();
243
244 // Subscribe to this RxVal and update the mapped ref
245 // Use weak reference to result's inner to avoid cycle
246 let result_weak = Rc::downgrade(&mapped_val.inner);
247 self.subscribe(tracker.tracker(), move |value| {
248 if let Some(result_inner) = result_weak.upgrade() {
249 let new_value = f(value);
250 let mut inner = result_inner.borrow_mut();
251 if inner.value != new_value {
252 inner.value = new_value.clone();
253 // Notify subscribers
254 for subscriber in &inner.subscribers {
255 let mut sub = subscriber.borrow_mut();
256 sub(&new_value);
257 }
258 }
259 }
260 });
261
262 // Attach the tracker to keep subscription alive
263 mapped_val.inner.borrow_mut()._lifetime_tracker = Some(tracker as Rc<dyn Any>);
264
265 mapped_val
266 }
267
268 /// Flat-maps the values of this RxVal using a function that returns RxVal<B>.
269 ///
270 /// When the source RxVal changes, the function is called to produce a new RxVal<B>,
271 /// and the result RxVal is updated to reflect the current value of that inner RxVal.
272 /// The result also updates when the inner RxVal changes.
273 ///
274 /// # Arguments
275 /// * `f` - Function to transform values from A to RxVal<B>
276 ///
277 /// # Example
278 /// ```
279 /// use rx_rs::core::RxRef;
280 ///
281 /// let outer = RxRef::new(1);
282 /// let inner1 = RxRef::new(10);
283 /// let inner2 = RxRef::new(20);
284 ///
285 /// let inner1_clone = inner1.clone();
286 /// let inner2_clone = inner2.clone();
287 ///
288 /// let flattened = outer.val().flat_map(move |&x| {
289 /// if x == 1 { inner1_clone.val() } else { inner2_clone.val() }
290 /// });
291 ///
292 /// assert_eq!(flattened.get(), 10);
293 ///
294 /// outer.set(2);
295 /// assert_eq!(flattened.get(), 20);
296 /// ```
297 pub fn flat_map<B, F>(&self, f: F) -> RxVal<B>
298 where
299 T: Clone + PartialEq,
300 B: Clone + PartialEq + 'static,
301 F: Fn(&T) -> RxVal<B> + 'static,
302 {
303 use super::tracker::DisposableTracker;
304
305 // Get initial inner RxVal
306 let initial_inner = f(&self.get());
307 let initial_value = initial_inner.get();
308
309 // Create RxRef to hold the result
310 let result_ref = RxRef::new(initial_value);
311
312 // Create tracker for the outer subscription
313 let outer_tracker = Rc::new(DisposableTracker::new());
314
315 // Track the current inner subscription
316 let inner_tracker = Rc::new(RefCell::new(DisposableTracker::new()));
317
318 // Store the current inner RxVal to keep it alive
319 let current_inner = Rc::new(RefCell::new(Some(initial_inner.clone())));
320
321 // Get result val first
322 let result_val = result_ref.val();
323
324 // Subscribe to the initial inner RxVal
325 let result_weak_init = Rc::downgrade(&result_val.inner);
326 initial_inner.subscribe(inner_tracker.borrow().tracker(), move |inner_value| {
327 if let Some(result_inner) = result_weak_init.upgrade() {
328 let mut inner = result_inner.borrow_mut();
329 if inner.value != *inner_value {
330 inner.value = inner_value.clone();
331 // Notify subscribers
332 for subscriber in &inner.subscribers {
333 let mut sub = subscriber.borrow_mut();
334 sub(inner_value);
335 }
336 }
337 }
338 });
339
340 // Track the last outer value to avoid re-subscribing on duplicate updates
341 let last_outer_value = Rc::new(RefCell::new(self.get()));
342
343 // Subscribe to the outer RxVal using weak reference to result
344 let result_weak = Rc::downgrade(&result_val.inner);
345 let inner_tracker_clone = inner_tracker.clone();
346 let current_inner_clone = current_inner.clone();
347 let f_clone = Rc::new(f);
348
349 self.subscribe(outer_tracker.tracker(), move |outer_value| {
350 if let Some(result_inner) = result_weak.upgrade() {
351 // Only recreate inner subscription if outer value changed
352 let should_update = {
353 let mut last_val = last_outer_value.borrow_mut();
354 if *last_val != *outer_value {
355 *last_val = outer_value.clone();
356 true
357 } else {
358 false
359 }
360 };
361
362 if !should_update {
363 return; // Same value, don't recreate subscription
364 }
365
366 // Get new inner RxVal
367 let new_inner = f_clone(outer_value);
368
369 // Cancel previous inner subscription
370 inner_tracker_clone.borrow_mut().dispose();
371 *inner_tracker_clone.borrow_mut() = DisposableTracker::new();
372
373 // Set to the new inner's current value
374 let new_value = new_inner.get();
375 {
376 let mut inner = result_inner.borrow_mut();
377 if inner.value != new_value {
378 inner.value = new_value.clone();
379 // Notify subscribers
380 for subscriber in &inner.subscribers {
381 let mut sub = subscriber.borrow_mut();
382 sub(&new_value);
383 }
384 }
385 }
386
387 // Subscribe to the new inner using weak reference
388 let result_weak2 = Rc::downgrade(&result_inner);
389 new_inner.subscribe(inner_tracker_clone.borrow().tracker(), move |inner_value| {
390 if let Some(result_inner2) = result_weak2.upgrade() {
391 let mut inner = result_inner2.borrow_mut();
392 if inner.value != *inner_value {
393 inner.value = inner_value.clone();
394 // Notify subscribers
395 for subscriber in &inner.subscribers {
396 let mut sub = subscriber.borrow_mut();
397 sub(inner_value);
398 }
399 }
400 }
401 });
402
403 // Update current_inner to keep the new one alive
404 *current_inner_clone.borrow_mut() = Some(new_inner);
405 }
406 });
407
408 // We need to keep both trackers and current_inner alive
409 // Store them in a combined structure
410 let combined_tracker = Rc::new((outer_tracker, inner_tracker, current_inner));
411 result_val.inner.borrow_mut()._lifetime_tracker =
412 Some(combined_tracker as Rc<dyn std::any::Any>);
413
414 result_val
415 }
416
417 /// Flat-maps using a function that returns RxRef<B>.
418 /// Delegates to flat_map by converting the RxRef to RxVal.
419 pub fn flat_map_ref<B, F>(&self, f: F) -> RxVal<B>
420 where
421 T: Clone + PartialEq,
422 B: Clone + PartialEq + 'static,
423 F: Fn(&T) -> RxRef<B> + 'static,
424 {
425 self.flat_map(move |x| f(x).val())
426 }
427
428 /// Flat-maps using a function that returns RxObservable<B>.
429 /// Returns an RxObservable that switches to the new observable on each change.
430 pub fn flat_map_observable<B, F>(&self, f: F) -> RxObservable<B>
431 where
432 T: Clone,
433 B: Clone + 'static,
434 F: Fn(&T) -> RxObservable<B> + 'static,
435 {
436 use super::rx_subject::RxSubject;
437 use super::tracker::DisposableTracker;
438
439 let subject = RxSubject::new();
440 let outer_tracker = Rc::new(DisposableTracker::new());
441 let inner_tracker = Rc::new(RefCell::new(DisposableTracker::new()));
442
443 let subject_clone = subject.clone();
444 let inner_tracker_clone = inner_tracker.clone();
445 let f_rc = Rc::new(f);
446
447 self.subscribe(outer_tracker.tracker(), move |outer_value| {
448 let new_inner = f_rc(outer_value);
449 inner_tracker_clone.borrow_mut().dispose();
450 *inner_tracker_clone.borrow_mut() = DisposableTracker::new();
451
452 let subject_clone2 = subject_clone.clone();
453 new_inner.subscribe(inner_tracker_clone.borrow().tracker(), move |inner_value| {
454 subject_clone2.next(inner_value.clone());
455 });
456 });
457
458 let observable = subject.observable();
459 let combined_tracker = Rc::new((outer_tracker, inner_tracker));
460 observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
461
462 observable
463 }
464
465 /// Flat-maps using a function that returns RxSubject<B>.
466 /// Delegates to flat_map_observable by converting the RxSubject to RxObservable.
467 pub fn flat_map_subject<B, F>(&self, f: F) -> RxObservable<B>
468 where
469 T: Clone,
470 B: Clone + 'static,
471 F: Fn(&T) -> RxSubject<B> + 'static,
472 {
473 self.flat_map_observable(move |x| f(x).observable())
474 }
475
476 /// Combines this RxVal with another RxVal, producing a new RxVal containing a tuple.
477 ///
478 /// The resulting RxVal updates whenever either source changes.
479 ///
480 /// # Arguments
481 /// * `other` - Another RxVal to combine with
482 ///
483 /// # Example
484 /// ```
485 /// use rx_rs::core::RxRef;
486 ///
487 /// let name = RxRef::new("Alice");
488 /// let age = RxRef::new(30);
489 ///
490 /// let combined = name.val().zip_val(age.val());
491 ///
492 /// assert_eq!(combined.get(), ("Alice", 30));
493 ///
494 /// name.set("Bob");
495 /// assert_eq!(combined.get(), ("Bob", 30));
496 /// ```
497 pub fn zip_val<U>(&self, other: RxVal<U>) -> RxVal<(T, U)>
498 where
499 T: Clone + PartialEq,
500 U: Clone + PartialEq + 'static,
501 {
502 // Create initial combined value
503 let initial = (self.get(), other.get());
504
505 // Create RxRef to hold the zipped values
506 let result_ref = RxRef::new(initial);
507
508 // Create trackers
509 let tracker1 = Rc::new(DisposableTracker::new());
510 let tracker2 = Rc::new(DisposableTracker::new());
511
512 // Get result val first
513 let result_val = result_ref.val();
514
515 // Subscribe to self using weak references
516 let result_weak1 = Rc::downgrade(&result_val.inner);
517 let other_clone1 = other.clone();
518 self.subscribe(tracker1.tracker(), move |self_val| {
519 if let Some(result_inner) = result_weak1.upgrade() {
520 let new_value = (self_val.clone(), other_clone1.get());
521 let mut inner = result_inner.borrow_mut();
522 if inner.value != new_value {
523 inner.value = new_value.clone();
524 // Notify subscribers
525 for subscriber in &inner.subscribers {
526 let mut sub = subscriber.borrow_mut();
527 sub(&new_value);
528 }
529 }
530 }
531 });
532
533 // Subscribe to other using weak references
534 let result_weak2 = Rc::downgrade(&result_val.inner);
535 let self_clone = self.clone();
536 other.subscribe(tracker2.tracker(), move |other_val| {
537 if let Some(result_inner) = result_weak2.upgrade() {
538 let new_value = (self_clone.get(), other_val.clone());
539 let mut inner = result_inner.borrow_mut();
540 if inner.value != new_value {
541 inner.value = new_value.clone();
542 // Notify subscribers
543 for subscriber in &inner.subscribers {
544 let mut sub = subscriber.borrow_mut();
545 sub(&new_value);
546 }
547 }
548 }
549 });
550 let combined_tracker = Rc::new((tracker1, tracker2));
551 result_val.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
552
553 result_val
554 }
555
556 /// Combines this RxVal with an RxRef.
557 /// Delegates to zip_val by converting the RxRef to RxVal.
558 pub fn zip_ref<U>(&self, other: RxRef<U>) -> RxVal<(T, U)>
559 where
560 T: Clone + PartialEq,
561 U: Clone + PartialEq + 'static,
562 {
563 self.zip_val(other.val())
564 }
565}
566
567impl<T: 'static> RxVal<T>
568where
569 T: Clone,
570{
571 /// Creates a new RxVal with the given initial value.
572 ///
573 /// This is primarily used internally by RxRef. Users should typically
574 /// create an RxRef and get the RxVal via `.val()`.
575 pub(crate) fn new(value: T) -> Self {
576 Self {
577 inner: Rc::new(RefCell::new(RxValInner {
578 value,
579 subscribers: Vec::new(),
580 _lifetime_tracker: None,
581 })),
582 }
583 }
584
585 /// Updates the value and notifies all subscribers.
586 ///
587 /// This is an internal method used by RxRef.
588 pub(crate) fn update(&self, value: T)
589 where
590 T: PartialEq,
591 {
592 // Clone subscribers list and value to avoid holding borrow during notification
593 let (subscribers, new_value) = {
594 let mut inner = self.inner.borrow_mut();
595
596 // Only update and notify if the value actually changed
597 if inner.value != value {
598 inner.value = value.clone();
599 (inner.subscribers.clone(), value)
600 } else {
601 return; // No change, no notification
602 }
603 };
604
605 // Notify all subscribers without holding the borrow
606 for subscriber in &subscribers {
607 let mut sub = subscriber.borrow_mut();
608 sub(&new_value);
609 }
610 }
611}