ftui_runtime/reactive/
batch.rs1#![forbid(unsafe_code)]
2
3use ftui_core::with_panic_cleanup_suppressed;
42use std::cell::RefCell;
43use tracing::{info, info_span};
44use web_time::Instant;
45
46type DeferredNotify = Box<dyn FnOnce()>;
49
50struct DeferredEntry {
52 key: Option<usize>,
53 notify: DeferredNotify,
54}
55
56impl DeferredEntry {
57 fn unkeyed(notify: DeferredNotify) -> Self {
58 Self { key: None, notify }
59 }
60
61 fn keyed(key: usize, notify: DeferredNotify) -> Self {
62 Self {
63 key: Some(key),
64 notify,
65 }
66 }
67}
68
69struct BatchContext {
71 depth: u32,
73 deferred: Vec<DeferredEntry>,
75 rows_changed: u64,
77}
78
79thread_local! {
80 static BATCH_CTX: RefCell<Option<BatchContext>> = const { RefCell::new(None) };
81}
82
83pub fn is_batching() -> bool {
85 BATCH_CTX.with(|ctx| ctx.borrow().is_some())
86}
87
88pub fn defer_or_run(f: impl FnOnce() + 'static) -> bool {
95 BATCH_CTX.with(|ctx| {
96 let mut guard = ctx.borrow_mut();
97 if let Some(ref mut batch) = *guard {
98 batch.deferred.push(DeferredEntry::unkeyed(Box::new(f)));
99 true
100 } else {
101 drop(guard); f();
103 false
104 }
105 })
106}
107
108pub fn defer_or_run_keyed(key: usize, f: impl FnOnce() + 'static) -> bool {
114 BATCH_CTX.with(|ctx| {
115 let mut guard = ctx.borrow_mut();
116 if let Some(ref mut batch) = *guard {
117 if let Some(entry) = batch
118 .deferred
119 .iter_mut()
120 .find(|entry| entry.key == Some(key))
121 {
122 entry.notify = Box::new(f);
123 } else {
124 batch.deferred.push(DeferredEntry::keyed(key, Box::new(f)));
125 }
126 true
127 } else {
128 drop(guard); f();
130 false
131 }
132 })
133}
134
135pub fn record_rows_changed(rows: u64) {
137 if rows == 0 {
138 return;
139 }
140 BATCH_CTX.with(|ctx| {
141 if let Some(ref mut batch) = *ctx.borrow_mut() {
142 batch.rows_changed = batch.rows_changed.saturating_add(rows);
143 }
144 });
145}
146
147fn flush() {
149 let (rows_changed, deferred): (u64, Vec<DeferredNotify>) = BATCH_CTX.with(|ctx| {
150 let mut guard = ctx.borrow_mut();
151 if let Some(ref mut batch) = *guard {
152 let rows = batch.rows_changed;
153 batch.rows_changed = 0;
154 let deferred = std::mem::take(&mut batch.deferred)
155 .into_iter()
156 .map(|entry| entry.notify)
157 .collect();
158 (rows, deferred)
159 } else {
160 (0, Vec::new())
161 }
162 });
163
164 if deferred.is_empty() {
165 return;
166 }
167
168 let widgets_invalidated = deferred.len() as u64;
169 let propagation_start = Instant::now();
170 let _span = info_span!(
171 "bloodstream.delta",
172 rows_changed,
173 widgets_invalidated,
174 duration_us = tracing::field::Empty
175 )
176 .entered();
177
178 let mut first_panic: Option<Box<dyn std::any::Any + Send>> = None;
181 for notify in deferred {
182 let result = with_panic_cleanup_suppressed(|| {
183 std::panic::catch_unwind(std::panic::AssertUnwindSafe(notify))
184 });
185 if let Err(payload) = result
186 && first_panic.is_none()
187 {
188 first_panic = Some(payload);
189 }
190 }
191
192 let duration_us = propagation_start.elapsed().as_micros() as u64;
193 tracing::Span::current().record("duration_us", duration_us);
194 info!(
195 bloodstream_propagation_duration_us = duration_us,
196 rows_changed, widgets_invalidated, "bloodstream propagation duration histogram"
197 );
198
199 if let Some(payload) = first_panic {
200 std::panic::resume_unwind(payload);
201 }
202}
203
204pub struct BatchScope {
212 is_root: bool,
214}
215
216impl BatchScope {
217 #[must_use]
221 pub fn new() -> Self {
222 let is_root = BATCH_CTX.with(|ctx| {
223 let mut guard = ctx.borrow_mut();
224 match *guard {
225 Some(ref mut batch) => {
226 batch.depth += 1;
227 false
228 }
229 None => {
230 *guard = Some(BatchContext {
231 depth: 1,
232 deferred: Vec::new(),
233 rows_changed: 0,
234 });
235 true
236 }
237 }
238 });
239 Self { is_root }
240 }
241
242 #[must_use]
244 pub fn pending_count(&self) -> usize {
245 BATCH_CTX.with(|ctx| ctx.borrow().as_ref().map_or(0, |b| b.deferred.len()))
246 }
247}
248
249impl Default for BatchScope {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255impl Drop for BatchScope {
256 fn drop(&mut self) {
257 let should_flush = BATCH_CTX.with(|ctx| {
258 let mut guard = ctx.borrow_mut();
259 if let Some(ref mut batch) = *guard {
260 batch.depth -= 1;
261 batch.depth == 0
262 } else {
263 false
264 }
265 });
266
267 if should_flush {
268 flush();
269 BATCH_CTX.with(|ctx| {
271 *ctx.borrow_mut() = None;
272 });
273 }
274 }
275}
276
277impl std::fmt::Debug for BatchScope {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 f.debug_struct("BatchScope")
280 .field("is_root", &self.is_root)
281 .field("pending", &self.pending_count())
282 .finish()
283 }
284}
285
286#[cfg(test)]
291mod tests {
292 use super::*;
293 use crate::reactive::Observable;
294 use std::cell::Cell;
295 use std::cell::RefCell;
296 use std::rc::Rc;
297
298 #[test]
299 fn batch_defers_notifications() {
300 let obs = Observable::new(0);
301 let count = Rc::new(Cell::new(0u32));
302 let count_clone = Rc::clone(&count);
303
304 let _sub = obs.subscribe(move |_| {
305 count_clone.set(count_clone.get() + 1);
306 });
307
308 {
309 let _batch = BatchScope::new();
310 obs.set(1);
311 obs.set(2);
312 obs.set(3);
313 assert_eq!(count.get(), 0);
315 }
316 assert!(count.get() > 0);
318 }
319
320 #[test]
321 fn batch_values_updated_immediately() {
322 let obs = Observable::new(0);
323 {
324 let _batch = BatchScope::new();
325 obs.set(42);
326 assert_eq!(obs.get(), 42);
328 }
329 }
330
331 #[test]
332 fn nested_batch_only_outermost_flushes() {
333 let obs = Observable::new(0);
334 let count = Rc::new(Cell::new(0u32));
335 let count_clone = Rc::clone(&count);
336
337 let _sub = obs.subscribe(move |_| {
338 count_clone.set(count_clone.get() + 1);
339 });
340
341 {
342 let _outer = BatchScope::new();
343 obs.set(1);
344
345 {
346 let _inner = BatchScope::new();
347 obs.set(2);
348 }
350 assert_eq!(count.get(), 0);
351 obs.set(3);
352 }
353 assert!(count.get() > 0);
355 }
356
357 #[test]
358 fn no_batch_fires_immediately() {
359 let obs = Observable::new(0);
360 let count = Rc::new(Cell::new(0u32));
361 let count_clone = Rc::clone(&count);
362
363 let _sub = obs.subscribe(move |_| {
364 count_clone.set(count_clone.get() + 1);
365 });
366
367 obs.set(1);
368 assert_eq!(count.get(), 1);
369
370 obs.set(2);
371 assert_eq!(count.get(), 2);
372 }
373
374 #[test]
375 fn is_batching_flag() {
376 assert!(!is_batching());
377 {
378 let _batch = BatchScope::new();
379 assert!(is_batching());
380 }
381 assert!(!is_batching());
382 }
383
384 #[test]
385 fn pending_count() {
386 let obs = Observable::new(0);
387 let _sub = obs.subscribe(|_| {});
388
389 let batch = BatchScope::new();
390 assert_eq!(batch.pending_count(), 0);
391
392 obs.set(1);
393 assert!(batch.pending_count() > 0);
395 }
396
397 #[test]
398 fn defer_or_run_without_batch() {
399 let ran = Rc::new(Cell::new(false));
400 let ran_clone = Rc::clone(&ran);
401
402 let deferred = defer_or_run(move || ran_clone.set(true));
403 assert!(!deferred);
404 assert!(ran.get());
405 }
406
407 #[test]
408 fn defer_or_run_with_batch() {
409 let ran = Rc::new(Cell::new(false));
410 let ran_clone = Rc::clone(&ran);
411
412 {
413 let _batch = BatchScope::new();
414 let deferred = defer_or_run(move || ran_clone.set(true));
415 assert!(deferred);
416 assert!(!ran.get());
417 }
418 assert!(ran.get());
419 }
420
421 #[test]
422 fn defer_or_run_keyed_coalesces_to_latest_callback() {
423 let value = Rc::new(Cell::new(0u32));
424 let v1 = Rc::clone(&value);
425 let v2 = Rc::clone(&value);
426
427 let batch = BatchScope::new();
428 assert_eq!(batch.pending_count(), 0);
429
430 assert!(defer_or_run_keyed(7, move || v1.set(1)));
431 assert_eq!(batch.pending_count(), 1);
432 assert!(defer_or_run_keyed(7, move || v2.set(2)));
433 assert_eq!(batch.pending_count(), 1, "same key should be coalesced");
434 assert_eq!(value.get(), 0, "callback should remain deferred");
435 drop(batch);
436
437 assert_eq!(value.get(), 2, "latest keyed callback should run");
438 }
439
440 #[test]
441 fn defer_or_run_keyed_preserves_first_enqueue_order() {
442 let order = Rc::new(RefCell::new(Vec::new()));
443 let o1 = Rc::clone(&order);
444 let o2 = Rc::clone(&order);
445 let o3 = Rc::clone(&order);
446
447 {
448 let batch = BatchScope::new();
449 assert!(defer_or_run_keyed(1, move || o1
450 .borrow_mut()
451 .push("first-old")));
452 assert!(defer_or_run_keyed(2, move || o2
453 .borrow_mut()
454 .push("second")));
455 assert!(defer_or_run_keyed(1, move || o3
456 .borrow_mut()
457 .push("first-new")));
458 assert_eq!(batch.pending_count(), 2);
459 }
460
461 assert_eq!(
462 *order.borrow(),
463 vec!["first-new", "second"],
464 "replaced keyed callback should keep its original queue position"
465 );
466 }
467
468 #[test]
469 fn debug_format() {
470 let batch = BatchScope::new();
471 let dbg = format!("{:?}", batch);
472 assert!(dbg.contains("BatchScope"));
473 assert!(dbg.contains("is_root"));
474 drop(batch);
475 }
476
477 #[test]
478 fn multiple_observables_in_batch() {
479 let a = Observable::new(0);
480 let b = Observable::new(0);
481 let a_count = Rc::new(Cell::new(0u32));
482 let b_count = Rc::new(Cell::new(0u32));
483 let a_clone = Rc::clone(&a_count);
484 let b_clone = Rc::clone(&b_count);
485
486 let _sub_a = a.subscribe(move |_| a_clone.set(a_clone.get() + 1));
487 let _sub_b = b.subscribe(move |_| b_clone.set(b_clone.get() + 1));
488
489 {
490 let _batch = BatchScope::new();
491 a.set(1);
492 b.set(2);
493 a.set(3);
494 b.set(4);
495 assert_eq!(a_count.get(), 0);
496 assert_eq!(b_count.get(), 0);
497 }
498 assert!(a_count.get() > 0);
499 assert!(b_count.get() > 0);
500 }
501
502 #[test]
503 fn batch_scope_default_trait() {
504 let batch = BatchScope::default();
505 assert!(is_batching());
506 drop(batch);
507 assert!(!is_batching());
508 }
509
510 #[test]
511 fn triple_nested_batch() {
512 let obs = Observable::new(0);
513 let count = Rc::new(Cell::new(0u32));
514 let count_clone = Rc::clone(&count);
515
516 let _sub = obs.subscribe(move |_| {
517 count_clone.set(count_clone.get() + 1);
518 });
519
520 {
521 let _outer = BatchScope::new();
522 obs.set(1);
523 {
524 let _mid = BatchScope::new();
525 obs.set(2);
526 {
527 let _inner = BatchScope::new();
528 obs.set(3);
529 }
530 assert_eq!(count.get(), 0, "inner drop should not flush");
531 }
532 assert_eq!(count.get(), 0, "mid drop should not flush");
533 }
534 assert!(count.get() > 0, "outer drop should flush");
535 }
536
537 #[test]
538 fn empty_batch_no_panic() {
539 {
540 let _batch = BatchScope::new();
541 }
543 assert!(!is_batching());
544 }
545
546 #[test]
547 fn pending_count_zero_without_subscribers() {
548 let obs = Observable::new(0);
549 let batch = BatchScope::new();
550 obs.set(42);
551 assert_eq!(batch.pending_count(), 0);
553 drop(batch);
554 }
555}