1#![forbid(unsafe_code)]
2
3use std::cell::RefCell;
30use std::rc::{Rc, Weak};
31use tracing::{info, info_span};
32use web_time::Instant;
33
34type CallbackRc<T> = Rc<dyn Fn(&T)>;
37type CallbackWeak<T> = Weak<dyn Fn(&T)>;
38
39struct ObservableInner<T> {
41 value: T,
42 version: u64,
43 subscribers: Vec<CallbackWeak<T>>,
45}
46
47pub struct Observable<T> {
59 inner: Rc<RefCell<ObservableInner<T>>>,
60}
61
62impl<T> Clone for Observable<T> {
64 fn clone(&self) -> Self {
65 Self {
66 inner: Rc::clone(&self.inner),
67 }
68 }
69}
70
71impl<T: std::fmt::Debug> std::fmt::Debug for Observable<T> {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 let inner = self.inner.borrow();
74 f.debug_struct("Observable")
75 .field("value", &inner.value)
76 .field("version", &inner.version)
77 .field("subscriber_count", &inner.subscribers.len())
78 .finish()
79 }
80}
81
82impl<T: Clone + PartialEq + 'static> Observable<T> {
83 #[must_use]
87 pub fn new(value: T) -> Self {
88 Self {
89 inner: Rc::new(RefCell::new(ObservableInner {
90 value,
91 version: 0,
92 subscribers: Vec::new(),
93 })),
94 }
95 }
96
97 #[must_use]
99 pub fn get(&self) -> T {
100 self.inner.borrow().value.clone()
101 }
102
103 pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
107 f(&self.inner.borrow().value)
108 }
109
110 pub fn set(&self, value: T) {
116 let changed = {
117 let mut inner = self.inner.borrow_mut();
118 if inner.value == value {
119 return;
120 }
121 inner.value = value;
122 inner.version += 1;
123 true
124 };
125 if changed {
126 self.notify();
127 }
128 }
129
130 pub fn update(&self, f: impl FnOnce(&mut T)) {
136 let changed = {
137 let mut inner = self.inner.borrow_mut();
138 let old = inner.value.clone();
139 f(&mut inner.value);
140 if inner.value != old {
141 inner.version += 1;
142 true
143 } else {
144 false
145 }
146 };
147 if changed {
148 self.notify();
149 }
150 }
151
152 pub fn subscribe(&self, callback: impl Fn(&T) + 'static) -> Subscription {
159 let strong: CallbackRc<T> = Rc::new(callback);
160 let weak = Rc::downgrade(&strong);
161 self.inner.borrow_mut().subscribers.push(weak);
162 Subscription {
165 _guard: Box::new(strong),
166 }
167 }
168
169 #[must_use]
172 pub fn version(&self) -> u64 {
173 self.inner.borrow().version
174 }
175
176 #[must_use]
179 pub fn subscriber_count(&self) -> usize {
180 self.inner.borrow().subscribers.len()
181 }
182
183 fn notify(&self) {
188 let callbacks: Vec<CallbackRc<T>> = {
190 let mut inner = self.inner.borrow_mut();
191 inner.subscribers.retain(|w| w.strong_count() > 0);
193 inner
194 .subscribers
195 .iter()
196 .filter_map(|w| w.upgrade())
197 .collect()
198 };
199
200 if callbacks.is_empty() {
201 return;
202 }
203
204 let widgets_invalidated = callbacks.len() as u64;
205
206 if super::batch::is_batching() {
207 super::batch::record_rows_changed(1);
208 for cb in callbacks {
210 let callback_key = Rc::as_ptr(&cb) as *const () as usize;
211 let source = self.clone();
212 super::batch::defer_or_run_keyed(callback_key, move || {
213 let latest = source.get();
214 cb(&latest);
215 });
216 }
217 return;
218 }
219
220 let value = self.inner.borrow().value.clone();
222 let propagation_start = Instant::now();
223 let _span = info_span!(
224 "bloodstream.delta",
225 rows_changed = 1_u64,
226 widgets_invalidated,
227 duration_us = tracing::field::Empty
228 )
229 .entered();
230
231 for cb in &callbacks {
233 cb(&value);
234 }
235
236 let duration_us = propagation_start.elapsed().as_micros() as u64;
237 tracing::Span::current().record("duration_us", duration_us);
238 info!(
239 bloodstream_propagation_duration_us = duration_us,
240 rows_changed = 1_u64,
241 widgets_invalidated,
242 "bloodstream propagation duration histogram"
243 );
244 }
245}
246
247pub struct Subscription {
254 _guard: Box<dyn std::any::Any>,
259}
260
261impl std::fmt::Debug for Subscription {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 f.debug_struct("Subscription").finish_non_exhaustive()
264 }
265}
266
267#[cfg(test)]
272mod tests {
273 use super::*;
274 use std::cell::Cell;
275 use std::sync::atomic::{AtomicU64, Ordering};
276 use std::sync::{Arc, Mutex};
277 use tracing::field::{Field, Visit};
278
279 #[derive(Clone, Debug, PartialEq, Eq)]
280 struct TableSnapshot {
281 schema_version: u64,
282 rows: Vec<String>,
283 }
284
285 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
286 enum RenderMode {
287 PartialDelta,
288 FullRerender,
289 }
290
291 fn classify_render_mode(previous: &TableSnapshot, next: &TableSnapshot) -> RenderMode {
292 if previous.schema_version != next.schema_version {
293 RenderMode::FullRerender
294 } else {
295 RenderMode::PartialDelta
296 }
297 }
298
299 #[derive(Default)]
300 struct DeltaSpanVisitor {
301 rows_changed: Option<u64>,
302 widgets_invalidated: Option<u64>,
303 }
304
305 impl Visit for DeltaSpanVisitor {
306 fn record_u64(&mut self, field: &Field, value: u64) {
307 match field.name() {
308 "rows_changed" => self.rows_changed = Some(value),
309 "widgets_invalidated" => self.widgets_invalidated = Some(value),
310 _ => {}
311 }
312 }
313
314 fn record_i64(&mut self, field: &Field, value: i64) {
315 if value < 0 {
316 return;
317 }
318 self.record_u64(field, value as u64);
319 }
320
321 fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
322 }
323
324 struct DeltaSpanSubscriber {
325 next_id: AtomicU64,
326 spans: Arc<Mutex<Vec<(u64, u64)>>>,
327 }
328
329 impl tracing::Subscriber for DeltaSpanSubscriber {
330 fn enabled(&self, _metadata: &tracing::Metadata<'_>) -> bool {
331 true
332 }
333
334 fn new_span(&self, attrs: &tracing::span::Attributes<'_>) -> tracing::span::Id {
335 if attrs.metadata().name() == "bloodstream.delta" {
336 let mut visitor = DeltaSpanVisitor::default();
337 attrs.record(&mut visitor);
338 self.spans.lock().expect("span capture lock").push((
339 visitor.rows_changed.unwrap_or(0),
340 visitor.widgets_invalidated.unwrap_or(0),
341 ));
342 }
343 tracing::span::Id::from_u64(self.next_id.fetch_add(1, Ordering::Relaxed))
344 }
345
346 fn record(&self, _span: &tracing::span::Id, _values: &tracing::span::Record<'_>) {}
347
348 fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
349
350 fn event(&self, _event: &tracing::Event<'_>) {}
351
352 fn enter(&self, _span: &tracing::span::Id) {}
353
354 fn exit(&self, _span: &tracing::span::Id) {}
355 }
356
357 fn capture_delta_spans(run: impl FnOnce()) -> Vec<(u64, u64)> {
358 let spans = Arc::new(Mutex::new(Vec::new()));
359 let subscriber = DeltaSpanSubscriber {
360 next_id: AtomicU64::new(1),
361 spans: Arc::clone(&spans),
362 };
363 let _guard = tracing::subscriber::set_default(subscriber);
364 run();
365 spans.lock().expect("span capture lock").clone()
366 }
367
368 #[test]
369 fn get_set_basic() {
370 let obs = Observable::new(42);
371 assert_eq!(obs.get(), 42);
372 assert_eq!(obs.version(), 0);
373
374 obs.set(99);
375 assert_eq!(obs.get(), 99);
376 assert_eq!(obs.version(), 1);
377 }
378
379 #[test]
380 fn no_change_no_version_bump() {
381 let obs = Observable::new(42);
382 obs.set(42); assert_eq!(obs.version(), 0);
384 }
385
386 #[test]
387 fn with_access() {
388 let obs = Observable::new(vec![1, 2, 3]);
389 let sum = obs.with(|v| v.iter().sum::<i32>());
390 assert_eq!(sum, 6);
391 }
392
393 #[test]
394 fn update_mutates_in_place() {
395 let obs = Observable::new(vec![1, 2, 3]);
396 obs.update(|v| v.push(4));
397 assert_eq!(obs.get(), vec![1, 2, 3, 4]);
398 assert_eq!(obs.version(), 1);
399 }
400
401 #[test]
402 fn update_no_change_no_bump() {
403 let obs = Observable::new(10);
404 obs.update(|v| {
405 *v = 10; });
407 assert_eq!(obs.version(), 0);
408 }
409
410 #[test]
411 fn change_notification() {
412 let obs = Observable::new(0);
413 let count = Rc::new(Cell::new(0u32));
414 let count_clone = Rc::clone(&count);
415
416 let _sub = obs.subscribe(move |_val| {
417 count_clone.set(count_clone.get() + 1);
418 });
419
420 obs.set(1);
421 assert_eq!(count.get(), 1);
422
423 obs.set(2);
424 assert_eq!(count.get(), 2);
425
426 obs.set(2);
428 assert_eq!(count.get(), 2);
429 }
430
431 #[test]
432 fn subscriber_receives_new_value() {
433 let obs = Observable::new(0);
434 let last_seen = Rc::new(Cell::new(0));
435 let last_clone = Rc::clone(&last_seen);
436
437 let _sub = obs.subscribe(move |val| {
438 last_clone.set(*val);
439 });
440
441 obs.set(42);
442 assert_eq!(last_seen.get(), 42);
443
444 obs.set(99);
445 assert_eq!(last_seen.get(), 99);
446 }
447
448 #[test]
449 fn subscription_drop_unsubscribes() {
450 let obs = Observable::new(0);
451 let count = Rc::new(Cell::new(0u32));
452 let count_clone = Rc::clone(&count);
453
454 let sub = obs.subscribe(move |_val| {
455 count_clone.set(count_clone.get() + 1);
456 });
457
458 obs.set(1);
459 assert_eq!(count.get(), 1);
460
461 drop(sub);
462
463 obs.set(2);
464 assert_eq!(count.get(), 1);
466 }
467
468 #[test]
469 fn multiple_subscribers() {
470 let obs = Observable::new(0);
471 let a = Rc::new(Cell::new(0u32));
472 let b = Rc::new(Cell::new(0u32));
473 let a_clone = Rc::clone(&a);
474 let b_clone = Rc::clone(&b);
475
476 let _sub_a = obs.subscribe(move |_| a_clone.set(a_clone.get() + 1));
477 let _sub_b = obs.subscribe(move |_| b_clone.set(b_clone.get() + 1));
478
479 obs.set(1);
480 assert_eq!(a.get(), 1);
481 assert_eq!(b.get(), 1);
482
483 obs.set(2);
484 assert_eq!(a.get(), 2);
485 assert_eq!(b.get(), 2);
486 }
487
488 #[test]
489 fn version_increment() {
490 let obs = Observable::new("hello".to_string());
491 assert_eq!(obs.version(), 0);
492
493 obs.set("world".to_string());
494 assert_eq!(obs.version(), 1);
495
496 obs.set("!".to_string());
497 assert_eq!(obs.version(), 2);
498
499 obs.set("!".to_string());
501 assert_eq!(obs.version(), 2);
502 }
503
504 #[test]
505 fn clone_shares_state() {
506 let obs1 = Observable::new(0);
507 let obs2 = obs1.clone();
508
509 obs1.set(42);
510 assert_eq!(obs2.get(), 42);
511 assert_eq!(obs2.version(), 1);
512
513 obs2.set(99);
514 assert_eq!(obs1.get(), 99);
515 assert_eq!(obs1.version(), 2);
516 }
517
518 #[test]
519 fn clone_shares_subscribers() {
520 let obs1 = Observable::new(0);
521 let count = Rc::new(Cell::new(0u32));
522 let count_clone = Rc::clone(&count);
523
524 let _sub = obs1.subscribe(move |_| count_clone.set(count_clone.get() + 1));
525
526 let obs2 = obs1.clone();
527 obs2.set(1);
528 assert_eq!(count.get(), 1); }
530
531 #[test]
532 fn subscriber_count() {
533 let obs = Observable::new(0);
534 assert_eq!(obs.subscriber_count(), 0);
535
536 let _s1 = obs.subscribe(|_| {});
537 assert_eq!(obs.subscriber_count(), 1);
538
539 let s2 = obs.subscribe(|_| {});
540 assert_eq!(obs.subscriber_count(), 2);
541
542 drop(s2);
543 assert_eq!(obs.subscriber_count(), 2);
545
546 obs.set(1);
548 assert_eq!(obs.subscriber_count(), 1);
549 }
550
551 #[test]
552 fn debug_format() {
553 let obs = Observable::new(42);
554 let dbg = format!("{:?}", obs);
555 assert!(dbg.contains("Observable"));
556 assert!(dbg.contains("42"));
557 assert!(dbg.contains("version"));
558 }
559
560 #[test]
561 fn notification_order_is_registration_order() {
562 let obs = Observable::new(0);
563 let log = Rc::new(RefCell::new(Vec::new()));
564
565 let log1 = Rc::clone(&log);
566 let _s1 = obs.subscribe(move |_| log1.borrow_mut().push('A'));
567
568 let log2 = Rc::clone(&log);
569 let _s2 = obs.subscribe(move |_| log2.borrow_mut().push('B'));
570
571 let log3 = Rc::clone(&log);
572 let _s3 = obs.subscribe(move |_| log3.borrow_mut().push('C'));
573
574 obs.set(1);
575 assert_eq!(*log.borrow(), vec!['A', 'B', 'C']);
576 }
577
578 #[test]
579 fn update_with_subscriber() {
580 let obs = Observable::new(vec![1, 2, 3]);
581 let last_len = Rc::new(Cell::new(0usize));
582 let last_clone = Rc::clone(&last_len);
583
584 let _sub = obs.subscribe(move |v: &Vec<i32>| {
585 last_clone.set(v.len());
586 });
587
588 obs.update(|v| v.push(4));
589 assert_eq!(last_len.get(), 4);
590 }
591
592 #[test]
593 fn many_set_calls_version_monotonic() {
594 let obs = Observable::new(0);
595 for i in 1..=100 {
596 obs.set(i);
597 }
598 assert_eq!(obs.version(), 100);
599 assert_eq!(obs.get(), 100);
600 }
601
602 #[test]
603 fn partial_subscriber_drop() {
604 let obs = Observable::new(0);
605 let a = Rc::new(Cell::new(0u32));
606 let b = Rc::new(Cell::new(0u32));
607 let a_clone = Rc::clone(&a);
608 let b_clone = Rc::clone(&b);
609
610 let sub_a = obs.subscribe(move |_| a_clone.set(a_clone.get() + 1));
611 let _sub_b = obs.subscribe(move |_| b_clone.set(b_clone.get() + 1));
612
613 obs.set(1);
614 assert_eq!(a.get(), 1);
615 assert_eq!(b.get(), 1);
616
617 drop(sub_a);
618
619 obs.set(2);
620 assert_eq!(a.get(), 1); assert_eq!(b.get(), 2); }
623
624 #[test]
625 fn single_row_change_propagates_only_to_bound_widgets() {
626 let row_a = Observable::new(vec!["a".to_string()]);
627 let row_b = Observable::new(vec!["b".to_string()]);
628 let a_hits = Rc::new(Cell::new(0u32));
629 let b_hits = Rc::new(Cell::new(0u32));
630 let a_hits_clone = Rc::clone(&a_hits);
631 let b_hits_clone = Rc::clone(&b_hits);
632
633 let _sub_a = row_a.subscribe(move |_| a_hits_clone.set(a_hits_clone.get() + 1));
634 let _sub_b = row_b.subscribe(move |_| b_hits_clone.set(b_hits_clone.get() + 1));
635
636 row_a.set(vec!["a2".to_string()]);
637 assert_eq!(a_hits.get(), 1, "bound row-A widget should be invalidated");
638 assert_eq!(
639 b_hits.get(),
640 0,
641 "unbound row-B widget should remain untouched"
642 );
643 }
644
645 #[test]
646 fn batch_delta_propagates_atomically_without_stale_intermediate_values() {
647 let rows = Observable::new(vec!["r0".to_string()]);
648 let seen = Rc::new(RefCell::new(Vec::<Vec<String>>::new()));
649 let seen_clone = Rc::clone(&seen);
650 let _sub = rows.subscribe(move |current| seen_clone.borrow_mut().push(current.clone()));
651
652 {
653 let _batch = crate::reactive::batch::BatchScope::new();
654 rows.set(vec!["r1".to_string()]);
655 rows.set(vec!["r1".to_string(), "r2".to_string()]);
656 rows.update(|current| current.push("r3".to_string()));
657 assert!(
658 seen.borrow().is_empty(),
659 "callbacks must be deferred until batch exit"
660 );
661 }
662
663 let snapshots = seen.borrow();
664 assert_eq!(
665 snapshots.len(),
666 1,
667 "batched updates should coalesce to one invalidation"
668 );
669 assert_eq!(
670 snapshots[0],
671 vec!["r1".to_string(), "r2".to_string(), "r3".to_string()],
672 "subscriber must observe only final state"
673 );
674 }
675
676 #[test]
677 fn unbound_table_updates_produce_no_bloodstream_delta() {
678 let table_rows = Observable::new(vec!["old".to_string()]);
679 let spans = capture_delta_spans(|| {
680 table_rows.set(vec!["new".to_string()]);
681 });
682 assert!(
683 spans.is_empty(),
684 "unbound table updates should not emit bloodstream deltas"
685 );
686 }
687
688 #[test]
689 fn bloodstream_delta_span_reports_rows_changed_and_widgets_invalidated() {
690 let table_rows = Observable::new(vec!["old".to_string()]);
691 let _sub_a = table_rows.subscribe(|_| {});
692 let _sub_b = table_rows.subscribe(|_| {});
693
694 let spans = capture_delta_spans(|| {
695 table_rows.set(vec!["new".to_string()]);
696 });
697 assert_eq!(
698 spans,
699 vec![(1, 2)],
700 "single-row change should report one row and two invalidated widgets"
701 );
702 }
703
704 #[test]
705 fn schema_change_requires_full_rerender_not_partial_delta() {
706 let table = Observable::new(TableSnapshot {
707 schema_version: 1,
708 rows: vec!["alpha".to_string()],
709 });
710 let previous = Rc::new(RefCell::new(Some(table.get())));
711 let decisions = Rc::new(RefCell::new(Vec::<RenderMode>::new()));
712 let previous_clone = Rc::clone(&previous);
713 let decisions_clone = Rc::clone(&decisions);
714
715 let _sub = table.subscribe(move |next| {
716 let mut prev = previous_clone.borrow_mut();
717 let current_mode =
718 classify_render_mode(prev.as_ref().expect("previous snapshot available"), next);
719 decisions_clone.borrow_mut().push(current_mode);
720 *prev = Some(next.clone());
721 });
722
723 table.set(TableSnapshot {
724 schema_version: 1,
725 rows: vec!["alpha".to_string(), "beta".to_string()],
726 });
727 table.set(TableSnapshot {
728 schema_version: 2,
729 rows: vec!["alpha".to_string(), "beta".to_string()],
730 });
731
732 assert_eq!(
733 *decisions.borrow(),
734 vec![RenderMode::PartialDelta, RenderMode::FullRerender],
735 "schema-version changes must force full rerender semantics"
736 );
737 }
738
739 #[test]
740 fn string_observable() {
741 let obs = Observable::new(String::new());
742 let changes = Rc::new(Cell::new(0u32));
743 let changes_clone = Rc::clone(&changes);
744
745 let _sub = obs.subscribe(move |_| changes_clone.set(changes_clone.get() + 1));
746
747 obs.set("hello".to_string());
748 obs.set("hello".to_string()); obs.set("world".to_string());
750
751 assert_eq!(changes.get(), 2);
752 assert_eq!(obs.version(), 2);
753 }
754}