Skip to main content

firq_core/
scheduler.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
3use std::time::{Duration, Instant};
4
5use parking_lot::{Mutex, RwLock};
6
7use crate::api::{
8    CancelResult, CloseMode, DequeueResult, EnqueueRejectReason, EnqueueResult,
9    EnqueueWithHandleResult, Priority, QuantumProvider, SchedulerConfig, SchedulerStats, Task,
10    TaskHandle, TenantKey,
11};
12use crate::state::{
13    ActiveRing, QUEUE_TIME_BUCKETS_NS, QueueEntry, Shard, StatsCounters, TenantState, TopTenants,
14    WorkSignal,
15};
16
17const SHUTDOWN_OPEN: u8 = 0;
18const SHUTDOWN_DRAIN: u8 = 1;
19const SHUTDOWN_IMMEDIATE: u8 = 2;
20const ENQUEUE_PURGE_INTERVAL: u32 = 32;
21const ENQUEUE_PURGE_NEAR_LIMIT_PERCENT: usize = 90;
22const ENQUEUE_PURGE_SCAN_BUDGET_NEAR: usize = 8;
23const ENQUEUE_PURGE_SCAN_BUDGET_FULL: usize = 32;
24const DEQUEUE_BLOCKING_BACKOFF_SPINS: u32 = 64;
25const DEQUEUE_BLOCKING_BACKOFF_SLEEP: Duration = Duration::from_micros(200);
26
27#[derive(Copy, Clone)]
28enum EnqueuePurgeReason {
29    Periodic,
30    NearLimit,
31    Full,
32}
33
34#[derive(Default)]
35struct PurgeStats {
36    removed_expired_live: u64,
37    released_slots: u64,
38}
39
40/// Multi-tenant in-process scheduler with DRR fairness and explicit backpressure.
41pub struct Scheduler<T> {
42    config: SchedulerConfig,
43    shards: Vec<Mutex<Shard<T>>>,
44    active_shards: Mutex<ActiveRing<usize>>,
45    stats: StatsCounters,
46    work_signal: WorkSignal,
47    shutdown_state: AtomicU8,
48    shard_active: Vec<AtomicBool>,
49    top_tenants: Mutex<TopTenants>,
50    tenant_quantum: RwLock<HashMap<TenantKey, u64>>,
51    quantum_provider: RwLock<Option<QuantumProvider>>,
52    next_task_id: AtomicU64,
53    pending_ids: Mutex<HashSet<u64>>,
54    cancelled_ids: Mutex<HashSet<u64>>,
55}
56
57impl<T> Scheduler<T> {
58    fn shard_index(&self, tenant: TenantKey) -> usize {
59        let shard_count = self.shards.len();
60        let hash = tenant.as_u64();
61        (hash as usize) % shard_count
62    }
63
64    fn saturating_add(counter: &AtomicU64, delta: u64) {
65        if delta == 0 {
66            return;
67        }
68        let mut current = counter.load(Ordering::Relaxed);
69        loop {
70            let next = current.saturating_add(delta);
71            match counter.compare_exchange(current, next, Ordering::Relaxed, Ordering::Relaxed) {
72                Ok(_) => return,
73                Err(actual) => current = actual,
74            }
75        }
76    }
77
78    fn record_queue_time(&self, queue_time_ns: u64) {
79        for (idx, bound) in QUEUE_TIME_BUCKETS_NS.iter().enumerate() {
80            if queue_time_ns <= *bound {
81                Self::saturating_add(&self.stats.queue_time_buckets[idx], 1);
82                break;
83            }
84        }
85    }
86
87    fn try_reserve_slot(&self) -> bool {
88        let max_global = self.config.max_global as u64;
89        let mut current = self.stats.queue_len_estimate.load(Ordering::Relaxed);
90        loop {
91            if current >= max_global {
92                return false;
93            }
94            let next = current + 1;
95            match self.stats.queue_len_estimate.compare_exchange(
96                current,
97                next,
98                Ordering::AcqRel,
99                Ordering::Relaxed,
100            ) {
101                Ok(_) => return true,
102                Err(actual) => current = actual,
103            }
104        }
105    }
106
107    fn release_reserved_slots(&self, delta: u64) {
108        if delta == 0 {
109            return;
110        }
111
112        let mut current = self.stats.queue_len_estimate.load(Ordering::Relaxed);
113        loop {
114            let next = current.saturating_sub(delta);
115            match self.stats.queue_len_estimate.compare_exchange(
116                current,
117                next,
118                Ordering::AcqRel,
119                Ordering::Relaxed,
120            ) {
121                Ok(_) => return,
122                Err(actual) => current = actual,
123            }
124        }
125    }
126
127    fn activate_shard(&self, shard_index: usize) {
128        if !self.shard_active[shard_index].swap(true, Ordering::AcqRel) {
129            self.active_shards.lock().push_back(shard_index);
130        }
131    }
132
133    fn shutdown_state(&self) -> u8 {
134        self.shutdown_state.load(Ordering::Acquire)
135    }
136
137    fn is_accepting_enqueues(&self) -> bool {
138        self.shutdown_state() == SHUTDOWN_OPEN
139    }
140
141    fn should_return_closed(&self) -> bool {
142        match self.shutdown_state() {
143            SHUTDOWN_IMMEDIATE => true,
144            SHUTDOWN_DRAIN => self.stats.queue_len_estimate.load(Ordering::Acquire) == 0,
145            _ => false,
146        }
147    }
148
149    fn set_shutdown_mode(&self, mode: CloseMode) {
150        let target = match mode {
151            CloseMode::Immediate => SHUTDOWN_IMMEDIATE,
152            CloseMode::Drain => SHUTDOWN_DRAIN,
153        };
154
155        let mut changed = false;
156        loop {
157            let current = self.shutdown_state();
158            let next = match (current, target) {
159                (SHUTDOWN_IMMEDIATE, _) => SHUTDOWN_IMMEDIATE,
160                (_, SHUTDOWN_IMMEDIATE) => SHUTDOWN_IMMEDIATE,
161                (SHUTDOWN_OPEN, SHUTDOWN_DRAIN) => SHUTDOWN_DRAIN,
162                _ => current,
163            };
164
165            if next == current {
166                break;
167            }
168
169            match self.shutdown_state.compare_exchange(
170                current,
171                next,
172                Ordering::AcqRel,
173                Ordering::Acquire,
174            ) {
175                Ok(_) => {
176                    changed = true;
177                    break;
178                }
179                Err(_) => continue,
180            }
181        }
182
183        if changed {
184            self.work_signal.notify_all();
185        }
186    }
187
188    fn record_reject(&self, reason: EnqueueRejectReason) {
189        Self::saturating_add(&self.stats.dropped, 1);
190        match reason {
191            EnqueueRejectReason::GlobalFull => {
192                Self::saturating_add(&self.stats.rejected_global, 1);
193            }
194            EnqueueRejectReason::TenantFull => {
195                Self::saturating_add(&self.stats.rejected_tenant, 1);
196            }
197            EnqueueRejectReason::Timeout => {
198                Self::saturating_add(&self.stats.timeout_rejected, 1);
199            }
200        }
201    }
202
203    fn record_policy_drop(&self) {
204        Self::saturating_add(&self.stats.dropped, 1);
205        Self::saturating_add(&self.stats.dropped_policy, 1);
206    }
207
208    fn next_handle(&self) -> TaskHandle {
209        let id = self.next_task_id.fetch_add(1, Ordering::Relaxed) + 1;
210        TaskHandle::from(id)
211    }
212
213    fn register_pending(&self, handle: TaskHandle) {
214        self.pending_ids.lock().insert(handle.as_u64());
215    }
216
217    fn take_pending(&self, id: u64) -> bool {
218        self.pending_ids.lock().remove(&id)
219    }
220
221    fn mark_cancelled(&self, id: u64) {
222        self.cancelled_ids.lock().insert(id);
223    }
224
225    fn take_cancelled_marker(&self, id: u64) -> bool {
226        self.cancelled_ids.lock().remove(&id)
227    }
228
229    fn clear_cancelled_marker(&self, id: u64) {
230        self.cancelled_ids.lock().remove(&id);
231    }
232
233    fn is_expired(deadline: Option<Instant>, now: Instant) -> bool {
234        matches!(deadline, Some(deadline) if now > deadline)
235    }
236
237    fn is_tenant_near_limit(&self, tenant_len: usize) -> bool {
238        let max_per_tenant = self.config.max_per_tenant;
239        if max_per_tenant == 0 {
240            return true;
241        }
242        let lhs = (tenant_len as u128).saturating_mul(100);
243        let rhs = (max_per_tenant as u128).saturating_mul(ENQUEUE_PURGE_NEAR_LIMIT_PERCENT as u128);
244        lhs >= rhs
245    }
246
247    fn maybe_purge_tenant_before_enqueue(
248        &self,
249        tenant_state: &mut TenantState<T>,
250    ) -> Option<EnqueuePurgeReason> {
251        tenant_state.enqueue_attempts_since_purge =
252            tenant_state.enqueue_attempts_since_purge.saturating_add(1);
253
254        let tenant_len = tenant_state.total_len();
255        let reason = if tenant_len >= self.config.max_per_tenant {
256            Some(EnqueuePurgeReason::Full)
257        } else if self.is_tenant_near_limit(tenant_len) {
258            Some(EnqueuePurgeReason::NearLimit)
259        } else if tenant_state.enqueue_attempts_since_purge >= ENQUEUE_PURGE_INTERVAL {
260            Some(EnqueuePurgeReason::Periodic)
261        } else {
262            None
263        };
264
265        if let Some(reason) = reason {
266            let _ = self.purge_tenant_for_enqueue(tenant_state, reason);
267        }
268        reason
269    }
270
271    fn purge_tenant_for_enqueue(
272        &self,
273        tenant_state: &mut TenantState<T>,
274        reason: EnqueuePurgeReason,
275    ) -> PurgeStats {
276        let side_scan_budget = match reason {
277            EnqueuePurgeReason::Periodic => 0,
278            EnqueuePurgeReason::NearLimit => ENQUEUE_PURGE_SCAN_BUDGET_NEAR,
279            EnqueuePurgeReason::Full => ENQUEUE_PURGE_SCAN_BUDGET_FULL,
280        };
281        let now = Instant::now();
282        let stats = self.purge_tenant_with_budget(tenant_state, now, side_scan_budget);
283        tenant_state.enqueue_attempts_since_purge = 0;
284        self.record_enqueue_purge_stats(&stats);
285        stats
286    }
287
288    fn purge_tenant_with_budget(
289        &self,
290        tenant_state: &mut TenantState<T>,
291        now: Instant,
292        side_scan_budget: usize,
293    ) -> PurgeStats {
294        let mut stats = PurgeStats::default();
295        for idx in 0..tenant_state.queues.len() {
296            let queue = &mut tenant_state.queues[idx];
297            self.purge_queue_with_budget(queue, now, side_scan_budget, &mut stats);
298            if queue.is_empty() {
299                tenant_state.active[idx] = false;
300            }
301        }
302        stats
303    }
304
305    fn purge_queue_with_budget(
306        &self,
307        queue: &mut VecDeque<QueueEntry<T>>,
308        now: Instant,
309        side_scan_budget: usize,
310        stats: &mut PurgeStats,
311    ) {
312        while let Some(front) = queue.front() {
313            let front_id = front.id;
314            if self.take_cancelled_marker(front_id) {
315                let _ = queue.pop_front();
316                if self.take_pending(front_id) {
317                    stats.released_slots = stats.released_slots.saturating_add(1);
318                }
319                continue;
320            }
321
322            if Self::is_expired(front.task.deadline, now) {
323                let removed = queue.pop_front().expect("front disappeared");
324                if self.take_pending(removed.id) {
325                    stats.removed_expired_live = stats.removed_expired_live.saturating_add(1);
326                    stats.released_slots = stats.released_slots.saturating_add(1);
327                }
328                continue;
329            }
330            break;
331        }
332
333        if side_scan_budget == 0 || queue.is_empty() {
334            return;
335        }
336
337        let mut idx = 0usize;
338        let mut scanned = 0usize;
339        while idx < queue.len() && scanned < side_scan_budget {
340            scanned = scanned.saturating_add(1);
341            let (entry_id, expired) = {
342                let entry = queue.get(idx).expect("entry should exist");
343                (entry.id, Self::is_expired(entry.task.deadline, now))
344            };
345
346            if self.take_cancelled_marker(entry_id) {
347                let _ = queue.remove(idx).expect("entry should still exist");
348                if self.take_pending(entry_id) {
349                    stats.released_slots = stats.released_slots.saturating_add(1);
350                }
351                continue;
352            }
353
354            if expired {
355                let removed = queue.remove(idx).expect("entry should still exist");
356                if self.take_pending(removed.id) {
357                    stats.removed_expired_live = stats.removed_expired_live.saturating_add(1);
358                    stats.released_slots = stats.released_slots.saturating_add(1);
359                }
360                continue;
361            }
362
363            idx = idx.saturating_add(1);
364        }
365    }
366
367    fn record_enqueue_purge_stats(&self, stats: &PurgeStats) {
368        Self::saturating_add(&self.stats.enqueue_purge_runs, 1);
369        Self::saturating_add(&self.stats.expired, stats.removed_expired_live);
370
371        if stats.released_slots > 0 {
372            self.release_reserved_slots(stats.released_slots);
373            self.work_signal.notify_all();
374        }
375    }
376
377    fn quantum_for(&self, tenant: TenantKey) -> u64 {
378        if let Some(value) = self.tenant_quantum.read().get(&tenant).copied() {
379            return value.max(1);
380        }
381        if let Some(provider) = self.quantum_provider.read().as_ref() {
382            return provider(tenant).max(1);
383        }
384        self.config.quantum.max(1)
385    }
386
387    fn refresh_tenant_quantum(&self, tenant: TenantKey) {
388        let quantum = self.quantum_for(tenant) as i64;
389        let shard_index = self.shard_index(tenant);
390        let mut shard = self.shards[shard_index].lock();
391        if let Some(state) = shard.tenants.get_mut(&tenant) {
392            state.quantum = quantum;
393        }
394    }
395
396    /// Sets a static DRR quantum override for a tenant.
397    pub fn set_tenant_quantum(&self, tenant: TenantKey, quantum: u64) {
398        let value = quantum.max(1);
399        self.tenant_quantum.write().insert(tenant, value);
400        self.refresh_tenant_quantum(tenant);
401    }
402
403    /// Removes a static DRR quantum override for a tenant.
404    pub fn clear_tenant_quantum(&self, tenant: TenantKey) {
405        self.tenant_quantum.write().remove(&tenant);
406        self.refresh_tenant_quantum(tenant);
407    }
408
409    /// Sets or clears the dynamic per-tenant quantum provider.
410    pub fn set_quantum_provider(&self, provider: Option<QuantumProvider>) {
411        {
412            let mut guard = self.quantum_provider.write();
413            *guard = provider;
414        }
415        for shard_mutex in &self.shards {
416            let mut shard = shard_mutex.lock();
417            for (tenant, state) in shard.tenants.iter_mut() {
418                let quantum = self.quantum_for(*tenant) as i64;
419                state.quantum = quantum;
420            }
421        }
422    }
423
424    /// Creates a scheduler with the provided configuration.
425    pub fn new(config: SchedulerConfig) -> Self {
426        let shard_count = config.shards.max(1);
427        let top_tenants_capacity = config.top_tenants_capacity;
428        let mut shards = Vec::with_capacity(shard_count);
429        let mut shard_active = Vec::with_capacity(shard_count);
430        for _ in 0..shard_count {
431            shards.push(Mutex::new(Shard::new()));
432            shard_active.push(AtomicBool::new(false));
433        }
434        Self {
435            tenant_quantum: RwLock::new(config.quantum_by_tenant.clone()),
436            quantum_provider: RwLock::new(config.quantum_provider.clone()),
437            config,
438            shards,
439            active_shards: Mutex::new(ActiveRing::new()),
440            stats: StatsCounters::new(),
441            work_signal: WorkSignal::new(),
442            shutdown_state: AtomicU8::new(SHUTDOWN_OPEN),
443            shard_active,
444            top_tenants: Mutex::new(TopTenants::new(top_tenants_capacity)),
445            next_task_id: AtomicU64::new(0),
446            pending_ids: Mutex::new(HashSet::new()),
447            cancelled_ids: Mutex::new(HashSet::new()),
448        }
449    }
450
451    /// Enqueues a task without returning a cancel handle.
452    pub fn enqueue(&self, tenant: TenantKey, task: Task<T>) -> EnqueueResult {
453        match self.enqueue_with_handle(tenant, task) {
454            EnqueueWithHandleResult::Enqueued(_) => EnqueueResult::Enqueued,
455            EnqueueWithHandleResult::Rejected(reason) => EnqueueResult::Rejected(reason),
456            EnqueueWithHandleResult::Closed => EnqueueResult::Closed,
457        }
458    }
459
460    /// Enqueues a task and returns a [`TaskHandle`] that can be cancelled.
461    pub fn enqueue_with_handle(&self, tenant: TenantKey, task: Task<T>) -> EnqueueWithHandleResult {
462        match self.backpressure_for(tenant).clone() {
463            crate::api::BackpressurePolicy::Reject => self.enqueue_reject_with_handle(tenant, task),
464            crate::api::BackpressurePolicy::DropOldestPerTenant => {
465                self.enqueue_drop_with_handle(tenant, task, DropStrategy::Oldest)
466            }
467            crate::api::BackpressurePolicy::DropNewestPerTenant => {
468                self.enqueue_drop_with_handle(tenant, task, DropStrategy::Newest)
469            }
470            crate::api::BackpressurePolicy::Timeout { wait } => {
471                self.enqueue_timeout_with_handle(tenant, task, wait)
472            }
473        }
474    }
475
476    /// Cancels a pending task identified by handle.
477    ///
478    /// Cancellation is best-effort for pending work and does not affect tasks
479    /// that have already been dequeued for execution.
480    pub fn cancel(&self, handle: TaskHandle) -> CancelResult {
481        let id = handle.as_u64();
482        if self.take_pending(id) {
483            self.mark_cancelled(id);
484            self.release_reserved_slots(1);
485            self.work_signal.notify_all();
486            CancelResult::Cancelled
487        } else {
488            CancelResult::NotFound
489        }
490    }
491
492    /// Attempts to dequeue one task without blocking.
493    pub fn try_dequeue(&self) -> DequeueResult<T> {
494        if self.shutdown_state() == SHUTDOWN_IMMEDIATE {
495            return DequeueResult::Closed;
496        }
497
498        let mut remaining = { self.active_shards.lock().len() };
499
500        while remaining > 0 {
501            let shard_index = {
502                let mut active_shards = self.active_shards.lock();
503                match active_shards.pop_front() {
504                    Some(index) => index,
505                    None => break,
506                }
507            };
508            remaining -= 1;
509
510            let (result, shard_has_work) = {
511                let mut shard = self.shards[shard_index].lock();
512                let result = self.try_dequeue_from_shard(&mut shard);
513                let has_work = shard.active_rings.iter().any(|ring| !ring.is_empty());
514                if !has_work {
515                    self.shard_active[shard_index].store(false, Ordering::Release);
516                }
517                (result, has_work)
518            };
519
520            if shard_has_work {
521                self.active_shards.lock().push_back(shard_index);
522            }
523
524            if let Some((tenant, task)) = result {
525                return DequeueResult::Task { tenant, task };
526            }
527        }
528
529        if self.should_return_closed() {
530            DequeueResult::Closed
531        } else {
532            DequeueResult::Empty
533        }
534    }
535
536    fn try_dequeue_from_shard(&self, shard: &mut Shard<T>) -> Option<(TenantKey, Task<T>)> {
537        for priority in Priority::ordered() {
538            let idx = priority.index();
539            if shard.active_rings[idx].is_empty() {
540                continue;
541            }
542
543            let ring_len = shard.active_rings[idx].len();
544            for _ in 0..ring_len {
545                let tenant = match shard.active_rings[idx].pop_front() {
546                    Some(tenant) => tenant,
547                    None => break,
548                };
549
550                let Some(tenant_state) = shard.tenants.get_mut(&tenant) else {
551                    continue;
552                };
553
554                let queue = &mut tenant_state.queues[idx];
555                let deficit = &mut tenant_state.deficits[idx];
556
557                let mut expired_count = 0u64;
558                let now = Instant::now();
559
560                while let Some(front) = queue.front() {
561                    if self.take_cancelled_marker(front.id) {
562                        queue.pop_front();
563                        continue;
564                    }
565                    if Self::is_expired(front.task.deadline, now) {
566                        let expired = queue.pop_front().expect("front disappeared");
567                        if self.take_pending(expired.id) {
568                            expired_count = expired_count.saturating_add(1);
569                            self.release_reserved_slots(1);
570                        }
571                    } else {
572                        break;
573                    }
574                }
575
576                if expired_count > 0 {
577                    Self::saturating_add(&self.stats.expired, expired_count);
578                    self.work_signal.notify_all();
579                }
580
581                if queue.is_empty() {
582                    tenant_state.active[idx] = false;
583                    continue;
584                }
585
586                let front_cost = queue.front().map(|entry| entry.task.cost).unwrap_or(0);
587                let cost = if front_cost > i64::MAX as u64 {
588                    i64::MAX
589                } else {
590                    front_cost as i64
591                };
592
593                if *deficit < cost {
594                    *deficit += tenant_state.quantum;
595                    shard.active_rings[idx].push_back(tenant);
596                    continue;
597                }
598
599                let entry = match queue.pop_front() {
600                    Some(entry) => entry,
601                    None => {
602                        tenant_state.active[idx] = false;
603                        continue;
604                    }
605                };
606
607                if queue.is_empty() {
608                    tenant_state.active[idx] = false;
609                } else {
610                    shard.active_rings[idx].push_back(tenant);
611                }
612
613                if self.take_cancelled_marker(entry.id) {
614                    continue;
615                }
616
617                if !self.take_pending(entry.id) {
618                    continue;
619                }
620
621                *deficit -= cost;
622
623                Self::saturating_add(&self.stats.dequeued, 1);
624                self.release_reserved_slots(1);
625                self.work_signal.notify_all();
626
627                let queue_time_ns = Instant::now()
628                    .duration_since(entry.task.enqueue_ts)
629                    .as_nanos()
630                    .min(u128::from(u64::MAX)) as u64;
631                Self::saturating_add(&self.stats.queue_time_sum_ns, queue_time_ns);
632                Self::saturating_add(&self.stats.queue_time_samples, 1);
633                self.record_queue_time(queue_time_ns);
634                self.top_tenants.lock().record(tenant, 1);
635
636                return Some((tenant, entry.task));
637            }
638        }
639
640        None
641    }
642
643    /// Dequeues one task, blocking until work is available or scheduler closes.
644    pub fn dequeue_blocking(&self) -> DequeueResult<T> {
645        let mut spins = 0u32;
646        loop {
647            let observed = self.work_signal.current();
648            match self.try_dequeue() {
649                DequeueResult::Task { tenant, task } => {
650                    return DequeueResult::Task { tenant, task };
651                }
652                DequeueResult::Closed => return DequeueResult::Closed,
653                DequeueResult::Empty => {
654                    if self.should_return_closed() {
655                        return DequeueResult::Closed;
656                    }
657
658                    if self.stats.queue_len_estimate.load(Ordering::Acquire) > 0 {
659                        spins = spins.saturating_add(1);
660                        if spins >= DEQUEUE_BLOCKING_BACKOFF_SPINS {
661                            let _ = self
662                                .work_signal
663                                .wait_for_change_timeout(observed, DEQUEUE_BLOCKING_BACKOFF_SLEEP);
664                            spins = 0;
665                        } else {
666                            std::hint::spin_loop();
667                        }
668                        continue;
669                    }
670
671                    spins = 0;
672                    self.work_signal.wait_for_change(observed);
673                }
674            }
675        }
676    }
677
678    /// Dequeues one task, waiting at most `timeout`.
679    ///
680    /// Returns [`DequeueResult::Empty`] on timeout when the scheduler is still open.
681    pub fn dequeue_blocking_timeout(&self, timeout: Duration) -> DequeueResult<T> {
682        let deadline = Instant::now() + timeout;
683        let mut spins = 0u32;
684
685        loop {
686            let observed = self.work_signal.current();
687            match self.try_dequeue() {
688                DequeueResult::Task { tenant, task } => {
689                    return DequeueResult::Task { tenant, task };
690                }
691                DequeueResult::Closed => return DequeueResult::Closed,
692                DequeueResult::Empty => {
693                    if self.should_return_closed() {
694                        return DequeueResult::Closed;
695                    }
696
697                    if Instant::now() >= deadline {
698                        return DequeueResult::Empty;
699                    }
700
701                    let remaining = deadline.saturating_duration_since(Instant::now());
702                    if self.stats.queue_len_estimate.load(Ordering::Acquire) > 0 {
703                        spins = spins.saturating_add(1);
704                        if spins >= DEQUEUE_BLOCKING_BACKOFF_SPINS {
705                            let wait = remaining.min(DEQUEUE_BLOCKING_BACKOFF_SLEEP);
706                            let _ = self.work_signal.wait_for_change_timeout(observed, wait);
707                            spins = 0;
708                        } else {
709                            std::hint::spin_loop();
710                        }
711                        continue;
712                    }
713
714                    spins = 0;
715                    let _ = self
716                        .work_signal
717                        .wait_for_change_timeout(observed, remaining);
718                }
719            }
720        }
721    }
722
723    /// Returns a snapshot of current scheduler metrics.
724    pub fn stats(&self) -> SchedulerStats {
725        let queue_time_histogram = QUEUE_TIME_BUCKETS_NS
726            .iter()
727            .enumerate()
728            .map(|(idx, bound)| crate::api::QueueTimeBucket {
729                le_ns: *bound,
730                count: self.stats.queue_time_buckets[idx].load(Ordering::Relaxed),
731            })
732            .collect::<Vec<_>>();
733
734        let total_samples: u64 = queue_time_histogram.iter().map(|b| b.count).sum();
735        let percentile = |pct: f64| -> u64 {
736            if total_samples == 0 {
737                return 0;
738            }
739            let target = (total_samples as f64 * pct).ceil() as u64;
740            let mut cumulative = 0u64;
741            for bucket in &queue_time_histogram {
742                cumulative = cumulative.saturating_add(bucket.count);
743                if cumulative >= target {
744                    return bucket.le_ns;
745                }
746            }
747            queue_time_histogram
748                .last()
749                .map(|bucket| bucket.le_ns)
750                .unwrap_or(0)
751        };
752
753        let queue_len_estimate = self.stats.queue_len_estimate.load(Ordering::Relaxed);
754        let max_global = self.config.max_global as u64;
755        let queue_saturation_ratio = if max_global == 0 {
756            0.0
757        } else {
758            queue_len_estimate as f64 / max_global as f64
759        };
760
761        let top_tenants = self.top_tenants.lock().snapshot();
762
763        SchedulerStats {
764            enqueued: self.stats.enqueued.load(Ordering::Relaxed),
765            dequeued: self.stats.dequeued.load(Ordering::Relaxed),
766            expired: self.stats.expired.load(Ordering::Relaxed),
767            dropped: self.stats.dropped.load(Ordering::Relaxed),
768            rejected_global: self.stats.rejected_global.load(Ordering::Relaxed),
769            rejected_tenant: self.stats.rejected_tenant.load(Ordering::Relaxed),
770            timeout_rejected: self.stats.timeout_rejected.load(Ordering::Relaxed),
771            dropped_policy: self.stats.dropped_policy.load(Ordering::Relaxed),
772            queue_len_estimate,
773            max_global,
774            queue_saturation_ratio,
775            queue_time_sum_ns: self.stats.queue_time_sum_ns.load(Ordering::Relaxed),
776            queue_time_samples: self.stats.queue_time_samples.load(Ordering::Relaxed),
777            queue_time_p95_ns: percentile(0.95),
778            queue_time_p99_ns: percentile(0.99),
779            queue_time_histogram,
780            top_tenants,
781        }
782    }
783
784    #[cfg(test)]
785    pub(crate) fn debug_enqueue_purge_runs(&self) -> u64 {
786        self.stats.enqueue_purge_runs.load(Ordering::Relaxed)
787    }
788
789    #[cfg(test)]
790    pub(crate) fn debug_enqueue_purge_interval(&self) -> u32 {
791        ENQUEUE_PURGE_INTERVAL
792    }
793
794    /// Alias for [`Scheduler::close_immediate`].
795    pub fn close(&self) {
796        self.close_immediate();
797    }
798
799    /// Closes immediately: stops accepting new work and wakes blocked consumers.
800    pub fn close_immediate(&self) {
801        self.set_shutdown_mode(CloseMode::Immediate);
802    }
803
804    /// Closes in drain mode: stops accepting and drains queued work before closing.
805    pub fn close_drain(&self) {
806        self.set_shutdown_mode(CloseMode::Drain);
807    }
808
809    /// Closes using the provided mode.
810    pub fn close_with_mode(&self, mode: CloseMode) {
811        self.set_shutdown_mode(mode);
812    }
813}
814
815enum DropStrategy {
816    Oldest,
817    Newest,
818}
819
820enum EnqueueAttempt<T> {
821    Enqueued(TaskHandle),
822    Closed,
823    Full(Task<T>, EnqueueRejectReason),
824}
825
826impl<T> Scheduler<T> {
827    fn enqueue_reject_with_handle(
828        &self,
829        tenant: TenantKey,
830        task: Task<T>,
831    ) -> EnqueueWithHandleResult {
832        match self.enqueue_try_reject(tenant, task) {
833            EnqueueAttempt::Enqueued(handle) => EnqueueWithHandleResult::Enqueued(handle),
834            EnqueueAttempt::Closed => EnqueueWithHandleResult::Closed,
835            EnqueueAttempt::Full(_, reason) => {
836                self.record_reject(reason.clone());
837                EnqueueWithHandleResult::Rejected(reason)
838            }
839        }
840    }
841
842    fn enqueue_timeout_with_handle(
843        &self,
844        tenant: TenantKey,
845        mut task: Task<T>,
846        wait: Duration,
847    ) -> EnqueueWithHandleResult {
848        let deadline = Instant::now() + wait;
849        loop {
850            if !self.is_accepting_enqueues() {
851                return EnqueueWithHandleResult::Closed;
852            }
853
854            let observed = self.work_signal.current();
855            match self.enqueue_try_reject(tenant, task) {
856                EnqueueAttempt::Enqueued(handle) => {
857                    return EnqueueWithHandleResult::Enqueued(handle);
858                }
859                EnqueueAttempt::Closed => return EnqueueWithHandleResult::Closed,
860                EnqueueAttempt::Full(returned, _) => {
861                    task = returned;
862                    if Instant::now() >= deadline {
863                        self.record_reject(EnqueueRejectReason::Timeout);
864                        return EnqueueWithHandleResult::Rejected(EnqueueRejectReason::Timeout);
865                    }
866                    let remaining = deadline.saturating_duration_since(Instant::now());
867                    let _ = self
868                        .work_signal
869                        .wait_for_change_timeout(observed, remaining);
870                }
871            }
872        }
873    }
874
875    fn enqueue_try_reject(&self, tenant: TenantKey, task: Task<T>) -> EnqueueAttempt<T> {
876        if !self.is_accepting_enqueues() {
877            return EnqueueAttempt::Closed;
878        }
879
880        let shard_index = self.shard_index(tenant);
881        let mut shard = self.shards[shard_index].lock();
882
883        if !self.is_accepting_enqueues() {
884            return EnqueueAttempt::Closed;
885        }
886
887        let shard_was_empty = shard.active_rings.iter().all(|ring| ring.is_empty());
888
889        let tenant_state = shard
890            .tenants
891            .entry(tenant)
892            .or_insert_with(|| TenantState::new(self.quantum_for(tenant) as i64));
893
894        let purge_reason = self.maybe_purge_tenant_before_enqueue(tenant_state);
895        let mut ran_full_purge = matches!(purge_reason, Some(EnqueuePurgeReason::Full));
896
897        if tenant_state.total_len() >= self.config.max_per_tenant {
898            if !ran_full_purge {
899                let _ = self.purge_tenant_for_enqueue(tenant_state, EnqueuePurgeReason::Full);
900                ran_full_purge = true;
901            }
902            if tenant_state.total_len() >= self.config.max_per_tenant {
903                return EnqueueAttempt::Full(task, EnqueueRejectReason::TenantFull);
904            }
905        }
906
907        let mut reserved = self.try_reserve_slot();
908        if !reserved && !ran_full_purge {
909            let purge = self.purge_tenant_for_enqueue(tenant_state, EnqueuePurgeReason::Full);
910            if purge.released_slots > 0 {
911                reserved = self.try_reserve_slot();
912            }
913        }
914
915        if !reserved {
916            return EnqueueAttempt::Full(task, EnqueueRejectReason::GlobalFull);
917        }
918
919        let handle = self.next_handle();
920        self.register_pending(handle);
921
922        let priority_index = task.priority.index();
923        let queue = &mut tenant_state.queues[priority_index];
924        let was_empty = queue.is_empty();
925        queue.push_back(QueueEntry {
926            id: handle.as_u64(),
927            task,
928        });
929
930        if was_empty && !tenant_state.active[priority_index] {
931            tenant_state.active[priority_index] = true;
932            shard.active_rings[priority_index].push_back(tenant);
933        }
934
935        Self::saturating_add(&self.stats.enqueued, 1);
936
937        drop(shard);
938
939        if shard_was_empty {
940            self.activate_shard(shard_index);
941        }
942
943        if was_empty {
944            self.work_signal.notify_all();
945        }
946
947        EnqueueAttempt::Enqueued(handle)
948    }
949
950    fn enqueue_drop_with_handle(
951        &self,
952        tenant: TenantKey,
953        task: Task<T>,
954        strategy: DropStrategy,
955    ) -> EnqueueWithHandleResult {
956        if !self.is_accepting_enqueues() {
957            return EnqueueWithHandleResult::Closed;
958        }
959
960        let shard_index = self.shard_index(tenant);
961        let mut shard = self.shards[shard_index].lock();
962
963        if !self.is_accepting_enqueues() {
964            return EnqueueWithHandleResult::Closed;
965        }
966
967        let shard_was_empty = shard.active_rings.iter().all(|ring| ring.is_empty());
968
969        let tenant_state = shard
970            .tenants
971            .entry(tenant)
972            .or_insert_with(|| TenantState::new(self.quantum_for(tenant) as i64));
973
974        let purge_reason = self.maybe_purge_tenant_before_enqueue(tenant_state);
975        let ran_full_purge = matches!(purge_reason, Some(EnqueuePurgeReason::Full));
976
977        let current_len = self.stats.queue_len_estimate.load(Ordering::Acquire);
978        let tenant_full = tenant_state.total_len() >= self.config.max_per_tenant;
979        let global_full = current_len >= self.config.max_global as u64;
980
981        let mut reused_slot = false;
982        if tenant_full || global_full {
983            let dropped = self.drop_from_tenant(tenant_state, strategy, task.priority);
984            let Some(dropped) = dropped else {
985                let reason = if tenant_full {
986                    EnqueueRejectReason::TenantFull
987                } else {
988                    EnqueueRejectReason::GlobalFull
989                };
990                self.record_reject(reason.clone());
991                return EnqueueWithHandleResult::Rejected(reason);
992            };
993
994            self.clear_cancelled_marker(dropped.id);
995            if self.take_pending(dropped.id) {
996                self.record_policy_drop();
997                reused_slot = true;
998            }
999        }
1000
1001        if !reused_slot {
1002            let mut reserved = self.try_reserve_slot();
1003            if !reserved && !ran_full_purge {
1004                let purge = self.purge_tenant_for_enqueue(tenant_state, EnqueuePurgeReason::Full);
1005                if purge.released_slots > 0 {
1006                    reserved = self.try_reserve_slot();
1007                }
1008            }
1009
1010            if !reserved {
1011                self.record_reject(EnqueueRejectReason::GlobalFull);
1012                return EnqueueWithHandleResult::Rejected(EnqueueRejectReason::GlobalFull);
1013            }
1014        }
1015
1016        let handle = self.next_handle();
1017        self.register_pending(handle);
1018
1019        let priority_index = task.priority.index();
1020        let queue = &mut tenant_state.queues[priority_index];
1021        let was_empty = queue.is_empty();
1022        queue.push_back(QueueEntry {
1023            id: handle.as_u64(),
1024            task,
1025        });
1026
1027        if was_empty && !tenant_state.active[priority_index] {
1028            tenant_state.active[priority_index] = true;
1029            shard.active_rings[priority_index].push_back(tenant);
1030        }
1031
1032        Self::saturating_add(&self.stats.enqueued, 1);
1033
1034        drop(shard);
1035
1036        if shard_was_empty {
1037            self.activate_shard(shard_index);
1038        }
1039
1040        if was_empty {
1041            self.work_signal.notify_all();
1042        }
1043
1044        EnqueueWithHandleResult::Enqueued(handle)
1045    }
1046
1047    fn drop_from_tenant(
1048        &self,
1049        tenant_state: &mut TenantState<T>,
1050        strategy: DropStrategy,
1051        incoming: Priority,
1052    ) -> Option<QueueEntry<T>> {
1053        const DROP_HIGH: [Priority; 3] = [Priority::Low, Priority::Normal, Priority::High];
1054        const DROP_NORMAL: [Priority; 2] = [Priority::Low, Priority::Normal];
1055        const DROP_LOW: [Priority; 1] = [Priority::Low];
1056
1057        let drop_order: &[Priority] = match incoming {
1058            Priority::High => &DROP_HIGH,
1059            Priority::Normal => &DROP_NORMAL,
1060            Priority::Low => &DROP_LOW,
1061        };
1062
1063        for &priority in drop_order {
1064            let idx = priority.index();
1065            if !tenant_state.queues[idx].is_empty() {
1066                return match strategy {
1067                    DropStrategy::Oldest => tenant_state.queues[idx].pop_front(),
1068                    DropStrategy::Newest => tenant_state.queues[idx].pop_back(),
1069                };
1070            }
1071        }
1072        None
1073    }
1074
1075    fn backpressure_for(&self, tenant: TenantKey) -> &crate::api::BackpressurePolicy {
1076        self.config
1077            .backpressure_by_tenant
1078            .get(&tenant)
1079            .unwrap_or(&self.config.backpressure)
1080    }
1081}