appscale_core/scheduler.rs
1//! Scheduler — coordinates React commits with Rust layout and platform mount.
2//!
3//! The fundamental tension: React batches updates asynchronously (concurrent mode),
4//! but native UI must be updated on the main thread synchronously within a 16ms frame.
5//!
6//! The scheduler resolves this by:
7//! 1. Accepting IR batches from JS at any time
8//! 2. Coalescing multiple batches within a single frame
9//! 3. Running layout on a background thread
10//! 4. Dispatching mount operations to the main thread at vsync
11//!
12//! Priority lanes (matching React's scheduler priorities):
13//! - Immediate: user input responses (touch feedback, text input)
14//! - UserBlocking: discrete interactions (button press, toggle)
15//! - Normal: data fetches, state updates
16//! - Low: prefetching, analytics
17//! - Idle: cleanup, cache warming
18
19use crate::ir::IrBatch;
20use std::collections::VecDeque;
21use std::sync::{Arc, Mutex, Condvar};
22use std::time::{Duration, Instant};
23
24/// Frame budget: 16.67ms for 60fps, 8.33ms for 120fps.
25/// We aim for 60fps by default; platform bridge can override.
26const DEFAULT_FRAME_BUDGET: Duration = Duration::from_micros(16_667);
27
28/// Maximum batches to coalesce per frame before forcing a flush.
29/// Prevents starvation under high update load.
30const MAX_COALESCE_COUNT: usize = 8;
31
32/// Priority lanes for scheduling commits.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
34pub enum Priority {
35 /// User input (touch, keyboard) — process immediately, skip coalescing.
36 Immediate = 0,
37 /// Discrete user interactions — process within current frame.
38 UserBlocking = 1,
39 /// Normal state updates — can be coalesced across frames.
40 Normal = 2,
41 /// Low priority — prefetch, background sync.
42 Low = 3,
43 /// Idle work — only when nothing else is pending.
44 Idle = 4,
45}
46
47/// A scheduled work item: an IR batch with priority and timing metadata.
48#[derive(Debug)]
49struct WorkItem {
50 batch: IrBatch,
51 priority: Priority,
52 enqueued_at: Instant,
53}
54
55/// The scheduler manages the work queue and coordinates frame timing.
56pub struct Scheduler {
57 /// Pending work items, sorted by priority (highest first).
58 queue: Arc<Mutex<VecDeque<WorkItem>>>,
59
60 /// Signal to wake the processing thread when work arrives.
61 notify: Arc<Condvar>,
62
63 /// Frame budget (can be adjusted for 120fps displays).
64 frame_budget: Duration,
65
66 /// Backpressure: if true, Rust is still processing the previous frame.
67 /// JS should coalesce more aggressively.
68 processing: Arc<Mutex<bool>>,
69
70 /// Frame statistics for DevTools.
71 stats: Arc<Mutex<FrameStats>>,
72}
73
74/// Per-frame timing statistics (exposed to DevTools).
75#[derive(Debug, Clone, Default)]
76pub struct FrameStats {
77 pub frame_count: u64,
78 pub last_frame_duration: Duration,
79 pub last_layout_duration: Duration,
80 pub last_mount_duration: Duration,
81 pub batches_coalesced: u32,
82 pub frames_dropped: u32,
83}
84
85impl Scheduler {
86 pub fn new() -> Self {
87 Self {
88 queue: Arc::new(Mutex::new(VecDeque::new())),
89 notify: Arc::new(Condvar::new()),
90 frame_budget: DEFAULT_FRAME_BUDGET,
91 processing: Arc::new(Mutex::new(false)),
92 stats: Arc::new(Mutex::new(FrameStats::default())),
93 }
94 }
95
96 /// Set frame budget (e.g., 8.33ms for 120fps ProMotion displays).
97 pub fn set_frame_budget(&mut self, budget: Duration) {
98 self.frame_budget = budget;
99 }
100
101 /// Enqueue an IR batch for processing.
102 /// Called from JS thread via JSI.
103 pub fn enqueue(&self, batch: IrBatch, priority: Priority) {
104 let item = WorkItem {
105 batch,
106 priority,
107 enqueued_at: Instant::now(),
108 };
109
110 {
111 let mut queue = self.queue.lock().unwrap();
112
113 // Insert in priority order (highest priority = lowest ordinal = front)
114 let pos = queue.iter()
115 .position(|existing| existing.priority > priority)
116 .unwrap_or(queue.len());
117 queue.insert(pos, item);
118 }
119
120 // Wake the processing thread
121 self.notify.notify_one();
122 }
123
124 /// Check if the engine is currently processing a frame.
125 /// JS scheduler uses this for backpressure — if true, coalesce more.
126 pub fn is_processing(&self) -> bool {
127 *self.processing.lock().unwrap()
128 }
129
130 /// Drain pending work items for the current frame.
131 /// Returns batches to process, coalescing multiple Normal/Low priority
132 /// batches into a single processing round.
133 ///
134 /// Rules:
135 /// - Immediate priority: return immediately (one at a time)
136 /// - UserBlocking: return all UserBlocking items in queue
137 /// - Normal/Low: coalesce up to MAX_COALESCE_COUNT
138 /// - Idle: only return if no other priority is pending
139 pub fn drain_frame(&self) -> Vec<IrBatch> {
140 let mut queue = self.queue.lock().unwrap();
141
142 if queue.is_empty() {
143 return vec![];
144 }
145
146 let mut batches = Vec::new();
147
148 // Check highest priority in queue
149 let top_priority = queue.front().map(|w| w.priority).unwrap_or(Priority::Idle);
150
151 match top_priority {
152 Priority::Immediate => {
153 // Process one immediate item right now
154 if let Some(item) = queue.pop_front() {
155 batches.push(item.batch);
156 }
157 }
158 Priority::UserBlocking => {
159 // Drain all user-blocking items
160 while let Some(item) = queue.front() {
161 if item.priority <= Priority::UserBlocking {
162 batches.push(queue.pop_front().unwrap().batch);
163 } else {
164 break;
165 }
166 }
167 }
168 _ => {
169 // Coalesce normal/low/idle items
170 let count = queue.len().min(MAX_COALESCE_COUNT);
171 for _ in 0..count {
172 if let Some(item) = queue.pop_front() {
173 batches.push(item.batch);
174 }
175 }
176 }
177 }
178
179 batches
180 }
181
182 /// Record frame timing (called by Engine after processing).
183 pub fn record_frame(
184 &self,
185 layout_duration: Duration,
186 mount_duration: Duration,
187 batches_processed: u32,
188 ) {
189 let mut stats = self.stats.lock().unwrap();
190 stats.frame_count += 1;
191 stats.last_layout_duration = layout_duration;
192 stats.last_mount_duration = mount_duration;
193 stats.last_frame_duration = layout_duration + mount_duration;
194 stats.batches_coalesced = batches_processed;
195
196 if stats.last_frame_duration > self.frame_budget {
197 stats.frames_dropped += 1;
198 }
199 }
200
201 /// Get current frame statistics (for DevTools).
202 pub fn stats(&self) -> FrameStats {
203 self.stats.lock().unwrap().clone()
204 }
205
206 /// Check if there's pending work.
207 pub fn has_pending_work(&self) -> bool {
208 !self.queue.lock().unwrap().is_empty()
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::ir::IrBatch;
216
217 #[test]
218 fn test_priority_ordering() {
219 let sched = Scheduler::new();
220
221 sched.enqueue(IrBatch::new(1), Priority::Normal);
222 sched.enqueue(IrBatch::new(2), Priority::Immediate);
223 sched.enqueue(IrBatch::new(3), Priority::Low);
224
225 let batches = sched.drain_frame();
226 // Immediate should come first and alone
227 assert_eq!(batches.len(), 1);
228 assert_eq!(batches[0].commit_id, 2);
229
230 // Next drain gets Normal
231 let batches = sched.drain_frame();
232 assert!(batches.iter().any(|b| b.commit_id == 1));
233 }
234
235 #[test]
236 fn test_backpressure() {
237 let sched = Scheduler::new();
238 assert!(!sched.is_processing());
239
240 *sched.processing.lock().unwrap() = true;
241 assert!(sched.is_processing());
242 }
243}