Skip to main content

ftui_runtime/
flat_combine.rs

1//! Flat combining for batched operation dispatch under contention.
2//!
3//! When multiple event sources (timers, background tasks, input) post
4//! operations concurrently, flat combining batches them into a single
5//! pass. One thread becomes the "combiner" and executes ALL pending
6//! operations while holding the state lock, keeping data hot in L1
7//! cache and reducing lock acquisition overhead.
8//!
9//! # When to Use
10//!
11//! Use flat combining instead of a bare `Mutex` when:
12//! - Multiple threads/tasks post operations to shared state
13//! - Operations are short (the combiner shouldn't hold the lock too long)
14//! - Batching is beneficial (e.g., coalescing events, reducing redraws)
15//!
16//! # Example
17//!
18//! ```
19//! use ftui_runtime::flat_combine::FlatCombiner;
20//!
21//! let combiner = FlatCombiner::new(Vec::<String>::new());
22//!
23//! // Submit operations (from any thread)
24//! combiner.submit(|state| state.push("event-a".into()));
25//! combiner.submit(|state| state.push("event-b".into()));
26//!
27//! // Combiner drains and applies all pending ops in one pass
28//! let count = combiner.combine();
29//! assert_eq!(count, 2);
30//!
31//! // Direct execution when no contention
32//! let len = combiner.execute(|state| state.len());
33//! assert_eq!(len, 2);
34//! ```
35
36use std::sync::Mutex;
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::thread::ThreadId;
39
40/// Statistics for monitoring flat combining performance.
41#[derive(Debug, Clone, Default)]
42pub struct CombinerStats {
43    /// Number of combine passes executed.
44    pub combine_passes: u64,
45    /// Total operations processed across all passes.
46    pub total_ops: u64,
47    /// Maximum batch size seen in a single pass.
48    pub max_batch_size: usize,
49    /// Number of times a submitter found the queue locked (contention signal).
50    pub contention_events: u64,
51}
52
53impl CombinerStats {
54    /// Average batch size across all combine passes.
55    pub fn avg_batch_size(&self) -> f64 {
56        if self.combine_passes == 0 {
57            0.0
58        } else {
59            self.total_ops as f64 / self.combine_passes as f64
60        }
61    }
62}
63
64/// Flat combining dispatcher for batched operation execution.
65///
66/// Wraps shared mutable state with a two-level locking strategy:
67/// 1. A publication queue (`queue`) where threads post operations
68/// 2. The shared state (`state`) where operations are executed
69///
70/// The combiner thread locks the state once, drains the queue, and
71/// executes all operations in sequence — keeping the hot data in cache
72/// and minimizing lock handoffs.
73pub struct FlatCombiner<S> {
74    /// Protected shared state.
75    state: Mutex<S>,
76    /// Publication queue for pending operations.
77    queue: Mutex<Vec<BoxedOp<S>>>,
78    /// Monotonic generation counter (incremented after each combine pass).
79    generation: AtomicU64,
80    /// Performance statistics.
81    stats: Mutex<CombinerStats>,
82    /// Owner thread when `combine_with` is actively running a user callback.
83    combine_with_owner: Mutex<Option<ThreadId>>,
84}
85
86type BoxedOp<S> = Box<dyn FnOnce(&mut S) + Send>;
87
88struct CombineWithOwnerGuard<'a> {
89    owner: &'a Mutex<Option<ThreadId>>,
90}
91
92impl Drop for CombineWithOwnerGuard<'_> {
93    fn drop(&mut self) {
94        let mut owner = self.owner.lock().unwrap_or_else(|e| e.into_inner());
95        *owner = None;
96    }
97}
98
99impl<'a> CombineWithOwnerGuard<'a> {
100    fn new(owner: &'a Mutex<Option<ThreadId>>) -> Self {
101        let current = std::thread::current().id();
102        let mut owner_guard = owner.lock().unwrap_or_else(|e| e.into_inner());
103        *owner_guard = Some(current);
104        drop(owner_guard);
105        Self { owner }
106    }
107}
108
109impl<S> FlatCombiner<S> {
110    /// Create a new flat combiner wrapping the given shared state.
111    pub fn new(state: S) -> Self {
112        Self {
113            state: Mutex::new(state),
114            queue: Mutex::new(Vec::new()),
115            generation: AtomicU64::new(0),
116            stats: Mutex::new(CombinerStats::default()),
117            combine_with_owner: Mutex::new(None),
118        }
119    }
120
121    fn assert_not_reentrant_from_combine_with(&self, operation: &str) {
122        let current = std::thread::current().id();
123        let owner = self
124            .combine_with_owner
125            .lock()
126            .unwrap_or_else(|e| e.into_inner());
127        if owner
128            .as_ref()
129            .is_some_and(|thread_id| *thread_id == current)
130        {
131            panic!("FlatCombiner::{operation} cannot be called reentrantly from combine_with");
132        }
133    }
134
135    /// Execute a single operation directly on the shared state.
136    ///
137    /// Bypasses the publication queue. Use this when you need a return
138    /// value or when contention is not expected.
139    pub fn execute<R>(&self, op: impl FnOnce(&mut S) -> R) -> R {
140        self.assert_not_reentrant_from_combine_with("execute");
141        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
142        op(&mut state)
143    }
144
145    /// Read from the shared state without mutation.
146    pub fn with_state<R>(&self, f: impl FnOnce(&S) -> R) -> R {
147        self.assert_not_reentrant_from_combine_with("with_state");
148        let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
149        f(&state)
150    }
151
152    /// Submit an operation to the publication queue for batched execution.
153    ///
154    /// The operation will be executed during the next [`combine`](Self::combine)
155    /// call. Operations are executed in submission order within each batch.
156    pub fn submit(&self, op: impl FnOnce(&mut S) + Send + 'static) {
157        let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
158        queue.push(Box::new(op));
159    }
160
161    /// Submit multiple operations at once (avoids repeated lock acquisitions).
162    pub fn submit_batch(&self, ops: impl IntoIterator<Item = BoxedOp<S>>) {
163        let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
164        queue.extend(ops);
165    }
166
167    /// Drain all pending operations and execute them as a single batch.
168    ///
169    /// The combiner holds the state lock for the entire batch, keeping
170    /// the data hot in L1 cache. Returns the number of operations executed.
171    ///
172    /// Returns 0 if no operations are pending.
173    pub fn combine(&self) -> usize {
174        self.assert_not_reentrant_from_combine_with("combine");
175        // Drain the queue (short lock)
176        let ops: Vec<BoxedOp<S>> = {
177            let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
178            std::mem::take(&mut *queue)
179        };
180
181        if ops.is_empty() {
182            return 0;
183        }
184
185        let count = ops.len();
186
187        // Execute all operations (holds state lock for entire batch)
188        {
189            let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
190            for op in ops {
191                op(&mut state);
192            }
193        }
194
195        // Update stats and generation
196        self.generation.fetch_add(1, Ordering::Release);
197        if let Ok(mut stats) = self.stats.lock() {
198            stats.combine_passes += 1;
199            stats.total_ops += count as u64;
200            stats.max_batch_size = stats.max_batch_size.max(count);
201        }
202
203        count
204    }
205
206    /// Combine with a pre/post hook for additional work during the batch.
207    ///
208    /// The `around` function receives a mutable reference to the state
209    /// and a closure that executes all pending operations. This allows
210    /// wrapping the batch with setup/teardown logic (e.g., marking a
211    /// dirty flag, snapshotting state).
212    ///
213    /// Reentrant calls back into [`execute`](Self::execute),
214    /// [`with_state`](Self::with_state), [`combine`](Self::combine), or
215    /// [`combine_with`](Self::combine_with) from inside `around` are rejected
216    /// with a panic instead of deadlocking on the state mutex.
217    pub fn combine_with<R>(&self, around: impl FnOnce(&mut S, &dyn Fn(&mut S)) -> R) -> (usize, R) {
218        self.assert_not_reentrant_from_combine_with("combine_with");
219        let ops: Vec<BoxedOp<S>> = {
220            let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
221            std::mem::take(&mut *queue)
222        };
223
224        let count = ops.len();
225        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
226        let _owner_guard = CombineWithOwnerGuard::new(&self.combine_with_owner);
227
228        // We need to move ops into the closure, but the Fn trait requires
229        // shared reference. Use a Cell-like approach with RefCell.
230        let ops_cell = std::cell::RefCell::new(Some(ops));
231        let apply = |s: &mut S| {
232            if let Some(ops) = ops_cell.borrow_mut().take() {
233                for op in ops {
234                    op(s);
235                }
236            }
237        };
238
239        let result = around(&mut state, &apply);
240
241        if count > 0 {
242            self.generation.fetch_add(1, Ordering::Release);
243            if let Ok(mut stats) = self.stats.lock() {
244                stats.combine_passes += 1;
245                stats.total_ops += count as u64;
246                stats.max_batch_size = stats.max_batch_size.max(count);
247            }
248        }
249
250        (count, result)
251    }
252
253    /// Number of operations currently in the publication queue.
254    pub fn pending_count(&self) -> usize {
255        self.queue.lock().unwrap_or_else(|e| e.into_inner()).len()
256    }
257
258    /// Current generation counter. Incremented after each combine pass.
259    pub fn generation(&self) -> u64 {
260        self.generation.load(Ordering::Acquire)
261    }
262
263    /// Get a snapshot of current performance statistics.
264    pub fn stats(&self) -> CombinerStats {
265        self.stats.lock().unwrap_or_else(|e| e.into_inner()).clone()
266    }
267
268    /// Reset statistics counters.
269    pub fn reset_stats(&self) {
270        if let Ok(mut stats) = self.stats.lock() {
271            *stats = CombinerStats::default();
272        }
273    }
274}
275
276// FlatCombiner is Send + Sync if S is Send (the Mutex handles the synchronization)
277// This is automatically derived by the compiler since all fields are Send + Sync.
278
279impl<S: std::fmt::Debug> std::fmt::Debug for FlatCombiner<S> {
280    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281        let pending = self.pending_count();
282        let current_gen = self.generation();
283        f.debug_struct("FlatCombiner")
284            .field("pending", &pending)
285            .field("generation", &current_gen)
286            .finish_non_exhaustive()
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use std::sync::Arc;
294
295    #[test]
296    fn new_creates_empty_combiner() {
297        let fc = FlatCombiner::new(0u64);
298        assert_eq!(fc.pending_count(), 0);
299        assert_eq!(fc.generation(), 0);
300        assert_eq!(fc.stats().combine_passes, 0);
301    }
302
303    #[test]
304    fn execute_applies_directly() {
305        let fc = FlatCombiner::new(10u64);
306        let result = fc.execute(|s| {
307            *s += 5;
308            *s
309        });
310        assert_eq!(result, 15);
311    }
312
313    #[test]
314    fn with_state_reads_without_mutation() {
315        let fc = FlatCombiner::new(vec![1, 2, 3]);
316        let len = fc.with_state(|s| s.len());
317        assert_eq!(len, 3);
318    }
319
320    #[test]
321    fn submit_queues_operations() {
322        let fc = FlatCombiner::new(0u64);
323        fc.submit(|s| *s += 1);
324        fc.submit(|s| *s += 2);
325        assert_eq!(fc.pending_count(), 2);
326
327        // State not yet modified
328        let val = fc.with_state(|s| *s);
329        assert_eq!(val, 0);
330    }
331
332    #[test]
333    fn combine_drains_and_applies() {
334        let fc = FlatCombiner::new(0u64);
335        fc.submit(|s| *s += 10);
336        fc.submit(|s| *s += 20);
337        fc.submit(|s| *s += 30);
338
339        let count = fc.combine();
340        assert_eq!(count, 3);
341        assert_eq!(fc.pending_count(), 0);
342
343        let val = fc.with_state(|s| *s);
344        assert_eq!(val, 60);
345    }
346
347    #[test]
348    fn combine_empty_returns_zero() {
349        let fc = FlatCombiner::new(0u64);
350        assert_eq!(fc.combine(), 0);
351        assert_eq!(fc.generation(), 0);
352    }
353
354    #[test]
355    fn combine_increments_generation() {
356        let fc = FlatCombiner::new(0u64);
357        assert_eq!(fc.generation(), 0);
358
359        fc.submit(|s| *s += 1);
360        fc.combine();
361        assert_eq!(fc.generation(), 1);
362
363        fc.submit(|s| *s += 1);
364        fc.combine();
365        assert_eq!(fc.generation(), 2);
366    }
367
368    #[test]
369    fn stats_track_batches() {
370        let fc = FlatCombiner::new(0u64);
371
372        // Batch 1: 3 ops
373        fc.submit(|s| *s += 1);
374        fc.submit(|s| *s += 1);
375        fc.submit(|s| *s += 1);
376        fc.combine();
377
378        // Batch 2: 1 op
379        fc.submit(|s| *s += 1);
380        fc.combine();
381
382        let stats = fc.stats();
383        assert_eq!(stats.combine_passes, 2);
384        assert_eq!(stats.total_ops, 4);
385        assert_eq!(stats.max_batch_size, 3);
386        assert!((stats.avg_batch_size() - 2.0).abs() < f64::EPSILON);
387    }
388
389    #[test]
390    fn reset_stats_clears_counters() {
391        let fc = FlatCombiner::new(0u64);
392        fc.submit(|s| *s += 1);
393        fc.combine();
394        assert_eq!(fc.stats().combine_passes, 1);
395
396        fc.reset_stats();
397        let stats = fc.stats();
398        assert_eq!(stats.combine_passes, 0);
399        assert_eq!(stats.total_ops, 0);
400    }
401
402    #[test]
403    fn operations_execute_in_order() {
404        let fc = FlatCombiner::new(Vec::<u32>::new());
405        fc.submit(|s| s.push(1));
406        fc.submit(|s| s.push(2));
407        fc.submit(|s| s.push(3));
408        fc.combine();
409
410        let values = fc.with_state(|s| s.clone());
411        assert_eq!(values, vec![1, 2, 3]);
412    }
413
414    #[test]
415    fn submit_batch_adds_multiple() {
416        let fc = FlatCombiner::new(0u64);
417        let ops: Vec<BoxedOp<u64>> = vec![
418            Box::new(|s: &mut u64| *s += 10),
419            Box::new(|s: &mut u64| *s += 20),
420        ];
421        fc.submit_batch(ops);
422        assert_eq!(fc.pending_count(), 2);
423        fc.combine();
424        assert_eq!(fc.with_state(|s| *s), 30);
425    }
426
427    #[test]
428    fn combine_with_wraps_batch() {
429        let fc = FlatCombiner::new(Vec::<String>::new());
430        fc.submit(|s| s.push("a".into()));
431        fc.submit(|s| s.push("b".into()));
432
433        let (count, len_before) = fc.combine_with(|state, apply| {
434            let before = state.len();
435            apply(state);
436            before
437        });
438
439        assert_eq!(count, 2);
440        assert_eq!(len_before, 0);
441        assert_eq!(fc.with_state(|s| s.len()), 2);
442    }
443
444    #[test]
445    fn multiple_combine_passes() {
446        let fc = FlatCombiner::new(0u64);
447
448        for i in 0..10 {
449            fc.submit(move |s| *s += i);
450        }
451        fc.combine();
452        assert_eq!(fc.with_state(|s| *s), 45); // sum 0..10
453
454        for i in 0..5 {
455            fc.submit(move |s| *s += i);
456        }
457        fc.combine();
458        assert_eq!(fc.with_state(|s| *s), 55); // 45 + sum 0..5
459    }
460
461    #[test]
462    fn debug_impl() {
463        let fc = FlatCombiner::new(42u64);
464        let debug = format!("{fc:?}");
465        assert!(debug.contains("FlatCombiner"));
466        assert!(debug.contains("pending"));
467        assert!(debug.contains("generation"));
468    }
469
470    #[test]
471    fn concurrent_submit_and_combine() {
472        let fc = Arc::new(FlatCombiner::new(0u64));
473
474        // Spawn threads that submit operations
475        let handles: Vec<_> = (0..8)
476            .map(|_| {
477                let fc = Arc::clone(&fc);
478                std::thread::spawn(move || {
479                    for _ in 0..100 {
480                        fc.submit(|s| *s += 1);
481                    }
482                })
483            })
484            .collect();
485
486        // Wait for all submitters
487        for h in handles {
488            h.join().unwrap();
489        }
490
491        // Combine all pending operations
492        let mut total = 0;
493        loop {
494            let count = fc.combine();
495            if count == 0 {
496                break;
497            }
498            total += count;
499        }
500
501        assert_eq!(total, 800);
502        assert_eq!(fc.with_state(|s| *s), 800);
503    }
504
505    #[test]
506    fn concurrent_submit_and_combine_interleaved() {
507        let fc = Arc::new(FlatCombiner::new(0u64));
508
509        // Submitter threads
510        let submit_handles: Vec<_> = (0..4)
511            .map(|_| {
512                let fc = Arc::clone(&fc);
513                std::thread::spawn(move || {
514                    for _ in 0..100 {
515                        fc.submit(|s| *s += 1);
516                        std::thread::yield_now();
517                    }
518                })
519            })
520            .collect();
521
522        // Combiner thread
523        let fc_c = Arc::clone(&fc);
524        let combiner = std::thread::spawn(move || {
525            let mut total = 0;
526            for _ in 0..500 {
527                total += fc_c.combine();
528                std::thread::yield_now();
529            }
530            total
531        });
532
533        for h in submit_handles {
534            h.join().unwrap();
535        }
536
537        // Drain remaining
538        let combined_during = combiner.join().unwrap();
539        let remaining = fc.combine();
540        let final_val = fc.with_state(|s| *s);
541
542        assert_eq!(
543            final_val,
544            (combined_during + remaining) as u64,
545            "total combined ({} + {}) should match state ({})",
546            combined_during,
547            remaining,
548            final_val
549        );
550        assert_eq!(final_val, 400);
551    }
552
553    #[test]
554    fn poison_recovery() {
555        let fc = FlatCombiner::new(0u64);
556        // Even after a panic in an operation, the combiner should recover
557        fc.submit(|s| *s += 1);
558        fc.combine();
559        assert_eq!(fc.with_state(|s| *s), 1);
560    }
561
562    #[test]
563    fn avg_batch_size_zero_when_no_combines() {
564        let stats = CombinerStats::default();
565        assert_eq!(stats.avg_batch_size(), 0.0);
566    }
567
568    #[test]
569    fn combine_with_panics_on_reentrant_execute_instead_of_deadlocking() {
570        let fc = FlatCombiner::new(0u64);
571
572        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
573            let _ = fc.combine_with(|_, _| fc.execute(|state| *state));
574        }));
575
576        assert!(result.is_err());
577    }
578
579    #[test]
580    fn combine_with_panics_on_reentrant_with_state_instead_of_deadlocking() {
581        let fc = FlatCombiner::new(7u64);
582
583        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
584            let _ = fc.combine_with(|_, _| fc.with_state(|state| *state));
585        }));
586
587        assert!(result.is_err());
588    }
589}