Skip to main content

telltale_machine/scheduler/
types.rs

1// Policy-based coroutine scheduler.
2//
3// All policies produce observably equivalent results per the
4// `schedule_confluence` theorem in `lean/Runtime/Proofs/SchedulerApi.lean`.
5
6/// Scheduler lane identifier.
7pub type LaneId = usize;
8
9fn default_timeslice() -> usize {
10    1
11}
12
13/// Priority policy family for serialization-safe prioritized scheduling.
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub enum PriorityPolicy {
16    /// Fixed static priorities keyed by coroutine id.
17    FixedMap(BTreeMap<usize, usize>),
18    /// Favor older entries in the ready queue.
19    Aging,
20    /// Favor coroutines with progress tokens.
21    TokenWeighted,
22}
23
24/// Scheduling policy.
25///
26/// All policies are observationally equivalent per the `schedule_confluence`
27/// theorem. `Cooperative` is the WASM-compatible single-threaded policy,
28/// justified by `cooperative_refines_concurrent`.
29#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
30pub enum SchedPolicy {
31    /// Single-threaded round-robin with explicit yield. WASM-compatible.
32    #[default]
33    Cooperative,
34    /// Basic multi-coroutine round-robin.
35    RoundRobin,
36    /// Priority scheduling without function pointers.
37    Priority(PriorityPolicy),
38    /// Prefer coroutines holding progress tokens (starvation freedom).
39    ProgressAware,
40}
41
42/// Step outcome used by scheduler bookkeeping.
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum StepUpdate {
45    /// Coroutine should remain runnable.
46    Ready,
47    /// Coroutine yielded and remains runnable.
48    Yielded,
49    /// Coroutine blocked for a reason.
50    Blocked(BlockReason),
51    /// Coroutine is done/faulted and removed from queues.
52    Done,
53}
54
55/// Cached readiness eligibility for scheduler-side filtered ready queues.
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57pub enum ReadyEligibility {
58    /// Coroutine is both ready and eligible for scheduling.
59    Eligible,
60    /// Coroutine should be removed from the eligible-ready cache.
61    Ineligible,
62}
63
64/// Cross-lane capability-transfer record.
65#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66pub struct CrossLaneHandoff {
67    /// Source coroutine id.
68    pub from_coro: usize,
69    /// Destination coroutine id.
70    pub to_coro: usize,
71    /// Source lane.
72    pub from_lane: LaneId,
73    /// Destination lane.
74    pub to_lane: LaneId,
75    /// Scheduler step where the handoff was emitted.
76    pub step: usize,
77    /// Free-form reason tag.
78    pub reason: String,
79}
80
81/// Scheduler state.
82#[derive(Debug, Serialize, Deserialize)]
83pub struct Scheduler {
84    policy: SchedPolicy,
85    ready_queue: VecDeque<usize>,
86    #[serde(default)]
87    ready_set: BTreeSet<usize>,
88    blocked_set: BTreeMap<usize, BlockReason>,
89    #[serde(default)]
90    lane_of: BTreeMap<usize, LaneId>,
91    #[serde(default)]
92    lane_queues: BTreeMap<LaneId, VecDeque<usize>>,
93    #[serde(default)]
94    lane_order: Vec<LaneId>,
95    #[serde(default)]
96    lane_cursor: usize,
97    #[serde(default)]
98    lane_ready_set: BTreeMap<LaneId, BTreeSet<usize>>,
99    #[serde(default)]
100    lane_eligible_queues: BTreeMap<LaneId, VecDeque<usize>>,
101    #[serde(default)]
102    lane_eligible_set: BTreeMap<LaneId, BTreeSet<usize>>,
103    #[serde(default)]
104    lane_blocked: BTreeMap<LaneId, BTreeMap<usize, BlockReason>>,
105    #[serde(default)]
106    cross_lane_handoffs: Vec<CrossLaneHandoff>,
107    #[serde(default = "default_timeslice")]
108    timeslice: usize,
109    step_count: usize,
110}
111
112/// Lean-aligned scheduler state alias.
113pub type SchedState = Scheduler;
114
115impl Scheduler {
116    /// Create a scheduler with the given policy.
117    #[must_use]
118    pub fn new(policy: SchedPolicy) -> Self {
119        let mut lane_queues = BTreeMap::new();
120        lane_queues.insert(0, VecDeque::new());
121        let mut lane_blocked = BTreeMap::new();
122        lane_blocked.insert(0, BTreeMap::new());
123        Self {
124            policy,
125            ready_queue: VecDeque::new(),
126            ready_set: BTreeSet::new(),
127            blocked_set: BTreeMap::new(),
128            lane_of: BTreeMap::new(),
129            lane_queues,
130            lane_order: vec![0],
131            lane_cursor: 0,
132            lane_ready_set: BTreeMap::new(),
133            lane_eligible_queues: BTreeMap::new(),
134            lane_eligible_set: BTreeMap::new(),
135            lane_blocked,
136            cross_lane_handoffs: Vec::new(),
137            timeslice: default_timeslice(),
138            step_count: 0,
139        }
140    }
141
142    fn register_lane(&mut self, lane: LaneId) {
143        self.lane_queues.entry(lane).or_default();
144        self.lane_ready_set.entry(lane).or_default();
145        self.lane_eligible_queues.entry(lane).or_default();
146        self.lane_eligible_set.entry(lane).or_default();
147        self.lane_blocked.entry(lane).or_default();
148        if let Err(pos) = self.lane_order.binary_search(&lane) {
149            self.lane_order.insert(pos, lane);
150        }
151    }
152
153    fn lane_for_or_default(&self, coro_id: usize) -> LaneId {
154        self.lane_of.get(&coro_id).copied().unwrap_or(0)
155    }
156
157    fn lane_queue_push(&mut self, lane: LaneId, coro_id: usize) {
158        self.register_lane(lane);
159        let ready = self.lane_ready_set.entry(lane).or_default();
160        if ready.insert(coro_id) {
161            self.lane_queues.entry(lane).or_default().push_back(coro_id);
162        }
163    }
164
165    fn lane_queue_remove(&mut self, lane: LaneId, coro_id: usize) {
166        if let Some(ready) = self.lane_ready_set.get_mut(&lane) {
167            ready.remove(&coro_id);
168        }
169    }
170
171    fn lane_eligible_queue_push(&mut self, lane: LaneId, coro_id: usize) {
172        self.register_lane(lane);
173        let eligible = self.lane_eligible_set.entry(lane).or_default();
174        if eligible.insert(coro_id) {
175            self.lane_eligible_queues
176                .entry(lane)
177                .or_default()
178                .push_back(coro_id);
179        }
180    }
181
182    fn lane_eligible_queue_remove(&mut self, lane: LaneId, coro_id: usize) {
183        if let Some(eligible) = self.lane_eligible_set.get_mut(&lane) {
184            eligible.remove(&coro_id);
185        }
186    }
187
188    fn lane_queue_pop_front(&mut self, lane: LaneId) -> Option<usize> {
189        loop {
190            // bounded: queue drains until empty or ready element found
191            let coro_id = self
192                .lane_queues
193                .get_mut(&lane)
194                .and_then(VecDeque::pop_front)?;
195            if self
196                .lane_ready_set
197                .get_mut(&lane)
198                .is_some_and(|ready| ready.remove(&coro_id))
199            {
200                return Some(coro_id);
201            }
202        }
203    }
204
205    fn lane_eligible_queue_pop_front(&mut self, lane: LaneId) -> Option<usize> {
206        loop {
207            // bounded: queue drains until empty or eligible element found
208            let coro_id = self
209                .lane_eligible_queues
210                .get_mut(&lane)
211                .and_then(VecDeque::pop_front)?;
212            if self
213                .lane_eligible_set
214                .get_mut(&lane)
215                .is_some_and(|eligible| eligible.remove(&coro_id))
216            {
217                return Some(coro_id);
218            }
219        }
220    }
221
222    fn remove_from_global_ready(&mut self, coro_id: usize) {
223        self.ready_set.remove(&coro_id);
224        if let Some(position) = self.ready_queue.iter().position(|queued| *queued == coro_id) {
225            let _ = self.ready_queue.remove(position);
226        }
227    }
228
229    fn next_lane_with_ready(&mut self) -> Option<LaneId> {
230        if self.lane_order.is_empty() {
231            self.lane_order = self.lane_queues.keys().copied().collect();
232        }
233        if self.lane_order.is_empty() {
234            return None;
235        }
236        let lane_count = self.lane_order.len();
237        let start = self.lane_cursor % lane_count;
238        for offset in 0..lane_count {
239            let idx = (start + offset) % lane_count;
240            let lane = self.lane_order[idx];
241            if self
242                .lane_ready_set
243                .get(&lane)
244                .is_some_and(|ready| !ready.is_empty())
245            {
246                self.lane_cursor = (idx + 1) % lane_count;
247                return Some(lane);
248            }
249        }
250        None
251    }
252
253    fn next_lane_with_eligible_ready(&mut self) -> Option<LaneId> {
254        if self.lane_order.is_empty() {
255            self.lane_order = self.lane_queues.keys().copied().collect();
256        }
257        if self.lane_order.is_empty() {
258            return None;
259        }
260        let lane_count = self.lane_order.len();
261        let start = self.lane_cursor % lane_count;
262        for offset in 0..lane_count {
263            let idx = (start + offset) % lane_count;
264            let lane = self.lane_order[idx];
265            if self
266                .lane_eligible_set
267                .get(&lane)
268                .is_some_and(|ready| !ready.is_empty())
269            {
270                self.lane_cursor = (idx + 1) % lane_count;
271                return Some(lane);
272            }
273        }
274        None
275    }
276
277    /// Register a coroutine as ready.
278    pub fn add_ready(&mut self, coro_id: usize) {
279        let lane = self.lane_for_or_default(coro_id);
280        self.lane_of.entry(coro_id).or_insert(lane);
281        if self.ready_set.insert(coro_id) {
282            self.ready_queue.push_back(coro_id);
283        }
284        self.lane_queue_push(lane, coro_id);
285        if let Some(blocked) = self.lane_blocked.get_mut(&lane) {
286            blocked.remove(&coro_id);
287        }
288    }
289
290    /// Mark a coroutine as blocked.
291    pub fn mark_blocked(&mut self, coro_id: usize, reason: BlockReason) {
292        let reason_for_lane = reason.clone();
293        self.remove_from_global_ready(coro_id);
294        self.blocked_set.insert(coro_id, reason);
295        let lane = self.lane_for_or_default(coro_id);
296        self.lane_queue_remove(lane, coro_id);
297        self.lane_blocked
298            .entry(lane)
299            .or_default()
300            .insert(coro_id, reason_for_lane);
301        self.lane_eligible_queue_remove(lane, coro_id);
302    }
303
304    /// Mark a coroutine as done (remove from all queues).
305    pub fn mark_done(&mut self, coro_id: usize) {
306        self.remove_from_global_ready(coro_id);
307        self.blocked_set.remove(&coro_id);
308        let lane = self.lane_for_or_default(coro_id);
309        self.lane_queue_remove(lane, coro_id);
310        self.lane_eligible_queue_remove(lane, coro_id);
311        if let Some(blocked) = self.lane_blocked.get_mut(&lane) {
312            blocked.remove(&coro_id);
313        }
314    }
315
316    /// Unblock a coroutine (move from blocked to ready).
317    pub fn unblock(&mut self, coro_id: usize) {
318        if self.blocked_set.remove(&coro_id).is_some() {
319            if self.ready_set.insert(coro_id) {
320                self.ready_queue.push_back(coro_id);
321            }
322            let lane = self.lane_for_or_default(coro_id);
323            self.lane_queue_push(lane, coro_id);
324            if let Some(blocked) = self.lane_blocked.get_mut(&lane) {
325                blocked.remove(&coro_id);
326            }
327        }
328    }
329
330}