telltale_machine/scheduler/
types.rs1pub type LaneId = usize;
8
9fn default_timeslice() -> usize {
10 1
11}
12
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub enum PriorityPolicy {
16 FixedMap(BTreeMap<usize, usize>),
18 Aging,
20 TokenWeighted,
22}
23
24#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
30pub enum SchedPolicy {
31 #[default]
33 Cooperative,
34 RoundRobin,
36 Priority(PriorityPolicy),
38 ProgressAware,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum StepUpdate {
45 Ready,
47 Yielded,
49 Blocked(BlockReason),
51 Done,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57pub enum ReadyEligibility {
58 Eligible,
60 Ineligible,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66pub struct CrossLaneHandoff {
67 pub from_coro: usize,
69 pub to_coro: usize,
71 pub from_lane: LaneId,
73 pub to_lane: LaneId,
75 pub step: usize,
77 pub reason: String,
79}
80
81#[derive(Debug, Serialize, Deserialize)]
83pub struct Scheduler {
84 policy: SchedPolicy,
85 ready_queue: VecDeque<usize>,
86 #[serde(default)]
87 ready_set: BTreeSet<usize>,
88 blocked_set: BTreeMap<usize, BlockReason>,
89 #[serde(default)]
90 lane_of: BTreeMap<usize, LaneId>,
91 #[serde(default)]
92 lane_queues: BTreeMap<LaneId, VecDeque<usize>>,
93 #[serde(default)]
94 lane_order: Vec<LaneId>,
95 #[serde(default)]
96 lane_cursor: usize,
97 #[serde(default)]
98 lane_ready_set: BTreeMap<LaneId, BTreeSet<usize>>,
99 #[serde(default)]
100 lane_eligible_queues: BTreeMap<LaneId, VecDeque<usize>>,
101 #[serde(default)]
102 lane_eligible_set: BTreeMap<LaneId, BTreeSet<usize>>,
103 #[serde(default)]
104 lane_blocked: BTreeMap<LaneId, BTreeMap<usize, BlockReason>>,
105 #[serde(default)]
106 cross_lane_handoffs: Vec<CrossLaneHandoff>,
107 #[serde(default = "default_timeslice")]
108 timeslice: usize,
109 step_count: usize,
110}
111
112pub type SchedState = Scheduler;
114
115impl Scheduler {
116 #[must_use]
118 pub fn new(policy: SchedPolicy) -> Self {
119 let mut lane_queues = BTreeMap::new();
120 lane_queues.insert(0, VecDeque::new());
121 let mut lane_blocked = BTreeMap::new();
122 lane_blocked.insert(0, BTreeMap::new());
123 Self {
124 policy,
125 ready_queue: VecDeque::new(),
126 ready_set: BTreeSet::new(),
127 blocked_set: BTreeMap::new(),
128 lane_of: BTreeMap::new(),
129 lane_queues,
130 lane_order: vec![0],
131 lane_cursor: 0,
132 lane_ready_set: BTreeMap::new(),
133 lane_eligible_queues: BTreeMap::new(),
134 lane_eligible_set: BTreeMap::new(),
135 lane_blocked,
136 cross_lane_handoffs: Vec::new(),
137 timeslice: default_timeslice(),
138 step_count: 0,
139 }
140 }
141
142 fn register_lane(&mut self, lane: LaneId) {
143 self.lane_queues.entry(lane).or_default();
144 self.lane_ready_set.entry(lane).or_default();
145 self.lane_eligible_queues.entry(lane).or_default();
146 self.lane_eligible_set.entry(lane).or_default();
147 self.lane_blocked.entry(lane).or_default();
148 if let Err(pos) = self.lane_order.binary_search(&lane) {
149 self.lane_order.insert(pos, lane);
150 }
151 }
152
153 fn lane_for_or_default(&self, coro_id: usize) -> LaneId {
154 self.lane_of.get(&coro_id).copied().unwrap_or(0)
155 }
156
157 fn lane_queue_push(&mut self, lane: LaneId, coro_id: usize) {
158 self.register_lane(lane);
159 let ready = self.lane_ready_set.entry(lane).or_default();
160 if ready.insert(coro_id) {
161 self.lane_queues.entry(lane).or_default().push_back(coro_id);
162 }
163 }
164
165 fn lane_queue_remove(&mut self, lane: LaneId, coro_id: usize) {
166 if let Some(ready) = self.lane_ready_set.get_mut(&lane) {
167 ready.remove(&coro_id);
168 }
169 }
170
171 fn lane_eligible_queue_push(&mut self, lane: LaneId, coro_id: usize) {
172 self.register_lane(lane);
173 let eligible = self.lane_eligible_set.entry(lane).or_default();
174 if eligible.insert(coro_id) {
175 self.lane_eligible_queues
176 .entry(lane)
177 .or_default()
178 .push_back(coro_id);
179 }
180 }
181
182 fn lane_eligible_queue_remove(&mut self, lane: LaneId, coro_id: usize) {
183 if let Some(eligible) = self.lane_eligible_set.get_mut(&lane) {
184 eligible.remove(&coro_id);
185 }
186 }
187
188 fn lane_queue_pop_front(&mut self, lane: LaneId) -> Option<usize> {
189 loop {
190 let coro_id = self
192 .lane_queues
193 .get_mut(&lane)
194 .and_then(VecDeque::pop_front)?;
195 if self
196 .lane_ready_set
197 .get_mut(&lane)
198 .is_some_and(|ready| ready.remove(&coro_id))
199 {
200 return Some(coro_id);
201 }
202 }
203 }
204
205 fn lane_eligible_queue_pop_front(&mut self, lane: LaneId) -> Option<usize> {
206 loop {
207 let coro_id = self
209 .lane_eligible_queues
210 .get_mut(&lane)
211 .and_then(VecDeque::pop_front)?;
212 if self
213 .lane_eligible_set
214 .get_mut(&lane)
215 .is_some_and(|eligible| eligible.remove(&coro_id))
216 {
217 return Some(coro_id);
218 }
219 }
220 }
221
222 fn remove_from_global_ready(&mut self, coro_id: usize) {
223 self.ready_set.remove(&coro_id);
224 if let Some(position) = self.ready_queue.iter().position(|queued| *queued == coro_id) {
225 let _ = self.ready_queue.remove(position);
226 }
227 }
228
229 fn next_lane_with_ready(&mut self) -> Option<LaneId> {
230 if self.lane_order.is_empty() {
231 self.lane_order = self.lane_queues.keys().copied().collect();
232 }
233 if self.lane_order.is_empty() {
234 return None;
235 }
236 let lane_count = self.lane_order.len();
237 let start = self.lane_cursor % lane_count;
238 for offset in 0..lane_count {
239 let idx = (start + offset) % lane_count;
240 let lane = self.lane_order[idx];
241 if self
242 .lane_ready_set
243 .get(&lane)
244 .is_some_and(|ready| !ready.is_empty())
245 {
246 self.lane_cursor = (idx + 1) % lane_count;
247 return Some(lane);
248 }
249 }
250 None
251 }
252
253 fn next_lane_with_eligible_ready(&mut self) -> Option<LaneId> {
254 if self.lane_order.is_empty() {
255 self.lane_order = self.lane_queues.keys().copied().collect();
256 }
257 if self.lane_order.is_empty() {
258 return None;
259 }
260 let lane_count = self.lane_order.len();
261 let start = self.lane_cursor % lane_count;
262 for offset in 0..lane_count {
263 let idx = (start + offset) % lane_count;
264 let lane = self.lane_order[idx];
265 if self
266 .lane_eligible_set
267 .get(&lane)
268 .is_some_and(|ready| !ready.is_empty())
269 {
270 self.lane_cursor = (idx + 1) % lane_count;
271 return Some(lane);
272 }
273 }
274 None
275 }
276
277 pub fn add_ready(&mut self, coro_id: usize) {
279 let lane = self.lane_for_or_default(coro_id);
280 self.lane_of.entry(coro_id).or_insert(lane);
281 if self.ready_set.insert(coro_id) {
282 self.ready_queue.push_back(coro_id);
283 }
284 self.lane_queue_push(lane, coro_id);
285 if let Some(blocked) = self.lane_blocked.get_mut(&lane) {
286 blocked.remove(&coro_id);
287 }
288 }
289
290 pub fn mark_blocked(&mut self, coro_id: usize, reason: BlockReason) {
292 let reason_for_lane = reason.clone();
293 self.remove_from_global_ready(coro_id);
294 self.blocked_set.insert(coro_id, reason);
295 let lane = self.lane_for_or_default(coro_id);
296 self.lane_queue_remove(lane, coro_id);
297 self.lane_blocked
298 .entry(lane)
299 .or_default()
300 .insert(coro_id, reason_for_lane);
301 self.lane_eligible_queue_remove(lane, coro_id);
302 }
303
304 pub fn mark_done(&mut self, coro_id: usize) {
306 self.remove_from_global_ready(coro_id);
307 self.blocked_set.remove(&coro_id);
308 let lane = self.lane_for_or_default(coro_id);
309 self.lane_queue_remove(lane, coro_id);
310 self.lane_eligible_queue_remove(lane, coro_id);
311 if let Some(blocked) = self.lane_blocked.get_mut(&lane) {
312 blocked.remove(&coro_id);
313 }
314 }
315
316 pub fn unblock(&mut self, coro_id: usize) {
318 if self.blocked_set.remove(&coro_id).is_some() {
319 if self.ready_set.insert(coro_id) {
320 self.ready_queue.push_back(coro_id);
321 }
322 let lane = self.lane_for_or_default(coro_id);
323 self.lane_queue_push(lane, coro_id);
324 if let Some(blocked) = self.lane_blocked.get_mut(&lane) {
325 blocked.remove(&coro_id);
326 }
327 }
328 }
329
330}