1use std::collections::{BTreeMap, BTreeSet};
14
15use serde::{Deserialize, Serialize};
16
17use super::worker_queue::{WorkerQueueJobState, WorkerQueuePriority};
18use super::TenantId;
19
20pub const DEFAULT_STARVATION_AGE_MS: u64 = 5 * 60 * 1000;
22
23#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "kebab-case")]
26pub enum FairnessKey {
27 #[default]
30 Tenant,
31 Binding,
33 TriggerId,
35 TenantAndBinding,
38}
39
40impl FairnessKey {
41 pub fn as_str(self) -> &'static str {
42 match self {
43 Self::Tenant => "tenant",
44 Self::Binding => "binding",
45 Self::TriggerId => "trigger-id",
46 Self::TenantAndBinding => "tenant-and-binding",
47 }
48 }
49}
50
51#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
53#[serde(tag = "kind", rename_all = "kebab-case")]
54pub enum SchedulerStrategy {
55 #[default]
58 Fifo,
59 DeficitRoundRobin {
61 #[serde(default = "default_quantum")]
65 quantum: u32,
66 #[serde(default)]
70 starvation_age_ms: Option<u64>,
71 },
72}
73
74fn default_quantum() -> u32 {
75 1
76}
77
78#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
80pub struct SchedulerPolicy {
81 #[serde(default)]
82 pub strategy: SchedulerStrategy,
83 #[serde(default)]
84 pub fairness_key: FairnessKey,
85 #[serde(default)]
87 pub weights: BTreeMap<String, u32>,
88 #[serde(default = "default_weight")]
89 pub default_weight: u32,
90 #[serde(default)]
92 pub max_concurrent_per_key: u32,
93}
94
95fn default_weight() -> u32 {
96 1
97}
98
99impl SchedulerPolicy {
100 pub fn fifo() -> Self {
101 Self::default()
102 }
103
104 pub fn deficit_round_robin(fairness_key: FairnessKey) -> Self {
105 Self {
106 strategy: SchedulerStrategy::DeficitRoundRobin {
107 quantum: 1,
108 starvation_age_ms: Some(DEFAULT_STARVATION_AGE_MS),
109 },
110 fairness_key,
111 weights: BTreeMap::new(),
112 default_weight: 1,
113 max_concurrent_per_key: 0,
114 }
115 }
116
117 pub fn with_weight(mut self, key: impl Into<String>, weight: u32) -> Self {
118 self.weights.insert(key.into(), weight.max(1));
119 self
120 }
121
122 pub fn with_max_concurrent_per_key(mut self, max: u32) -> Self {
123 self.max_concurrent_per_key = max;
124 self
125 }
126
127 pub fn with_starvation_age_ms(mut self, age_ms: u64) -> Self {
128 if let SchedulerStrategy::DeficitRoundRobin {
129 starvation_age_ms, ..
130 } = &mut self.strategy
131 {
132 *starvation_age_ms = Some(age_ms);
133 }
134 self
135 }
136
137 pub fn weight_for(&self, key: &str) -> u32 {
138 self.weights
139 .get(key)
140 .copied()
141 .unwrap_or(self.default_weight)
142 .max(1)
143 }
144
145 pub fn from_env() -> Self {
162 Self::from_env_lookup(|name| std::env::var(name).ok())
163 }
164
165 pub fn from_env_lookup<F>(lookup: F) -> Self
168 where
169 F: Fn(&str) -> Option<String>,
170 {
171 let strategy_raw = lookup("HARN_SCHEDULER_STRATEGY")
172 .map(|value| value.trim().to_ascii_lowercase())
173 .unwrap_or_else(|| "fifo".to_string());
174 let fairness_key = parse_fairness_key(
175 lookup("HARN_SCHEDULER_FAIRNESS_KEY")
176 .as_deref()
177 .map(str::trim)
178 .unwrap_or(""),
179 );
180 let default_weight = lookup("HARN_SCHEDULER_DEFAULT_WEIGHT")
181 .as_deref()
182 .and_then(|raw| raw.trim().parse::<u32>().ok())
183 .filter(|n| *n >= 1)
184 .unwrap_or(1);
185 let weights = lookup("HARN_SCHEDULER_WEIGHTS")
186 .map(|raw| parse_weights(&raw))
187 .unwrap_or_default();
188 let max_concurrent_per_key = lookup("HARN_SCHEDULER_MAX_CONCURRENT_PER_KEY")
189 .as_deref()
190 .and_then(|raw| raw.trim().parse::<u32>().ok())
191 .unwrap_or(0);
192
193 let strategy = match strategy_raw.as_str() {
194 "drr" | "deficit-round-robin" | "fair-share" => {
195 let quantum = lookup("HARN_SCHEDULER_QUANTUM")
196 .as_deref()
197 .and_then(|raw| raw.trim().parse::<u32>().ok())
198 .filter(|n| *n >= 1)
199 .unwrap_or(1);
200 let starvation_age_ms = match lookup("HARN_SCHEDULER_STARVATION_AGE_MS").as_deref()
201 {
202 Some(raw) => {
203 let parsed = raw.trim().parse::<u64>().ok();
204 match parsed {
205 Some(0) => None,
206 Some(n) => Some(n),
207 None => Some(DEFAULT_STARVATION_AGE_MS),
208 }
209 }
210 None => Some(DEFAULT_STARVATION_AGE_MS),
211 };
212 SchedulerStrategy::DeficitRoundRobin {
213 quantum,
214 starvation_age_ms,
215 }
216 }
217 _ => SchedulerStrategy::Fifo,
218 };
219
220 Self {
221 strategy,
222 fairness_key,
223 weights,
224 default_weight,
225 max_concurrent_per_key,
226 }
227 }
228
229 pub fn fairness_key_of(&self, job: &SchedulableJob<'_>) -> String {
230 match self.fairness_key {
231 FairnessKey::Tenant => job
232 .tenant_id
233 .map(|t| t.0.clone())
234 .unwrap_or_else(|| "_no_tenant".to_string()),
235 FairnessKey::Binding => job.binding_key.to_string(),
236 FairnessKey::TriggerId => job.trigger_id.to_string(),
237 FairnessKey::TenantAndBinding => {
238 let tenant = job.tenant_id.map(|t| t.0.as_str()).unwrap_or("_no_tenant");
239 format!("{tenant}|{}", job.binding_key)
240 }
241 }
242 }
243
244 pub fn strategy_name(&self) -> &'static str {
245 match self.strategy {
246 SchedulerStrategy::Fifo => "fifo",
247 SchedulerStrategy::DeficitRoundRobin { .. } => "drr",
248 }
249 }
250}
251
252#[derive(Clone, Copy, Debug)]
254pub struct SchedulableJob<'a> {
255 pub job_event_id: u64,
256 pub enqueued_at_ms: i64,
257 pub priority: WorkerQueuePriority,
258 pub tenant_id: Option<&'a TenantId>,
259 pub binding_key: &'a str,
260 pub trigger_id: &'a str,
261 pub queue: &'a str,
262}
263
264impl<'a> SchedulableJob<'a> {
265 pub fn from_state(state: &'a WorkerQueueJobState) -> Self {
266 Self {
267 job_event_id: state.job_event_id,
268 enqueued_at_ms: state.enqueued_at_ms,
269 priority: state.job.priority,
270 tenant_id: state.job.event.tenant_id.as_ref(),
271 binding_key: state.job.binding_key.as_str(),
272 trigger_id: state.job.trigger_id.as_str(),
273 queue: state.job.queue.as_str(),
274 }
275 }
276}
277
278#[derive(Clone, Debug, PartialEq, Eq)]
282pub struct SchedulerSelection {
283 pub job_event_id: u64,
284 pub fairness_key: String,
285}
286
287#[derive(Clone, Debug, Default)]
290pub struct SchedulerState {
291 credits: BTreeMap<String, u32>,
294 in_flight: BTreeMap<String, u32>,
297 last_selected: Option<String>,
299 selected_total: BTreeMap<String, u64>,
301 deferred_total: BTreeMap<String, u64>,
303 starvation_promotions_total: u64,
306 rounds_completed: u64,
308}
309
310impl SchedulerState {
311 pub fn new() -> Self {
312 Self::default()
313 }
314
315 pub fn select(
319 &mut self,
320 candidates: &[SchedulableJob<'_>],
321 policy: &SchedulerPolicy,
322 now_ms: i64,
323 ) -> Option<SchedulerSelection> {
324 if candidates.is_empty() {
325 return None;
326 }
327 match &policy.strategy {
328 SchedulerStrategy::Fifo => fifo_select(candidates, policy, now_ms),
329 SchedulerStrategy::DeficitRoundRobin {
330 quantum,
331 starvation_age_ms,
332 } => self.drr_select(candidates, policy, *quantum, *starvation_age_ms, now_ms),
333 }
334 }
335
336 pub fn note_claim_committed(&mut self, fairness_key: &str) {
339 *self.in_flight.entry(fairness_key.to_string()).or_default() += 1;
340 }
341
342 pub fn note_claim_released(&mut self, fairness_key: &str) {
345 if let Some(count) = self.in_flight.get_mut(fairness_key) {
346 *count = count.saturating_sub(1);
347 if *count == 0 {
348 self.in_flight.remove(fairness_key);
349 }
350 }
351 }
352
353 pub fn replace_in_flight(&mut self, snapshot: BTreeMap<String, u32>) {
356 self.in_flight = snapshot.into_iter().filter(|(_, n)| *n > 0).collect();
357 }
358
359 pub fn rounds_completed(&self) -> u64 {
360 self.rounds_completed
361 }
362
363 pub fn starvation_promotions_total(&self) -> u64 {
364 self.starvation_promotions_total
365 }
366
367 pub fn deficit_for(&self, key: &str) -> i64 {
368 self.credits.get(key).copied().unwrap_or(0) as i64
369 }
370
371 pub fn in_flight_for(&self, key: &str) -> u32 {
372 self.in_flight.get(key).copied().unwrap_or(0)
373 }
374
375 pub fn selected_total_for(&self, key: &str) -> u64 {
376 self.selected_total.get(key).copied().unwrap_or(0)
377 }
378
379 pub fn deferred_total_for(&self, key: &str) -> u64 {
380 self.deferred_total.get(key).copied().unwrap_or(0)
381 }
382
383 pub fn snapshot(
384 &self,
385 policy: &SchedulerPolicy,
386 ready_by_key: &BTreeMap<String, ReadyKeyStats>,
387 ) -> SchedulerSnapshot {
388 let mut all_keys: BTreeSet<String> = self.credits.keys().cloned().collect();
389 all_keys.extend(self.selected_total.keys().cloned());
390 all_keys.extend(self.deferred_total.keys().cloned());
391 all_keys.extend(self.in_flight.keys().cloned());
392 all_keys.extend(ready_by_key.keys().cloned());
393
394 let keys = all_keys
395 .into_iter()
396 .map(|key| {
397 let ready = ready_by_key
398 .get(&key)
399 .copied()
400 .unwrap_or(ReadyKeyStats::default());
401 SchedulerKeyStat {
402 fairness_key: key.clone(),
403 weight: policy.weight_for(&key),
404 deficit: self.deficit_for(&key),
405 in_flight: self.in_flight_for(&key),
406 selected_total: self.selected_total_for(&key),
407 deferred_total: self.deferred_total_for(&key),
408 ready_jobs: ready.ready_jobs,
409 oldest_ready_age_ms: ready.oldest_ready_age_ms,
410 }
411 })
412 .collect();
413
414 SchedulerSnapshot {
415 strategy: policy.strategy_name().to_string(),
416 fairness_key: policy.fairness_key.as_str().to_string(),
417 rounds_completed: self.rounds_completed,
418 starvation_promotions_total: self.starvation_promotions_total,
419 keys,
420 }
421 }
422
423 fn drr_select(
424 &mut self,
425 candidates: &[SchedulableJob<'_>],
426 policy: &SchedulerPolicy,
427 quantum: u32,
428 starvation_age_ms: Option<u64>,
429 now_ms: i64,
430 ) -> Option<SchedulerSelection> {
431 let mut groups: BTreeMap<String, Vec<&SchedulableJob<'_>>> = BTreeMap::new();
434 for job in candidates {
435 let key = policy.fairness_key_of(job);
436 groups.entry(key).or_default().push(job);
437 }
438 for jobs in groups.values_mut() {
439 jobs.sort_by_key(|j| {
440 (
441 j.priority.effective_rank(j.enqueued_at_ms, now_ms),
442 j.enqueued_at_ms,
443 j.job_event_id,
444 )
445 });
446 }
447
448 if let Some(threshold) = starvation_age_ms {
451 let mut oldest: Option<(i64, String, u64)> = None;
452 for (key, jobs) in &groups {
453 if policy.max_concurrent_per_key > 0
454 && self.in_flight_for(key) >= policy.max_concurrent_per_key
455 {
456 continue;
457 }
458 let head = match jobs.first() {
459 Some(job) => *job,
460 None => continue,
461 };
462 let age_ms = now_ms.saturating_sub(head.enqueued_at_ms).max(0);
463 if (age_ms as u64) >= threshold
464 && oldest
465 .as_ref()
466 .map(|(prev, _, _)| head.enqueued_at_ms < *prev)
467 .unwrap_or(true)
468 {
469 oldest = Some((head.enqueued_at_ms, key.clone(), head.job_event_id));
470 }
471 }
472 if let Some((_, key, job_event_id)) = oldest {
473 self.starvation_promotions_total += 1;
474 self.commit_selection(&key);
475 return Some(SchedulerSelection {
476 job_event_id,
477 fairness_key: key,
478 });
479 }
480 }
481
482 let mut eligible_keys: Vec<String> = groups
484 .iter()
485 .filter(|(key, jobs)| {
486 !jobs.is_empty()
487 && (policy.max_concurrent_per_key == 0
488 || self.in_flight_for(key) < policy.max_concurrent_per_key)
489 })
490 .map(|(key, _)| key.clone())
491 .collect();
492 eligible_keys.sort();
493 if eligible_keys.is_empty() {
494 for key in groups.keys() {
495 *self.deferred_total.entry(key.clone()).or_default() += 1;
496 }
497 return None;
498 }
499
500 let n = eligible_keys.len();
501 let start = self.start_index(&eligible_keys);
502
503 for offset in 0..n {
505 let idx = (start + offset) % n;
506 let key = eligible_keys[idx].clone();
507 if self.credits.get(&key).copied().unwrap_or(0) >= 1 {
508 let job_event_id = groups
509 .get(&key)
510 .and_then(|jobs| jobs.first())
511 .map(|job| job.job_event_id)?;
512 self.spend_credit(&key);
513 self.commit_selection(&key);
514 return Some(SchedulerSelection {
515 job_event_id,
516 fairness_key: key,
517 });
518 }
519 }
520
521 for key in &eligible_keys {
523 let credits = policy.weight_for(key) as u64 * quantum as u64;
524 let credits = credits.min(u32::MAX as u64) as u32;
525 *self.credits.entry(key.clone()).or_insert(0) += credits;
526 }
527 self.rounds_completed += 1;
528
529 for offset in 0..n {
530 let idx = (start + offset) % n;
531 let key = eligible_keys[idx].clone();
532 if self.credits.get(&key).copied().unwrap_or(0) >= 1 {
533 let job_event_id = groups
534 .get(&key)
535 .and_then(|jobs| jobs.first())
536 .map(|job| job.job_event_id)?;
537 self.spend_credit(&key);
538 self.commit_selection(&key);
539 return Some(SchedulerSelection {
540 job_event_id,
541 fairness_key: key,
542 });
543 }
544 }
545 None
546 }
547
548 fn start_index(&self, eligible_keys: &[String]) -> usize {
549 match &self.last_selected {
550 Some(last) => eligible_keys
551 .iter()
552 .position(|key| key.as_str() > last.as_str())
553 .unwrap_or(0),
554 None => 0,
555 }
556 }
557
558 fn spend_credit(&mut self, key: &str) {
559 if let Some(credits) = self.credits.get_mut(key) {
560 *credits = credits.saturating_sub(1);
561 }
562 }
563
564 fn commit_selection(&mut self, key: &str) {
565 self.last_selected = Some(key.to_string());
566 *self.selected_total.entry(key.to_string()).or_default() += 1;
567 }
568}
569
570fn fifo_select(
571 candidates: &[SchedulableJob<'_>],
572 policy: &SchedulerPolicy,
573 now_ms: i64,
574) -> Option<SchedulerSelection> {
575 let pick = candidates.iter().min_by_key(|job| {
576 (
577 job.priority.effective_rank(job.enqueued_at_ms, now_ms),
578 job.enqueued_at_ms,
579 job.job_event_id,
580 )
581 })?;
582 Some(SchedulerSelection {
583 job_event_id: pick.job_event_id,
584 fairness_key: policy.fairness_key_of(pick),
585 })
586}
587
588#[derive(Clone, Copy, Debug, Default, Serialize)]
589pub struct ReadyKeyStats {
590 pub ready_jobs: u32,
591 pub oldest_ready_age_ms: u64,
592}
593
594#[derive(Clone, Debug, Serialize)]
595pub struct SchedulerKeyStat {
596 pub fairness_key: String,
597 pub weight: u32,
598 pub deficit: i64,
599 pub in_flight: u32,
600 pub selected_total: u64,
601 pub deferred_total: u64,
602 pub ready_jobs: u32,
603 pub oldest_ready_age_ms: u64,
604}
605
606#[derive(Clone, Debug, Serialize)]
607pub struct SchedulerSnapshot {
608 pub strategy: String,
609 pub fairness_key: String,
610 pub rounds_completed: u64,
611 pub starvation_promotions_total: u64,
612 pub keys: Vec<SchedulerKeyStat>,
613}
614
615pub fn ready_stats_by_key(
618 jobs: &[WorkerQueueJobState],
619 policy: &SchedulerPolicy,
620 now_ms: i64,
621) -> BTreeMap<String, ReadyKeyStats> {
622 let mut out: BTreeMap<String, ReadyKeyStats> = BTreeMap::new();
623 for state in jobs.iter().filter(|j| j.is_ready()) {
624 let view = SchedulableJob::from_state(state);
625 let key = policy.fairness_key_of(&view);
626 let entry = out.entry(key).or_default();
627 entry.ready_jobs += 1;
628 let age = now_ms.saturating_sub(state.enqueued_at_ms).max(0) as u64;
629 if age > entry.oldest_ready_age_ms {
630 entry.oldest_ready_age_ms = age;
631 }
632 }
633 out
634}
635
636fn parse_fairness_key(raw: &str) -> FairnessKey {
637 match raw.to_ascii_lowercase().as_str() {
638 "binding" => FairnessKey::Binding,
639 "trigger-id" | "trigger_id" => FairnessKey::TriggerId,
640 "tenant-and-binding" | "tenant_and_binding" | "composite" => FairnessKey::TenantAndBinding,
641 _ => FairnessKey::Tenant,
642 }
643}
644
645fn parse_weights(raw: &str) -> BTreeMap<String, u32> {
646 let mut out = BTreeMap::new();
647 for entry in raw.split(',') {
648 let trimmed = entry.trim();
649 if trimmed.is_empty() {
650 continue;
651 }
652 if let Some((key, value)) = trimmed.rsplit_once(':') {
653 let key = key.trim().to_string();
654 if key.is_empty() {
655 continue;
656 }
657 if let Ok(weight) = value.trim().parse::<u32>() {
658 if weight >= 1 {
659 out.insert(key, weight);
660 }
661 }
662 }
663 }
664 out
665}
666
667pub fn in_flight_by_key(
669 jobs: &[WorkerQueueJobState],
670 policy: &SchedulerPolicy,
671) -> BTreeMap<String, u32> {
672 let mut out: BTreeMap<String, u32> = BTreeMap::new();
673 for state in jobs {
674 if state.acked || state.purged || state.active_claim.is_none() {
675 continue;
676 }
677 let view = SchedulableJob::from_state(state);
678 let key = policy.fairness_key_of(&view);
679 *out.entry(key).or_default() += 1;
680 }
681 out
682}
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687 use crate::triggers::event::{
688 GenericWebhookPayload, KnownProviderPayload, ProviderId, ProviderPayload, SignatureStatus,
689 TraceId, TriggerEvent, TriggerEventId,
690 };
691 use crate::triggers::worker_queue::{WorkerQueueJob, WorkerQueueJobState};
692 use std::collections::BTreeMap as Map;
693
694 fn event(id: &str, tenant: Option<&str>) -> TriggerEvent {
695 TriggerEvent {
696 id: TriggerEventId(id.to_string()),
697 provider: ProviderId::from("test"),
698 kind: "test.event".to_string(),
699 trace_id: TraceId("trace-x".to_string()),
700 dedupe_key: id.to_string(),
701 tenant_id: tenant.map(TenantId::new),
702 headers: Map::new(),
703 batch: None,
704 raw_body: None,
705 provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
706 GenericWebhookPayload {
707 source: None,
708 content_type: None,
709 raw: serde_json::json!({}),
710 },
711 )),
712 signature_status: SignatureStatus::Verified,
713 received_at: time::OffsetDateTime::now_utc(),
714 occurred_at: None,
715 dedupe_claimed: false,
716 }
717 }
718
719 fn state(
720 job_event_id: u64,
721 enqueued_at_ms: i64,
722 tenant: Option<&str>,
723 trigger_id: &str,
724 priority: WorkerQueuePriority,
725 ) -> WorkerQueueJobState {
726 WorkerQueueJobState {
727 job_event_id,
728 enqueued_at_ms,
729 job: WorkerQueueJob {
730 queue: "q".to_string(),
731 trigger_id: trigger_id.to_string(),
732 binding_key: format!("{trigger_id}@v1"),
733 binding_version: 1,
734 event: event(&format!("evt-{job_event_id}"), tenant),
735 replay_of_event_id: None,
736 priority,
737 },
738 active_claim: None,
739 acked: false,
740 purged: false,
741 }
742 }
743
744 fn snapshot_views<'a>(states: &'a [WorkerQueueJobState]) -> Vec<SchedulableJob<'a>> {
745 states.iter().map(SchedulableJob::from_state).collect()
746 }
747
748 #[test]
749 fn fifo_strategy_matches_priority_and_age() {
750 let jobs = vec![
751 state(1, 100, Some("a"), "t-low", WorkerQueuePriority::Low),
752 state(2, 50, Some("a"), "t-high", WorkerQueuePriority::High),
753 state(3, 200, Some("a"), "t-normal", WorkerQueuePriority::Normal),
754 ];
755 let mut sched = SchedulerState::new();
756 let policy = SchedulerPolicy::fifo();
757 let pick = sched
758 .select(&snapshot_views(&jobs), &policy, 1_000)
759 .unwrap();
760 assert_eq!(pick.job_event_id, 2, "high priority always wins under FIFO");
761 }
762
763 #[test]
764 fn drr_alternates_across_tenants_when_one_tenant_is_hot() {
765 let mut jobs = Vec::new();
768 for idx in 0..100 {
769 jobs.push(state(
770 100 + idx,
771 100 + idx as i64,
772 Some("tenant-a"),
773 "trigger",
774 WorkerQueuePriority::Normal,
775 ));
776 }
777 jobs.push(state(
778 5,
779 500,
780 Some("tenant-b"),
781 "trigger",
782 WorkerQueuePriority::Normal,
783 ));
784 let mut sched = SchedulerState::new();
785 let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant);
786
787 let first = sched
788 .select(&snapshot_views(&jobs), &policy, 10_000)
789 .unwrap();
790 let second_jobs: Vec<_> = jobs
791 .iter()
792 .filter(|j| j.job_event_id != first.job_event_id)
793 .cloned()
794 .collect();
795 let second = sched
796 .select(&snapshot_views(&second_jobs), &policy, 10_001)
797 .unwrap();
798 let tenants = [first.fairness_key.clone(), second.fairness_key.clone()];
799 assert!(
800 tenants.contains(&"tenant-a".to_string()) && tenants.contains(&"tenant-b".to_string()),
801 "expected both tenants represented in first two picks, got {tenants:?}",
802 );
803 }
804
805 #[test]
806 fn drr_respects_weighted_share_two_to_one() {
807 let now_ms = 1_000_000;
809 let mut sched = SchedulerState::new();
810 let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant)
811 .with_weight("tenant-a", 2)
812 .with_weight("tenant-b", 1);
813
814 let mut acount = 0;
815 let mut bcount = 0;
816 for _ in 0..120 {
817 let jobs = vec![
819 state(
820 1,
821 now_ms,
822 Some("tenant-a"),
823 "trigger",
824 WorkerQueuePriority::Normal,
825 ),
826 state(
827 2,
828 now_ms,
829 Some("tenant-b"),
830 "trigger",
831 WorkerQueuePriority::Normal,
832 ),
833 ];
834 let pick = sched
835 .select(&snapshot_views(&jobs), &policy, now_ms)
836 .unwrap();
837 match pick.fairness_key.as_str() {
838 "tenant-a" => acount += 1,
839 "tenant-b" => bcount += 1,
840 _ => unreachable!(),
841 }
842 }
843 let ratio = acount as f64 / bcount as f64;
845 assert!(
846 (1.8..=2.2).contains(&ratio),
847 "expected ~2:1 selection ratio, got a={acount} b={bcount} ratio={ratio:.3}",
848 );
849 }
850
851 #[test]
852 fn drr_starvation_promotion_picks_old_job_when_threshold_exceeded() {
853 let mut sched = SchedulerState::new();
854 let policy =
855 SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant).with_starvation_age_ms(1_000);
856
857 for _ in 0..3 {
859 let jobs = vec![state(
860 1,
861 100,
862 Some("tenant-a"),
863 "trigger",
864 WorkerQueuePriority::Normal,
865 )];
866 sched.select(&snapshot_views(&jobs), &policy, 200).unwrap();
867 }
868
869 let jobs = vec![
873 state(
874 2,
875 10,
876 Some("tenant-b"),
877 "trigger",
878 WorkerQueuePriority::Normal,
879 ),
880 state(
881 3,
882 10_000,
883 Some("tenant-a"),
884 "trigger",
885 WorkerQueuePriority::Normal,
886 ),
887 ];
888 let pick = sched
889 .select(&snapshot_views(&jobs), &policy, 20_000)
890 .unwrap();
891 assert_eq!(pick.fairness_key, "tenant-b");
892 assert_eq!(sched.starvation_promotions_total(), 1);
893 }
894
895 #[test]
896 fn drr_max_concurrent_per_key_blocks_hot_tenant() {
897 let mut sched = SchedulerState::new();
898 let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant)
899 .with_max_concurrent_per_key(1);
900
901 sched.note_claim_committed("tenant-a");
903
904 let jobs = vec![
907 state(
908 1,
909 10,
910 Some("tenant-a"),
911 "trigger",
912 WorkerQueuePriority::Normal,
913 ),
914 state(
915 2,
916 500,
917 Some("tenant-b"),
918 "trigger",
919 WorkerQueuePriority::Normal,
920 ),
921 ];
922 let pick = sched
923 .select(&snapshot_views(&jobs), &policy, 1_000)
924 .unwrap();
925 assert_eq!(pick.fairness_key, "tenant-b");
926 }
927
928 #[test]
929 fn drr_returns_none_when_all_keys_capped() {
930 let mut sched = SchedulerState::new();
931 let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant)
932 .with_max_concurrent_per_key(1);
933 sched.note_claim_committed("tenant-a");
934 sched.note_claim_committed("tenant-b");
935
936 let jobs = vec![
937 state(
938 1,
939 10,
940 Some("tenant-a"),
941 "trigger",
942 WorkerQueuePriority::Normal,
943 ),
944 state(
945 2,
946 500,
947 Some("tenant-b"),
948 "trigger",
949 WorkerQueuePriority::Normal,
950 ),
951 ];
952 assert!(sched
953 .select(&snapshot_views(&jobs), &policy, 1_000)
954 .is_none());
955 assert_eq!(sched.deferred_total_for("tenant-a"), 1);
956 assert_eq!(sched.deferred_total_for("tenant-b"), 1);
957 }
958
959 #[test]
960 fn drr_priority_within_a_tenant_still_holds() {
961 let mut sched = SchedulerState::new();
962 let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant);
963 let jobs = vec![
964 state(
965 1,
966 100,
967 Some("tenant-a"),
968 "trigger-low",
969 WorkerQueuePriority::Low,
970 ),
971 state(
972 2,
973 100,
974 Some("tenant-a"),
975 "trigger-high",
976 WorkerQueuePriority::High,
977 ),
978 ];
979 let pick = sched
980 .select(&snapshot_views(&jobs), &policy, 1_000)
981 .unwrap();
982 assert_eq!(
983 pick.job_event_id, 2,
984 "high priority should win within a tenant"
985 );
986 }
987
988 #[test]
989 fn snapshot_includes_fairness_state_per_key() {
990 let mut sched = SchedulerState::new();
991 let policy =
992 SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant).with_weight("tenant-a", 3);
993
994 for _ in 0..5 {
995 let jobs = vec![
996 state(
997 1,
998 100,
999 Some("tenant-a"),
1000 "trigger",
1001 WorkerQueuePriority::Normal,
1002 ),
1003 state(
1004 2,
1005 200,
1006 Some("tenant-b"),
1007 "trigger",
1008 WorkerQueuePriority::Normal,
1009 ),
1010 ];
1011 sched.select(&snapshot_views(&jobs), &policy, 300).unwrap();
1012 }
1013
1014 let states = vec![
1015 state(
1016 3,
1017 100,
1018 Some("tenant-a"),
1019 "trigger",
1020 WorkerQueuePriority::Normal,
1021 ),
1022 state(
1023 4,
1024 100,
1025 Some("tenant-b"),
1026 "trigger",
1027 WorkerQueuePriority::Normal,
1028 ),
1029 ];
1030 let ready = ready_stats_by_key(&states, &policy, 5_000);
1031 let snap = sched.snapshot(&policy, &ready);
1032 assert_eq!(snap.strategy, "drr");
1033 assert_eq!(snap.fairness_key, "tenant");
1034 let a = snap
1035 .keys
1036 .iter()
1037 .find(|k| k.fairness_key == "tenant-a")
1038 .unwrap();
1039 assert_eq!(a.weight, 3);
1040 let b = snap
1041 .keys
1042 .iter()
1043 .find(|k| k.fairness_key == "tenant-b")
1044 .unwrap();
1045 assert!(a.selected_total > b.selected_total);
1046 assert!(a.ready_jobs >= 1 && b.ready_jobs >= 1);
1047 }
1048}