Skip to main content

ftui_runtime/reactive/
batch.rs

1#![forbid(unsafe_code)]
2
3//! Batch update coalescing for [`Observable`] notifications.
4//!
5//! When multiple `Observable` values are updated in rapid succession,
6//! subscribers receive a notification for each change. In render-heavy
7//! scenarios this causes redundant intermediate renders. Batch coalescing
8//! defers all notifications until the batch scope exits, then fires each
9//! unique callback at most once.
10//!
11//! # Usage
12//!
13//! ```ignore
14//! use ftui_runtime::reactive::batch::BatchScope;
15//!
16//! let x = Observable::new(0);
17//! let y = Observable::new(0);
18//!
19//! {
20//!     let _batch = BatchScope::new();
21//!     x.set(1);  // notification deferred
22//!     y.set(2);  // notification deferred
23//!     x.set(3);  // notification deferred (coalesced with first x.set)
24//! }  // all notifications fire here, x subscribers called once with value 3
25//! ```
26//!
27//! # Invariants
28//!
29//! 1. Nested batches are supported: only the outermost scope triggers flush.
30//! 2. Within a batch, `Observable::get()` always returns the latest value
31//!    (values are updated immediately, only notifications are deferred).
32//! 3. After a batch exits, all subscribers see the final state, never an
33//!    intermediate state.
34//! 4. Flush calls deferred callbacks in the order they were first enqueued.
35//!
36//! # Failure Modes
37//!
38//! - **Callback panics during flush**: Remaining callbacks are still called.
39//!   The first panic is re-raised after all callbacks have been attempted.
40
41use ftui_core::with_panic_cleanup_suppressed;
42use std::cell::RefCell;
43use tracing::{info, info_span};
44use web_time::Instant;
45
46/// A deferred notification: a closure that fires a subscriber callback
47/// with the latest value.
48type DeferredNotify = Box<dyn FnOnce()>;
49
50/// Deferred callback entry optionally keyed for in-batch coalescing.
51struct DeferredEntry {
52    key: Option<usize>,
53    notify: DeferredNotify,
54}
55
56impl DeferredEntry {
57    fn unkeyed(notify: DeferredNotify) -> Self {
58        Self { key: None, notify }
59    }
60
61    fn keyed(key: usize, notify: DeferredNotify) -> Self {
62        Self {
63            key: Some(key),
64            notify,
65        }
66    }
67}
68
69/// Thread-local batch context.
70struct BatchContext {
71    /// Nesting depth. Only flush when this reaches 0.
72    depth: u32,
73    /// Queued notifications to fire on flush.
74    deferred: Vec<DeferredEntry>,
75    /// Number of source row updates coalesced into this batch.
76    rows_changed: u64,
77}
78
79thread_local! {
80    static BATCH_CTX: RefCell<Option<BatchContext>> = const { RefCell::new(None) };
81}
82
83/// Returns true if a batch is currently active on this thread.
84pub fn is_batching() -> bool {
85    BATCH_CTX.with(|ctx| ctx.borrow().is_some())
86}
87
88/// Enqueue a deferred notification to be fired when the current batch exits.
89///
90/// If no batch is active, the notification fires immediately.
91///
92/// Returns `true` if the notification was deferred, `false` if it fired
93/// immediately.
94pub fn defer_or_run(f: impl FnOnce() + 'static) -> bool {
95    BATCH_CTX.with(|ctx| {
96        let mut guard = ctx.borrow_mut();
97        if let Some(ref mut batch) = *guard {
98            batch.deferred.push(DeferredEntry::unkeyed(Box::new(f)));
99            true
100        } else {
101            drop(guard); // Release borrow before calling f.
102            f();
103            false
104        }
105    })
106}
107
108/// Enqueue a deferred notification keyed by `key`.
109///
110/// If the key already exists in the current batch, the previously queued
111/// callback is replaced so the latest callback wins while preserving the
112/// original enqueue order.
113pub fn defer_or_run_keyed(key: usize, f: impl FnOnce() + 'static) -> bool {
114    BATCH_CTX.with(|ctx| {
115        let mut guard = ctx.borrow_mut();
116        if let Some(ref mut batch) = *guard {
117            if let Some(entry) = batch
118                .deferred
119                .iter_mut()
120                .find(|entry| entry.key == Some(key))
121            {
122                entry.notify = Box::new(f);
123            } else {
124                batch.deferred.push(DeferredEntry::keyed(key, Box::new(f)));
125            }
126            true
127        } else {
128            drop(guard); // Release borrow before calling f.
129            f();
130            false
131        }
132    })
133}
134
135/// Record row-level changes while a batch is active.
136pub fn record_rows_changed(rows: u64) {
137    if rows == 0 {
138        return;
139    }
140    BATCH_CTX.with(|ctx| {
141        if let Some(ref mut batch) = *ctx.borrow_mut() {
142            batch.rows_changed = batch.rows_changed.saturating_add(rows);
143        }
144    });
145}
146
147/// Flush all deferred notifications. Called internally by `BatchScope::drop`.
148fn flush() {
149    let (rows_changed, deferred): (u64, Vec<DeferredNotify>) = BATCH_CTX.with(|ctx| {
150        let mut guard = ctx.borrow_mut();
151        if let Some(ref mut batch) = *guard {
152            let rows = batch.rows_changed;
153            batch.rows_changed = 0;
154            let deferred = std::mem::take(&mut batch.deferred)
155                .into_iter()
156                .map(|entry| entry.notify)
157                .collect();
158            (rows, deferred)
159        } else {
160            (0, Vec::new())
161        }
162    });
163
164    if deferred.is_empty() {
165        return;
166    }
167
168    let widgets_invalidated = deferred.len() as u64;
169    let propagation_start = Instant::now();
170    let _span = info_span!(
171        "bloodstream.delta",
172        rows_changed,
173        widgets_invalidated,
174        duration_us = tracing::field::Empty
175    )
176    .entered();
177
178    // Run all deferred notifications outside the borrow.
179    // If a callback panics, we still try to run the rest.
180    let mut first_panic: Option<Box<dyn std::any::Any + Send>> = None;
181    for notify in deferred {
182        let result = with_panic_cleanup_suppressed(|| {
183            std::panic::catch_unwind(std::panic::AssertUnwindSafe(notify))
184        });
185        if let Err(payload) = result
186            && first_panic.is_none()
187        {
188            first_panic = Some(payload);
189        }
190    }
191
192    let duration_us = propagation_start.elapsed().as_micros() as u64;
193    tracing::Span::current().record("duration_us", duration_us);
194    info!(
195        bloodstream_propagation_duration_us = duration_us,
196        rows_changed, widgets_invalidated, "bloodstream propagation duration histogram"
197    );
198
199    if let Some(payload) = first_panic {
200        std::panic::resume_unwind(payload);
201    }
202}
203
204/// RAII guard that begins a batch scope.
205///
206/// While a `BatchScope` is alive, all [`Observable`](super::Observable)
207/// notifications are deferred. When the outermost `BatchScope` drops,
208/// all deferred notifications fire.
209///
210/// Nested `BatchScope`s are supported — only the outermost one flushes.
211pub struct BatchScope {
212    /// Whether this scope is the outermost (responsible for flush).
213    is_root: bool,
214}
215
216impl BatchScope {
217    /// Begin a new batch scope.
218    ///
219    /// If already inside a batch, this increments the nesting depth.
220    #[must_use]
221    pub fn new() -> Self {
222        let is_root = BATCH_CTX.with(|ctx| {
223            let mut guard = ctx.borrow_mut();
224            match *guard {
225                Some(ref mut batch) => {
226                    batch.depth += 1;
227                    false
228                }
229                None => {
230                    *guard = Some(BatchContext {
231                        depth: 1,
232                        deferred: Vec::new(),
233                        rows_changed: 0,
234                    });
235                    true
236                }
237            }
238        });
239        Self { is_root }
240    }
241
242    /// Number of deferred notifications queued in the current batch.
243    #[must_use]
244    pub fn pending_count(&self) -> usize {
245        BATCH_CTX.with(|ctx| ctx.borrow().as_ref().map_or(0, |b| b.deferred.len()))
246    }
247}
248
249impl Default for BatchScope {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255impl Drop for BatchScope {
256    fn drop(&mut self) {
257        let should_flush = BATCH_CTX.with(|ctx| {
258            let mut guard = ctx.borrow_mut();
259            if let Some(ref mut batch) = *guard {
260                batch.depth -= 1;
261                batch.depth == 0
262            } else {
263                false
264            }
265        });
266
267        if should_flush {
268            flush();
269            // Clear the context after flush.
270            BATCH_CTX.with(|ctx| {
271                *ctx.borrow_mut() = None;
272            });
273        }
274    }
275}
276
277impl std::fmt::Debug for BatchScope {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        f.debug_struct("BatchScope")
280            .field("is_root", &self.is_root)
281            .field("pending", &self.pending_count())
282            .finish()
283    }
284}
285
286// ---------------------------------------------------------------------------
287// Tests
288// ---------------------------------------------------------------------------
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use crate::reactive::Observable;
294    use std::cell::Cell;
295    use std::cell::RefCell;
296    use std::rc::Rc;
297
298    #[test]
299    fn batch_defers_notifications() {
300        let obs = Observable::new(0);
301        let count = Rc::new(Cell::new(0u32));
302        let count_clone = Rc::clone(&count);
303
304        let _sub = obs.subscribe(move |_| {
305            count_clone.set(count_clone.get() + 1);
306        });
307
308        {
309            let _batch = BatchScope::new();
310            obs.set(1);
311            obs.set(2);
312            obs.set(3);
313            // No notifications yet.
314            assert_eq!(count.get(), 0);
315        }
316        // All notifications fire on batch exit.
317        assert!(count.get() > 0);
318    }
319
320    #[test]
321    fn batch_values_updated_immediately() {
322        let obs = Observable::new(0);
323        {
324            let _batch = BatchScope::new();
325            obs.set(42);
326            // Value is updated even within batch.
327            assert_eq!(obs.get(), 42);
328        }
329    }
330
331    #[test]
332    fn nested_batch_only_outermost_flushes() {
333        let obs = Observable::new(0);
334        let count = Rc::new(Cell::new(0u32));
335        let count_clone = Rc::clone(&count);
336
337        let _sub = obs.subscribe(move |_| {
338            count_clone.set(count_clone.get() + 1);
339        });
340
341        {
342            let _outer = BatchScope::new();
343            obs.set(1);
344
345            {
346                let _inner = BatchScope::new();
347                obs.set(2);
348                // Inner batch exit doesn't flush.
349            }
350            assert_eq!(count.get(), 0);
351            obs.set(3);
352        }
353        // Only outer batch exit flushes.
354        assert!(count.get() > 0);
355    }
356
357    #[test]
358    fn no_batch_fires_immediately() {
359        let obs = Observable::new(0);
360        let count = Rc::new(Cell::new(0u32));
361        let count_clone = Rc::clone(&count);
362
363        let _sub = obs.subscribe(move |_| {
364            count_clone.set(count_clone.get() + 1);
365        });
366
367        obs.set(1);
368        assert_eq!(count.get(), 1);
369
370        obs.set(2);
371        assert_eq!(count.get(), 2);
372    }
373
374    #[test]
375    fn is_batching_flag() {
376        assert!(!is_batching());
377        {
378            let _batch = BatchScope::new();
379            assert!(is_batching());
380        }
381        assert!(!is_batching());
382    }
383
384    #[test]
385    fn pending_count() {
386        let obs = Observable::new(0);
387        let _sub = obs.subscribe(|_| {});
388
389        let batch = BatchScope::new();
390        assert_eq!(batch.pending_count(), 0);
391
392        obs.set(1);
393        // Each set enqueues a deferred notification.
394        assert!(batch.pending_count() > 0);
395    }
396
397    #[test]
398    fn defer_or_run_without_batch() {
399        let ran = Rc::new(Cell::new(false));
400        let ran_clone = Rc::clone(&ran);
401
402        let deferred = defer_or_run(move || ran_clone.set(true));
403        assert!(!deferred);
404        assert!(ran.get());
405    }
406
407    #[test]
408    fn defer_or_run_with_batch() {
409        let ran = Rc::new(Cell::new(false));
410        let ran_clone = Rc::clone(&ran);
411
412        {
413            let _batch = BatchScope::new();
414            let deferred = defer_or_run(move || ran_clone.set(true));
415            assert!(deferred);
416            assert!(!ran.get());
417        }
418        assert!(ran.get());
419    }
420
421    #[test]
422    fn defer_or_run_keyed_coalesces_to_latest_callback() {
423        let value = Rc::new(Cell::new(0u32));
424        let v1 = Rc::clone(&value);
425        let v2 = Rc::clone(&value);
426
427        let batch = BatchScope::new();
428        assert_eq!(batch.pending_count(), 0);
429
430        assert!(defer_or_run_keyed(7, move || v1.set(1)));
431        assert_eq!(batch.pending_count(), 1);
432        assert!(defer_or_run_keyed(7, move || v2.set(2)));
433        assert_eq!(batch.pending_count(), 1, "same key should be coalesced");
434        assert_eq!(value.get(), 0, "callback should remain deferred");
435        drop(batch);
436
437        assert_eq!(value.get(), 2, "latest keyed callback should run");
438    }
439
440    #[test]
441    fn defer_or_run_keyed_preserves_first_enqueue_order() {
442        let order = Rc::new(RefCell::new(Vec::new()));
443        let o1 = Rc::clone(&order);
444        let o2 = Rc::clone(&order);
445        let o3 = Rc::clone(&order);
446
447        {
448            let batch = BatchScope::new();
449            assert!(defer_or_run_keyed(1, move || o1
450                .borrow_mut()
451                .push("first-old")));
452            assert!(defer_or_run_keyed(2, move || o2
453                .borrow_mut()
454                .push("second")));
455            assert!(defer_or_run_keyed(1, move || o3
456                .borrow_mut()
457                .push("first-new")));
458            assert_eq!(batch.pending_count(), 2);
459        }
460
461        assert_eq!(
462            *order.borrow(),
463            vec!["first-new", "second"],
464            "replaced keyed callback should keep its original queue position"
465        );
466    }
467
468    #[test]
469    fn debug_format() {
470        let batch = BatchScope::new();
471        let dbg = format!("{:?}", batch);
472        assert!(dbg.contains("BatchScope"));
473        assert!(dbg.contains("is_root"));
474        drop(batch);
475    }
476
477    #[test]
478    fn multiple_observables_in_batch() {
479        let a = Observable::new(0);
480        let b = Observable::new(0);
481        let a_count = Rc::new(Cell::new(0u32));
482        let b_count = Rc::new(Cell::new(0u32));
483        let a_clone = Rc::clone(&a_count);
484        let b_clone = Rc::clone(&b_count);
485
486        let _sub_a = a.subscribe(move |_| a_clone.set(a_clone.get() + 1));
487        let _sub_b = b.subscribe(move |_| b_clone.set(b_clone.get() + 1));
488
489        {
490            let _batch = BatchScope::new();
491            a.set(1);
492            b.set(2);
493            a.set(3);
494            b.set(4);
495            assert_eq!(a_count.get(), 0);
496            assert_eq!(b_count.get(), 0);
497        }
498        assert!(a_count.get() > 0);
499        assert!(b_count.get() > 0);
500    }
501
502    #[test]
503    fn batch_scope_default_trait() {
504        let batch = BatchScope::default();
505        assert!(is_batching());
506        drop(batch);
507        assert!(!is_batching());
508    }
509
510    #[test]
511    fn triple_nested_batch() {
512        let obs = Observable::new(0);
513        let count = Rc::new(Cell::new(0u32));
514        let count_clone = Rc::clone(&count);
515
516        let _sub = obs.subscribe(move |_| {
517            count_clone.set(count_clone.get() + 1);
518        });
519
520        {
521            let _outer = BatchScope::new();
522            obs.set(1);
523            {
524                let _mid = BatchScope::new();
525                obs.set(2);
526                {
527                    let _inner = BatchScope::new();
528                    obs.set(3);
529                }
530                assert_eq!(count.get(), 0, "inner drop should not flush");
531            }
532            assert_eq!(count.get(), 0, "mid drop should not flush");
533        }
534        assert!(count.get() > 0, "outer drop should flush");
535    }
536
537    #[test]
538    fn empty_batch_no_panic() {
539        {
540            let _batch = BatchScope::new();
541            // No observable mutations
542        }
543        assert!(!is_batching());
544    }
545
546    #[test]
547    fn pending_count_zero_without_subscribers() {
548        let obs = Observable::new(0);
549        let batch = BatchScope::new();
550        obs.set(42);
551        // Without subscribers, set doesn't enqueue notifications
552        assert_eq!(batch.pending_count(), 0);
553        drop(batch);
554    }
555}