1mod single_flight;
2
3#[cfg(test)]
4mod tests;
5
6use std::{
7 collections::{HashMap, VecDeque},
8 sync::{
9 atomic::{AtomicUsize, Ordering},
10 Arc,
11 },
12 thread::{self, JoinHandle},
13 time::Duration,
14};
15
16use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
17use parking_lot::{Mutex, RwLock};
18use tokio::sync::oneshot;
19
20use crate::{context::AppContext, path_identity::ProjectRootId, protocol::Response};
21
22pub use single_flight::SingleFlight;
23
24const JOB_COST: isize = 1;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum Lane {
29 PureRead,
32 SerialLspStatus,
35 HeavyInit,
39 Mutating,
42}
43
44pub type ExecutorJob = Box<dyn FnOnce(&AppContext) -> Response + Send + 'static>;
45
46#[derive(Debug, Clone)]
47pub struct ExecutorConfig {
48 pub pool_size: usize,
49 pub read_cap: usize,
50 pub actor_cap: usize,
51 pub heavy_permits: usize,
52 pub drr_quantum: isize,
53}
54
55impl Default for ExecutorConfig {
56 fn default() -> Self {
57 let available = thread::available_parallelism()
58 .map(usize::from)
59 .unwrap_or(2);
60 let pool_size = available.saturating_sub(1).clamp(2, 8);
61 let actor_cap = pool_size.saturating_sub(1).clamp(1, 4);
62 let read_cap = actor_cap.clamp(1, 4);
63 let heavy_permits = pool_size.saturating_sub(1).clamp(2, 3);
64
65 Self {
66 pool_size,
67 read_cap,
68 actor_cap,
69 heavy_permits,
70 drr_quantum: 1,
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
76struct EffectiveConfig {
77 pool_size: usize,
78 read_cap: usize,
79 actor_cap: usize,
80 heavy_permits: usize,
81 drr_quantum: isize,
82 deficit_cap: isize,
83}
84
85impl ExecutorConfig {
86 fn effective(&self) -> EffectiveConfig {
87 let pool_size = self.pool_size.clamp(2, 8);
88 let max_actor_cap = pool_size.saturating_sub(1).max(1);
89 let actor_cap = self.actor_cap.max(1).min(max_actor_cap);
90 let read_cap = self.read_cap.max(1).min(actor_cap).min(4);
91 let heavy_permits = self.heavy_permits.clamp(2, 3);
92 let drr_quantum = self.drr_quantum.max(1);
93 let deficit_cap = (actor_cap.max(1) as isize) * 4;
94
95 EffectiveConfig {
96 pool_size,
97 read_cap,
98 actor_cap,
99 heavy_permits,
100 drr_quantum,
101 deficit_cap,
102 }
103 }
104}
105
106pub struct CompletionHandle {
109 rx: Receiver<Response>,
110}
111
112impl CompletionHandle {
113 pub fn recv(self) -> Result<Response, RecvError> {
114 self.rx.recv()
115 }
116
117 pub fn recv_timeout(&self, timeout: Duration) -> Result<Response, RecvTimeoutError> {
118 self.rx.recv_timeout(timeout)
119 }
120
121 pub fn into_receiver(self) -> Receiver<Response> {
122 self.rx
123 }
124}
125
126pub struct Executor {
128 inner: Arc<ExecutorInner>,
129}
130
131impl Executor {
132 pub fn new() -> Self {
133 Self::with_config(ExecutorConfig::default())
134 }
135
136 pub fn with_config(config: ExecutorConfig) -> Self {
137 let effective = config.effective();
138 let state = Arc::new(Mutex::new(SchedulerState::new(effective.clone())));
139 let heavy = Arc::new(HeavySemaphore::new(effective.heavy_permits));
140 let nonrunnable_dispatches = Arc::new(AtomicUsize::new(0));
141 let (run_tx, run_rx) = crossbeam_channel::unbounded();
142 let (event_tx, event_rx) = crossbeam_channel::unbounded();
143
144 let scheduler_state = Arc::clone(&state);
145 let scheduler_heavy = Arc::clone(&heavy);
146 let scheduler_violations = Arc::clone(&nonrunnable_dispatches);
147 let scheduler_handle = thread::Builder::new()
148 .name("aft-executor-scheduler".to_string())
149 .spawn(move || {
150 scheduler_loop(
151 scheduler_state,
152 scheduler_heavy,
153 run_tx,
154 event_rx,
155 scheduler_violations,
156 );
157 })
158 .expect("spawn AFT executor scheduler");
159
160 let mut worker_handles = Vec::with_capacity(effective.pool_size);
161 for worker_id in 0..effective.pool_size {
162 let worker_rx = run_rx.clone();
163 let worker_events = event_tx.clone();
164 let handle = thread::Builder::new()
165 .name(format!("aft-executor-worker-{worker_id}"))
166 .spawn(move || worker_loop(worker_rx, worker_events))
167 .expect("spawn AFT executor worker");
168 worker_handles.push(handle);
169 }
170
171 Self {
172 inner: Arc::new(ExecutorInner {
173 state,
174 event_tx,
175 scheduler_handle: Mutex::new(Some(scheduler_handle)),
176 worker_handles: Mutex::new(worker_handles),
177 config: effective,
178 nonrunnable_dispatches,
179 }),
180 }
181 }
182
183 pub fn register_actor(&self, root_id: ProjectRootId, ctx: Arc<AppContext>) -> bool {
190 let inserted = {
191 let mut state = self.inner.state.lock();
192 if state.actors.contains_key(&root_id) {
193 false
194 } else {
195 state.actor_order.push(root_id.clone());
196 state.actors.insert(root_id, ActorState::new(ctx));
197 true
198 }
199 };
200 self.wake_scheduler();
201 inserted
202 }
203
204 pub fn remove_actor(&self, root_id: &ProjectRootId) {
212 let removed = {
213 let mut state = self.inner.state.lock();
214 state.actor_order.retain(|actor_root| actor_root != root_id);
215 state.actors.remove(root_id)
216 };
217 drop(removed);
218 self.wake_scheduler();
219 }
220
221 pub fn actor_contexts(&self) -> Vec<Arc<AppContext>> {
226 let state = self.inner.state.lock();
227 state
228 .actors
229 .values()
230 .map(|actor_state| Arc::clone(&actor_state.ctx))
231 .collect()
232 }
233
234 pub fn submit(
235 &self,
236 root_id: ProjectRootId,
237 lane: Lane,
238 request_id: String,
239 job: ExecutorJob,
240 ) -> CompletionHandle {
241 let (completion_tx, completion_rx) = crossbeam_channel::bounded(1);
242 self.submit_with_completion(
243 root_id,
244 lane,
245 request_id,
246 job,
247 CompletionSender::Sync(completion_tx),
248 );
249 CompletionHandle { rx: completion_rx }
250 }
251
252 pub fn submit_async(
253 &self,
254 root_id: ProjectRootId,
255 lane: Lane,
256 request_id: String,
257 job: ExecutorJob,
258 ) -> oneshot::Receiver<Response> {
259 let (completion_tx, completion_rx) = oneshot::channel();
260 self.submit_with_completion(
261 root_id,
262 lane,
263 request_id,
264 job,
265 CompletionSender::Async(completion_tx),
266 );
267 completion_rx
268 }
269
270 fn submit_with_completion(
271 &self,
272 root_id: ProjectRootId,
273 lane: Lane,
274 request_id: String,
275 job: ExecutorJob,
276 completion: CompletionSender,
277 ) {
278 let command = lane_command(lane);
279 let mut job = Some(job);
280 let mut completion = Some(completion);
281
282 let response = {
283 let mut state = self.inner.state.lock();
284 match state.actors.get_mut(&root_id) {
285 Some(actor) if actor.fatal => Some(actor_fatal_response(request_id.clone())),
286 Some(actor) => {
287 actor.push_job(
288 lane,
289 QueuedJob {
290 job: job.take().expect("executor job already queued"),
291 completion: completion
292 .take()
293 .expect("executor completion already queued"),
294 request_id: request_id.clone(),
295 command,
296 },
297 );
298 None
299 }
300 None => Some(Response::error(
301 request_id.clone(),
302 "actor_not_registered",
303 "executor actor is not registered",
304 )),
305 }
306 };
307
308 if let Some(response) = response {
309 if let Some(completion) = completion {
310 completion.send(response);
311 }
312 return;
313 }
314
315 self.wake_scheduler();
316 }
317
318 pub fn pool_size(&self) -> usize {
319 self.inner.config.pool_size
320 }
321
322 pub fn actor_cap(&self) -> usize {
323 self.inner.config.actor_cap
324 }
325
326 pub fn read_cap(&self) -> usize {
327 self.inner.config.read_cap
328 }
329
330 pub fn heavy_permits(&self) -> usize {
331 self.inner.config.heavy_permits
332 }
333
334 pub fn nonrunnable_dispatch_count(&self) -> usize {
335 self.inner.nonrunnable_dispatches.load(Ordering::Acquire)
336 }
337
338 pub fn actor_is_fatal(&self, root_id: &ProjectRootId) -> bool {
339 self.inner
340 .state
341 .lock()
342 .actors
343 .get(root_id)
344 .map(|actor| actor.fatal)
345 .unwrap_or(false)
346 }
347
348 fn wake_scheduler(&self) {
349 let _ = self.inner.event_tx.send(SchedulerEvent::Wake);
350 }
351}
352
353impl Default for Executor {
354 fn default() -> Self {
355 Self::new()
356 }
357}
358
359struct ExecutorInner {
360 state: Arc<Mutex<SchedulerState>>,
361 event_tx: Sender<SchedulerEvent>,
362 scheduler_handle: Mutex<Option<JoinHandle<()>>>,
363 worker_handles: Mutex<Vec<JoinHandle<()>>>,
364 config: EffectiveConfig,
365 nonrunnable_dispatches: Arc<AtomicUsize>,
366}
367
368impl Drop for ExecutorInner {
369 fn drop(&mut self) {
370 let _ = self.event_tx.send(SchedulerEvent::Shutdown);
371
372 if let Some(handle) = self.scheduler_handle.lock().take() {
373 let _ = handle.join();
374 }
375
376 let mut workers = self.worker_handles.lock();
377 for handle in workers.drain(..) {
378 let _ = handle.join();
379 }
380 }
381}
382
383struct SchedulerState {
384 actors: HashMap<ProjectRootId, ActorState>,
385 actor_order: Vec<ProjectRootId>,
386 cursor: usize,
387 idle_workers: usize,
388 config: EffectiveConfig,
389}
390
391impl SchedulerState {
392 fn new(config: EffectiveConfig) -> Self {
393 Self {
394 actors: HashMap::new(),
395 actor_order: Vec::new(),
396 cursor: 0,
397 idle_workers: config.pool_size,
398 config,
399 }
400 }
401}
402
403struct ActorState {
404 ctx: Arc<AppContext>,
405 epoch: Arc<RwLock<()>>,
406 read_inflight: usize,
407 lsp_inflight: bool,
408 actor_total_inflight: usize,
409 writer_pending: bool,
410 deficit: isize,
411 order: VecDeque<Lane>,
412 pure_reads: VecDeque<QueuedJob>,
413 lsp_status: VecDeque<QueuedJob>,
414 heavy_init: VecDeque<QueuedJob>,
415 mutating: VecDeque<QueuedJob>,
416 fatal: bool,
417}
418
419impl ActorState {
420 fn new(ctx: Arc<AppContext>) -> Self {
421 Self {
422 ctx,
423 epoch: Arc::new(RwLock::new(())),
424 read_inflight: 0,
425 lsp_inflight: false,
426 actor_total_inflight: 0,
427 writer_pending: false,
428 deficit: 0,
429 order: VecDeque::new(),
430 pure_reads: VecDeque::new(),
431 lsp_status: VecDeque::new(),
432 heavy_init: VecDeque::new(),
433 mutating: VecDeque::new(),
434 fatal: false,
435 }
436 }
437
438 fn push_job(&mut self, lane: Lane, job: QueuedJob) {
439 self.order.push_back(lane);
440 self.queue_mut(lane).push_back(job);
441 }
442
443 fn has_queued_jobs(&self) -> bool {
444 !self.order.is_empty()
445 }
446
447 fn pop_front_job(&mut self, lane: Lane) -> Option<QueuedJob> {
448 let order_lane = self.order.pop_front()?;
449 debug_assert_eq!(order_lane, lane);
450 self.queue_mut(lane).pop_front()
451 }
452
453 fn queue_mut(&mut self, lane: Lane) -> &mut VecDeque<QueuedJob> {
454 match lane {
455 Lane::PureRead => &mut self.pure_reads,
456 Lane::SerialLspStatus => &mut self.lsp_status,
457 Lane::HeavyInit => &mut self.heavy_init,
458 Lane::Mutating => &mut self.mutating,
459 }
460 }
461
462 fn fail_queued_jobs(&mut self) {
463 self.order.clear();
464 fail_queued_job_queue(&mut self.pure_reads);
465 fail_queued_job_queue(&mut self.lsp_status);
466 fail_queued_job_queue(&mut self.heavy_init);
467 fail_queued_job_queue(&mut self.mutating);
468 }
469}
470
471struct QueuedJob {
472 job: ExecutorJob,
473 completion: CompletionSender,
474 request_id: String,
475 command: String,
476}
477
478fn fail_queued_job_queue(queue: &mut VecDeque<QueuedJob>) {
479 for queued in queue.drain(..) {
480 queued
481 .completion
482 .send(actor_fatal_response(queued.request_id));
483 }
484}
485
486fn lane_command(lane: Lane) -> String {
487 format!("executor::{lane:?}")
488}
489
490fn actor_fatal_response(request_id: impl Into<String>) -> Response {
491 Response::error(
492 request_id,
493 "actor_fatal",
494 "executor actor is fatal after a mutating job panic",
495 )
496}
497
498fn panic_payload_message(payload: &(dyn std::any::Any + Send)) -> String {
499 if let Some(message) = payload.downcast_ref::<&'static str>() {
500 (*message).to_string()
501 } else if let Some(message) = payload.downcast_ref::<String>() {
502 message.clone()
503 } else {
504 "unknown panic payload".to_string()
505 }
506}
507
508fn panic_response(
509 request_id: impl Into<String>,
510 command: &str,
511 payload: &(dyn std::any::Any + Send),
512) -> Response {
513 let panic_message = panic_payload_message(payload);
514 Response::error(
515 request_id,
516 "internal_error",
517 format!("command '{command}' panicked: {panic_message}"),
518 )
519}
520
521enum CompletionSender {
522 Sync(Sender<Response>),
523 Async(oneshot::Sender<Response>),
524}
525
526impl CompletionSender {
527 fn send(self, response: Response) {
528 match self {
529 Self::Sync(tx) => {
530 let _ = tx.send(response);
531 }
532 Self::Async(tx) => {
533 let _ = tx.send(response);
534 }
535 }
536 }
537}
538
539struct RunJob {
540 root_id: ProjectRootId,
541 lane: Lane,
542 ctx: Arc<AppContext>,
543 epoch: Arc<RwLock<()>>,
544 job: ExecutorJob,
545 completion: Option<CompletionSender>,
546 request_id: String,
547 command: String,
548 heavy_permit: Option<HeavyPermit>,
549}
550
551struct CompletionEvent {
552 root_id: ProjectRootId,
553 lane: Lane,
554 heavy_permit: Option<HeavyPermit>,
555 panicked: bool,
556}
557
558enum SchedulerEvent {
559 Wake,
560 Completed(CompletionEvent),
561 Shutdown,
562}
563
564fn scheduler_loop(
565 state: Arc<Mutex<SchedulerState>>,
566 heavy: Arc<HeavySemaphore>,
567 run_tx: Sender<RunJob>,
568 event_rx: Receiver<SchedulerEvent>,
569 nonrunnable_dispatches: Arc<AtomicUsize>,
570) {
571 while let Ok(event) = event_rx.recv() {
572 let mut shutdown = false;
573 {
574 let mut state = state.lock();
575 shutdown |= process_scheduler_event(event, &mut state);
576 while !shutdown {
577 match event_rx.try_recv() {
578 Ok(event) => shutdown |= process_scheduler_event(event, &mut state),
579 Err(_) => break,
580 }
581 }
582
583 if !shutdown {
584 dispatch_runnable(&mut state, &heavy, &run_tx, &nonrunnable_dispatches);
585 }
586 }
587
588 if shutdown {
589 break;
590 }
591 }
592}
593
594fn process_scheduler_event(event: SchedulerEvent, state: &mut SchedulerState) -> bool {
595 match event {
596 SchedulerEvent::Wake => false,
597 SchedulerEvent::Completed(event) => {
598 complete_job(state, event);
599 false
600 }
601 SchedulerEvent::Shutdown => true,
602 }
603}
604
605fn complete_job(state: &mut SchedulerState, event: CompletionEvent) {
606 let CompletionEvent {
607 root_id,
608 lane,
609 heavy_permit,
610 panicked,
611 } = event;
612
613 if let Some(actor) = state.actors.get_mut(&root_id) {
614 actor.actor_total_inflight = actor.actor_total_inflight.saturating_sub(1);
615 match lane {
616 Lane::PureRead => {
617 actor.read_inflight = actor.read_inflight.saturating_sub(1);
618 }
619 Lane::SerialLspStatus => {
620 actor.lsp_inflight = false;
621 }
622 Lane::HeavyInit => {}
623 Lane::Mutating => {
624 actor.writer_pending = false;
625 }
626 }
627
628 if panicked && lane == Lane::Mutating {
629 actor.fatal = true;
630 actor.fail_queued_jobs();
631 }
632 }
633
634 drop(heavy_permit);
635 state.idle_workers += 1;
636}
637
638fn dispatch_runnable(
639 state: &mut SchedulerState,
640 heavy: &Arc<HeavySemaphore>,
641 run_tx: &Sender<RunJob>,
642 nonrunnable_dispatches: &AtomicUsize,
643) {
644 while state.idle_workers > 0 && !state.actor_order.is_empty() {
645 let actor_count = state.actor_order.len();
646 let mut made_progress = false;
647
648 for _ in 0..actor_count {
649 if state.idle_workers == 0 || state.actor_order.is_empty() {
650 break;
651 }
652
653 if state.cursor >= state.actor_order.len() {
654 state.cursor = 0;
655 }
656 let root_id = state.actor_order[state.cursor].clone();
657 state.cursor = (state.cursor + 1) % state.actor_order.len();
658
659 let run_job = {
660 let Some(actor) = state.actors.get_mut(&root_id) else {
661 continue;
662 };
663
664 if actor.fatal {
665 actor.fail_queued_jobs();
666 actor.deficit = 0;
667 continue;
668 }
669
670 if !actor.has_queued_jobs() {
671 actor.deficit = 0;
672 continue;
673 }
674
675 actor.deficit =
676 (actor.deficit + state.config.drr_quantum).min(state.config.deficit_cap);
677 if actor.deficit < JOB_COST {
678 continue;
679 }
680
681 try_admit_actor(&root_id, actor, &state.config, heavy)
682 };
683
684 if let Some(run_job) = run_job {
685 state.idle_workers -= 1;
686 made_progress = true;
687 if run_tx.send(run_job).is_err() {
688 nonrunnable_dispatches.fetch_add(1, Ordering::AcqRel);
689 return;
690 }
691 }
692 }
693
694 if !made_progress {
695 break;
696 }
697 }
698}
699
700fn try_admit_actor(
701 root_id: &ProjectRootId,
702 actor: &mut ActorState,
703 config: &EffectiveConfig,
704 heavy: &Arc<HeavySemaphore>,
705) -> Option<RunJob> {
706 let lane = *actor.order.front()?;
707 let mut heavy_permit = None;
708
709 let runnable = match lane {
710 Lane::PureRead => {
711 !actor.writer_pending
712 && actor.read_inflight < config.read_cap
713 && actor.actor_total_inflight < config.actor_cap
714 }
715 Lane::SerialLspStatus => {
716 !actor.writer_pending
717 && !actor.lsp_inflight
718 && actor.actor_total_inflight < config.actor_cap
719 }
720 Lane::HeavyInit => {
721 if actor.actor_total_inflight >= config.actor_cap {
722 false
723 } else if let Some(permit) = heavy.try_acquire() {
724 heavy_permit = Some(permit);
725 true
726 } else {
727 false
728 }
729 }
730 Lane::Mutating => {
731 actor.writer_pending = true;
732 actor.read_inflight == 0 && actor.actor_total_inflight < config.actor_cap
733 }
734 };
735
736 if !runnable {
737 return None;
738 }
739
740 let queued = actor.pop_front_job(lane)?;
741 actor.deficit -= JOB_COST;
742 match lane {
743 Lane::PureRead => {
744 actor.read_inflight += 1;
745 actor.actor_total_inflight += 1;
746 }
747 Lane::SerialLspStatus => {
748 actor.lsp_inflight = true;
749 actor.actor_total_inflight += 1;
750 }
751 Lane::HeavyInit => {
752 actor.actor_total_inflight += 1;
753 }
754 Lane::Mutating => {
755 actor.actor_total_inflight += 1;
756 }
757 }
758
759 Some(RunJob {
760 root_id: root_id.clone(),
761 lane,
762 ctx: Arc::clone(&actor.ctx),
763 epoch: Arc::clone(&actor.epoch),
764 job: queued.job,
765 completion: Some(queued.completion),
766 request_id: queued.request_id,
767 command: queued.command,
768 heavy_permit,
769 })
770}
771
772fn worker_loop(run_rx: Receiver<RunJob>, event_tx: Sender<SchedulerEvent>) {
773 while let Ok(mut run_job) = run_rx.recv() {
774 let response =
775 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| run_lane_job(&mut run_job)));
776 let panicked = response.is_err();
777 let response = match response {
778 Ok(response) => response,
779 Err(payload) => panic_response(
780 run_job.request_id.clone(),
781 &run_job.command,
782 payload.as_ref(),
783 ),
784 };
785
786 if let Some(completion) = run_job.completion.take() {
787 completion.send(response);
788 }
789 let completion = CompletionEvent {
790 root_id: run_job.root_id,
791 lane: run_job.lane,
792 heavy_permit: run_job.heavy_permit.take(),
793 panicked,
794 };
795 let _ = event_tx.send(SchedulerEvent::Completed(completion));
796 }
797}
798
799fn run_lane_job(run_job: &mut RunJob) -> Response {
800 let missing_request_id = run_job.request_id.clone();
801 let job = std::mem::replace(
802 &mut run_job.job,
803 Box::new(move |_| {
804 Response::error(
805 missing_request_id,
806 "job_missing",
807 "executor job already taken",
808 )
809 }),
810 );
811
812 match run_job.lane {
813 Lane::PureRead | Lane::SerialLspStatus => {
814 let _epoch = run_job.epoch.read();
815 job(&run_job.ctx)
816 }
817 Lane::HeavyInit => {
818 let response = job(&run_job.ctx);
819 {
820 let _install = run_job.epoch.write();
821 }
822 response
823 }
824 Lane::Mutating => {
825 let _epoch = run_job.epoch.write();
826 job(&run_job.ctx)
827 }
828 }
829}
830
831#[derive(Debug)]
832struct HeavySemaphore {
833 available: AtomicUsize,
834 max: usize,
835}
836
837impl HeavySemaphore {
838 fn new(permits: usize) -> Self {
839 Self {
840 available: AtomicUsize::new(permits),
841 max: permits,
842 }
843 }
844
845 fn try_acquire(self: &Arc<Self>) -> Option<HeavyPermit> {
846 loop {
847 let available = self.available.load(Ordering::Acquire);
848 if available == 0 {
849 return None;
850 }
851 if self
852 .available
853 .compare_exchange(
854 available,
855 available - 1,
856 Ordering::AcqRel,
857 Ordering::Acquire,
858 )
859 .is_ok()
860 {
861 return Some(HeavyPermit {
862 semaphore: Arc::clone(self),
863 });
864 }
865 }
866 }
867}
868
869struct HeavyPermit {
870 semaphore: Arc<HeavySemaphore>,
871}
872
873impl Drop for HeavyPermit {
874 fn drop(&mut self) {
875 let previous = self.semaphore.available.fetch_add(1, Ordering::Release);
876 debug_assert!(previous < self.semaphore.max);
877 }
878}