1use std::{
36 cmp::Ordering,
37 collections::{BinaryHeap, HashMap},
38 fmt::{Debug, Formatter},
39 sync::{Condvar, Mutex, MutexGuard},
40};
41
42use crate::{
43 analysis::{AnalysisError, AnalysisResult},
44 report::{ld_assert, ld_unreachable},
45 sync::{Shared, SyncBuildHasher, Trigger},
46};
47
48const TASKS_CAPACITY: usize = 10;
49
50pub trait TaskHandle: Default + Clone + Send + Sync + 'static {
71 fn is_triggered(&self) -> bool;
74
75 fn trigger(&self);
85}
86
87#[derive(Default, PartialEq, Eq, Hash, Clone)]
89pub struct TriggerHandle(
90 pub Trigger,
92);
93
94impl Debug for TriggerHandle {
95 #[inline(always)]
96 fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
97 match self.0.is_active() {
98 true => formatter.write_str("TriggerHandle(active)"),
99 false => formatter.write_str("TriggerHandle(inactive)"),
100 }
101 }
102}
103
104impl TaskHandle for TriggerHandle {
105 #[inline(always)]
106 fn is_triggered(&self) -> bool {
107 self.0.is_active()
108 }
109
110 #[inline(always)]
111 fn trigger(&self) {
112 self.0.activate();
113 }
114}
115
116impl TriggerHandle {
117 #[inline(always)]
119 pub fn new() -> Self {
120 Self::default()
121 }
122}
123
124pub type TaskPriority = u16;
130
131pub(super) type TaskId = u64;
132
133#[derive(Clone, Copy, PartialEq, Eq)]
134pub(super) enum TaskKind {
135 Analysis,
136 Mutation,
137 Exclusive,
138}
139
140pub(super) struct TaskManager<H, S> {
141 state: Mutex<ManagerState<H, S>>,
142}
143
144impl<H: TaskHandle, S: SyncBuildHasher> TaskManager<H, S> {
145 #[inline(always)]
146 pub(super) fn new() -> Self {
147 Self {
148 state: Mutex::new(ManagerState {
149 next_task_id: 0,
150 cancel_threshold: 0,
151 active_mode: None,
152 active_tasks: HashMap::with_hasher(S::default()),
153 awoke_tasks: HashMap::with_hasher(S::default()),
154 sleep_tasks: BinaryHeap::new(),
155 }),
156 }
157 }
158
159 pub(super) fn acquire_task(
160 &self,
161 kind: TaskKind,
162 handle: &H,
163 priority: TaskPriority,
164 lock: bool,
165 ) -> AnalysisResult<TaskId> {
166 let mut state = self.lock_state();
167
168 if priority < state.cancel_threshold || handle.is_triggered() {
169 return Err(AnalysisError::Interrupted);
170 }
171
172 let Some(active_mode) = state.active_mode else {
173 state.active_mode = Some(kind);
174
175 let task_id = state.gen_task_id();
176
177 state.insert_active_task(task_id, handle.clone(), priority);
178
179 return Ok(task_id);
180 };
181
182 ld_assert!(!state.active_tasks.is_empty(), "Empty active tasks map.");
183
184 let mode_fits = active_mode_fits(active_mode, kind);
185
186 if mode_fits && state.pending_priority() <= priority {
187 let task_id = state.gen_task_id();
188
189 state.insert_active_task(task_id, handle.clone(), priority);
190
191 return Ok(task_id);
192 }
193
194 if !lock {
195 return Err(AnalysisError::Interrupted);
196 }
197
198 if !mode_fits {
199 state.interrupt_active_tasks(priority);
200 }
201
202 let task_id = state.gen_task_id();
203
204 let waker = state.enqueue_task(task_id, kind, priority, handle.clone());
205
206 loop {
207 state = waker
208 .as_ref()
209 .wait(state)
210 .unwrap_or_else(|poison| poison.into_inner());
211
212 let Some(wakeup_kind) = state.awoke_tasks.remove(&task_id) else {
213 continue;
214 };
215
216 if state.awoke_tasks.capacity() > TASKS_CAPACITY {
217 state.awoke_tasks.shrink_to(TASKS_CAPACITY);
218 }
219
220 return match wakeup_kind {
221 WakeupKind::Activate => Ok(task_id),
222 WakeupKind::Cancel => Err(AnalysisError::Interrupted),
223 };
224 }
225 }
226
227 pub(super) fn release_task(&self, id: TaskId) {
228 let mut state = self.lock_state();
229
230 ld_assert!(state.active_mode.is_some(), "Release in inactive mode.");
231
232 if state.active_tasks.remove(&id).is_none() {
233 unsafe { ld_unreachable!("Missing active task.") }
234 }
235
236 if !state.active_tasks.is_empty() {
237 return;
238 }
239
240 if state.active_tasks.capacity() > TASKS_CAPACITY {
241 state.active_tasks.shrink_to(TASKS_CAPACITY);
242 }
243
244 state.active_mode = None;
245
246 loop {
247 let Some(sleep_task) = state.sleep_tasks.pop() else {
248 break;
249 };
250
251 if sleep_task.is_cancelled(state.cancel_threshold) {
252 state.wake_up_task(sleep_task.id, &sleep_task.waker, WakeupKind::Cancel);
253
254 continue;
255 }
256
257 let kind = sleep_task.kind;
258
259 state.active_mode = Some(kind);
260
261 state.insert_active_task(sleep_task.id, sleep_task.handle, sleep_task.priority);
262 state.wake_up_task(sleep_task.id, &sleep_task.waker, WakeupKind::Activate);
263
264 if kind == TaskKind::Exclusive {
265 break;
266 }
267
268 loop {
269 let Some(top) = state.sleep_tasks.peek() else {
270 break;
271 };
272
273 if top.kind != kind {
274 break;
275 }
276
277 let Some(sleep_task) = state.sleep_tasks.pop() else {
278 unsafe { ld_unreachable!("Missing sleep task.") }
279 };
280
281 if sleep_task.is_cancelled(state.cancel_threshold) {
282 state.wake_up_task(sleep_task.id, &sleep_task.waker, WakeupKind::Cancel);
283
284 continue;
285 }
286
287 state.insert_active_task(sleep_task.id, sleep_task.handle, sleep_task.priority);
288 state.wake_up_task(sleep_task.id, &sleep_task.waker, WakeupKind::Activate);
289 }
290
291 break;
292 }
293
294 if state.sleep_tasks.capacity() > TASKS_CAPACITY {
295 state.sleep_tasks.shrink_to(TASKS_CAPACITY);
296 }
297 }
298
299 pub(super) fn set_access_level(&self, threshold: TaskPriority) {
300 let mut state = self.lock_state();
301
302 if state.cancel_threshold > threshold {
303 state.cancel_threshold = threshold;
304 return;
305 }
306
307 state.cancel_threshold = threshold;
308
309 state.interrupt_active_tasks(threshold);
310 state.cancel_pending_tasks(threshold);
311 }
312
313 pub(super) fn get_access_level(&self) -> TaskPriority {
314 let state = self.lock_state();
315
316 state.cancel_threshold
317 }
318
319 #[inline(always)]
320 fn lock_state(&self) -> MutexGuard<ManagerState<H, S>> {
321 self.state
322 .lock()
323 .unwrap_or_else(|poison| poison.into_inner())
324 }
325}
326
327struct ManagerState<H, S> {
328 next_task_id: TaskId,
329 cancel_threshold: TaskPriority,
330 active_mode: Option<TaskKind>,
331 active_tasks: HashMap<TaskId, ActiveTaskInfo<H>, S>,
332 awoke_tasks: HashMap<TaskId, WakeupKind, S>,
333 sleep_tasks: BinaryHeap<SleepTaskInfo<H>>,
334}
335
336impl<H: TaskHandle, S: SyncBuildHasher> ManagerState<H, S> {
337 #[inline(always)]
338 fn wake_up_task(&mut self, id: TaskId, task_waker: &TaskWaker, wakeup_kind: WakeupKind) {
339 if self.awoke_tasks.insert(id, wakeup_kind).is_some() {
340 unsafe { ld_unreachable!("Duplicate task id.") }
341 }
342
343 task_waker.as_ref().notify_one();
344 }
345
346 #[inline(always)]
347 fn insert_active_task(&mut self, id: TaskId, handle: H, priority: TaskPriority) {
348 let info = ActiveTaskInfo {
349 priority,
350 shutdown: handle,
351 };
352
353 if self.active_tasks.insert(id, info).is_some() {
354 unsafe { ld_unreachable!("Duplicate task id.") }
355 }
356 }
357
358 #[inline(always)]
359 fn interrupt_active_tasks(&mut self, threshold: TaskPriority) {
360 if threshold == 0 {
361 return;
362 }
363
364 for task_info in self.active_tasks.values() {
365 if task_info.priority < threshold {
366 task_info.shutdown.trigger();
367 }
368 }
369 }
370
371 #[inline(always)]
372 fn cancel_pending_tasks(&mut self, threshold: TaskPriority) {
373 if threshold == 0 {
374 return;
375 }
376
377 self.sleep_tasks.retain(|sleep_task| {
378 if sleep_task.priority > threshold {
379 return true;
380 }
381
382 if self
383 .awoke_tasks
384 .insert(sleep_task.id, WakeupKind::Cancel)
385 .is_some()
386 {
387 unsafe { ld_unreachable!("Duplicate task id.") }
388 }
389
390 sleep_task.waker.as_ref().notify_one();
391
392 false
393 })
394 }
395
396 #[inline(always)]
397 fn enqueue_task(
398 &mut self,
399 id: TaskId,
400 kind: TaskKind,
401 priority: TaskPriority,
402 handle: H,
403 ) -> TaskWaker {
404 let waker = Shared::new(Condvar::new());
405
406 self.sleep_tasks.push(SleepTaskInfo {
407 id,
408 kind,
409 priority,
410 handle,
411 waker: waker.clone(),
412 });
413
414 waker
415 }
416
417 #[inline(always)]
418 fn pending_priority(&self) -> TaskPriority {
419 let Some(peek) = self.sleep_tasks.peek() else {
420 return 0;
421 };
422
423 peek.priority
424 }
425
426 #[inline(always)]
427 fn gen_task_id(&mut self) -> TaskId {
428 self.next_task_id = match self.next_task_id.checked_add(1) {
429 Some(id) => id,
430 None => panic!("Too many tasks."),
431 };
432
433 self.next_task_id
434 }
435}
436
437type TaskWaker = Shared<Condvar>;
438
439enum WakeupKind {
440 Activate,
441 Cancel,
442}
443
444struct ActiveTaskInfo<H> {
445 priority: TaskPriority,
446 shutdown: H,
447}
448
449struct SleepTaskInfo<H> {
450 id: TaskId,
451 kind: TaskKind,
452 priority: TaskPriority,
453 handle: H,
454 waker: TaskWaker,
455}
456
457impl<H: TaskHandle> PartialEq for SleepTaskInfo<H> {
458 #[inline(always)]
459 fn eq(&self, other: &Self) -> bool {
460 self.priority.eq(&other.priority)
461 }
462}
463
464impl<H: TaskHandle> Eq for SleepTaskInfo<H> {}
465
466impl<H: TaskHandle> PartialOrd for SleepTaskInfo<H> {
467 #[inline(always)]
468 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
469 Some(self.cmp(other))
470 }
471}
472
473impl<H: TaskHandle> Ord for SleepTaskInfo<H> {
474 #[inline(always)]
475 fn cmp(&self, other: &Self) -> Ordering {
476 self.priority.cmp(&other.priority)
477 }
478}
479
480impl<H: TaskHandle> SleepTaskInfo<H> {
481 #[inline(always)]
482 fn is_cancelled(&self, cancel_threshold: TaskPriority) -> bool {
483 self.priority < cancel_threshold || self.handle.is_triggered()
484 }
485}
486
487#[inline(always)]
488fn active_mode_fits(active_mode: TaskKind, task_kind: TaskKind) -> bool {
489 active_mode == task_kind && active_mode != TaskKind::Exclusive
490}