Skip to main content

ftui_runtime/reactive/
observable.rs

1#![forbid(unsafe_code)]
2
3//! Observable value wrapper with change notification and version tracking.
4//!
5//! # Design
6//!
7//! [`Observable<T>`] wraps a value of type `T` in shared, reference-counted
8//! storage (`Rc<RefCell<..>>`). When the value changes (determined by
9//! `PartialEq`), all live subscribers are notified in registration order.
10//!
11//! # Performance
12//!
13//! | Operation    | Complexity               |
14//! |-------------|--------------------------|
15//! | `get()`     | O(1)                     |
16//! | `set()`     | O(S) where S = subscribers |
17//! | `subscribe()` | O(1) amortized          |
18//! | Memory      | ~48 bytes + sizeof(T)    |
19//!
20//! # Failure Modes
21//!
22//! - **Re-entrant set**: Calling `set()` from within a subscriber callback
23//!   will panic (RefCell borrow rules). This is intentional: re-entrant
24//!   mutations indicate a design bug in the subscriber graph.
25//! - **Subscriber leak**: If `Subscription` guards are stored indefinitely
26//!   without being dropped, callbacks accumulate. Dead weak references are
27//!   cleaned lazily during `notify()`.
28
29use std::cell::RefCell;
30use std::rc::{Rc, Weak};
31
32/// A subscriber callback stored as a strong `Rc` internally, handed out
33/// as `Weak` to the observable.
34type CallbackRc<T> = Rc<dyn Fn(&T)>;
35type CallbackWeak<T> = Weak<dyn Fn(&T)>;
36
37/// Shared interior for [`Observable<T>`].
38struct ObservableInner<T> {
39    value: T,
40    version: u64,
41    /// Subscribers stored as weak references. Dead entries are pruned on notify.
42    subscribers: Vec<CallbackWeak<T>>,
43}
44
45/// A shared, version-tracked value with change notification.
46///
47/// Cloning an `Observable` creates a new handle to the **same** inner state —
48/// both handles see the same value and share subscribers.
49///
50/// # Invariants
51///
52/// 1. `version` increments by exactly 1 on each value-changing mutation.
53/// 2. `set(v)` where `v == current` is a no-op.
54/// 3. Subscribers are notified in registration order.
55/// 4. Dead subscribers (dropped [`Subscription`] guards) are pruned lazily.
56pub struct Observable<T> {
57    inner: Rc<RefCell<ObservableInner<T>>>,
58}
59
60// Manual Clone: shares the same Rc.
61impl<T> Clone for Observable<T> {
62    fn clone(&self) -> Self {
63        Self {
64            inner: Rc::clone(&self.inner),
65        }
66    }
67}
68
69impl<T: std::fmt::Debug> std::fmt::Debug for Observable<T> {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        let inner = self.inner.borrow();
72        f.debug_struct("Observable")
73            .field("value", &inner.value)
74            .field("version", &inner.version)
75            .field("subscriber_count", &inner.subscribers.len())
76            .finish()
77    }
78}
79
80impl<T: Clone + PartialEq + 'static> Observable<T> {
81    /// Create a new observable with the given initial value.
82    ///
83    /// The initial version is 0 and no subscribers are registered.
84    #[must_use]
85    pub fn new(value: T) -> Self {
86        Self {
87            inner: Rc::new(RefCell::new(ObservableInner {
88                value,
89                version: 0,
90                subscribers: Vec::new(),
91            })),
92        }
93    }
94
95    /// Get a clone of the current value.
96    #[must_use]
97    pub fn get(&self) -> T {
98        self.inner.borrow().value.clone()
99    }
100
101    /// Access the current value by reference without cloning.
102    ///
103    /// The closure `f` receives an immutable reference to the value.
104    pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
105        f(&self.inner.borrow().value)
106    }
107
108    /// Set a new value. If the new value differs from the current value
109    /// (by `PartialEq`), the version is incremented and all live subscribers
110    /// are notified.
111    ///
112    /// This method is safe to call re-entrantly from within subscriber callbacks.
113    pub fn set(&self, value: T) {
114        let changed = {
115            let mut inner = self.inner.borrow_mut();
116            if inner.value == value {
117                return;
118            }
119            inner.value = value;
120            inner.version += 1;
121            true
122        };
123        if changed {
124            self.notify();
125        }
126    }
127
128    /// Modify the value in place via a closure. If the value changes
129    /// (compared by `PartialEq` against a snapshot), the version is
130    /// incremented and subscribers are notified.
131    ///
132    /// This method is safe to call re-entrantly from within subscriber callbacks.
133    pub fn update(&self, f: impl FnOnce(&mut T)) {
134        let changed = {
135            let mut inner = self.inner.borrow_mut();
136            let old = inner.value.clone();
137            f(&mut inner.value);
138            if inner.value != old {
139                inner.version += 1;
140                true
141            } else {
142                false
143            }
144        };
145        if changed {
146            self.notify();
147        }
148    }
149
150    /// Subscribe to value changes. The callback is invoked with a reference
151    /// to the new value each time it changes.
152    ///
153    /// Returns a [`Subscription`] guard. Dropping the guard unsubscribes
154    /// the callback (it will not be called after drop, though it may still
155    /// be in the subscriber list until the next `notify()` prunes it).
156    pub fn subscribe(&self, callback: impl Fn(&T) + 'static) -> Subscription {
157        let strong: CallbackRc<T> = Rc::new(callback);
158        let weak = Rc::downgrade(&strong);
159        self.inner.borrow_mut().subscribers.push(weak);
160        // Wrap in a holder struct that can be type-erased as `dyn Any`,
161        // since `Rc<dyn Fn(&T)>` itself cannot directly coerce to `Rc<dyn Any>`.
162        Subscription {
163            _guard: Box::new(strong),
164        }
165    }
166
167    /// Current version number. Increments by 1 on each value-changing
168    /// mutation. Useful for dirty-checking in render loops.
169    #[must_use]
170    pub fn version(&self) -> u64 {
171        self.inner.borrow().version
172    }
173
174    /// Number of currently registered subscribers (including dead ones
175    /// not yet pruned).
176    #[must_use]
177    pub fn subscriber_count(&self) -> usize {
178        self.inner.borrow().subscribers.len()
179    }
180
181    /// Notify live subscribers and prune dead ones.
182    ///
183    /// If a batch scope is active (see [`super::batch::BatchScope`]),
184    /// notifications are deferred until the batch exits.
185    fn notify(&self) {
186        // Collect live callbacks first (to avoid holding the borrow during calls).
187        let callbacks: Vec<CallbackRc<T>> = {
188            let mut inner = self.inner.borrow_mut();
189            // Prune dead weak refs and collect live ones.
190            inner.subscribers.retain(|w| w.strong_count() > 0);
191            inner
192                .subscribers
193                .iter()
194                .filter_map(|w| w.upgrade())
195                .collect()
196        };
197
198        if callbacks.is_empty() {
199            return;
200        }
201
202        // Clone the value once for all callbacks.
203        let value = self.inner.borrow().value.clone();
204
205        if super::batch::is_batching() {
206            // Defer each callback to the batch queue.
207            for cb in callbacks {
208                let v = value.clone();
209                super::batch::defer_or_run(move || cb(&v));
210            }
211        } else {
212            // Fire immediately.
213            for cb in &callbacks {
214                cb(&value);
215            }
216        }
217    }
218}
219
220/// RAII guard for a subscriber callback.
221///
222/// Dropping the `Subscription` causes the associated callback to become
223/// unreachable (the strong `Rc` is dropped, so the `Weak` in the
224/// observable's subscriber list will fail to upgrade on the next
225/// notification cycle).
226pub struct Subscription {
227    /// Type-erased strong reference keeping the callback `Rc` alive.
228    /// When this `Box<dyn Any>` is dropped, the inner `Rc<dyn Fn(&T)>`
229    /// is dropped, and the corresponding `Weak` in the subscriber list
230    /// loses its referent.
231    _guard: Box<dyn std::any::Any>,
232}
233
234impl std::fmt::Debug for Subscription {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        f.debug_struct("Subscription").finish_non_exhaustive()
237    }
238}
239
240// ---------------------------------------------------------------------------
241// Tests
242// ---------------------------------------------------------------------------
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use std::cell::Cell;
248
249    #[test]
250    fn get_set_basic() {
251        let obs = Observable::new(42);
252        assert_eq!(obs.get(), 42);
253        assert_eq!(obs.version(), 0);
254
255        obs.set(99);
256        assert_eq!(obs.get(), 99);
257        assert_eq!(obs.version(), 1);
258    }
259
260    #[test]
261    fn no_change_no_version_bump() {
262        let obs = Observable::new(42);
263        obs.set(42); // Same value.
264        assert_eq!(obs.version(), 0);
265    }
266
267    #[test]
268    fn with_access() {
269        let obs = Observable::new(vec![1, 2, 3]);
270        let sum = obs.with(|v| v.iter().sum::<i32>());
271        assert_eq!(sum, 6);
272    }
273
274    #[test]
275    fn update_mutates_in_place() {
276        let obs = Observable::new(vec![1, 2, 3]);
277        obs.update(|v| v.push(4));
278        assert_eq!(obs.get(), vec![1, 2, 3, 4]);
279        assert_eq!(obs.version(), 1);
280    }
281
282    #[test]
283    fn update_no_change_no_bump() {
284        let obs = Observable::new(10);
285        obs.update(|v| {
286            *v = 10; // Same value.
287        });
288        assert_eq!(obs.version(), 0);
289    }
290
291    #[test]
292    fn change_notification() {
293        let obs = Observable::new(0);
294        let count = Rc::new(Cell::new(0u32));
295        let count_clone = Rc::clone(&count);
296
297        let _sub = obs.subscribe(move |_val| {
298            count_clone.set(count_clone.get() + 1);
299        });
300
301        obs.set(1);
302        assert_eq!(count.get(), 1);
303
304        obs.set(2);
305        assert_eq!(count.get(), 2);
306
307        // Same value — no notification.
308        obs.set(2);
309        assert_eq!(count.get(), 2);
310    }
311
312    #[test]
313    fn subscriber_receives_new_value() {
314        let obs = Observable::new(0);
315        let last_seen = Rc::new(Cell::new(0));
316        let last_clone = Rc::clone(&last_seen);
317
318        let _sub = obs.subscribe(move |val| {
319            last_clone.set(*val);
320        });
321
322        obs.set(42);
323        assert_eq!(last_seen.get(), 42);
324
325        obs.set(99);
326        assert_eq!(last_seen.get(), 99);
327    }
328
329    #[test]
330    fn subscription_drop_unsubscribes() {
331        let obs = Observable::new(0);
332        let count = Rc::new(Cell::new(0u32));
333        let count_clone = Rc::clone(&count);
334
335        let sub = obs.subscribe(move |_val| {
336            count_clone.set(count_clone.get() + 1);
337        });
338
339        obs.set(1);
340        assert_eq!(count.get(), 1);
341
342        drop(sub);
343
344        obs.set(2);
345        // Callback should NOT have been called.
346        assert_eq!(count.get(), 1);
347    }
348
349    #[test]
350    fn multiple_subscribers() {
351        let obs = Observable::new(0);
352        let a = Rc::new(Cell::new(0u32));
353        let b = Rc::new(Cell::new(0u32));
354        let a_clone = Rc::clone(&a);
355        let b_clone = Rc::clone(&b);
356
357        let _sub_a = obs.subscribe(move |_| a_clone.set(a_clone.get() + 1));
358        let _sub_b = obs.subscribe(move |_| b_clone.set(b_clone.get() + 1));
359
360        obs.set(1);
361        assert_eq!(a.get(), 1);
362        assert_eq!(b.get(), 1);
363
364        obs.set(2);
365        assert_eq!(a.get(), 2);
366        assert_eq!(b.get(), 2);
367    }
368
369    #[test]
370    fn version_increment() {
371        let obs = Observable::new("hello".to_string());
372        assert_eq!(obs.version(), 0);
373
374        obs.set("world".to_string());
375        assert_eq!(obs.version(), 1);
376
377        obs.set("!".to_string());
378        assert_eq!(obs.version(), 2);
379
380        // Same value, no increment.
381        obs.set("!".to_string());
382        assert_eq!(obs.version(), 2);
383    }
384
385    #[test]
386    fn clone_shares_state() {
387        let obs1 = Observable::new(0);
388        let obs2 = obs1.clone();
389
390        obs1.set(42);
391        assert_eq!(obs2.get(), 42);
392        assert_eq!(obs2.version(), 1);
393
394        obs2.set(99);
395        assert_eq!(obs1.get(), 99);
396        assert_eq!(obs1.version(), 2);
397    }
398
399    #[test]
400    fn clone_shares_subscribers() {
401        let obs1 = Observable::new(0);
402        let count = Rc::new(Cell::new(0u32));
403        let count_clone = Rc::clone(&count);
404
405        let _sub = obs1.subscribe(move |_| count_clone.set(count_clone.get() + 1));
406
407        let obs2 = obs1.clone();
408        obs2.set(1);
409        assert_eq!(count.get(), 1); // Subscriber sees change via clone.
410    }
411
412    #[test]
413    fn subscriber_count() {
414        let obs = Observable::new(0);
415        assert_eq!(obs.subscriber_count(), 0);
416
417        let _s1 = obs.subscribe(|_| {});
418        assert_eq!(obs.subscriber_count(), 1);
419
420        let s2 = obs.subscribe(|_| {});
421        assert_eq!(obs.subscriber_count(), 2);
422
423        drop(s2);
424        // Dead subscriber not yet pruned.
425        assert_eq!(obs.subscriber_count(), 2);
426
427        // Trigger notify to prune dead.
428        obs.set(1);
429        assert_eq!(obs.subscriber_count(), 1);
430    }
431
432    #[test]
433    fn debug_format() {
434        let obs = Observable::new(42);
435        let dbg = format!("{:?}", obs);
436        assert!(dbg.contains("Observable"));
437        assert!(dbg.contains("42"));
438        assert!(dbg.contains("version"));
439    }
440
441    #[test]
442    fn notification_order_is_registration_order() {
443        let obs = Observable::new(0);
444        let log = Rc::new(RefCell::new(Vec::new()));
445
446        let log1 = Rc::clone(&log);
447        let _s1 = obs.subscribe(move |_| log1.borrow_mut().push('A'));
448
449        let log2 = Rc::clone(&log);
450        let _s2 = obs.subscribe(move |_| log2.borrow_mut().push('B'));
451
452        let log3 = Rc::clone(&log);
453        let _s3 = obs.subscribe(move |_| log3.borrow_mut().push('C'));
454
455        obs.set(1);
456        assert_eq!(*log.borrow(), vec!['A', 'B', 'C']);
457    }
458
459    #[test]
460    fn update_with_subscriber() {
461        let obs = Observable::new(vec![1, 2, 3]);
462        let last_len = Rc::new(Cell::new(0usize));
463        let last_clone = Rc::clone(&last_len);
464
465        let _sub = obs.subscribe(move |v: &Vec<i32>| {
466            last_clone.set(v.len());
467        });
468
469        obs.update(|v| v.push(4));
470        assert_eq!(last_len.get(), 4);
471    }
472
473    #[test]
474    fn many_set_calls_version_monotonic() {
475        let obs = Observable::new(0);
476        for i in 1..=100 {
477            obs.set(i);
478        }
479        assert_eq!(obs.version(), 100);
480        assert_eq!(obs.get(), 100);
481    }
482
483    #[test]
484    fn partial_subscriber_drop() {
485        let obs = Observable::new(0);
486        let a = Rc::new(Cell::new(0u32));
487        let b = Rc::new(Cell::new(0u32));
488        let a_clone = Rc::clone(&a);
489        let b_clone = Rc::clone(&b);
490
491        let sub_a = obs.subscribe(move |_| a_clone.set(a_clone.get() + 1));
492        let _sub_b = obs.subscribe(move |_| b_clone.set(b_clone.get() + 1));
493
494        obs.set(1);
495        assert_eq!(a.get(), 1);
496        assert_eq!(b.get(), 1);
497
498        drop(sub_a);
499
500        obs.set(2);
501        assert_eq!(a.get(), 1); // A was unsubscribed.
502        assert_eq!(b.get(), 2); // B still active.
503    }
504
505    #[test]
506    fn string_observable() {
507        let obs = Observable::new(String::new());
508        let changes = Rc::new(Cell::new(0u32));
509        let changes_clone = Rc::clone(&changes);
510
511        let _sub = obs.subscribe(move |_| changes_clone.set(changes_clone.get() + 1));
512
513        obs.set("hello".to_string());
514        obs.set("hello".to_string()); // Same, no notify.
515        obs.set("world".to_string());
516
517        assert_eq!(changes.get(), 2);
518        assert_eq!(obs.version(), 2);
519    }
520}