Skip to main content

ftui_runtime/reactive/
batch.rs

1#![forbid(unsafe_code)]
2
3//! Batch update coalescing for [`Observable`] notifications.
4//!
5//! When multiple `Observable` values are updated in rapid succession,
6//! subscribers receive a notification for each change. In render-heavy
7//! scenarios this causes redundant intermediate renders. Batch coalescing
8//! defers all notifications until the batch scope exits, then fires each
9//! unique callback at most once.
10//!
11//! # Usage
12//!
13//! ```ignore
14//! use ftui_runtime::reactive::batch::BatchScope;
15//!
16//! let x = Observable::new(0);
17//! let y = Observable::new(0);
18//!
19//! {
20//!     let _batch = BatchScope::new();
21//!     x.set(1);  // notification deferred
22//!     y.set(2);  // notification deferred
23//!     x.set(3);  // notification deferred (coalesced with first x.set)
24//! }  // all notifications fire here, x subscribers called once with value 3
25//! ```
26//!
27//! # Invariants
28//!
29//! 1. Nested batches are supported: only the outermost scope triggers flush.
30//! 2. Within a batch, `Observable::get()` always returns the latest value
31//!    (values are updated immediately, only notifications are deferred).
32//! 3. After a batch exits, all subscribers see the final state, never an
33//!    intermediate state.
34//! 4. Flush calls deferred callbacks in the order they were first enqueued.
35//!
36//! # Failure Modes
37//!
38//! - **Callback panics during flush**: Remaining callbacks are still called.
39//!   The first panic is re-raised after all callbacks have been attempted.
40
41use std::cell::RefCell;
42
43/// A deferred notification: a closure that fires a subscriber callback
44/// with the latest value.
45type DeferredNotify = Box<dyn FnOnce()>;
46
47/// Thread-local batch context.
48struct BatchContext {
49    /// Nesting depth. Only flush when this reaches 0.
50    depth: u32,
51    /// Queued notifications to fire on flush.
52    deferred: Vec<DeferredNotify>,
53}
54
55thread_local! {
56    static BATCH_CTX: RefCell<Option<BatchContext>> = const { RefCell::new(None) };
57}
58
59/// Returns true if a batch is currently active on this thread.
60pub fn is_batching() -> bool {
61    BATCH_CTX.with(|ctx| ctx.borrow().is_some())
62}
63
64/// Enqueue a deferred notification to be fired when the current batch exits.
65///
66/// If no batch is active, the notification fires immediately.
67///
68/// Returns `true` if the notification was deferred, `false` if it fired
69/// immediately.
70pub fn defer_or_run(f: impl FnOnce() + 'static) -> bool {
71    BATCH_CTX.with(|ctx| {
72        let mut guard = ctx.borrow_mut();
73        if let Some(ref mut batch) = *guard {
74            batch.deferred.push(Box::new(f));
75            true
76        } else {
77            drop(guard); // Release borrow before calling f.
78            f();
79            false
80        }
81    })
82}
83
84/// Flush all deferred notifications. Called internally by `BatchScope::drop`.
85fn flush() {
86    let deferred: Vec<DeferredNotify> = BATCH_CTX.with(|ctx| {
87        let mut guard = ctx.borrow_mut();
88        if let Some(ref mut batch) = *guard {
89            std::mem::take(&mut batch.deferred)
90        } else {
91            Vec::new()
92        }
93    });
94
95    // Run all deferred notifications outside the borrow.
96    // If a callback panics, we still try to run the rest.
97    let mut first_panic: Option<Box<dyn std::any::Any + Send>> = None;
98    for notify in deferred {
99        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(notify));
100        if let Err(payload) = result
101            && first_panic.is_none()
102        {
103            first_panic = Some(payload);
104        }
105    }
106
107    if let Some(payload) = first_panic {
108        std::panic::resume_unwind(payload);
109    }
110}
111
112/// RAII guard that begins a batch scope.
113///
114/// While a `BatchScope` is alive, all [`Observable`](super::Observable)
115/// notifications are deferred. When the outermost `BatchScope` drops,
116/// all deferred notifications fire.
117///
118/// Nested `BatchScope`s are supported — only the outermost one flushes.
119pub struct BatchScope {
120    /// Whether this scope is the outermost (responsible for flush).
121    is_root: bool,
122}
123
124impl BatchScope {
125    /// Begin a new batch scope.
126    ///
127    /// If already inside a batch, this increments the nesting depth.
128    #[must_use]
129    pub fn new() -> Self {
130        let is_root = BATCH_CTX.with(|ctx| {
131            let mut guard = ctx.borrow_mut();
132            match *guard {
133                Some(ref mut batch) => {
134                    batch.depth += 1;
135                    false
136                }
137                None => {
138                    *guard = Some(BatchContext {
139                        depth: 1,
140                        deferred: Vec::new(),
141                    });
142                    true
143                }
144            }
145        });
146        Self { is_root }
147    }
148
149    /// Number of deferred notifications queued in the current batch.
150    #[must_use]
151    pub fn pending_count(&self) -> usize {
152        BATCH_CTX.with(|ctx| ctx.borrow().as_ref().map_or(0, |b| b.deferred.len()))
153    }
154}
155
156impl Default for BatchScope {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162impl Drop for BatchScope {
163    fn drop(&mut self) {
164        let should_flush = BATCH_CTX.with(|ctx| {
165            let mut guard = ctx.borrow_mut();
166            if let Some(ref mut batch) = *guard {
167                batch.depth -= 1;
168                batch.depth == 0
169            } else {
170                false
171            }
172        });
173
174        if should_flush {
175            flush();
176            // Clear the context after flush.
177            BATCH_CTX.with(|ctx| {
178                *ctx.borrow_mut() = None;
179            });
180        }
181    }
182}
183
184impl std::fmt::Debug for BatchScope {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        f.debug_struct("BatchScope")
187            .field("is_root", &self.is_root)
188            .field("pending", &self.pending_count())
189            .finish()
190    }
191}
192
193// ---------------------------------------------------------------------------
194// Tests
195// ---------------------------------------------------------------------------
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use crate::reactive::Observable;
201    use std::cell::Cell;
202    use std::rc::Rc;
203
204    #[test]
205    fn batch_defers_notifications() {
206        let obs = Observable::new(0);
207        let count = Rc::new(Cell::new(0u32));
208        let count_clone = Rc::clone(&count);
209
210        let _sub = obs.subscribe(move |_| {
211            count_clone.set(count_clone.get() + 1);
212        });
213
214        {
215            let _batch = BatchScope::new();
216            obs.set(1);
217            obs.set(2);
218            obs.set(3);
219            // No notifications yet.
220            assert_eq!(count.get(), 0);
221        }
222        // All notifications fire on batch exit.
223        assert!(count.get() > 0);
224    }
225
226    #[test]
227    fn batch_values_updated_immediately() {
228        let obs = Observable::new(0);
229        {
230            let _batch = BatchScope::new();
231            obs.set(42);
232            // Value is updated even within batch.
233            assert_eq!(obs.get(), 42);
234        }
235    }
236
237    #[test]
238    fn nested_batch_only_outermost_flushes() {
239        let obs = Observable::new(0);
240        let count = Rc::new(Cell::new(0u32));
241        let count_clone = Rc::clone(&count);
242
243        let _sub = obs.subscribe(move |_| {
244            count_clone.set(count_clone.get() + 1);
245        });
246
247        {
248            let _outer = BatchScope::new();
249            obs.set(1);
250
251            {
252                let _inner = BatchScope::new();
253                obs.set(2);
254                // Inner batch exit doesn't flush.
255            }
256            assert_eq!(count.get(), 0);
257            obs.set(3);
258        }
259        // Only outer batch exit flushes.
260        assert!(count.get() > 0);
261    }
262
263    #[test]
264    fn no_batch_fires_immediately() {
265        let obs = Observable::new(0);
266        let count = Rc::new(Cell::new(0u32));
267        let count_clone = Rc::clone(&count);
268
269        let _sub = obs.subscribe(move |_| {
270            count_clone.set(count_clone.get() + 1);
271        });
272
273        obs.set(1);
274        assert_eq!(count.get(), 1);
275
276        obs.set(2);
277        assert_eq!(count.get(), 2);
278    }
279
280    #[test]
281    fn is_batching_flag() {
282        assert!(!is_batching());
283        {
284            let _batch = BatchScope::new();
285            assert!(is_batching());
286        }
287        assert!(!is_batching());
288    }
289
290    #[test]
291    fn pending_count() {
292        let obs = Observable::new(0);
293        let _sub = obs.subscribe(|_| {});
294
295        let batch = BatchScope::new();
296        assert_eq!(batch.pending_count(), 0);
297
298        obs.set(1);
299        // Each set enqueues a deferred notification.
300        assert!(batch.pending_count() > 0);
301    }
302
303    #[test]
304    fn defer_or_run_without_batch() {
305        let ran = Rc::new(Cell::new(false));
306        let ran_clone = Rc::clone(&ran);
307
308        let deferred = defer_or_run(move || ran_clone.set(true));
309        assert!(!deferred);
310        assert!(ran.get());
311    }
312
313    #[test]
314    fn defer_or_run_with_batch() {
315        let ran = Rc::new(Cell::new(false));
316        let ran_clone = Rc::clone(&ran);
317
318        {
319            let _batch = BatchScope::new();
320            let deferred = defer_or_run(move || ran_clone.set(true));
321            assert!(deferred);
322            assert!(!ran.get());
323        }
324        assert!(ran.get());
325    }
326
327    #[test]
328    fn debug_format() {
329        let batch = BatchScope::new();
330        let dbg = format!("{:?}", batch);
331        assert!(dbg.contains("BatchScope"));
332        assert!(dbg.contains("is_root"));
333        drop(batch);
334    }
335
336    #[test]
337    fn multiple_observables_in_batch() {
338        let a = Observable::new(0);
339        let b = Observable::new(0);
340        let a_count = Rc::new(Cell::new(0u32));
341        let b_count = Rc::new(Cell::new(0u32));
342        let a_clone = Rc::clone(&a_count);
343        let b_clone = Rc::clone(&b_count);
344
345        let _sub_a = a.subscribe(move |_| a_clone.set(a_clone.get() + 1));
346        let _sub_b = b.subscribe(move |_| b_clone.set(b_clone.get() + 1));
347
348        {
349            let _batch = BatchScope::new();
350            a.set(1);
351            b.set(2);
352            a.set(3);
353            b.set(4);
354            assert_eq!(a_count.get(), 0);
355            assert_eq!(b_count.get(), 0);
356        }
357        assert!(a_count.get() > 0);
358        assert!(b_count.get() > 0);
359    }
360}