ftui_runtime/reactive/
observable.rs1#![forbid(unsafe_code)]
2
3use std::cell::RefCell;
30use std::rc::{Rc, Weak};
31
32type CallbackRc<T> = Rc<dyn Fn(&T)>;
35type CallbackWeak<T> = Weak<dyn Fn(&T)>;
36
37struct ObservableInner<T> {
39 value: T,
40 version: u64,
41 subscribers: Vec<CallbackWeak<T>>,
43}
44
45pub struct Observable<T> {
57 inner: Rc<RefCell<ObservableInner<T>>>,
58}
59
60impl<T> Clone for Observable<T> {
62 fn clone(&self) -> Self {
63 Self {
64 inner: Rc::clone(&self.inner),
65 }
66 }
67}
68
69impl<T: std::fmt::Debug> std::fmt::Debug for Observable<T> {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 let inner = self.inner.borrow();
72 f.debug_struct("Observable")
73 .field("value", &inner.value)
74 .field("version", &inner.version)
75 .field("subscriber_count", &inner.subscribers.len())
76 .finish()
77 }
78}
79
80impl<T: Clone + PartialEq + 'static> Observable<T> {
81 #[must_use]
85 pub fn new(value: T) -> Self {
86 Self {
87 inner: Rc::new(RefCell::new(ObservableInner {
88 value,
89 version: 0,
90 subscribers: Vec::new(),
91 })),
92 }
93 }
94
95 #[must_use]
97 pub fn get(&self) -> T {
98 self.inner.borrow().value.clone()
99 }
100
101 pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
105 f(&self.inner.borrow().value)
106 }
107
108 pub fn set(&self, value: T) {
114 let changed = {
115 let mut inner = self.inner.borrow_mut();
116 if inner.value == value {
117 return;
118 }
119 inner.value = value;
120 inner.version += 1;
121 true
122 };
123 if changed {
124 self.notify();
125 }
126 }
127
128 pub fn update(&self, f: impl FnOnce(&mut T)) {
134 let changed = {
135 let mut inner = self.inner.borrow_mut();
136 let old = inner.value.clone();
137 f(&mut inner.value);
138 if inner.value != old {
139 inner.version += 1;
140 true
141 } else {
142 false
143 }
144 };
145 if changed {
146 self.notify();
147 }
148 }
149
150 pub fn subscribe(&self, callback: impl Fn(&T) + 'static) -> Subscription {
157 let strong: CallbackRc<T> = Rc::new(callback);
158 let weak = Rc::downgrade(&strong);
159 self.inner.borrow_mut().subscribers.push(weak);
160 Subscription {
163 _guard: Box::new(strong),
164 }
165 }
166
167 #[must_use]
170 pub fn version(&self) -> u64 {
171 self.inner.borrow().version
172 }
173
174 #[must_use]
177 pub fn subscriber_count(&self) -> usize {
178 self.inner.borrow().subscribers.len()
179 }
180
181 fn notify(&self) {
186 let callbacks: Vec<CallbackRc<T>> = {
188 let mut inner = self.inner.borrow_mut();
189 inner.subscribers.retain(|w| w.strong_count() > 0);
191 inner
192 .subscribers
193 .iter()
194 .filter_map(|w| w.upgrade())
195 .collect()
196 };
197
198 if callbacks.is_empty() {
199 return;
200 }
201
202 let value = self.inner.borrow().value.clone();
204
205 if super::batch::is_batching() {
206 for cb in callbacks {
208 let v = value.clone();
209 super::batch::defer_or_run(move || cb(&v));
210 }
211 } else {
212 for cb in &callbacks {
214 cb(&value);
215 }
216 }
217 }
218}
219
220pub struct Subscription {
227 _guard: Box<dyn std::any::Any>,
232}
233
234impl std::fmt::Debug for Subscription {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 f.debug_struct("Subscription").finish_non_exhaustive()
237 }
238}
239
240#[cfg(test)]
245mod tests {
246 use super::*;
247 use std::cell::Cell;
248
249 #[test]
250 fn get_set_basic() {
251 let obs = Observable::new(42);
252 assert_eq!(obs.get(), 42);
253 assert_eq!(obs.version(), 0);
254
255 obs.set(99);
256 assert_eq!(obs.get(), 99);
257 assert_eq!(obs.version(), 1);
258 }
259
260 #[test]
261 fn no_change_no_version_bump() {
262 let obs = Observable::new(42);
263 obs.set(42); assert_eq!(obs.version(), 0);
265 }
266
267 #[test]
268 fn with_access() {
269 let obs = Observable::new(vec![1, 2, 3]);
270 let sum = obs.with(|v| v.iter().sum::<i32>());
271 assert_eq!(sum, 6);
272 }
273
274 #[test]
275 fn update_mutates_in_place() {
276 let obs = Observable::new(vec![1, 2, 3]);
277 obs.update(|v| v.push(4));
278 assert_eq!(obs.get(), vec![1, 2, 3, 4]);
279 assert_eq!(obs.version(), 1);
280 }
281
282 #[test]
283 fn update_no_change_no_bump() {
284 let obs = Observable::new(10);
285 obs.update(|v| {
286 *v = 10; });
288 assert_eq!(obs.version(), 0);
289 }
290
291 #[test]
292 fn change_notification() {
293 let obs = Observable::new(0);
294 let count = Rc::new(Cell::new(0u32));
295 let count_clone = Rc::clone(&count);
296
297 let _sub = obs.subscribe(move |_val| {
298 count_clone.set(count_clone.get() + 1);
299 });
300
301 obs.set(1);
302 assert_eq!(count.get(), 1);
303
304 obs.set(2);
305 assert_eq!(count.get(), 2);
306
307 obs.set(2);
309 assert_eq!(count.get(), 2);
310 }
311
312 #[test]
313 fn subscriber_receives_new_value() {
314 let obs = Observable::new(0);
315 let last_seen = Rc::new(Cell::new(0));
316 let last_clone = Rc::clone(&last_seen);
317
318 let _sub = obs.subscribe(move |val| {
319 last_clone.set(*val);
320 });
321
322 obs.set(42);
323 assert_eq!(last_seen.get(), 42);
324
325 obs.set(99);
326 assert_eq!(last_seen.get(), 99);
327 }
328
329 #[test]
330 fn subscription_drop_unsubscribes() {
331 let obs = Observable::new(0);
332 let count = Rc::new(Cell::new(0u32));
333 let count_clone = Rc::clone(&count);
334
335 let sub = obs.subscribe(move |_val| {
336 count_clone.set(count_clone.get() + 1);
337 });
338
339 obs.set(1);
340 assert_eq!(count.get(), 1);
341
342 drop(sub);
343
344 obs.set(2);
345 assert_eq!(count.get(), 1);
347 }
348
349 #[test]
350 fn multiple_subscribers() {
351 let obs = Observable::new(0);
352 let a = Rc::new(Cell::new(0u32));
353 let b = Rc::new(Cell::new(0u32));
354 let a_clone = Rc::clone(&a);
355 let b_clone = Rc::clone(&b);
356
357 let _sub_a = obs.subscribe(move |_| a_clone.set(a_clone.get() + 1));
358 let _sub_b = obs.subscribe(move |_| b_clone.set(b_clone.get() + 1));
359
360 obs.set(1);
361 assert_eq!(a.get(), 1);
362 assert_eq!(b.get(), 1);
363
364 obs.set(2);
365 assert_eq!(a.get(), 2);
366 assert_eq!(b.get(), 2);
367 }
368
369 #[test]
370 fn version_increment() {
371 let obs = Observable::new("hello".to_string());
372 assert_eq!(obs.version(), 0);
373
374 obs.set("world".to_string());
375 assert_eq!(obs.version(), 1);
376
377 obs.set("!".to_string());
378 assert_eq!(obs.version(), 2);
379
380 obs.set("!".to_string());
382 assert_eq!(obs.version(), 2);
383 }
384
385 #[test]
386 fn clone_shares_state() {
387 let obs1 = Observable::new(0);
388 let obs2 = obs1.clone();
389
390 obs1.set(42);
391 assert_eq!(obs2.get(), 42);
392 assert_eq!(obs2.version(), 1);
393
394 obs2.set(99);
395 assert_eq!(obs1.get(), 99);
396 assert_eq!(obs1.version(), 2);
397 }
398
399 #[test]
400 fn clone_shares_subscribers() {
401 let obs1 = Observable::new(0);
402 let count = Rc::new(Cell::new(0u32));
403 let count_clone = Rc::clone(&count);
404
405 let _sub = obs1.subscribe(move |_| count_clone.set(count_clone.get() + 1));
406
407 let obs2 = obs1.clone();
408 obs2.set(1);
409 assert_eq!(count.get(), 1); }
411
412 #[test]
413 fn subscriber_count() {
414 let obs = Observable::new(0);
415 assert_eq!(obs.subscriber_count(), 0);
416
417 let _s1 = obs.subscribe(|_| {});
418 assert_eq!(obs.subscriber_count(), 1);
419
420 let s2 = obs.subscribe(|_| {});
421 assert_eq!(obs.subscriber_count(), 2);
422
423 drop(s2);
424 assert_eq!(obs.subscriber_count(), 2);
426
427 obs.set(1);
429 assert_eq!(obs.subscriber_count(), 1);
430 }
431
432 #[test]
433 fn debug_format() {
434 let obs = Observable::new(42);
435 let dbg = format!("{:?}", obs);
436 assert!(dbg.contains("Observable"));
437 assert!(dbg.contains("42"));
438 assert!(dbg.contains("version"));
439 }
440
441 #[test]
442 fn notification_order_is_registration_order() {
443 let obs = Observable::new(0);
444 let log = Rc::new(RefCell::new(Vec::new()));
445
446 let log1 = Rc::clone(&log);
447 let _s1 = obs.subscribe(move |_| log1.borrow_mut().push('A'));
448
449 let log2 = Rc::clone(&log);
450 let _s2 = obs.subscribe(move |_| log2.borrow_mut().push('B'));
451
452 let log3 = Rc::clone(&log);
453 let _s3 = obs.subscribe(move |_| log3.borrow_mut().push('C'));
454
455 obs.set(1);
456 assert_eq!(*log.borrow(), vec!['A', 'B', 'C']);
457 }
458
459 #[test]
460 fn update_with_subscriber() {
461 let obs = Observable::new(vec![1, 2, 3]);
462 let last_len = Rc::new(Cell::new(0usize));
463 let last_clone = Rc::clone(&last_len);
464
465 let _sub = obs.subscribe(move |v: &Vec<i32>| {
466 last_clone.set(v.len());
467 });
468
469 obs.update(|v| v.push(4));
470 assert_eq!(last_len.get(), 4);
471 }
472
473 #[test]
474 fn many_set_calls_version_monotonic() {
475 let obs = Observable::new(0);
476 for i in 1..=100 {
477 obs.set(i);
478 }
479 assert_eq!(obs.version(), 100);
480 assert_eq!(obs.get(), 100);
481 }
482
483 #[test]
484 fn partial_subscriber_drop() {
485 let obs = Observable::new(0);
486 let a = Rc::new(Cell::new(0u32));
487 let b = Rc::new(Cell::new(0u32));
488 let a_clone = Rc::clone(&a);
489 let b_clone = Rc::clone(&b);
490
491 let sub_a = obs.subscribe(move |_| a_clone.set(a_clone.get() + 1));
492 let _sub_b = obs.subscribe(move |_| b_clone.set(b_clone.get() + 1));
493
494 obs.set(1);
495 assert_eq!(a.get(), 1);
496 assert_eq!(b.get(), 1);
497
498 drop(sub_a);
499
500 obs.set(2);
501 assert_eq!(a.get(), 1); assert_eq!(b.get(), 2); }
504
505 #[test]
506 fn string_observable() {
507 let obs = Observable::new(String::new());
508 let changes = Rc::new(Cell::new(0u32));
509 let changes_clone = Rc::clone(&changes);
510
511 let _sub = obs.subscribe(move |_| changes_clone.set(changes_clone.get() + 1));
512
513 obs.set("hello".to_string());
514 obs.set("hello".to_string()); obs.set("world".to_string());
516
517 assert_eq!(changes.get(), 2);
518 assert_eq!(obs.version(), 2);
519 }
520}