ftui_runtime/
flat_combine.rs1use std::sync::Mutex;
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::thread::ThreadId;
39
40#[derive(Debug, Clone, Default)]
42pub struct CombinerStats {
43 pub combine_passes: u64,
45 pub total_ops: u64,
47 pub max_batch_size: usize,
49 pub contention_events: u64,
51}
52
53impl CombinerStats {
54 pub fn avg_batch_size(&self) -> f64 {
56 if self.combine_passes == 0 {
57 0.0
58 } else {
59 self.total_ops as f64 / self.combine_passes as f64
60 }
61 }
62}
63
64pub struct FlatCombiner<S> {
74 state: Mutex<S>,
76 queue: Mutex<Vec<BoxedOp<S>>>,
78 generation: AtomicU64,
80 stats: Mutex<CombinerStats>,
82 combine_with_owner: Mutex<Option<ThreadId>>,
84}
85
86type BoxedOp<S> = Box<dyn FnOnce(&mut S) + Send>;
87
88struct CombineWithOwnerGuard<'a> {
89 owner: &'a Mutex<Option<ThreadId>>,
90}
91
92impl Drop for CombineWithOwnerGuard<'_> {
93 fn drop(&mut self) {
94 let mut owner = self.owner.lock().unwrap_or_else(|e| e.into_inner());
95 *owner = None;
96 }
97}
98
99impl<'a> CombineWithOwnerGuard<'a> {
100 fn new(owner: &'a Mutex<Option<ThreadId>>) -> Self {
101 let current = std::thread::current().id();
102 let mut owner_guard = owner.lock().unwrap_or_else(|e| e.into_inner());
103 *owner_guard = Some(current);
104 drop(owner_guard);
105 Self { owner }
106 }
107}
108
109impl<S> FlatCombiner<S> {
110 pub fn new(state: S) -> Self {
112 Self {
113 state: Mutex::new(state),
114 queue: Mutex::new(Vec::new()),
115 generation: AtomicU64::new(0),
116 stats: Mutex::new(CombinerStats::default()),
117 combine_with_owner: Mutex::new(None),
118 }
119 }
120
121 fn assert_not_reentrant_from_combine_with(&self, operation: &str) {
122 let current = std::thread::current().id();
123 let owner = self
124 .combine_with_owner
125 .lock()
126 .unwrap_or_else(|e| e.into_inner());
127 if owner
128 .as_ref()
129 .is_some_and(|thread_id| *thread_id == current)
130 {
131 panic!("FlatCombiner::{operation} cannot be called reentrantly from combine_with");
132 }
133 }
134
135 pub fn execute<R>(&self, op: impl FnOnce(&mut S) -> R) -> R {
140 self.assert_not_reentrant_from_combine_with("execute");
141 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
142 op(&mut state)
143 }
144
145 pub fn with_state<R>(&self, f: impl FnOnce(&S) -> R) -> R {
147 self.assert_not_reentrant_from_combine_with("with_state");
148 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
149 f(&state)
150 }
151
152 pub fn submit(&self, op: impl FnOnce(&mut S) + Send + 'static) {
157 let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
158 queue.push(Box::new(op));
159 }
160
161 pub fn submit_batch(&self, ops: impl IntoIterator<Item = BoxedOp<S>>) {
163 let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
164 queue.extend(ops);
165 }
166
167 pub fn combine(&self) -> usize {
174 self.assert_not_reentrant_from_combine_with("combine");
175 let ops: Vec<BoxedOp<S>> = {
177 let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
178 std::mem::take(&mut *queue)
179 };
180
181 if ops.is_empty() {
182 return 0;
183 }
184
185 let count = ops.len();
186
187 {
189 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
190 for op in ops {
191 op(&mut state);
192 }
193 }
194
195 self.generation.fetch_add(1, Ordering::Release);
197 if let Ok(mut stats) = self.stats.lock() {
198 stats.combine_passes += 1;
199 stats.total_ops += count as u64;
200 stats.max_batch_size = stats.max_batch_size.max(count);
201 }
202
203 count
204 }
205
206 pub fn combine_with<R>(&self, around: impl FnOnce(&mut S, &dyn Fn(&mut S)) -> R) -> (usize, R) {
218 self.assert_not_reentrant_from_combine_with("combine_with");
219 let ops: Vec<BoxedOp<S>> = {
220 let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
221 std::mem::take(&mut *queue)
222 };
223
224 let count = ops.len();
225 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
226 let _owner_guard = CombineWithOwnerGuard::new(&self.combine_with_owner);
227
228 let ops_cell = std::cell::RefCell::new(Some(ops));
231 let apply = |s: &mut S| {
232 if let Some(ops) = ops_cell.borrow_mut().take() {
233 for op in ops {
234 op(s);
235 }
236 }
237 };
238
239 let result = around(&mut state, &apply);
240
241 if count > 0 {
242 self.generation.fetch_add(1, Ordering::Release);
243 if let Ok(mut stats) = self.stats.lock() {
244 stats.combine_passes += 1;
245 stats.total_ops += count as u64;
246 stats.max_batch_size = stats.max_batch_size.max(count);
247 }
248 }
249
250 (count, result)
251 }
252
253 pub fn pending_count(&self) -> usize {
255 self.queue.lock().unwrap_or_else(|e| e.into_inner()).len()
256 }
257
258 pub fn generation(&self) -> u64 {
260 self.generation.load(Ordering::Acquire)
261 }
262
263 pub fn stats(&self) -> CombinerStats {
265 self.stats.lock().unwrap_or_else(|e| e.into_inner()).clone()
266 }
267
268 pub fn reset_stats(&self) {
270 if let Ok(mut stats) = self.stats.lock() {
271 *stats = CombinerStats::default();
272 }
273 }
274}
275
276impl<S: std::fmt::Debug> std::fmt::Debug for FlatCombiner<S> {
280 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281 let pending = self.pending_count();
282 let current_gen = self.generation();
283 f.debug_struct("FlatCombiner")
284 .field("pending", &pending)
285 .field("generation", ¤t_gen)
286 .finish_non_exhaustive()
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use std::sync::Arc;
294
295 #[test]
296 fn new_creates_empty_combiner() {
297 let fc = FlatCombiner::new(0u64);
298 assert_eq!(fc.pending_count(), 0);
299 assert_eq!(fc.generation(), 0);
300 assert_eq!(fc.stats().combine_passes, 0);
301 }
302
303 #[test]
304 fn execute_applies_directly() {
305 let fc = FlatCombiner::new(10u64);
306 let result = fc.execute(|s| {
307 *s += 5;
308 *s
309 });
310 assert_eq!(result, 15);
311 }
312
313 #[test]
314 fn with_state_reads_without_mutation() {
315 let fc = FlatCombiner::new(vec![1, 2, 3]);
316 let len = fc.with_state(|s| s.len());
317 assert_eq!(len, 3);
318 }
319
320 #[test]
321 fn submit_queues_operations() {
322 let fc = FlatCombiner::new(0u64);
323 fc.submit(|s| *s += 1);
324 fc.submit(|s| *s += 2);
325 assert_eq!(fc.pending_count(), 2);
326
327 let val = fc.with_state(|s| *s);
329 assert_eq!(val, 0);
330 }
331
332 #[test]
333 fn combine_drains_and_applies() {
334 let fc = FlatCombiner::new(0u64);
335 fc.submit(|s| *s += 10);
336 fc.submit(|s| *s += 20);
337 fc.submit(|s| *s += 30);
338
339 let count = fc.combine();
340 assert_eq!(count, 3);
341 assert_eq!(fc.pending_count(), 0);
342
343 let val = fc.with_state(|s| *s);
344 assert_eq!(val, 60);
345 }
346
347 #[test]
348 fn combine_empty_returns_zero() {
349 let fc = FlatCombiner::new(0u64);
350 assert_eq!(fc.combine(), 0);
351 assert_eq!(fc.generation(), 0);
352 }
353
354 #[test]
355 fn combine_increments_generation() {
356 let fc = FlatCombiner::new(0u64);
357 assert_eq!(fc.generation(), 0);
358
359 fc.submit(|s| *s += 1);
360 fc.combine();
361 assert_eq!(fc.generation(), 1);
362
363 fc.submit(|s| *s += 1);
364 fc.combine();
365 assert_eq!(fc.generation(), 2);
366 }
367
368 #[test]
369 fn stats_track_batches() {
370 let fc = FlatCombiner::new(0u64);
371
372 fc.submit(|s| *s += 1);
374 fc.submit(|s| *s += 1);
375 fc.submit(|s| *s += 1);
376 fc.combine();
377
378 fc.submit(|s| *s += 1);
380 fc.combine();
381
382 let stats = fc.stats();
383 assert_eq!(stats.combine_passes, 2);
384 assert_eq!(stats.total_ops, 4);
385 assert_eq!(stats.max_batch_size, 3);
386 assert!((stats.avg_batch_size() - 2.0).abs() < f64::EPSILON);
387 }
388
389 #[test]
390 fn reset_stats_clears_counters() {
391 let fc = FlatCombiner::new(0u64);
392 fc.submit(|s| *s += 1);
393 fc.combine();
394 assert_eq!(fc.stats().combine_passes, 1);
395
396 fc.reset_stats();
397 let stats = fc.stats();
398 assert_eq!(stats.combine_passes, 0);
399 assert_eq!(stats.total_ops, 0);
400 }
401
402 #[test]
403 fn operations_execute_in_order() {
404 let fc = FlatCombiner::new(Vec::<u32>::new());
405 fc.submit(|s| s.push(1));
406 fc.submit(|s| s.push(2));
407 fc.submit(|s| s.push(3));
408 fc.combine();
409
410 let values = fc.with_state(|s| s.clone());
411 assert_eq!(values, vec![1, 2, 3]);
412 }
413
414 #[test]
415 fn submit_batch_adds_multiple() {
416 let fc = FlatCombiner::new(0u64);
417 let ops: Vec<BoxedOp<u64>> = vec![
418 Box::new(|s: &mut u64| *s += 10),
419 Box::new(|s: &mut u64| *s += 20),
420 ];
421 fc.submit_batch(ops);
422 assert_eq!(fc.pending_count(), 2);
423 fc.combine();
424 assert_eq!(fc.with_state(|s| *s), 30);
425 }
426
427 #[test]
428 fn combine_with_wraps_batch() {
429 let fc = FlatCombiner::new(Vec::<String>::new());
430 fc.submit(|s| s.push("a".into()));
431 fc.submit(|s| s.push("b".into()));
432
433 let (count, len_before) = fc.combine_with(|state, apply| {
434 let before = state.len();
435 apply(state);
436 before
437 });
438
439 assert_eq!(count, 2);
440 assert_eq!(len_before, 0);
441 assert_eq!(fc.with_state(|s| s.len()), 2);
442 }
443
444 #[test]
445 fn multiple_combine_passes() {
446 let fc = FlatCombiner::new(0u64);
447
448 for i in 0..10 {
449 fc.submit(move |s| *s += i);
450 }
451 fc.combine();
452 assert_eq!(fc.with_state(|s| *s), 45); for i in 0..5 {
455 fc.submit(move |s| *s += i);
456 }
457 fc.combine();
458 assert_eq!(fc.with_state(|s| *s), 55); }
460
461 #[test]
462 fn debug_impl() {
463 let fc = FlatCombiner::new(42u64);
464 let debug = format!("{fc:?}");
465 assert!(debug.contains("FlatCombiner"));
466 assert!(debug.contains("pending"));
467 assert!(debug.contains("generation"));
468 }
469
470 #[test]
471 fn concurrent_submit_and_combine() {
472 let fc = Arc::new(FlatCombiner::new(0u64));
473
474 let handles: Vec<_> = (0..8)
476 .map(|_| {
477 let fc = Arc::clone(&fc);
478 std::thread::spawn(move || {
479 for _ in 0..100 {
480 fc.submit(|s| *s += 1);
481 }
482 })
483 })
484 .collect();
485
486 for h in handles {
488 h.join().unwrap();
489 }
490
491 let mut total = 0;
493 loop {
494 let count = fc.combine();
495 if count == 0 {
496 break;
497 }
498 total += count;
499 }
500
501 assert_eq!(total, 800);
502 assert_eq!(fc.with_state(|s| *s), 800);
503 }
504
505 #[test]
506 fn concurrent_submit_and_combine_interleaved() {
507 let fc = Arc::new(FlatCombiner::new(0u64));
508
509 let submit_handles: Vec<_> = (0..4)
511 .map(|_| {
512 let fc = Arc::clone(&fc);
513 std::thread::spawn(move || {
514 for _ in 0..100 {
515 fc.submit(|s| *s += 1);
516 std::thread::yield_now();
517 }
518 })
519 })
520 .collect();
521
522 let fc_c = Arc::clone(&fc);
524 let combiner = std::thread::spawn(move || {
525 let mut total = 0;
526 for _ in 0..500 {
527 total += fc_c.combine();
528 std::thread::yield_now();
529 }
530 total
531 });
532
533 for h in submit_handles {
534 h.join().unwrap();
535 }
536
537 let combined_during = combiner.join().unwrap();
539 let remaining = fc.combine();
540 let final_val = fc.with_state(|s| *s);
541
542 assert_eq!(
543 final_val,
544 (combined_during + remaining) as u64,
545 "total combined ({} + {}) should match state ({})",
546 combined_during,
547 remaining,
548 final_val
549 );
550 assert_eq!(final_val, 400);
551 }
552
553 #[test]
554 fn poison_recovery() {
555 let fc = FlatCombiner::new(0u64);
556 fc.submit(|s| *s += 1);
558 fc.combine();
559 assert_eq!(fc.with_state(|s| *s), 1);
560 }
561
562 #[test]
563 fn avg_batch_size_zero_when_no_combines() {
564 let stats = CombinerStats::default();
565 assert_eq!(stats.avg_batch_size(), 0.0);
566 }
567
568 #[test]
569 fn combine_with_panics_on_reentrant_execute_instead_of_deadlocking() {
570 let fc = FlatCombiner::new(0u64);
571
572 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
573 let _ = fc.combine_with(|_, _| fc.execute(|state| *state));
574 }));
575
576 assert!(result.is_err());
577 }
578
579 #[test]
580 fn combine_with_panics_on_reentrant_with_state_instead_of_deadlocking() {
581 let fc = FlatCombiner::new(7u64);
582
583 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
584 let _ = fc.combine_with(|_, _| fc.with_state(|state| *state));
585 }));
586
587 assert!(result.is_err());
588 }
589}