Skip to main content

aft/executor/
mod.rs

1mod single_flight;
2
3#[cfg(test)]
4mod tests;
5
6use std::{
7    collections::{HashMap, VecDeque},
8    sync::{
9        atomic::{AtomicUsize, Ordering},
10        Arc,
11    },
12    thread::{self, JoinHandle},
13    time::Duration,
14};
15
16use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
17use parking_lot::{Mutex, RwLock};
18use tokio::sync::oneshot;
19
20use crate::{context::AppContext, path_identity::ProjectRootId, protocol::Response};
21
22pub use single_flight::SingleFlight;
23
24const JOB_COST: isize = 1;
25
26/// Scheduler lane for command-handler execution.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum Lane {
29    /// Pure read-only work. Runs under the actor epoch read gate and is capped
30    /// per actor.
31    PureRead,
32    /// LSP/status work. Serialized per actor by scheduler admission while still
33    /// using the shared epoch read gate.
34    SerialLspStatus,
35    /// Heavy lazy initialization. The scheduler acquires a process-wide heavy
36    /// permit before dispatch; the worker runs the build outside the epoch and
37    /// then takes a short write gate for the install point.
38    HeavyInit,
39    /// Mutating work. Becomes a writer barrier at the actor queue head, drains
40    /// in-flight reads, and runs under the actor epoch write gate.
41    Mutating,
42}
43
44pub type ExecutorJob = Box<dyn FnOnce(&AppContext) -> Response + Send + 'static>;
45
46#[derive(Debug, Clone)]
47pub struct ExecutorConfig {
48    pub pool_size: usize,
49    pub read_cap: usize,
50    pub actor_cap: usize,
51    pub heavy_permits: usize,
52    pub drr_quantum: isize,
53}
54
55impl Default for ExecutorConfig {
56    fn default() -> Self {
57        let available = thread::available_parallelism()
58            .map(usize::from)
59            .unwrap_or(2);
60        let pool_size = available.saturating_sub(1).clamp(2, 8);
61        let actor_cap = pool_size.saturating_sub(1).clamp(1, 4);
62        let read_cap = actor_cap.clamp(1, 4);
63        let heavy_permits = pool_size.saturating_sub(1).clamp(2, 3);
64
65        Self {
66            pool_size,
67            read_cap,
68            actor_cap,
69            heavy_permits,
70            drr_quantum: 1,
71        }
72    }
73}
74
75#[derive(Debug, Clone)]
76struct EffectiveConfig {
77    pool_size: usize,
78    read_cap: usize,
79    actor_cap: usize,
80    heavy_permits: usize,
81    drr_quantum: isize,
82    deficit_cap: isize,
83}
84
85impl ExecutorConfig {
86    fn effective(&self) -> EffectiveConfig {
87        let pool_size = self.pool_size.clamp(2, 8);
88        let max_actor_cap = pool_size.saturating_sub(1).max(1);
89        let actor_cap = self.actor_cap.max(1).min(max_actor_cap);
90        let read_cap = self.read_cap.max(1).min(actor_cap).min(4);
91        let heavy_permits = self.heavy_permits.clamp(2, 3);
92        let drr_quantum = self.drr_quantum.max(1);
93        let deficit_cap = (actor_cap.max(1) as isize) * 4;
94
95        EffectiveConfig {
96            pool_size,
97            read_cap,
98            actor_cap,
99            heavy_permits,
100            drr_quantum,
101            deficit_cap,
102        }
103    }
104}
105
106/// Synchronous completion handle used by the executor tests and the
107/// future standalone bridge.
108pub struct CompletionHandle {
109    rx: Receiver<Response>,
110}
111
112impl CompletionHandle {
113    pub fn recv(self) -> Result<Response, RecvError> {
114        self.rx.recv()
115    }
116
117    pub fn recv_timeout(&self, timeout: Duration) -> Result<Response, RecvTimeoutError> {
118        self.rx.recv_timeout(timeout)
119    }
120
121    pub fn into_receiver(self) -> Receiver<Response> {
122        self.rx
123    }
124}
125
126/// Concurrent scheduler-dispatch executor.
127pub struct Executor {
128    inner: Arc<ExecutorInner>,
129}
130
131impl Executor {
132    pub fn new() -> Self {
133        Self::with_config(ExecutorConfig::default())
134    }
135
136    pub fn with_config(config: ExecutorConfig) -> Self {
137        let effective = config.effective();
138        let state = Arc::new(Mutex::new(SchedulerState::new(effective.clone())));
139        let heavy = Arc::new(HeavySemaphore::new(effective.heavy_permits));
140        let nonrunnable_dispatches = Arc::new(AtomicUsize::new(0));
141        let (run_tx, run_rx) = crossbeam_channel::unbounded();
142        let (event_tx, event_rx) = crossbeam_channel::unbounded();
143
144        let scheduler_state = Arc::clone(&state);
145        let scheduler_heavy = Arc::clone(&heavy);
146        let scheduler_violations = Arc::clone(&nonrunnable_dispatches);
147        let scheduler_handle = thread::Builder::new()
148            .name("aft-executor-scheduler".to_string())
149            .spawn(move || {
150                scheduler_loop(
151                    scheduler_state,
152                    scheduler_heavy,
153                    run_tx,
154                    event_rx,
155                    scheduler_violations,
156                );
157            })
158            .expect("spawn AFT executor scheduler");
159
160        let mut worker_handles = Vec::with_capacity(effective.pool_size);
161        for worker_id in 0..effective.pool_size {
162            let worker_rx = run_rx.clone();
163            let worker_events = event_tx.clone();
164            let handle = thread::Builder::new()
165                .name(format!("aft-executor-worker-{worker_id}"))
166                .spawn(move || worker_loop(worker_rx, worker_events))
167                .expect("spawn AFT executor worker");
168            worker_handles.push(handle);
169        }
170
171        Self {
172            inner: Arc::new(ExecutorInner {
173                state,
174                event_tx,
175                scheduler_handle: Mutex::new(Some(scheduler_handle)),
176                worker_handles: Mutex::new(worker_handles),
177                config: effective,
178                nonrunnable_dispatches,
179            }),
180        }
181    }
182
183    /// Register an actor if one is not already present.
184    ///
185    /// Existing actors keep their current context and scheduler state; subc
186    /// routing reuses them and reconfigures through the Mutating lane
187    /// rather than replacing the per-root [`AppContext`]. Returns `true` when a
188    /// new actor was inserted.
189    pub fn register_actor(&self, root_id: ProjectRootId, ctx: Arc<AppContext>) -> bool {
190        let inserted = {
191            let mut state = self.inner.state.lock();
192            if state.actors.contains_key(&root_id) {
193                false
194            } else {
195                state.actor_order.push(root_id.clone());
196                state.actors.insert(root_id, ActorState::new(ctx));
197                true
198            }
199        };
200        self.wake_scheduler();
201        inserted
202    }
203
204    /// Remove an actor from scheduler state.
205    ///
206    /// This is intentionally minimal: subc uses it only for a just-created
207    /// RouteBind actor whose configure failed before any route was installed, so
208    /// there is no in-flight work to quiesce. The removed [`AppContext`] is
209    /// dropped after releasing the scheduler lock so watcher/LSP teardown never
210    /// runs under that mutex.
211    pub fn remove_actor(&self, root_id: &ProjectRootId) {
212        let removed = {
213            let mut state = self.inner.state.lock();
214            state.actor_order.retain(|actor_root| actor_root != root_id);
215            state.actors.remove(root_id)
216        };
217        drop(removed);
218        self.wake_scheduler();
219    }
220
221    /// Snapshot the registered actor contexts.
222    ///
223    /// The returned [`Arc`]s keep contexts alive after the scheduler lock is
224    /// released, so callers can run teardown without holding executor state.
225    pub fn actor_contexts(&self) -> Vec<Arc<AppContext>> {
226        let state = self.inner.state.lock();
227        state
228            .actors
229            .values()
230            .map(|actor_state| Arc::clone(&actor_state.ctx))
231            .collect()
232    }
233
234    pub fn submit(
235        &self,
236        root_id: ProjectRootId,
237        lane: Lane,
238        request_id: String,
239        job: ExecutorJob,
240    ) -> CompletionHandle {
241        let (completion_tx, completion_rx) = crossbeam_channel::bounded(1);
242        self.submit_with_completion(
243            root_id,
244            lane,
245            request_id,
246            job,
247            CompletionSender::Sync(completion_tx),
248        );
249        CompletionHandle { rx: completion_rx }
250    }
251
252    pub fn submit_async(
253        &self,
254        root_id: ProjectRootId,
255        lane: Lane,
256        request_id: String,
257        job: ExecutorJob,
258    ) -> oneshot::Receiver<Response> {
259        let (completion_tx, completion_rx) = oneshot::channel();
260        self.submit_with_completion(
261            root_id,
262            lane,
263            request_id,
264            job,
265            CompletionSender::Async(completion_tx),
266        );
267        completion_rx
268    }
269
270    fn submit_with_completion(
271        &self,
272        root_id: ProjectRootId,
273        lane: Lane,
274        request_id: String,
275        job: ExecutorJob,
276        completion: CompletionSender,
277    ) {
278        let command = lane_command(lane);
279        let mut job = Some(job);
280        let mut completion = Some(completion);
281
282        let response = {
283            let mut state = self.inner.state.lock();
284            match state.actors.get_mut(&root_id) {
285                Some(actor) if actor.fatal => Some(actor_fatal_response(request_id.clone())),
286                Some(actor) => {
287                    actor.push_job(
288                        lane,
289                        QueuedJob {
290                            job: job.take().expect("executor job already queued"),
291                            completion: completion
292                                .take()
293                                .expect("executor completion already queued"),
294                            request_id: request_id.clone(),
295                            command,
296                        },
297                    );
298                    None
299                }
300                None => Some(Response::error(
301                    request_id.clone(),
302                    "actor_not_registered",
303                    "executor actor is not registered",
304                )),
305            }
306        };
307
308        if let Some(response) = response {
309            if let Some(completion) = completion {
310                completion.send(response);
311            }
312            return;
313        }
314
315        self.wake_scheduler();
316    }
317
318    pub fn pool_size(&self) -> usize {
319        self.inner.config.pool_size
320    }
321
322    pub fn actor_cap(&self) -> usize {
323        self.inner.config.actor_cap
324    }
325
326    pub fn read_cap(&self) -> usize {
327        self.inner.config.read_cap
328    }
329
330    pub fn heavy_permits(&self) -> usize {
331        self.inner.config.heavy_permits
332    }
333
334    pub fn nonrunnable_dispatch_count(&self) -> usize {
335        self.inner.nonrunnable_dispatches.load(Ordering::Acquire)
336    }
337
338    pub fn actor_is_fatal(&self, root_id: &ProjectRootId) -> bool {
339        self.inner
340            .state
341            .lock()
342            .actors
343            .get(root_id)
344            .map(|actor| actor.fatal)
345            .unwrap_or(false)
346    }
347
348    fn wake_scheduler(&self) {
349        let _ = self.inner.event_tx.send(SchedulerEvent::Wake);
350    }
351}
352
353impl Default for Executor {
354    fn default() -> Self {
355        Self::new()
356    }
357}
358
359struct ExecutorInner {
360    state: Arc<Mutex<SchedulerState>>,
361    event_tx: Sender<SchedulerEvent>,
362    scheduler_handle: Mutex<Option<JoinHandle<()>>>,
363    worker_handles: Mutex<Vec<JoinHandle<()>>>,
364    config: EffectiveConfig,
365    nonrunnable_dispatches: Arc<AtomicUsize>,
366}
367
368impl Drop for ExecutorInner {
369    fn drop(&mut self) {
370        let _ = self.event_tx.send(SchedulerEvent::Shutdown);
371
372        if let Some(handle) = self.scheduler_handle.lock().take() {
373            let _ = handle.join();
374        }
375
376        let mut workers = self.worker_handles.lock();
377        for handle in workers.drain(..) {
378            let _ = handle.join();
379        }
380    }
381}
382
383struct SchedulerState {
384    actors: HashMap<ProjectRootId, ActorState>,
385    actor_order: Vec<ProjectRootId>,
386    cursor: usize,
387    idle_workers: usize,
388    config: EffectiveConfig,
389}
390
391impl SchedulerState {
392    fn new(config: EffectiveConfig) -> Self {
393        Self {
394            actors: HashMap::new(),
395            actor_order: Vec::new(),
396            cursor: 0,
397            idle_workers: config.pool_size,
398            config,
399        }
400    }
401}
402
403struct ActorState {
404    ctx: Arc<AppContext>,
405    epoch: Arc<RwLock<()>>,
406    read_inflight: usize,
407    lsp_inflight: bool,
408    actor_total_inflight: usize,
409    writer_pending: bool,
410    deficit: isize,
411    order: VecDeque<Lane>,
412    pure_reads: VecDeque<QueuedJob>,
413    lsp_status: VecDeque<QueuedJob>,
414    heavy_init: VecDeque<QueuedJob>,
415    mutating: VecDeque<QueuedJob>,
416    fatal: bool,
417}
418
419impl ActorState {
420    fn new(ctx: Arc<AppContext>) -> Self {
421        Self {
422            ctx,
423            epoch: Arc::new(RwLock::new(())),
424            read_inflight: 0,
425            lsp_inflight: false,
426            actor_total_inflight: 0,
427            writer_pending: false,
428            deficit: 0,
429            order: VecDeque::new(),
430            pure_reads: VecDeque::new(),
431            lsp_status: VecDeque::new(),
432            heavy_init: VecDeque::new(),
433            mutating: VecDeque::new(),
434            fatal: false,
435        }
436    }
437
438    fn push_job(&mut self, lane: Lane, job: QueuedJob) {
439        self.order.push_back(lane);
440        self.queue_mut(lane).push_back(job);
441    }
442
443    fn has_queued_jobs(&self) -> bool {
444        !self.order.is_empty()
445    }
446
447    fn pop_front_job(&mut self, lane: Lane) -> Option<QueuedJob> {
448        let order_lane = self.order.pop_front()?;
449        debug_assert_eq!(order_lane, lane);
450        self.queue_mut(lane).pop_front()
451    }
452
453    fn queue_mut(&mut self, lane: Lane) -> &mut VecDeque<QueuedJob> {
454        match lane {
455            Lane::PureRead => &mut self.pure_reads,
456            Lane::SerialLspStatus => &mut self.lsp_status,
457            Lane::HeavyInit => &mut self.heavy_init,
458            Lane::Mutating => &mut self.mutating,
459        }
460    }
461
462    fn fail_queued_jobs(&mut self) {
463        self.order.clear();
464        fail_queued_job_queue(&mut self.pure_reads);
465        fail_queued_job_queue(&mut self.lsp_status);
466        fail_queued_job_queue(&mut self.heavy_init);
467        fail_queued_job_queue(&mut self.mutating);
468    }
469}
470
471struct QueuedJob {
472    job: ExecutorJob,
473    completion: CompletionSender,
474    request_id: String,
475    command: String,
476}
477
478fn fail_queued_job_queue(queue: &mut VecDeque<QueuedJob>) {
479    for queued in queue.drain(..) {
480        queued
481            .completion
482            .send(actor_fatal_response(queued.request_id));
483    }
484}
485
486fn lane_command(lane: Lane) -> String {
487    format!("executor::{lane:?}")
488}
489
490fn actor_fatal_response(request_id: impl Into<String>) -> Response {
491    Response::error(
492        request_id,
493        "actor_fatal",
494        "executor actor is fatal after a mutating job panic",
495    )
496}
497
498fn panic_payload_message(payload: &(dyn std::any::Any + Send)) -> String {
499    if let Some(message) = payload.downcast_ref::<&'static str>() {
500        (*message).to_string()
501    } else if let Some(message) = payload.downcast_ref::<String>() {
502        message.clone()
503    } else {
504        "unknown panic payload".to_string()
505    }
506}
507
508fn panic_response(
509    request_id: impl Into<String>,
510    command: &str,
511    payload: &(dyn std::any::Any + Send),
512) -> Response {
513    let panic_message = panic_payload_message(payload);
514    Response::error(
515        request_id,
516        "internal_error",
517        format!("command '{command}' panicked: {panic_message}"),
518    )
519}
520
521enum CompletionSender {
522    Sync(Sender<Response>),
523    Async(oneshot::Sender<Response>),
524}
525
526impl CompletionSender {
527    fn send(self, response: Response) {
528        match self {
529            Self::Sync(tx) => {
530                let _ = tx.send(response);
531            }
532            Self::Async(tx) => {
533                let _ = tx.send(response);
534            }
535        }
536    }
537}
538
539struct RunJob {
540    root_id: ProjectRootId,
541    lane: Lane,
542    ctx: Arc<AppContext>,
543    epoch: Arc<RwLock<()>>,
544    job: ExecutorJob,
545    completion: Option<CompletionSender>,
546    request_id: String,
547    command: String,
548    heavy_permit: Option<HeavyPermit>,
549}
550
551struct CompletionEvent {
552    root_id: ProjectRootId,
553    lane: Lane,
554    heavy_permit: Option<HeavyPermit>,
555    panicked: bool,
556}
557
558enum SchedulerEvent {
559    Wake,
560    Completed(CompletionEvent),
561    Shutdown,
562}
563
564fn scheduler_loop(
565    state: Arc<Mutex<SchedulerState>>,
566    heavy: Arc<HeavySemaphore>,
567    run_tx: Sender<RunJob>,
568    event_rx: Receiver<SchedulerEvent>,
569    nonrunnable_dispatches: Arc<AtomicUsize>,
570) {
571    while let Ok(event) = event_rx.recv() {
572        let mut shutdown = false;
573        {
574            let mut state = state.lock();
575            shutdown |= process_scheduler_event(event, &mut state);
576            while !shutdown {
577                match event_rx.try_recv() {
578                    Ok(event) => shutdown |= process_scheduler_event(event, &mut state),
579                    Err(_) => break,
580                }
581            }
582
583            if !shutdown {
584                dispatch_runnable(&mut state, &heavy, &run_tx, &nonrunnable_dispatches);
585            }
586        }
587
588        if shutdown {
589            break;
590        }
591    }
592}
593
594fn process_scheduler_event(event: SchedulerEvent, state: &mut SchedulerState) -> bool {
595    match event {
596        SchedulerEvent::Wake => false,
597        SchedulerEvent::Completed(event) => {
598            complete_job(state, event);
599            false
600        }
601        SchedulerEvent::Shutdown => true,
602    }
603}
604
605fn complete_job(state: &mut SchedulerState, event: CompletionEvent) {
606    let CompletionEvent {
607        root_id,
608        lane,
609        heavy_permit,
610        panicked,
611    } = event;
612
613    if let Some(actor) = state.actors.get_mut(&root_id) {
614        actor.actor_total_inflight = actor.actor_total_inflight.saturating_sub(1);
615        match lane {
616            Lane::PureRead => {
617                actor.read_inflight = actor.read_inflight.saturating_sub(1);
618            }
619            Lane::SerialLspStatus => {
620                actor.lsp_inflight = false;
621            }
622            Lane::HeavyInit => {}
623            Lane::Mutating => {
624                actor.writer_pending = false;
625            }
626        }
627
628        if panicked && lane == Lane::Mutating {
629            actor.fatal = true;
630            actor.fail_queued_jobs();
631        }
632    }
633
634    drop(heavy_permit);
635    state.idle_workers += 1;
636}
637
638fn dispatch_runnable(
639    state: &mut SchedulerState,
640    heavy: &Arc<HeavySemaphore>,
641    run_tx: &Sender<RunJob>,
642    nonrunnable_dispatches: &AtomicUsize,
643) {
644    while state.idle_workers > 0 && !state.actor_order.is_empty() {
645        let actor_count = state.actor_order.len();
646        let mut made_progress = false;
647
648        for _ in 0..actor_count {
649            if state.idle_workers == 0 || state.actor_order.is_empty() {
650                break;
651            }
652
653            if state.cursor >= state.actor_order.len() {
654                state.cursor = 0;
655            }
656            let root_id = state.actor_order[state.cursor].clone();
657            state.cursor = (state.cursor + 1) % state.actor_order.len();
658
659            let run_job = {
660                let Some(actor) = state.actors.get_mut(&root_id) else {
661                    continue;
662                };
663
664                if actor.fatal {
665                    actor.fail_queued_jobs();
666                    actor.deficit = 0;
667                    continue;
668                }
669
670                if !actor.has_queued_jobs() {
671                    actor.deficit = 0;
672                    continue;
673                }
674
675                actor.deficit =
676                    (actor.deficit + state.config.drr_quantum).min(state.config.deficit_cap);
677                if actor.deficit < JOB_COST {
678                    continue;
679                }
680
681                try_admit_actor(&root_id, actor, &state.config, heavy)
682            };
683
684            if let Some(run_job) = run_job {
685                state.idle_workers -= 1;
686                made_progress = true;
687                if run_tx.send(run_job).is_err() {
688                    nonrunnable_dispatches.fetch_add(1, Ordering::AcqRel);
689                    return;
690                }
691            }
692        }
693
694        if !made_progress {
695            break;
696        }
697    }
698}
699
700fn try_admit_actor(
701    root_id: &ProjectRootId,
702    actor: &mut ActorState,
703    config: &EffectiveConfig,
704    heavy: &Arc<HeavySemaphore>,
705) -> Option<RunJob> {
706    let lane = *actor.order.front()?;
707    let mut heavy_permit = None;
708
709    let runnable = match lane {
710        Lane::PureRead => {
711            !actor.writer_pending
712                && actor.read_inflight < config.read_cap
713                && actor.actor_total_inflight < config.actor_cap
714        }
715        Lane::SerialLspStatus => {
716            !actor.writer_pending
717                && !actor.lsp_inflight
718                && actor.actor_total_inflight < config.actor_cap
719        }
720        Lane::HeavyInit => {
721            if actor.actor_total_inflight >= config.actor_cap {
722                false
723            } else if let Some(permit) = heavy.try_acquire() {
724                heavy_permit = Some(permit);
725                true
726            } else {
727                false
728            }
729        }
730        Lane::Mutating => {
731            actor.writer_pending = true;
732            actor.read_inflight == 0 && actor.actor_total_inflight < config.actor_cap
733        }
734    };
735
736    if !runnable {
737        return None;
738    }
739
740    let queued = actor.pop_front_job(lane)?;
741    actor.deficit -= JOB_COST;
742    match lane {
743        Lane::PureRead => {
744            actor.read_inflight += 1;
745            actor.actor_total_inflight += 1;
746        }
747        Lane::SerialLspStatus => {
748            actor.lsp_inflight = true;
749            actor.actor_total_inflight += 1;
750        }
751        Lane::HeavyInit => {
752            actor.actor_total_inflight += 1;
753        }
754        Lane::Mutating => {
755            actor.actor_total_inflight += 1;
756        }
757    }
758
759    Some(RunJob {
760        root_id: root_id.clone(),
761        lane,
762        ctx: Arc::clone(&actor.ctx),
763        epoch: Arc::clone(&actor.epoch),
764        job: queued.job,
765        completion: Some(queued.completion),
766        request_id: queued.request_id,
767        command: queued.command,
768        heavy_permit,
769    })
770}
771
772fn worker_loop(run_rx: Receiver<RunJob>, event_tx: Sender<SchedulerEvent>) {
773    while let Ok(mut run_job) = run_rx.recv() {
774        let response =
775            std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| run_lane_job(&mut run_job)));
776        let panicked = response.is_err();
777        let response = match response {
778            Ok(response) => response,
779            Err(payload) => panic_response(
780                run_job.request_id.clone(),
781                &run_job.command,
782                payload.as_ref(),
783            ),
784        };
785
786        if let Some(completion) = run_job.completion.take() {
787            completion.send(response);
788        }
789        let completion = CompletionEvent {
790            root_id: run_job.root_id,
791            lane: run_job.lane,
792            heavy_permit: run_job.heavy_permit.take(),
793            panicked,
794        };
795        let _ = event_tx.send(SchedulerEvent::Completed(completion));
796    }
797}
798
799fn run_lane_job(run_job: &mut RunJob) -> Response {
800    let missing_request_id = run_job.request_id.clone();
801    let job = std::mem::replace(
802        &mut run_job.job,
803        Box::new(move |_| {
804            Response::error(
805                missing_request_id,
806                "job_missing",
807                "executor job already taken",
808            )
809        }),
810    );
811
812    match run_job.lane {
813        Lane::PureRead | Lane::SerialLspStatus => {
814            let _epoch = run_job.epoch.read();
815            job(&run_job.ctx)
816        }
817        Lane::HeavyInit => {
818            let response = job(&run_job.ctx);
819            {
820                let _install = run_job.epoch.write();
821            }
822            response
823        }
824        Lane::Mutating => {
825            let _epoch = run_job.epoch.write();
826            job(&run_job.ctx)
827        }
828    }
829}
830
831#[derive(Debug)]
832struct HeavySemaphore {
833    available: AtomicUsize,
834    max: usize,
835}
836
837impl HeavySemaphore {
838    fn new(permits: usize) -> Self {
839        Self {
840            available: AtomicUsize::new(permits),
841            max: permits,
842        }
843    }
844
845    fn try_acquire(self: &Arc<Self>) -> Option<HeavyPermit> {
846        loop {
847            let available = self.available.load(Ordering::Acquire);
848            if available == 0 {
849                return None;
850            }
851            if self
852                .available
853                .compare_exchange(
854                    available,
855                    available - 1,
856                    Ordering::AcqRel,
857                    Ordering::Acquire,
858                )
859                .is_ok()
860            {
861                return Some(HeavyPermit {
862                    semaphore: Arc::clone(self),
863                });
864            }
865        }
866    }
867}
868
869struct HeavyPermit {
870    semaphore: Arc<HeavySemaphore>,
871}
872
873impl Drop for HeavyPermit {
874    fn drop(&mut self) {
875        let previous = self.semaphore.available.fetch_add(1, Ordering::Release);
876        debug_assert!(previous < self.semaphore.max);
877    }
878}