Skip to main content

ftui_runtime/reactive/
observable.rs

1#![forbid(unsafe_code)]
2
3//! Observable value wrapper with change notification and version tracking.
4//!
5//! # Design
6//!
7//! [`Observable<T>`] wraps a value of type `T` in shared, reference-counted
8//! storage (`Rc<RefCell<..>>`). When the value changes (determined by
9//! `PartialEq`), all live subscribers are notified in registration order.
10//!
11//! # Performance
12//!
13//! | Operation    | Complexity               |
14//! |-------------|--------------------------|
15//! | `get()`     | O(1)                     |
16//! | `set()`     | O(S) where S = subscribers |
17//! | `subscribe()` | O(1) amortized          |
18//! | Memory      | ~48 bytes + sizeof(T)    |
19//!
20//! # Failure Modes
21//!
22//! - **Re-entrant set**: Calling `set()` from within a subscriber callback
23//!   will panic (RefCell borrow rules). This is intentional: re-entrant
24//!   mutations indicate a design bug in the subscriber graph.
25//! - **Subscriber leak**: If `Subscription` guards are stored indefinitely
26//!   without being dropped, callbacks accumulate. Dead weak references are
27//!   cleaned lazily during `notify()`.
28
29use std::cell::RefCell;
30use std::rc::{Rc, Weak};
31use tracing::{info, info_span};
32use web_time::Instant;
33
34/// A subscriber callback stored as a strong `Rc` internally, handed out
35/// as `Weak` to the observable.
36type CallbackRc<T> = Rc<dyn Fn(&T)>;
37type CallbackWeak<T> = Weak<dyn Fn(&T)>;
38
39/// Shared interior for [`Observable<T>`].
40struct ObservableInner<T> {
41    value: T,
42    version: u64,
43    /// Subscribers stored as weak references. Dead entries are pruned on notify.
44    subscribers: Vec<CallbackWeak<T>>,
45}
46
47/// A shared, version-tracked value with change notification.
48///
49/// Cloning an `Observable` creates a new handle to the **same** inner state —
50/// both handles see the same value and share subscribers.
51///
52/// # Invariants
53///
54/// 1. `version` increments by exactly 1 on each value-changing mutation.
55/// 2. `set(v)` where `v == current` is a no-op.
56/// 3. Subscribers are notified in registration order.
57/// 4. Dead subscribers (dropped [`Subscription`] guards) are pruned lazily.
58pub struct Observable<T> {
59    inner: Rc<RefCell<ObservableInner<T>>>,
60}
61
62// Manual Clone: shares the same Rc.
63impl<T> Clone for Observable<T> {
64    fn clone(&self) -> Self {
65        Self {
66            inner: Rc::clone(&self.inner),
67        }
68    }
69}
70
71impl<T: std::fmt::Debug> std::fmt::Debug for Observable<T> {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        let inner = self.inner.borrow();
74        f.debug_struct("Observable")
75            .field("value", &inner.value)
76            .field("version", &inner.version)
77            .field("subscriber_count", &inner.subscribers.len())
78            .finish()
79    }
80}
81
82impl<T: Clone + PartialEq + 'static> Observable<T> {
83    /// Create a new observable with the given initial value.
84    ///
85    /// The initial version is 0 and no subscribers are registered.
86    #[must_use]
87    pub fn new(value: T) -> Self {
88        Self {
89            inner: Rc::new(RefCell::new(ObservableInner {
90                value,
91                version: 0,
92                subscribers: Vec::new(),
93            })),
94        }
95    }
96
97    /// Get a clone of the current value.
98    #[must_use]
99    pub fn get(&self) -> T {
100        self.inner.borrow().value.clone()
101    }
102
103    /// Access the current value by reference without cloning.
104    ///
105    /// The closure `f` receives an immutable reference to the value.
106    pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
107        f(&self.inner.borrow().value)
108    }
109
110    /// Set a new value. If the new value differs from the current value
111    /// (by `PartialEq`), the version is incremented and all live subscribers
112    /// are notified.
113    ///
114    /// This method is safe to call re-entrantly from within subscriber callbacks.
115    pub fn set(&self, value: T) {
116        let changed = {
117            let mut inner = self.inner.borrow_mut();
118            if inner.value == value {
119                return;
120            }
121            inner.value = value;
122            inner.version += 1;
123            true
124        };
125        if changed {
126            self.notify();
127        }
128    }
129
130    /// Modify the value in place via a closure. If the value changes
131    /// (compared by `PartialEq` against a snapshot), the version is
132    /// incremented and subscribers are notified.
133    ///
134    /// This method is safe to call re-entrantly from within subscriber callbacks.
135    pub fn update(&self, f: impl FnOnce(&mut T)) {
136        let changed = {
137            let mut inner = self.inner.borrow_mut();
138            let old = inner.value.clone();
139            f(&mut inner.value);
140            if inner.value != old {
141                inner.version += 1;
142                true
143            } else {
144                false
145            }
146        };
147        if changed {
148            self.notify();
149        }
150    }
151
152    /// Subscribe to value changes. The callback is invoked with a reference
153    /// to the new value each time it changes.
154    ///
155    /// Returns a [`Subscription`] guard. Dropping the guard unsubscribes
156    /// the callback (it will not be called after drop, though it may still
157    /// be in the subscriber list until the next `notify()` prunes it).
158    pub fn subscribe(&self, callback: impl Fn(&T) + 'static) -> Subscription {
159        let strong: CallbackRc<T> = Rc::new(callback);
160        let weak = Rc::downgrade(&strong);
161        self.inner.borrow_mut().subscribers.push(weak);
162        // Wrap in a holder struct that can be type-erased as `dyn Any`,
163        // since `Rc<dyn Fn(&T)>` itself cannot directly coerce to `Rc<dyn Any>`.
164        Subscription {
165            _guard: Box::new(strong),
166        }
167    }
168
169    /// Current version number. Increments by 1 on each value-changing
170    /// mutation. Useful for dirty-checking in render loops.
171    #[must_use]
172    pub fn version(&self) -> u64 {
173        self.inner.borrow().version
174    }
175
176    /// Number of currently registered subscribers (including dead ones
177    /// not yet pruned).
178    #[must_use]
179    pub fn subscriber_count(&self) -> usize {
180        self.inner.borrow().subscribers.len()
181    }
182
183    /// Notify live subscribers and prune dead ones.
184    ///
185    /// If a batch scope is active (see [`super::batch::BatchScope`]),
186    /// notifications are deferred until the batch exits.
187    fn notify(&self) {
188        // Collect live callbacks first (to avoid holding the borrow during calls).
189        let callbacks: Vec<CallbackRc<T>> = {
190            let mut inner = self.inner.borrow_mut();
191            // Prune dead weak refs and collect live ones.
192            inner.subscribers.retain(|w| w.strong_count() > 0);
193            inner
194                .subscribers
195                .iter()
196                .filter_map(|w| w.upgrade())
197                .collect()
198        };
199
200        if callbacks.is_empty() {
201            return;
202        }
203
204        let widgets_invalidated = callbacks.len() as u64;
205
206        if super::batch::is_batching() {
207            super::batch::record_rows_changed(1);
208            // Defer each callback to the batch queue.
209            for cb in callbacks {
210                let callback_key = Rc::as_ptr(&cb) as *const () as usize;
211                let source = self.clone();
212                super::batch::defer_or_run_keyed(callback_key, move || {
213                    let latest = source.get();
214                    cb(&latest);
215                });
216            }
217            return;
218        }
219
220        // Clone the value once for all callbacks.
221        let value = self.inner.borrow().value.clone();
222        let propagation_start = Instant::now();
223        let _span = info_span!(
224            "bloodstream.delta",
225            rows_changed = 1_u64,
226            widgets_invalidated,
227            duration_us = tracing::field::Empty
228        )
229        .entered();
230
231        // Fire immediately.
232        for cb in &callbacks {
233            cb(&value);
234        }
235
236        let duration_us = propagation_start.elapsed().as_micros() as u64;
237        tracing::Span::current().record("duration_us", duration_us);
238        info!(
239            bloodstream_propagation_duration_us = duration_us,
240            rows_changed = 1_u64,
241            widgets_invalidated,
242            "bloodstream propagation duration histogram"
243        );
244    }
245}
246
247/// RAII guard for a subscriber callback.
248///
249/// Dropping the `Subscription` causes the associated callback to become
250/// unreachable (the strong `Rc` is dropped, so the `Weak` in the
251/// observable's subscriber list will fail to upgrade on the next
252/// notification cycle).
253pub struct Subscription {
254    /// Type-erased strong reference keeping the callback `Rc` alive.
255    /// When this `Box<dyn Any>` is dropped, the inner `Rc<dyn Fn(&T)>`
256    /// is dropped, and the corresponding `Weak` in the subscriber list
257    /// loses its referent.
258    _guard: Box<dyn std::any::Any>,
259}
260
261impl std::fmt::Debug for Subscription {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        f.debug_struct("Subscription").finish_non_exhaustive()
264    }
265}
266
267// ---------------------------------------------------------------------------
268// Tests
269// ---------------------------------------------------------------------------
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use std::cell::Cell;
275    use std::sync::atomic::{AtomicU64, Ordering};
276    use std::sync::{Arc, Mutex};
277    use tracing::field::{Field, Visit};
278
279    #[derive(Clone, Debug, PartialEq, Eq)]
280    struct TableSnapshot {
281        schema_version: u64,
282        rows: Vec<String>,
283    }
284
285    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
286    enum RenderMode {
287        PartialDelta,
288        FullRerender,
289    }
290
291    fn classify_render_mode(previous: &TableSnapshot, next: &TableSnapshot) -> RenderMode {
292        if previous.schema_version != next.schema_version {
293            RenderMode::FullRerender
294        } else {
295            RenderMode::PartialDelta
296        }
297    }
298
299    #[derive(Default)]
300    struct DeltaSpanVisitor {
301        rows_changed: Option<u64>,
302        widgets_invalidated: Option<u64>,
303    }
304
305    impl Visit for DeltaSpanVisitor {
306        fn record_u64(&mut self, field: &Field, value: u64) {
307            match field.name() {
308                "rows_changed" => self.rows_changed = Some(value),
309                "widgets_invalidated" => self.widgets_invalidated = Some(value),
310                _ => {}
311            }
312        }
313
314        fn record_i64(&mut self, field: &Field, value: i64) {
315            if value < 0 {
316                return;
317            }
318            self.record_u64(field, value as u64);
319        }
320
321        fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
322    }
323
324    struct DeltaSpanSubscriber {
325        next_id: AtomicU64,
326        spans: Arc<Mutex<Vec<(u64, u64)>>>,
327    }
328
329    impl tracing::Subscriber for DeltaSpanSubscriber {
330        fn enabled(&self, _metadata: &tracing::Metadata<'_>) -> bool {
331            true
332        }
333
334        fn new_span(&self, attrs: &tracing::span::Attributes<'_>) -> tracing::span::Id {
335            if attrs.metadata().name() == "bloodstream.delta" {
336                let mut visitor = DeltaSpanVisitor::default();
337                attrs.record(&mut visitor);
338                self.spans.lock().expect("span capture lock").push((
339                    visitor.rows_changed.unwrap_or(0),
340                    visitor.widgets_invalidated.unwrap_or(0),
341                ));
342            }
343            tracing::span::Id::from_u64(self.next_id.fetch_add(1, Ordering::Relaxed))
344        }
345
346        fn record(&self, _span: &tracing::span::Id, _values: &tracing::span::Record<'_>) {}
347
348        fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
349
350        fn event(&self, _event: &tracing::Event<'_>) {}
351
352        fn enter(&self, _span: &tracing::span::Id) {}
353
354        fn exit(&self, _span: &tracing::span::Id) {}
355    }
356
357    fn capture_delta_spans(run: impl FnOnce()) -> Vec<(u64, u64)> {
358        let spans = Arc::new(Mutex::new(Vec::new()));
359        let subscriber = DeltaSpanSubscriber {
360            next_id: AtomicU64::new(1),
361            spans: Arc::clone(&spans),
362        };
363        let _guard = tracing::subscriber::set_default(subscriber);
364        run();
365        spans.lock().expect("span capture lock").clone()
366    }
367
368    #[test]
369    fn get_set_basic() {
370        let obs = Observable::new(42);
371        assert_eq!(obs.get(), 42);
372        assert_eq!(obs.version(), 0);
373
374        obs.set(99);
375        assert_eq!(obs.get(), 99);
376        assert_eq!(obs.version(), 1);
377    }
378
379    #[test]
380    fn no_change_no_version_bump() {
381        let obs = Observable::new(42);
382        obs.set(42); // Same value.
383        assert_eq!(obs.version(), 0);
384    }
385
386    #[test]
387    fn with_access() {
388        let obs = Observable::new(vec![1, 2, 3]);
389        let sum = obs.with(|v| v.iter().sum::<i32>());
390        assert_eq!(sum, 6);
391    }
392
393    #[test]
394    fn update_mutates_in_place() {
395        let obs = Observable::new(vec![1, 2, 3]);
396        obs.update(|v| v.push(4));
397        assert_eq!(obs.get(), vec![1, 2, 3, 4]);
398        assert_eq!(obs.version(), 1);
399    }
400
401    #[test]
402    fn update_no_change_no_bump() {
403        let obs = Observable::new(10);
404        obs.update(|v| {
405            *v = 10; // Same value.
406        });
407        assert_eq!(obs.version(), 0);
408    }
409
410    #[test]
411    fn change_notification() {
412        let obs = Observable::new(0);
413        let count = Rc::new(Cell::new(0u32));
414        let count_clone = Rc::clone(&count);
415
416        let _sub = obs.subscribe(move |_val| {
417            count_clone.set(count_clone.get() + 1);
418        });
419
420        obs.set(1);
421        assert_eq!(count.get(), 1);
422
423        obs.set(2);
424        assert_eq!(count.get(), 2);
425
426        // Same value — no notification.
427        obs.set(2);
428        assert_eq!(count.get(), 2);
429    }
430
431    #[test]
432    fn subscriber_receives_new_value() {
433        let obs = Observable::new(0);
434        let last_seen = Rc::new(Cell::new(0));
435        let last_clone = Rc::clone(&last_seen);
436
437        let _sub = obs.subscribe(move |val| {
438            last_clone.set(*val);
439        });
440
441        obs.set(42);
442        assert_eq!(last_seen.get(), 42);
443
444        obs.set(99);
445        assert_eq!(last_seen.get(), 99);
446    }
447
448    #[test]
449    fn subscription_drop_unsubscribes() {
450        let obs = Observable::new(0);
451        let count = Rc::new(Cell::new(0u32));
452        let count_clone = Rc::clone(&count);
453
454        let sub = obs.subscribe(move |_val| {
455            count_clone.set(count_clone.get() + 1);
456        });
457
458        obs.set(1);
459        assert_eq!(count.get(), 1);
460
461        drop(sub);
462
463        obs.set(2);
464        // Callback should NOT have been called.
465        assert_eq!(count.get(), 1);
466    }
467
468    #[test]
469    fn multiple_subscribers() {
470        let obs = Observable::new(0);
471        let a = Rc::new(Cell::new(0u32));
472        let b = Rc::new(Cell::new(0u32));
473        let a_clone = Rc::clone(&a);
474        let b_clone = Rc::clone(&b);
475
476        let _sub_a = obs.subscribe(move |_| a_clone.set(a_clone.get() + 1));
477        let _sub_b = obs.subscribe(move |_| b_clone.set(b_clone.get() + 1));
478
479        obs.set(1);
480        assert_eq!(a.get(), 1);
481        assert_eq!(b.get(), 1);
482
483        obs.set(2);
484        assert_eq!(a.get(), 2);
485        assert_eq!(b.get(), 2);
486    }
487
488    #[test]
489    fn version_increment() {
490        let obs = Observable::new("hello".to_string());
491        assert_eq!(obs.version(), 0);
492
493        obs.set("world".to_string());
494        assert_eq!(obs.version(), 1);
495
496        obs.set("!".to_string());
497        assert_eq!(obs.version(), 2);
498
499        // Same value, no increment.
500        obs.set("!".to_string());
501        assert_eq!(obs.version(), 2);
502    }
503
504    #[test]
505    fn clone_shares_state() {
506        let obs1 = Observable::new(0);
507        let obs2 = obs1.clone();
508
509        obs1.set(42);
510        assert_eq!(obs2.get(), 42);
511        assert_eq!(obs2.version(), 1);
512
513        obs2.set(99);
514        assert_eq!(obs1.get(), 99);
515        assert_eq!(obs1.version(), 2);
516    }
517
518    #[test]
519    fn clone_shares_subscribers() {
520        let obs1 = Observable::new(0);
521        let count = Rc::new(Cell::new(0u32));
522        let count_clone = Rc::clone(&count);
523
524        let _sub = obs1.subscribe(move |_| count_clone.set(count_clone.get() + 1));
525
526        let obs2 = obs1.clone();
527        obs2.set(1);
528        assert_eq!(count.get(), 1); // Subscriber sees change via clone.
529    }
530
531    #[test]
532    fn subscriber_count() {
533        let obs = Observable::new(0);
534        assert_eq!(obs.subscriber_count(), 0);
535
536        let _s1 = obs.subscribe(|_| {});
537        assert_eq!(obs.subscriber_count(), 1);
538
539        let s2 = obs.subscribe(|_| {});
540        assert_eq!(obs.subscriber_count(), 2);
541
542        drop(s2);
543        // Dead subscriber not yet pruned.
544        assert_eq!(obs.subscriber_count(), 2);
545
546        // Trigger notify to prune dead.
547        obs.set(1);
548        assert_eq!(obs.subscriber_count(), 1);
549    }
550
551    #[test]
552    fn debug_format() {
553        let obs = Observable::new(42);
554        let dbg = format!("{:?}", obs);
555        assert!(dbg.contains("Observable"));
556        assert!(dbg.contains("42"));
557        assert!(dbg.contains("version"));
558    }
559
560    #[test]
561    fn notification_order_is_registration_order() {
562        let obs = Observable::new(0);
563        let log = Rc::new(RefCell::new(Vec::new()));
564
565        let log1 = Rc::clone(&log);
566        let _s1 = obs.subscribe(move |_| log1.borrow_mut().push('A'));
567
568        let log2 = Rc::clone(&log);
569        let _s2 = obs.subscribe(move |_| log2.borrow_mut().push('B'));
570
571        let log3 = Rc::clone(&log);
572        let _s3 = obs.subscribe(move |_| log3.borrow_mut().push('C'));
573
574        obs.set(1);
575        assert_eq!(*log.borrow(), vec!['A', 'B', 'C']);
576    }
577
578    #[test]
579    fn update_with_subscriber() {
580        let obs = Observable::new(vec![1, 2, 3]);
581        let last_len = Rc::new(Cell::new(0usize));
582        let last_clone = Rc::clone(&last_len);
583
584        let _sub = obs.subscribe(move |v: &Vec<i32>| {
585            last_clone.set(v.len());
586        });
587
588        obs.update(|v| v.push(4));
589        assert_eq!(last_len.get(), 4);
590    }
591
592    #[test]
593    fn many_set_calls_version_monotonic() {
594        let obs = Observable::new(0);
595        for i in 1..=100 {
596            obs.set(i);
597        }
598        assert_eq!(obs.version(), 100);
599        assert_eq!(obs.get(), 100);
600    }
601
602    #[test]
603    fn partial_subscriber_drop() {
604        let obs = Observable::new(0);
605        let a = Rc::new(Cell::new(0u32));
606        let b = Rc::new(Cell::new(0u32));
607        let a_clone = Rc::clone(&a);
608        let b_clone = Rc::clone(&b);
609
610        let sub_a = obs.subscribe(move |_| a_clone.set(a_clone.get() + 1));
611        let _sub_b = obs.subscribe(move |_| b_clone.set(b_clone.get() + 1));
612
613        obs.set(1);
614        assert_eq!(a.get(), 1);
615        assert_eq!(b.get(), 1);
616
617        drop(sub_a);
618
619        obs.set(2);
620        assert_eq!(a.get(), 1); // A was unsubscribed.
621        assert_eq!(b.get(), 2); // B still active.
622    }
623
624    #[test]
625    fn single_row_change_propagates_only_to_bound_widgets() {
626        let row_a = Observable::new(vec!["a".to_string()]);
627        let row_b = Observable::new(vec!["b".to_string()]);
628        let a_hits = Rc::new(Cell::new(0u32));
629        let b_hits = Rc::new(Cell::new(0u32));
630        let a_hits_clone = Rc::clone(&a_hits);
631        let b_hits_clone = Rc::clone(&b_hits);
632
633        let _sub_a = row_a.subscribe(move |_| a_hits_clone.set(a_hits_clone.get() + 1));
634        let _sub_b = row_b.subscribe(move |_| b_hits_clone.set(b_hits_clone.get() + 1));
635
636        row_a.set(vec!["a2".to_string()]);
637        assert_eq!(a_hits.get(), 1, "bound row-A widget should be invalidated");
638        assert_eq!(
639            b_hits.get(),
640            0,
641            "unbound row-B widget should remain untouched"
642        );
643    }
644
645    #[test]
646    fn batch_delta_propagates_atomically_without_stale_intermediate_values() {
647        let rows = Observable::new(vec!["r0".to_string()]);
648        let seen = Rc::new(RefCell::new(Vec::<Vec<String>>::new()));
649        let seen_clone = Rc::clone(&seen);
650        let _sub = rows.subscribe(move |current| seen_clone.borrow_mut().push(current.clone()));
651
652        {
653            let _batch = crate::reactive::batch::BatchScope::new();
654            rows.set(vec!["r1".to_string()]);
655            rows.set(vec!["r1".to_string(), "r2".to_string()]);
656            rows.update(|current| current.push("r3".to_string()));
657            assert!(
658                seen.borrow().is_empty(),
659                "callbacks must be deferred until batch exit"
660            );
661        }
662
663        let snapshots = seen.borrow();
664        assert_eq!(
665            snapshots.len(),
666            1,
667            "batched updates should coalesce to one invalidation"
668        );
669        assert_eq!(
670            snapshots[0],
671            vec!["r1".to_string(), "r2".to_string(), "r3".to_string()],
672            "subscriber must observe only final state"
673        );
674    }
675
676    #[test]
677    fn unbound_table_updates_produce_no_bloodstream_delta() {
678        let table_rows = Observable::new(vec!["old".to_string()]);
679        let spans = capture_delta_spans(|| {
680            table_rows.set(vec!["new".to_string()]);
681        });
682        assert!(
683            spans.is_empty(),
684            "unbound table updates should not emit bloodstream deltas"
685        );
686    }
687
688    #[test]
689    fn bloodstream_delta_span_reports_rows_changed_and_widgets_invalidated() {
690        let table_rows = Observable::new(vec!["old".to_string()]);
691        let _sub_a = table_rows.subscribe(|_| {});
692        let _sub_b = table_rows.subscribe(|_| {});
693
694        let spans = capture_delta_spans(|| {
695            table_rows.set(vec!["new".to_string()]);
696        });
697        assert_eq!(
698            spans,
699            vec![(1, 2)],
700            "single-row change should report one row and two invalidated widgets"
701        );
702    }
703
704    #[test]
705    fn schema_change_requires_full_rerender_not_partial_delta() {
706        let table = Observable::new(TableSnapshot {
707            schema_version: 1,
708            rows: vec!["alpha".to_string()],
709        });
710        let previous = Rc::new(RefCell::new(Some(table.get())));
711        let decisions = Rc::new(RefCell::new(Vec::<RenderMode>::new()));
712        let previous_clone = Rc::clone(&previous);
713        let decisions_clone = Rc::clone(&decisions);
714
715        let _sub = table.subscribe(move |next| {
716            let mut prev = previous_clone.borrow_mut();
717            let current_mode =
718                classify_render_mode(prev.as_ref().expect("previous snapshot available"), next);
719            decisions_clone.borrow_mut().push(current_mode);
720            *prev = Some(next.clone());
721        });
722
723        table.set(TableSnapshot {
724            schema_version: 1,
725            rows: vec!["alpha".to_string(), "beta".to_string()],
726        });
727        table.set(TableSnapshot {
728            schema_version: 2,
729            rows: vec!["alpha".to_string(), "beta".to_string()],
730        });
731
732        assert_eq!(
733            *decisions.borrow(),
734            vec![RenderMode::PartialDelta, RenderMode::FullRerender],
735            "schema-version changes must force full rerender semantics"
736        );
737    }
738
739    #[test]
740    fn string_observable() {
741        let obs = Observable::new(String::new());
742        let changes = Rc::new(Cell::new(0u32));
743        let changes_clone = Rc::clone(&changes);
744
745        let _sub = obs.subscribe(move |_| changes_clone.set(changes_clone.get() + 1));
746
747        obs.set("hello".to_string());
748        obs.set("hello".to_string()); // Same, no notify.
749        obs.set("world".to_string());
750
751        assert_eq!(changes.get(), 2);
752        assert_eq!(obs.version(), 2);
753    }
754}