lady_deirdre/analysis/
manager.rs

1////////////////////////////////////////////////////////////////////////////////
2// This file is part of "Lady Deirdre", a compiler front-end foundation       //
3// technology.                                                                //
4//                                                                            //
5// This work is proprietary software with source-available code.              //
6//                                                                            //
7// To copy, use, distribute, or contribute to this work, you must agree to    //
8// the terms of the General License Agreement:                                //
9//                                                                            //
10// https://github.com/Eliah-Lakhin/lady-deirdre/blob/master/EULA.md           //
11//                                                                            //
12// The agreement grants a Basic Commercial License, allowing you to use       //
13// this work in non-commercial and limited commercial products with a total   //
14// gross revenue cap. To remove this commercial limit for one of your         //
15// products, you must acquire a Full Commercial License.                      //
16//                                                                            //
17// If you contribute to the source code, documentation, or related materials, //
18// you must grant me an exclusive license to these contributions.             //
19// Contributions are governed by the "Contributions" section of the General   //
20// License Agreement.                                                         //
21//                                                                            //
22// Copying the work in parts is strictly forbidden, except as permitted       //
23// under the General License Agreement.                                       //
24//                                                                            //
25// If you do not or cannot agree to the terms of this Agreement,              //
26// do not use this work.                                                      //
27//                                                                            //
28// This work is provided "as is", without any warranties, express or implied, //
29// except where such disclaimers are legally invalid.                         //
30//                                                                            //
31// Copyright (c) 2024 Ilya Lakhin (Илья Александрович Лахин).                 //
32// All rights reserved.                                                       //
33////////////////////////////////////////////////////////////////////////////////
34
35use std::{
36    cmp::Ordering,
37    collections::{BinaryHeap, HashMap},
38    fmt::{Debug, Formatter},
39    sync::{Condvar, Mutex, MutexGuard},
40};
41
42use crate::{
43    analysis::{AnalysisError, AnalysisResult},
44    report::{ld_assert, ld_unreachable},
45    sync::{Shared, SyncBuildHasher, Trigger},
46};
47
48const TASKS_CAPACITY: usize = 10;
49
50/// An object that signals a task worker to finish its job.
51///
52/// In Lady Deirdre, task jobs are subject for graceful shutdown.
53///
54/// Each task object provides access to the TaskHandle. The analyzer's functions
55/// associated with this task and the user's code assume to examine the
56/// [TaskHandle::is_triggered] value periodically to determine if the job needs
57/// to be finished earlier. If the function returns true, the worker should
58/// finish its job as soon as possible and drop the task.
59///
60/// Another party of the compiler execution, such as the thread that spawns the
61/// worker's sub-thread, and the [Analyzer](crate::analysis::Analyzer) object
62/// itself, that have a clone of the TaskHandle object, may trigger spawned
63/// worker's job interruption by calling a [TaskHandle::trigger] function that
64/// sets the inner flag of the TaskHandle to true.
65///
66/// The [TriggerHandle] object provides default implementation of TaskHandle
67/// backed by a single [Trigger] object. However, in the end compiler
68/// architecture, you can implement your own type of TaskHandle with more
69/// complex triggering logic.
70pub trait TaskHandle: Default + Clone + Send + Sync + 'static {
71    /// Returns true if the task's worker should finish its job as soon as
72    /// possible and drop the corresponding task object.
73    fn is_triggered(&self) -> bool;
74
75    /// Signals the task's worker to finish its job as soon as possible and to
76    /// drop the corresponding task.
77    ///
78    /// The function sets this TaskHandle's state and all of its clones states
79    /// to "triggered" such that their [is_triggered](Self::is_triggered)
80    /// function would return true.
81    ///
82    /// Once the trigger function is called, the TaskHandle triggering state
83    /// cannot be unset.
84    fn trigger(&self);
85}
86
87/// A default implementation of the [TaskHandle] backed by the [Trigger] object.
88#[derive(Default, PartialEq, Eq, Hash, Clone)]
89pub struct TriggerHandle(
90    /// An inner state of the handle.
91    pub Trigger,
92);
93
94impl Debug for TriggerHandle {
95    #[inline(always)]
96    fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
97        match self.0.is_active() {
98            true => formatter.write_str("TriggerHandle(active)"),
99            false => formatter.write_str("TriggerHandle(inactive)"),
100        }
101    }
102}
103
104impl TaskHandle for TriggerHandle {
105    #[inline(always)]
106    fn is_triggered(&self) -> bool {
107        self.0.is_active()
108    }
109
110    #[inline(always)]
111    fn trigger(&self) {
112        self.0.activate();
113    }
114}
115
116impl TriggerHandle {
117    /// Creates a new task handle in "untriggered" state.
118    #[inline(always)]
119    pub fn new() -> Self {
120        Self::default()
121    }
122}
123
124/// A priority of the task.
125///
126/// The [analyzer](crate::analysis::Analyzer)'s task manager attempts to
127/// grant access to the tasks with higher priority value earlier than the tasks
128/// with lower priority.
129pub type TaskPriority = u16;
130
131pub(super) type TaskId = u64;
132
133#[derive(Clone, Copy, PartialEq, Eq)]
134pub(super) enum TaskKind {
135    Analysis,
136    Mutation,
137    Exclusive,
138}
139
140pub(super) struct TaskManager<H, S> {
141    state: Mutex<ManagerState<H, S>>,
142}
143
144impl<H: TaskHandle, S: SyncBuildHasher> TaskManager<H, S> {
145    #[inline(always)]
146    pub(super) fn new() -> Self {
147        Self {
148            state: Mutex::new(ManagerState {
149                next_task_id: 0,
150                cancel_threshold: 0,
151                active_mode: None,
152                active_tasks: HashMap::with_hasher(S::default()),
153                awoke_tasks: HashMap::with_hasher(S::default()),
154                sleep_tasks: BinaryHeap::new(),
155            }),
156        }
157    }
158
159    pub(super) fn acquire_task(
160        &self,
161        kind: TaskKind,
162        handle: &H,
163        priority: TaskPriority,
164        lock: bool,
165    ) -> AnalysisResult<TaskId> {
166        let mut state = self.lock_state();
167
168        if priority < state.cancel_threshold || handle.is_triggered() {
169            return Err(AnalysisError::Interrupted);
170        }
171
172        let Some(active_mode) = state.active_mode else {
173            state.active_mode = Some(kind);
174
175            let task_id = state.gen_task_id();
176
177            state.insert_active_task(task_id, handle.clone(), priority);
178
179            return Ok(task_id);
180        };
181
182        ld_assert!(!state.active_tasks.is_empty(), "Empty active tasks map.");
183
184        let mode_fits = active_mode_fits(active_mode, kind);
185
186        if mode_fits && state.pending_priority() <= priority {
187            let task_id = state.gen_task_id();
188
189            state.insert_active_task(task_id, handle.clone(), priority);
190
191            return Ok(task_id);
192        }
193
194        if !lock {
195            return Err(AnalysisError::Interrupted);
196        }
197
198        if !mode_fits {
199            state.interrupt_active_tasks(priority);
200        }
201
202        let task_id = state.gen_task_id();
203
204        let waker = state.enqueue_task(task_id, kind, priority, handle.clone());
205
206        loop {
207            state = waker
208                .as_ref()
209                .wait(state)
210                .unwrap_or_else(|poison| poison.into_inner());
211
212            let Some(wakeup_kind) = state.awoke_tasks.remove(&task_id) else {
213                continue;
214            };
215
216            if state.awoke_tasks.capacity() > TASKS_CAPACITY {
217                state.awoke_tasks.shrink_to(TASKS_CAPACITY);
218            }
219
220            return match wakeup_kind {
221                WakeupKind::Activate => Ok(task_id),
222                WakeupKind::Cancel => Err(AnalysisError::Interrupted),
223            };
224        }
225    }
226
227    pub(super) fn release_task(&self, id: TaskId) {
228        let mut state = self.lock_state();
229
230        ld_assert!(state.active_mode.is_some(), "Release in inactive mode.");
231
232        if state.active_tasks.remove(&id).is_none() {
233            unsafe { ld_unreachable!("Missing active task.") }
234        }
235
236        if !state.active_tasks.is_empty() {
237            return;
238        }
239
240        if state.active_tasks.capacity() > TASKS_CAPACITY {
241            state.active_tasks.shrink_to(TASKS_CAPACITY);
242        }
243
244        state.active_mode = None;
245
246        loop {
247            let Some(sleep_task) = state.sleep_tasks.pop() else {
248                break;
249            };
250
251            if sleep_task.is_cancelled(state.cancel_threshold) {
252                state.wake_up_task(sleep_task.id, &sleep_task.waker, WakeupKind::Cancel);
253
254                continue;
255            }
256
257            let kind = sleep_task.kind;
258
259            state.active_mode = Some(kind);
260
261            state.insert_active_task(sleep_task.id, sleep_task.handle, sleep_task.priority);
262            state.wake_up_task(sleep_task.id, &sleep_task.waker, WakeupKind::Activate);
263
264            if kind == TaskKind::Exclusive {
265                break;
266            }
267
268            loop {
269                let Some(top) = state.sleep_tasks.peek() else {
270                    break;
271                };
272
273                if top.kind != kind {
274                    break;
275                }
276
277                let Some(sleep_task) = state.sleep_tasks.pop() else {
278                    unsafe { ld_unreachable!("Missing sleep task.") }
279                };
280
281                if sleep_task.is_cancelled(state.cancel_threshold) {
282                    state.wake_up_task(sleep_task.id, &sleep_task.waker, WakeupKind::Cancel);
283
284                    continue;
285                }
286
287                state.insert_active_task(sleep_task.id, sleep_task.handle, sleep_task.priority);
288                state.wake_up_task(sleep_task.id, &sleep_task.waker, WakeupKind::Activate);
289            }
290
291            break;
292        }
293
294        if state.sleep_tasks.capacity() > TASKS_CAPACITY {
295            state.sleep_tasks.shrink_to(TASKS_CAPACITY);
296        }
297    }
298
299    pub(super) fn set_access_level(&self, threshold: TaskPriority) {
300        let mut state = self.lock_state();
301
302        if state.cancel_threshold > threshold {
303            state.cancel_threshold = threshold;
304            return;
305        }
306
307        state.cancel_threshold = threshold;
308
309        state.interrupt_active_tasks(threshold);
310        state.cancel_pending_tasks(threshold);
311    }
312
313    pub(super) fn get_access_level(&self) -> TaskPriority {
314        let state = self.lock_state();
315
316        state.cancel_threshold
317    }
318
319    #[inline(always)]
320    fn lock_state(&self) -> MutexGuard<ManagerState<H, S>> {
321        self.state
322            .lock()
323            .unwrap_or_else(|poison| poison.into_inner())
324    }
325}
326
327struct ManagerState<H, S> {
328    next_task_id: TaskId,
329    cancel_threshold: TaskPriority,
330    active_mode: Option<TaskKind>,
331    active_tasks: HashMap<TaskId, ActiveTaskInfo<H>, S>,
332    awoke_tasks: HashMap<TaskId, WakeupKind, S>,
333    sleep_tasks: BinaryHeap<SleepTaskInfo<H>>,
334}
335
336impl<H: TaskHandle, S: SyncBuildHasher> ManagerState<H, S> {
337    #[inline(always)]
338    fn wake_up_task(&mut self, id: TaskId, task_waker: &TaskWaker, wakeup_kind: WakeupKind) {
339        if self.awoke_tasks.insert(id, wakeup_kind).is_some() {
340            unsafe { ld_unreachable!("Duplicate task id.") }
341        }
342
343        task_waker.as_ref().notify_one();
344    }
345
346    #[inline(always)]
347    fn insert_active_task(&mut self, id: TaskId, handle: H, priority: TaskPriority) {
348        let info = ActiveTaskInfo {
349            priority,
350            shutdown: handle,
351        };
352
353        if self.active_tasks.insert(id, info).is_some() {
354            unsafe { ld_unreachable!("Duplicate task id.") }
355        }
356    }
357
358    #[inline(always)]
359    fn interrupt_active_tasks(&mut self, threshold: TaskPriority) {
360        if threshold == 0 {
361            return;
362        }
363
364        for task_info in self.active_tasks.values() {
365            if task_info.priority < threshold {
366                task_info.shutdown.trigger();
367            }
368        }
369    }
370
371    #[inline(always)]
372    fn cancel_pending_tasks(&mut self, threshold: TaskPriority) {
373        if threshold == 0 {
374            return;
375        }
376
377        self.sleep_tasks.retain(|sleep_task| {
378            if sleep_task.priority > threshold {
379                return true;
380            }
381
382            if self
383                .awoke_tasks
384                .insert(sleep_task.id, WakeupKind::Cancel)
385                .is_some()
386            {
387                unsafe { ld_unreachable!("Duplicate task id.") }
388            }
389
390            sleep_task.waker.as_ref().notify_one();
391
392            false
393        })
394    }
395
396    #[inline(always)]
397    fn enqueue_task(
398        &mut self,
399        id: TaskId,
400        kind: TaskKind,
401        priority: TaskPriority,
402        handle: H,
403    ) -> TaskWaker {
404        let waker = Shared::new(Condvar::new());
405
406        self.sleep_tasks.push(SleepTaskInfo {
407            id,
408            kind,
409            priority,
410            handle,
411            waker: waker.clone(),
412        });
413
414        waker
415    }
416
417    #[inline(always)]
418    fn pending_priority(&self) -> TaskPriority {
419        let Some(peek) = self.sleep_tasks.peek() else {
420            return 0;
421        };
422
423        peek.priority
424    }
425
426    #[inline(always)]
427    fn gen_task_id(&mut self) -> TaskId {
428        self.next_task_id = match self.next_task_id.checked_add(1) {
429            Some(id) => id,
430            None => panic!("Too many tasks."),
431        };
432
433        self.next_task_id
434    }
435}
436
437type TaskWaker = Shared<Condvar>;
438
439enum WakeupKind {
440    Activate,
441    Cancel,
442}
443
444struct ActiveTaskInfo<H> {
445    priority: TaskPriority,
446    shutdown: H,
447}
448
449struct SleepTaskInfo<H> {
450    id: TaskId,
451    kind: TaskKind,
452    priority: TaskPriority,
453    handle: H,
454    waker: TaskWaker,
455}
456
457impl<H: TaskHandle> PartialEq for SleepTaskInfo<H> {
458    #[inline(always)]
459    fn eq(&self, other: &Self) -> bool {
460        self.priority.eq(&other.priority)
461    }
462}
463
464impl<H: TaskHandle> Eq for SleepTaskInfo<H> {}
465
466impl<H: TaskHandle> PartialOrd for SleepTaskInfo<H> {
467    #[inline(always)]
468    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
469        Some(self.cmp(other))
470    }
471}
472
473impl<H: TaskHandle> Ord for SleepTaskInfo<H> {
474    #[inline(always)]
475    fn cmp(&self, other: &Self) -> Ordering {
476        self.priority.cmp(&other.priority)
477    }
478}
479
480impl<H: TaskHandle> SleepTaskInfo<H> {
481    #[inline(always)]
482    fn is_cancelled(&self, cancel_threshold: TaskPriority) -> bool {
483        self.priority < cancel_threshold || self.handle.is_triggered()
484    }
485}
486
487#[inline(always)]
488fn active_mode_fits(active_mode: TaskKind, task_kind: TaskKind) -> bool {
489    active_mode == task_kind && active_mode != TaskKind::Exclusive
490}