1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
3use std::time::{Duration, Instant};
4
5use parking_lot::{Mutex, RwLock};
6
7use crate::api::{
8 CancelResult, CloseMode, DequeueResult, EnqueueRejectReason, EnqueueResult,
9 EnqueueWithHandleResult, Priority, QuantumProvider, SchedulerConfig, SchedulerStats, Task,
10 TaskHandle, TenantKey,
11};
12use crate::state::{
13 ActiveRing, QUEUE_TIME_BUCKETS_NS, QueueEntry, Shard, StatsCounters, TenantState, TopTenants,
14 WorkSignal,
15};
16
17const SHUTDOWN_OPEN: u8 = 0;
18const SHUTDOWN_DRAIN: u8 = 1;
19const SHUTDOWN_IMMEDIATE: u8 = 2;
20const ENQUEUE_PURGE_INTERVAL: u32 = 32;
21const ENQUEUE_PURGE_NEAR_LIMIT_PERCENT: usize = 90;
22const ENQUEUE_PURGE_SCAN_BUDGET_NEAR: usize = 8;
23const ENQUEUE_PURGE_SCAN_BUDGET_FULL: usize = 32;
24const DEQUEUE_BLOCKING_BACKOFF_SPINS: u32 = 64;
25const DEQUEUE_BLOCKING_BACKOFF_SLEEP: Duration = Duration::from_micros(200);
26
27#[derive(Copy, Clone)]
28enum EnqueuePurgeReason {
29 Periodic,
30 NearLimit,
31 Full,
32}
33
34#[derive(Default)]
35struct PurgeStats {
36 removed_expired_live: u64,
37 released_slots: u64,
38}
39
40pub struct Scheduler<T> {
42 config: SchedulerConfig,
43 shards: Vec<Mutex<Shard<T>>>,
44 active_shards: Mutex<ActiveRing<usize>>,
45 stats: StatsCounters,
46 work_signal: WorkSignal,
47 shutdown_state: AtomicU8,
48 shard_active: Vec<AtomicBool>,
49 top_tenants: Mutex<TopTenants>,
50 tenant_quantum: RwLock<HashMap<TenantKey, u64>>,
51 quantum_provider: RwLock<Option<QuantumProvider>>,
52 next_task_id: AtomicU64,
53 pending_ids: Mutex<HashSet<u64>>,
54 cancelled_ids: Mutex<HashSet<u64>>,
55}
56
57impl<T> Scheduler<T> {
58 fn shard_index(&self, tenant: TenantKey) -> usize {
59 let shard_count = self.shards.len();
60 let hash = tenant.as_u64();
61 (hash as usize) % shard_count
62 }
63
64 fn saturating_add(counter: &AtomicU64, delta: u64) {
65 if delta == 0 {
66 return;
67 }
68 let mut current = counter.load(Ordering::Relaxed);
69 loop {
70 let next = current.saturating_add(delta);
71 match counter.compare_exchange(current, next, Ordering::Relaxed, Ordering::Relaxed) {
72 Ok(_) => return,
73 Err(actual) => current = actual,
74 }
75 }
76 }
77
78 fn record_queue_time(&self, queue_time_ns: u64) {
79 for (idx, bound) in QUEUE_TIME_BUCKETS_NS.iter().enumerate() {
80 if queue_time_ns <= *bound {
81 Self::saturating_add(&self.stats.queue_time_buckets[idx], 1);
82 break;
83 }
84 }
85 }
86
87 fn try_reserve_slot(&self) -> bool {
88 let max_global = self.config.max_global as u64;
89 let mut current = self.stats.queue_len_estimate.load(Ordering::Relaxed);
90 loop {
91 if current >= max_global {
92 return false;
93 }
94 let next = current + 1;
95 match self.stats.queue_len_estimate.compare_exchange(
96 current,
97 next,
98 Ordering::AcqRel,
99 Ordering::Relaxed,
100 ) {
101 Ok(_) => return true,
102 Err(actual) => current = actual,
103 }
104 }
105 }
106
107 fn release_reserved_slots(&self, delta: u64) {
108 if delta == 0 {
109 return;
110 }
111
112 let mut current = self.stats.queue_len_estimate.load(Ordering::Relaxed);
113 loop {
114 let next = current.saturating_sub(delta);
115 match self.stats.queue_len_estimate.compare_exchange(
116 current,
117 next,
118 Ordering::AcqRel,
119 Ordering::Relaxed,
120 ) {
121 Ok(_) => return,
122 Err(actual) => current = actual,
123 }
124 }
125 }
126
127 fn activate_shard(&self, shard_index: usize) {
128 if !self.shard_active[shard_index].swap(true, Ordering::AcqRel) {
129 self.active_shards.lock().push_back(shard_index);
130 }
131 }
132
133 fn shutdown_state(&self) -> u8 {
134 self.shutdown_state.load(Ordering::Acquire)
135 }
136
137 fn is_accepting_enqueues(&self) -> bool {
138 self.shutdown_state() == SHUTDOWN_OPEN
139 }
140
141 fn should_return_closed(&self) -> bool {
142 match self.shutdown_state() {
143 SHUTDOWN_IMMEDIATE => true,
144 SHUTDOWN_DRAIN => self.stats.queue_len_estimate.load(Ordering::Acquire) == 0,
145 _ => false,
146 }
147 }
148
149 fn set_shutdown_mode(&self, mode: CloseMode) {
150 let target = match mode {
151 CloseMode::Immediate => SHUTDOWN_IMMEDIATE,
152 CloseMode::Drain => SHUTDOWN_DRAIN,
153 };
154
155 let mut changed = false;
156 loop {
157 let current = self.shutdown_state();
158 let next = match (current, target) {
159 (SHUTDOWN_IMMEDIATE, _) => SHUTDOWN_IMMEDIATE,
160 (_, SHUTDOWN_IMMEDIATE) => SHUTDOWN_IMMEDIATE,
161 (SHUTDOWN_OPEN, SHUTDOWN_DRAIN) => SHUTDOWN_DRAIN,
162 _ => current,
163 };
164
165 if next == current {
166 break;
167 }
168
169 match self.shutdown_state.compare_exchange(
170 current,
171 next,
172 Ordering::AcqRel,
173 Ordering::Acquire,
174 ) {
175 Ok(_) => {
176 changed = true;
177 break;
178 }
179 Err(_) => continue,
180 }
181 }
182
183 if changed {
184 self.work_signal.notify_all();
185 }
186 }
187
188 fn record_reject(&self, reason: EnqueueRejectReason) {
189 Self::saturating_add(&self.stats.dropped, 1);
190 match reason {
191 EnqueueRejectReason::GlobalFull => {
192 Self::saturating_add(&self.stats.rejected_global, 1);
193 }
194 EnqueueRejectReason::TenantFull => {
195 Self::saturating_add(&self.stats.rejected_tenant, 1);
196 }
197 EnqueueRejectReason::Timeout => {
198 Self::saturating_add(&self.stats.timeout_rejected, 1);
199 }
200 }
201 }
202
203 fn record_policy_drop(&self) {
204 Self::saturating_add(&self.stats.dropped, 1);
205 Self::saturating_add(&self.stats.dropped_policy, 1);
206 }
207
208 fn next_handle(&self) -> TaskHandle {
209 let id = self.next_task_id.fetch_add(1, Ordering::Relaxed) + 1;
210 TaskHandle::from(id)
211 }
212
213 fn register_pending(&self, handle: TaskHandle) {
214 self.pending_ids.lock().insert(handle.as_u64());
215 }
216
217 fn take_pending(&self, id: u64) -> bool {
218 self.pending_ids.lock().remove(&id)
219 }
220
221 fn mark_cancelled(&self, id: u64) {
222 self.cancelled_ids.lock().insert(id);
223 }
224
225 fn take_cancelled_marker(&self, id: u64) -> bool {
226 self.cancelled_ids.lock().remove(&id)
227 }
228
229 fn clear_cancelled_marker(&self, id: u64) {
230 self.cancelled_ids.lock().remove(&id);
231 }
232
233 fn is_expired(deadline: Option<Instant>, now: Instant) -> bool {
234 matches!(deadline, Some(deadline) if now > deadline)
235 }
236
237 fn is_tenant_near_limit(&self, tenant_len: usize) -> bool {
238 let max_per_tenant = self.config.max_per_tenant;
239 if max_per_tenant == 0 {
240 return true;
241 }
242 let lhs = (tenant_len as u128).saturating_mul(100);
243 let rhs = (max_per_tenant as u128).saturating_mul(ENQUEUE_PURGE_NEAR_LIMIT_PERCENT as u128);
244 lhs >= rhs
245 }
246
247 fn maybe_purge_tenant_before_enqueue(
248 &self,
249 tenant_state: &mut TenantState<T>,
250 ) -> Option<EnqueuePurgeReason> {
251 tenant_state.enqueue_attempts_since_purge =
252 tenant_state.enqueue_attempts_since_purge.saturating_add(1);
253
254 let tenant_len = tenant_state.total_len();
255 let reason = if tenant_len >= self.config.max_per_tenant {
256 Some(EnqueuePurgeReason::Full)
257 } else if self.is_tenant_near_limit(tenant_len) {
258 Some(EnqueuePurgeReason::NearLimit)
259 } else if tenant_state.enqueue_attempts_since_purge >= ENQUEUE_PURGE_INTERVAL {
260 Some(EnqueuePurgeReason::Periodic)
261 } else {
262 None
263 };
264
265 if let Some(reason) = reason {
266 let _ = self.purge_tenant_for_enqueue(tenant_state, reason);
267 }
268 reason
269 }
270
271 fn purge_tenant_for_enqueue(
272 &self,
273 tenant_state: &mut TenantState<T>,
274 reason: EnqueuePurgeReason,
275 ) -> PurgeStats {
276 let side_scan_budget = match reason {
277 EnqueuePurgeReason::Periodic => 0,
278 EnqueuePurgeReason::NearLimit => ENQUEUE_PURGE_SCAN_BUDGET_NEAR,
279 EnqueuePurgeReason::Full => ENQUEUE_PURGE_SCAN_BUDGET_FULL,
280 };
281 let now = Instant::now();
282 let stats = self.purge_tenant_with_budget(tenant_state, now, side_scan_budget);
283 tenant_state.enqueue_attempts_since_purge = 0;
284 self.record_enqueue_purge_stats(&stats);
285 stats
286 }
287
288 fn purge_tenant_with_budget(
289 &self,
290 tenant_state: &mut TenantState<T>,
291 now: Instant,
292 side_scan_budget: usize,
293 ) -> PurgeStats {
294 let mut stats = PurgeStats::default();
295 for idx in 0..tenant_state.queues.len() {
296 let queue = &mut tenant_state.queues[idx];
297 self.purge_queue_with_budget(queue, now, side_scan_budget, &mut stats);
298 if queue.is_empty() {
299 tenant_state.active[idx] = false;
300 }
301 }
302 stats
303 }
304
305 fn purge_queue_with_budget(
306 &self,
307 queue: &mut VecDeque<QueueEntry<T>>,
308 now: Instant,
309 side_scan_budget: usize,
310 stats: &mut PurgeStats,
311 ) {
312 while let Some(front) = queue.front() {
313 let front_id = front.id;
314 if self.take_cancelled_marker(front_id) {
315 let _ = queue.pop_front();
316 if self.take_pending(front_id) {
317 stats.released_slots = stats.released_slots.saturating_add(1);
318 }
319 continue;
320 }
321
322 if Self::is_expired(front.task.deadline, now) {
323 let removed = queue.pop_front().expect("front disappeared");
324 if self.take_pending(removed.id) {
325 stats.removed_expired_live = stats.removed_expired_live.saturating_add(1);
326 stats.released_slots = stats.released_slots.saturating_add(1);
327 }
328 continue;
329 }
330 break;
331 }
332
333 if side_scan_budget == 0 || queue.is_empty() {
334 return;
335 }
336
337 let mut idx = 0usize;
338 let mut scanned = 0usize;
339 while idx < queue.len() && scanned < side_scan_budget {
340 scanned = scanned.saturating_add(1);
341 let (entry_id, expired) = {
342 let entry = queue.get(idx).expect("entry should exist");
343 (entry.id, Self::is_expired(entry.task.deadline, now))
344 };
345
346 if self.take_cancelled_marker(entry_id) {
347 let _ = queue.remove(idx).expect("entry should still exist");
348 if self.take_pending(entry_id) {
349 stats.released_slots = stats.released_slots.saturating_add(1);
350 }
351 continue;
352 }
353
354 if expired {
355 let removed = queue.remove(idx).expect("entry should still exist");
356 if self.take_pending(removed.id) {
357 stats.removed_expired_live = stats.removed_expired_live.saturating_add(1);
358 stats.released_slots = stats.released_slots.saturating_add(1);
359 }
360 continue;
361 }
362
363 idx = idx.saturating_add(1);
364 }
365 }
366
367 fn record_enqueue_purge_stats(&self, stats: &PurgeStats) {
368 Self::saturating_add(&self.stats.enqueue_purge_runs, 1);
369 Self::saturating_add(&self.stats.expired, stats.removed_expired_live);
370
371 if stats.released_slots > 0 {
372 self.release_reserved_slots(stats.released_slots);
373 self.work_signal.notify_all();
374 }
375 }
376
377 fn quantum_for(&self, tenant: TenantKey) -> u64 {
378 if let Some(value) = self.tenant_quantum.read().get(&tenant).copied() {
379 return value.max(1);
380 }
381 if let Some(provider) = self.quantum_provider.read().as_ref() {
382 return provider(tenant).max(1);
383 }
384 self.config.quantum.max(1)
385 }
386
387 fn refresh_tenant_quantum(&self, tenant: TenantKey) {
388 let quantum = self.quantum_for(tenant) as i64;
389 let shard_index = self.shard_index(tenant);
390 let mut shard = self.shards[shard_index].lock();
391 if let Some(state) = shard.tenants.get_mut(&tenant) {
392 state.quantum = quantum;
393 }
394 }
395
396 pub fn set_tenant_quantum(&self, tenant: TenantKey, quantum: u64) {
398 let value = quantum.max(1);
399 self.tenant_quantum.write().insert(tenant, value);
400 self.refresh_tenant_quantum(tenant);
401 }
402
403 pub fn clear_tenant_quantum(&self, tenant: TenantKey) {
405 self.tenant_quantum.write().remove(&tenant);
406 self.refresh_tenant_quantum(tenant);
407 }
408
409 pub fn set_quantum_provider(&self, provider: Option<QuantumProvider>) {
411 {
412 let mut guard = self.quantum_provider.write();
413 *guard = provider;
414 }
415 for shard_mutex in &self.shards {
416 let mut shard = shard_mutex.lock();
417 for (tenant, state) in shard.tenants.iter_mut() {
418 let quantum = self.quantum_for(*tenant) as i64;
419 state.quantum = quantum;
420 }
421 }
422 }
423
424 pub fn new(config: SchedulerConfig) -> Self {
426 let shard_count = config.shards.max(1);
427 let top_tenants_capacity = config.top_tenants_capacity;
428 let mut shards = Vec::with_capacity(shard_count);
429 let mut shard_active = Vec::with_capacity(shard_count);
430 for _ in 0..shard_count {
431 shards.push(Mutex::new(Shard::new()));
432 shard_active.push(AtomicBool::new(false));
433 }
434 Self {
435 tenant_quantum: RwLock::new(config.quantum_by_tenant.clone()),
436 quantum_provider: RwLock::new(config.quantum_provider.clone()),
437 config,
438 shards,
439 active_shards: Mutex::new(ActiveRing::new()),
440 stats: StatsCounters::new(),
441 work_signal: WorkSignal::new(),
442 shutdown_state: AtomicU8::new(SHUTDOWN_OPEN),
443 shard_active,
444 top_tenants: Mutex::new(TopTenants::new(top_tenants_capacity)),
445 next_task_id: AtomicU64::new(0),
446 pending_ids: Mutex::new(HashSet::new()),
447 cancelled_ids: Mutex::new(HashSet::new()),
448 }
449 }
450
451 pub fn enqueue(&self, tenant: TenantKey, task: Task<T>) -> EnqueueResult {
453 match self.enqueue_with_handle(tenant, task) {
454 EnqueueWithHandleResult::Enqueued(_) => EnqueueResult::Enqueued,
455 EnqueueWithHandleResult::Rejected(reason) => EnqueueResult::Rejected(reason),
456 EnqueueWithHandleResult::Closed => EnqueueResult::Closed,
457 }
458 }
459
460 pub fn enqueue_with_handle(&self, tenant: TenantKey, task: Task<T>) -> EnqueueWithHandleResult {
462 match self.backpressure_for(tenant).clone() {
463 crate::api::BackpressurePolicy::Reject => self.enqueue_reject_with_handle(tenant, task),
464 crate::api::BackpressurePolicy::DropOldestPerTenant => {
465 self.enqueue_drop_with_handle(tenant, task, DropStrategy::Oldest)
466 }
467 crate::api::BackpressurePolicy::DropNewestPerTenant => {
468 self.enqueue_drop_with_handle(tenant, task, DropStrategy::Newest)
469 }
470 crate::api::BackpressurePolicy::Timeout { wait } => {
471 self.enqueue_timeout_with_handle(tenant, task, wait)
472 }
473 }
474 }
475
476 pub fn cancel(&self, handle: TaskHandle) -> CancelResult {
481 let id = handle.as_u64();
482 if self.take_pending(id) {
483 self.mark_cancelled(id);
484 self.release_reserved_slots(1);
485 self.work_signal.notify_all();
486 CancelResult::Cancelled
487 } else {
488 CancelResult::NotFound
489 }
490 }
491
492 pub fn try_dequeue(&self) -> DequeueResult<T> {
494 if self.shutdown_state() == SHUTDOWN_IMMEDIATE {
495 return DequeueResult::Closed;
496 }
497
498 let mut remaining = { self.active_shards.lock().len() };
499
500 while remaining > 0 {
501 let shard_index = {
502 let mut active_shards = self.active_shards.lock();
503 match active_shards.pop_front() {
504 Some(index) => index,
505 None => break,
506 }
507 };
508 remaining -= 1;
509
510 let (result, shard_has_work) = {
511 let mut shard = self.shards[shard_index].lock();
512 let result = self.try_dequeue_from_shard(&mut shard);
513 let has_work = shard.active_rings.iter().any(|ring| !ring.is_empty());
514 if !has_work {
515 self.shard_active[shard_index].store(false, Ordering::Release);
516 }
517 (result, has_work)
518 };
519
520 if shard_has_work {
521 self.active_shards.lock().push_back(shard_index);
522 }
523
524 if let Some((tenant, task)) = result {
525 return DequeueResult::Task { tenant, task };
526 }
527 }
528
529 if self.should_return_closed() {
530 DequeueResult::Closed
531 } else {
532 DequeueResult::Empty
533 }
534 }
535
536 fn try_dequeue_from_shard(&self, shard: &mut Shard<T>) -> Option<(TenantKey, Task<T>)> {
537 for priority in Priority::ordered() {
538 let idx = priority.index();
539 if shard.active_rings[idx].is_empty() {
540 continue;
541 }
542
543 let ring_len = shard.active_rings[idx].len();
544 for _ in 0..ring_len {
545 let tenant = match shard.active_rings[idx].pop_front() {
546 Some(tenant) => tenant,
547 None => break,
548 };
549
550 let Some(tenant_state) = shard.tenants.get_mut(&tenant) else {
551 continue;
552 };
553
554 let queue = &mut tenant_state.queues[idx];
555 let deficit = &mut tenant_state.deficits[idx];
556
557 let mut expired_count = 0u64;
558 let now = Instant::now();
559
560 while let Some(front) = queue.front() {
561 if self.take_cancelled_marker(front.id) {
562 queue.pop_front();
563 continue;
564 }
565 if Self::is_expired(front.task.deadline, now) {
566 let expired = queue.pop_front().expect("front disappeared");
567 if self.take_pending(expired.id) {
568 expired_count = expired_count.saturating_add(1);
569 self.release_reserved_slots(1);
570 }
571 } else {
572 break;
573 }
574 }
575
576 if expired_count > 0 {
577 Self::saturating_add(&self.stats.expired, expired_count);
578 self.work_signal.notify_all();
579 }
580
581 if queue.is_empty() {
582 tenant_state.active[idx] = false;
583 continue;
584 }
585
586 let front_cost = queue.front().map(|entry| entry.task.cost).unwrap_or(0);
587 let cost = if front_cost > i64::MAX as u64 {
588 i64::MAX
589 } else {
590 front_cost as i64
591 };
592
593 if *deficit < cost {
594 *deficit += tenant_state.quantum;
595 shard.active_rings[idx].push_back(tenant);
596 continue;
597 }
598
599 let entry = match queue.pop_front() {
600 Some(entry) => entry,
601 None => {
602 tenant_state.active[idx] = false;
603 continue;
604 }
605 };
606
607 if queue.is_empty() {
608 tenant_state.active[idx] = false;
609 } else {
610 shard.active_rings[idx].push_back(tenant);
611 }
612
613 if self.take_cancelled_marker(entry.id) {
614 continue;
615 }
616
617 if !self.take_pending(entry.id) {
618 continue;
619 }
620
621 *deficit -= cost;
622
623 Self::saturating_add(&self.stats.dequeued, 1);
624 self.release_reserved_slots(1);
625 self.work_signal.notify_all();
626
627 let queue_time_ns = Instant::now()
628 .duration_since(entry.task.enqueue_ts)
629 .as_nanos()
630 .min(u128::from(u64::MAX)) as u64;
631 Self::saturating_add(&self.stats.queue_time_sum_ns, queue_time_ns);
632 Self::saturating_add(&self.stats.queue_time_samples, 1);
633 self.record_queue_time(queue_time_ns);
634 self.top_tenants.lock().record(tenant, 1);
635
636 return Some((tenant, entry.task));
637 }
638 }
639
640 None
641 }
642
643 pub fn dequeue_blocking(&self) -> DequeueResult<T> {
645 let mut spins = 0u32;
646 loop {
647 let observed = self.work_signal.current();
648 match self.try_dequeue() {
649 DequeueResult::Task { tenant, task } => {
650 return DequeueResult::Task { tenant, task };
651 }
652 DequeueResult::Closed => return DequeueResult::Closed,
653 DequeueResult::Empty => {
654 if self.should_return_closed() {
655 return DequeueResult::Closed;
656 }
657
658 if self.stats.queue_len_estimate.load(Ordering::Acquire) > 0 {
659 spins = spins.saturating_add(1);
660 if spins >= DEQUEUE_BLOCKING_BACKOFF_SPINS {
661 let _ = self
662 .work_signal
663 .wait_for_change_timeout(observed, DEQUEUE_BLOCKING_BACKOFF_SLEEP);
664 spins = 0;
665 } else {
666 std::hint::spin_loop();
667 }
668 continue;
669 }
670
671 spins = 0;
672 self.work_signal.wait_for_change(observed);
673 }
674 }
675 }
676 }
677
678 pub fn dequeue_blocking_timeout(&self, timeout: Duration) -> DequeueResult<T> {
682 let deadline = Instant::now() + timeout;
683 let mut spins = 0u32;
684
685 loop {
686 let observed = self.work_signal.current();
687 match self.try_dequeue() {
688 DequeueResult::Task { tenant, task } => {
689 return DequeueResult::Task { tenant, task };
690 }
691 DequeueResult::Closed => return DequeueResult::Closed,
692 DequeueResult::Empty => {
693 if self.should_return_closed() {
694 return DequeueResult::Closed;
695 }
696
697 if Instant::now() >= deadline {
698 return DequeueResult::Empty;
699 }
700
701 let remaining = deadline.saturating_duration_since(Instant::now());
702 if self.stats.queue_len_estimate.load(Ordering::Acquire) > 0 {
703 spins = spins.saturating_add(1);
704 if spins >= DEQUEUE_BLOCKING_BACKOFF_SPINS {
705 let wait = remaining.min(DEQUEUE_BLOCKING_BACKOFF_SLEEP);
706 let _ = self.work_signal.wait_for_change_timeout(observed, wait);
707 spins = 0;
708 } else {
709 std::hint::spin_loop();
710 }
711 continue;
712 }
713
714 spins = 0;
715 let _ = self
716 .work_signal
717 .wait_for_change_timeout(observed, remaining);
718 }
719 }
720 }
721 }
722
723 pub fn stats(&self) -> SchedulerStats {
725 let queue_time_histogram = QUEUE_TIME_BUCKETS_NS
726 .iter()
727 .enumerate()
728 .map(|(idx, bound)| crate::api::QueueTimeBucket {
729 le_ns: *bound,
730 count: self.stats.queue_time_buckets[idx].load(Ordering::Relaxed),
731 })
732 .collect::<Vec<_>>();
733
734 let total_samples: u64 = queue_time_histogram.iter().map(|b| b.count).sum();
735 let percentile = |pct: f64| -> u64 {
736 if total_samples == 0 {
737 return 0;
738 }
739 let target = (total_samples as f64 * pct).ceil() as u64;
740 let mut cumulative = 0u64;
741 for bucket in &queue_time_histogram {
742 cumulative = cumulative.saturating_add(bucket.count);
743 if cumulative >= target {
744 return bucket.le_ns;
745 }
746 }
747 queue_time_histogram
748 .last()
749 .map(|bucket| bucket.le_ns)
750 .unwrap_or(0)
751 };
752
753 let queue_len_estimate = self.stats.queue_len_estimate.load(Ordering::Relaxed);
754 let max_global = self.config.max_global as u64;
755 let queue_saturation_ratio = if max_global == 0 {
756 0.0
757 } else {
758 queue_len_estimate as f64 / max_global as f64
759 };
760
761 let top_tenants = self.top_tenants.lock().snapshot();
762
763 SchedulerStats {
764 enqueued: self.stats.enqueued.load(Ordering::Relaxed),
765 dequeued: self.stats.dequeued.load(Ordering::Relaxed),
766 expired: self.stats.expired.load(Ordering::Relaxed),
767 dropped: self.stats.dropped.load(Ordering::Relaxed),
768 rejected_global: self.stats.rejected_global.load(Ordering::Relaxed),
769 rejected_tenant: self.stats.rejected_tenant.load(Ordering::Relaxed),
770 timeout_rejected: self.stats.timeout_rejected.load(Ordering::Relaxed),
771 dropped_policy: self.stats.dropped_policy.load(Ordering::Relaxed),
772 queue_len_estimate,
773 max_global,
774 queue_saturation_ratio,
775 queue_time_sum_ns: self.stats.queue_time_sum_ns.load(Ordering::Relaxed),
776 queue_time_samples: self.stats.queue_time_samples.load(Ordering::Relaxed),
777 queue_time_p95_ns: percentile(0.95),
778 queue_time_p99_ns: percentile(0.99),
779 queue_time_histogram,
780 top_tenants,
781 }
782 }
783
784 #[cfg(test)]
785 pub(crate) fn debug_enqueue_purge_runs(&self) -> u64 {
786 self.stats.enqueue_purge_runs.load(Ordering::Relaxed)
787 }
788
789 #[cfg(test)]
790 pub(crate) fn debug_enqueue_purge_interval(&self) -> u32 {
791 ENQUEUE_PURGE_INTERVAL
792 }
793
794 pub fn close(&self) {
796 self.close_immediate();
797 }
798
799 pub fn close_immediate(&self) {
801 self.set_shutdown_mode(CloseMode::Immediate);
802 }
803
804 pub fn close_drain(&self) {
806 self.set_shutdown_mode(CloseMode::Drain);
807 }
808
809 pub fn close_with_mode(&self, mode: CloseMode) {
811 self.set_shutdown_mode(mode);
812 }
813}
814
815enum DropStrategy {
816 Oldest,
817 Newest,
818}
819
820enum EnqueueAttempt<T> {
821 Enqueued(TaskHandle),
822 Closed,
823 Full(Task<T>, EnqueueRejectReason),
824}
825
826impl<T> Scheduler<T> {
827 fn enqueue_reject_with_handle(
828 &self,
829 tenant: TenantKey,
830 task: Task<T>,
831 ) -> EnqueueWithHandleResult {
832 match self.enqueue_try_reject(tenant, task) {
833 EnqueueAttempt::Enqueued(handle) => EnqueueWithHandleResult::Enqueued(handle),
834 EnqueueAttempt::Closed => EnqueueWithHandleResult::Closed,
835 EnqueueAttempt::Full(_, reason) => {
836 self.record_reject(reason.clone());
837 EnqueueWithHandleResult::Rejected(reason)
838 }
839 }
840 }
841
842 fn enqueue_timeout_with_handle(
843 &self,
844 tenant: TenantKey,
845 mut task: Task<T>,
846 wait: Duration,
847 ) -> EnqueueWithHandleResult {
848 let deadline = Instant::now() + wait;
849 loop {
850 if !self.is_accepting_enqueues() {
851 return EnqueueWithHandleResult::Closed;
852 }
853
854 let observed = self.work_signal.current();
855 match self.enqueue_try_reject(tenant, task) {
856 EnqueueAttempt::Enqueued(handle) => {
857 return EnqueueWithHandleResult::Enqueued(handle);
858 }
859 EnqueueAttempt::Closed => return EnqueueWithHandleResult::Closed,
860 EnqueueAttempt::Full(returned, _) => {
861 task = returned;
862 if Instant::now() >= deadline {
863 self.record_reject(EnqueueRejectReason::Timeout);
864 return EnqueueWithHandleResult::Rejected(EnqueueRejectReason::Timeout);
865 }
866 let remaining = deadline.saturating_duration_since(Instant::now());
867 let _ = self
868 .work_signal
869 .wait_for_change_timeout(observed, remaining);
870 }
871 }
872 }
873 }
874
875 fn enqueue_try_reject(&self, tenant: TenantKey, task: Task<T>) -> EnqueueAttempt<T> {
876 if !self.is_accepting_enqueues() {
877 return EnqueueAttempt::Closed;
878 }
879
880 let shard_index = self.shard_index(tenant);
881 let mut shard = self.shards[shard_index].lock();
882
883 if !self.is_accepting_enqueues() {
884 return EnqueueAttempt::Closed;
885 }
886
887 let shard_was_empty = shard.active_rings.iter().all(|ring| ring.is_empty());
888
889 let tenant_state = shard
890 .tenants
891 .entry(tenant)
892 .or_insert_with(|| TenantState::new(self.quantum_for(tenant) as i64));
893
894 let purge_reason = self.maybe_purge_tenant_before_enqueue(tenant_state);
895 let mut ran_full_purge = matches!(purge_reason, Some(EnqueuePurgeReason::Full));
896
897 if tenant_state.total_len() >= self.config.max_per_tenant {
898 if !ran_full_purge {
899 let _ = self.purge_tenant_for_enqueue(tenant_state, EnqueuePurgeReason::Full);
900 ran_full_purge = true;
901 }
902 if tenant_state.total_len() >= self.config.max_per_tenant {
903 return EnqueueAttempt::Full(task, EnqueueRejectReason::TenantFull);
904 }
905 }
906
907 let mut reserved = self.try_reserve_slot();
908 if !reserved && !ran_full_purge {
909 let purge = self.purge_tenant_for_enqueue(tenant_state, EnqueuePurgeReason::Full);
910 if purge.released_slots > 0 {
911 reserved = self.try_reserve_slot();
912 }
913 }
914
915 if !reserved {
916 return EnqueueAttempt::Full(task, EnqueueRejectReason::GlobalFull);
917 }
918
919 let handle = self.next_handle();
920 self.register_pending(handle);
921
922 let priority_index = task.priority.index();
923 let queue = &mut tenant_state.queues[priority_index];
924 let was_empty = queue.is_empty();
925 queue.push_back(QueueEntry {
926 id: handle.as_u64(),
927 task,
928 });
929
930 if was_empty && !tenant_state.active[priority_index] {
931 tenant_state.active[priority_index] = true;
932 shard.active_rings[priority_index].push_back(tenant);
933 }
934
935 Self::saturating_add(&self.stats.enqueued, 1);
936
937 drop(shard);
938
939 if shard_was_empty {
940 self.activate_shard(shard_index);
941 }
942
943 if was_empty {
944 self.work_signal.notify_all();
945 }
946
947 EnqueueAttempt::Enqueued(handle)
948 }
949
950 fn enqueue_drop_with_handle(
951 &self,
952 tenant: TenantKey,
953 task: Task<T>,
954 strategy: DropStrategy,
955 ) -> EnqueueWithHandleResult {
956 if !self.is_accepting_enqueues() {
957 return EnqueueWithHandleResult::Closed;
958 }
959
960 let shard_index = self.shard_index(tenant);
961 let mut shard = self.shards[shard_index].lock();
962
963 if !self.is_accepting_enqueues() {
964 return EnqueueWithHandleResult::Closed;
965 }
966
967 let shard_was_empty = shard.active_rings.iter().all(|ring| ring.is_empty());
968
969 let tenant_state = shard
970 .tenants
971 .entry(tenant)
972 .or_insert_with(|| TenantState::new(self.quantum_for(tenant) as i64));
973
974 let purge_reason = self.maybe_purge_tenant_before_enqueue(tenant_state);
975 let ran_full_purge = matches!(purge_reason, Some(EnqueuePurgeReason::Full));
976
977 let current_len = self.stats.queue_len_estimate.load(Ordering::Acquire);
978 let tenant_full = tenant_state.total_len() >= self.config.max_per_tenant;
979 let global_full = current_len >= self.config.max_global as u64;
980
981 let mut reused_slot = false;
982 if tenant_full || global_full {
983 let dropped = self.drop_from_tenant(tenant_state, strategy, task.priority);
984 let Some(dropped) = dropped else {
985 let reason = if tenant_full {
986 EnqueueRejectReason::TenantFull
987 } else {
988 EnqueueRejectReason::GlobalFull
989 };
990 self.record_reject(reason.clone());
991 return EnqueueWithHandleResult::Rejected(reason);
992 };
993
994 self.clear_cancelled_marker(dropped.id);
995 if self.take_pending(dropped.id) {
996 self.record_policy_drop();
997 reused_slot = true;
998 }
999 }
1000
1001 if !reused_slot {
1002 let mut reserved = self.try_reserve_slot();
1003 if !reserved && !ran_full_purge {
1004 let purge = self.purge_tenant_for_enqueue(tenant_state, EnqueuePurgeReason::Full);
1005 if purge.released_slots > 0 {
1006 reserved = self.try_reserve_slot();
1007 }
1008 }
1009
1010 if !reserved {
1011 self.record_reject(EnqueueRejectReason::GlobalFull);
1012 return EnqueueWithHandleResult::Rejected(EnqueueRejectReason::GlobalFull);
1013 }
1014 }
1015
1016 let handle = self.next_handle();
1017 self.register_pending(handle);
1018
1019 let priority_index = task.priority.index();
1020 let queue = &mut tenant_state.queues[priority_index];
1021 let was_empty = queue.is_empty();
1022 queue.push_back(QueueEntry {
1023 id: handle.as_u64(),
1024 task,
1025 });
1026
1027 if was_empty && !tenant_state.active[priority_index] {
1028 tenant_state.active[priority_index] = true;
1029 shard.active_rings[priority_index].push_back(tenant);
1030 }
1031
1032 Self::saturating_add(&self.stats.enqueued, 1);
1033
1034 drop(shard);
1035
1036 if shard_was_empty {
1037 self.activate_shard(shard_index);
1038 }
1039
1040 if was_empty {
1041 self.work_signal.notify_all();
1042 }
1043
1044 EnqueueWithHandleResult::Enqueued(handle)
1045 }
1046
1047 fn drop_from_tenant(
1048 &self,
1049 tenant_state: &mut TenantState<T>,
1050 strategy: DropStrategy,
1051 incoming: Priority,
1052 ) -> Option<QueueEntry<T>> {
1053 const DROP_HIGH: [Priority; 3] = [Priority::Low, Priority::Normal, Priority::High];
1054 const DROP_NORMAL: [Priority; 2] = [Priority::Low, Priority::Normal];
1055 const DROP_LOW: [Priority; 1] = [Priority::Low];
1056
1057 let drop_order: &[Priority] = match incoming {
1058 Priority::High => &DROP_HIGH,
1059 Priority::Normal => &DROP_NORMAL,
1060 Priority::Low => &DROP_LOW,
1061 };
1062
1063 for &priority in drop_order {
1064 let idx = priority.index();
1065 if !tenant_state.queues[idx].is_empty() {
1066 return match strategy {
1067 DropStrategy::Oldest => tenant_state.queues[idx].pop_front(),
1068 DropStrategy::Newest => tenant_state.queues[idx].pop_back(),
1069 };
1070 }
1071 }
1072 None
1073 }
1074
1075 fn backpressure_for(&self, tenant: TenantKey) -> &crate::api::BackpressurePolicy {
1076 self.config
1077 .backpressure_by_tenant
1078 .get(&tenant)
1079 .unwrap_or(&self.config.backpressure)
1080 }
1081}