Skip to main content

harn_vm/triggers/
scheduler.rs

1//! Fair-share scheduler for trigger dispatch and worker-queue claims.
2//!
3//! Implements a deterministic weighted round-robin (WRR) selection policy with
4//! deficit accounting and starvation-age promotion. The scheduler groups
5//! ready candidates by a fairness key (tenant, binding, trigger, or a
6//! composite) and rotates through groups so a hot key cannot monopolise
7//! shared capacity.
8//!
9//! The default policy (`SchedulerPolicy::fifo`) reproduces the previous
10//! priority-then-FIFO behaviour, so single-tenant deployments and existing
11//! callers see no change unless they opt in to deficit round-robin.
12
13use std::collections::{BTreeMap, BTreeSet};
14
15use serde::{Deserialize, Serialize};
16
17use super::worker_queue::{WorkerQueueJobState, WorkerQueuePriority};
18use super::TenantId;
19
20/// Default starvation-age promotion threshold (5 minutes).
21pub const DEFAULT_STARVATION_AGE_MS: u64 = 5 * 60 * 1000;
22
23/// What dimension to use when grouping ready candidates for fair-share.
24#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "kebab-case")]
26pub enum FairnessKey {
27    /// Group by `TriggerEvent.tenant_id` (events without a tenant share a
28    /// single bucket).
29    #[default]
30    Tenant,
31    /// Group by `WorkerQueueJob.binding_key` (binding-version aware).
32    Binding,
33    /// Group by `WorkerQueueJob.trigger_id`.
34    TriggerId,
35    /// Composite of tenant + binding so multi-binding tenants get fairness
36    /// per-binding within their share.
37    TenantAndBinding,
38}
39
40impl FairnessKey {
41    pub fn as_str(self) -> &'static str {
42        match self {
43            Self::Tenant => "tenant",
44            Self::Binding => "binding",
45            Self::TriggerId => "trigger-id",
46            Self::TenantAndBinding => "tenant-and-binding",
47        }
48    }
49}
50
51/// Selection strategy used by the scheduler.
52#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
53#[serde(tag = "kind", rename_all = "kebab-case")]
54pub enum SchedulerStrategy {
55    /// Pure FIFO with priority + age promotion. Equivalent to historical
56    /// behaviour.
57    #[default]
58    Fifo,
59    /// Deficit / weighted round-robin across fairness keys.
60    DeficitRoundRobin {
61        /// Base credits granted to a fairness key per refill round (default 1).
62        /// Higher quantum amortises rotation overhead at the cost of larger
63        /// burst windows.
64        #[serde(default = "default_quantum")]
65        quantum: u32,
66        /// Optional starvation-age promotion threshold in milliseconds. When
67        /// set, any ready job older than the threshold is selected regardless
68        /// of credits.
69        #[serde(default)]
70        starvation_age_ms: Option<u64>,
71    },
72}
73
74fn default_quantum() -> u32 {
75    1
76}
77
78/// Policy controlling scheduler selection.
79#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
80pub struct SchedulerPolicy {
81    #[serde(default)]
82    pub strategy: SchedulerStrategy,
83    #[serde(default)]
84    pub fairness_key: FairnessKey,
85    /// Per-fairness-key weights. Missing keys default to `default_weight`.
86    #[serde(default)]
87    pub weights: BTreeMap<String, u32>,
88    #[serde(default = "default_weight")]
89    pub default_weight: u32,
90    /// Per-fairness-key max in-flight claims (0 = unlimited).
91    #[serde(default)]
92    pub max_concurrent_per_key: u32,
93}
94
95fn default_weight() -> u32 {
96    1
97}
98
99impl SchedulerPolicy {
100    pub fn fifo() -> Self {
101        Self::default()
102    }
103
104    pub fn deficit_round_robin(fairness_key: FairnessKey) -> Self {
105        Self {
106            strategy: SchedulerStrategy::DeficitRoundRobin {
107                quantum: 1,
108                starvation_age_ms: Some(DEFAULT_STARVATION_AGE_MS),
109            },
110            fairness_key,
111            weights: BTreeMap::new(),
112            default_weight: 1,
113            max_concurrent_per_key: 0,
114        }
115    }
116
117    pub fn with_weight(mut self, key: impl Into<String>, weight: u32) -> Self {
118        self.weights.insert(key.into(), weight.max(1));
119        self
120    }
121
122    pub fn with_max_concurrent_per_key(mut self, max: u32) -> Self {
123        self.max_concurrent_per_key = max;
124        self
125    }
126
127    pub fn with_starvation_age_ms(mut self, age_ms: u64) -> Self {
128        if let SchedulerStrategy::DeficitRoundRobin {
129            starvation_age_ms, ..
130        } = &mut self.strategy
131        {
132            *starvation_age_ms = Some(age_ms);
133        }
134        self
135    }
136
137    pub fn weight_for(&self, key: &str) -> u32 {
138        self.weights
139            .get(key)
140            .copied()
141            .unwrap_or(self.default_weight)
142            .max(1)
143    }
144
145    /// Build a policy from `HARN_SCHEDULER_*` environment variables.
146    ///
147    /// Recognised variables:
148    /// - `HARN_SCHEDULER_STRATEGY` — `fifo` (default) or `drr`.
149    /// - `HARN_SCHEDULER_FAIRNESS_KEY` — `tenant` (default) | `binding` |
150    ///   `trigger-id` | `tenant-and-binding`.
151    /// - `HARN_SCHEDULER_QUANTUM` — positive integer (default 1).
152    /// - `HARN_SCHEDULER_STARVATION_AGE_MS` — milliseconds (default 300000).
153    ///   Set to `0` to disable starvation-age promotion.
154    /// - `HARN_SCHEDULER_MAX_CONCURRENT_PER_KEY` — `0` for unlimited (default).
155    /// - `HARN_SCHEDULER_WEIGHTS` — comma-separated `key:weight` pairs (e.g.
156    ///   `tenant-a:3,tenant-b:1`). Unknown keys fall back to `default_weight`.
157    /// - `HARN_SCHEDULER_DEFAULT_WEIGHT` — positive integer (default 1).
158    ///
159    /// Invalid values fall back to defaults rather than failing — the
160    /// scheduler is best-effort and must not refuse to start.
161    pub fn from_env() -> Self {
162        Self::from_env_lookup(|name| std::env::var(name).ok())
163    }
164
165    /// Same as [`Self::from_env`] but driven by an explicit lookup function
166    /// (useful for tests).
167    pub fn from_env_lookup<F>(lookup: F) -> Self
168    where
169        F: Fn(&str) -> Option<String>,
170    {
171        let strategy_raw = lookup("HARN_SCHEDULER_STRATEGY")
172            .map(|value| value.trim().to_ascii_lowercase())
173            .unwrap_or_else(|| "fifo".to_string());
174        let fairness_key = parse_fairness_key(
175            lookup("HARN_SCHEDULER_FAIRNESS_KEY")
176                .as_deref()
177                .map(str::trim)
178                .unwrap_or(""),
179        );
180        let default_weight = lookup("HARN_SCHEDULER_DEFAULT_WEIGHT")
181            .as_deref()
182            .and_then(|raw| raw.trim().parse::<u32>().ok())
183            .filter(|n| *n >= 1)
184            .unwrap_or(1);
185        let weights = lookup("HARN_SCHEDULER_WEIGHTS")
186            .map(|raw| parse_weights(&raw))
187            .unwrap_or_default();
188        let max_concurrent_per_key = lookup("HARN_SCHEDULER_MAX_CONCURRENT_PER_KEY")
189            .as_deref()
190            .and_then(|raw| raw.trim().parse::<u32>().ok())
191            .unwrap_or(0);
192
193        let strategy = match strategy_raw.as_str() {
194            "drr" | "deficit-round-robin" | "fair-share" => {
195                let quantum = lookup("HARN_SCHEDULER_QUANTUM")
196                    .as_deref()
197                    .and_then(|raw| raw.trim().parse::<u32>().ok())
198                    .filter(|n| *n >= 1)
199                    .unwrap_or(1);
200                let starvation_age_ms = match lookup("HARN_SCHEDULER_STARVATION_AGE_MS").as_deref()
201                {
202                    Some(raw) => {
203                        let parsed = raw.trim().parse::<u64>().ok();
204                        match parsed {
205                            Some(0) => None,
206                            Some(n) => Some(n),
207                            None => Some(DEFAULT_STARVATION_AGE_MS),
208                        }
209                    }
210                    None => Some(DEFAULT_STARVATION_AGE_MS),
211                };
212                SchedulerStrategy::DeficitRoundRobin {
213                    quantum,
214                    starvation_age_ms,
215                }
216            }
217            _ => SchedulerStrategy::Fifo,
218        };
219
220        Self {
221            strategy,
222            fairness_key,
223            weights,
224            default_weight,
225            max_concurrent_per_key,
226        }
227    }
228
229    pub fn fairness_key_of(&self, job: &SchedulableJob<'_>) -> String {
230        match self.fairness_key {
231            FairnessKey::Tenant => job
232                .tenant_id
233                .map(|t| t.0.clone())
234                .unwrap_or_else(|| "_no_tenant".to_string()),
235            FairnessKey::Binding => job.binding_key.to_string(),
236            FairnessKey::TriggerId => job.trigger_id.to_string(),
237            FairnessKey::TenantAndBinding => {
238                let tenant = job.tenant_id.map(|t| t.0.as_str()).unwrap_or("_no_tenant");
239                format!("{tenant}|{}", job.binding_key)
240            }
241        }
242    }
243
244    pub fn strategy_name(&self) -> &'static str {
245        match self.strategy {
246            SchedulerStrategy::Fifo => "fifo",
247            SchedulerStrategy::DeficitRoundRobin { .. } => "drr",
248        }
249    }
250}
251
252/// Decoupled view over the bits of a job the scheduler needs.
253#[derive(Clone, Copy, Debug)]
254pub struct SchedulableJob<'a> {
255    pub job_event_id: u64,
256    pub enqueued_at_ms: i64,
257    pub priority: WorkerQueuePriority,
258    pub tenant_id: Option<&'a TenantId>,
259    pub binding_key: &'a str,
260    pub trigger_id: &'a str,
261    pub queue: &'a str,
262}
263
264impl<'a> SchedulableJob<'a> {
265    pub fn from_state(state: &'a WorkerQueueJobState) -> Self {
266        Self {
267            job_event_id: state.job_event_id,
268            enqueued_at_ms: state.enqueued_at_ms,
269            priority: state.job.priority,
270            tenant_id: state.job.event.tenant_id.as_ref(),
271            binding_key: state.job.binding_key.as_str(),
272            trigger_id: state.job.trigger_id.as_str(),
273            queue: state.job.queue.as_str(),
274        }
275    }
276}
277
278/// Owned identity of the selected job. Returned by [`SchedulerState::select`]
279/// so callers can look the original job up in their own data structures
280/// without grappling with the candidate lifetime.
281#[derive(Clone, Debug, PartialEq, Eq)]
282pub struct SchedulerSelection {
283    pub job_event_id: u64,
284    pub fairness_key: String,
285}
286
287/// Mutable in-memory state. Self-correcting: deficits even out over time so
288/// it is safe to lose this on process restart.
289#[derive(Clone, Debug, Default)]
290pub struct SchedulerState {
291    /// Current credit balance per fairness key. Refills happen lazily when a
292    /// round completes with no key holding credits.
293    credits: BTreeMap<String, u32>,
294    /// Currently-claimed job count per fairness key (used for
295    /// `max_concurrent_per_key`).
296    in_flight: BTreeMap<String, u32>,
297    /// Last fairness key selected — drives round-robin progression.
298    last_selected: Option<String>,
299    /// Cumulative selection count per fairness key (metrics).
300    selected_total: BTreeMap<String, u64>,
301    /// Cumulative deferral count per fairness key (metrics).
302    deferred_total: BTreeMap<String, u64>,
303    /// Number of times the scheduler force-selected a job because its age
304    /// exceeded the configured starvation threshold.
305    starvation_promotions_total: u64,
306    /// Number of complete round-robin rounds (refill events).
307    rounds_completed: u64,
308}
309
310impl SchedulerState {
311    pub fn new() -> Self {
312        Self::default()
313    }
314
315    /// Choose the next job to dispatch from `candidates`. Returns `None` if no
316    /// candidate is eligible (all keys at their `max_concurrent_per_key` cap
317    /// or `candidates` is empty).
318    pub fn select(
319        &mut self,
320        candidates: &[SchedulableJob<'_>],
321        policy: &SchedulerPolicy,
322        now_ms: i64,
323    ) -> Option<SchedulerSelection> {
324        if candidates.is_empty() {
325            return None;
326        }
327        match &policy.strategy {
328            SchedulerStrategy::Fifo => fifo_select(candidates, policy, now_ms),
329            SchedulerStrategy::DeficitRoundRobin {
330                quantum,
331                starvation_age_ms,
332            } => self.drr_select(candidates, policy, *quantum, *starvation_age_ms, now_ms),
333        }
334    }
335
336    /// Increment in-flight counter for a fairness key. Call when a claim
337    /// commits successfully.
338    pub fn note_claim_committed(&mut self, fairness_key: &str) {
339        *self.in_flight.entry(fairness_key.to_string()).or_default() += 1;
340    }
341
342    /// Decrement in-flight counter. Call when a claim is released, ack'd, or
343    /// expires.
344    pub fn note_claim_released(&mut self, fairness_key: &str) {
345        if let Some(count) = self.in_flight.get_mut(fairness_key) {
346            *count = count.saturating_sub(1);
347            if *count == 0 {
348                self.in_flight.remove(fairness_key);
349            }
350        }
351    }
352
353    /// Adopt an externally-observed claim count snapshot (used when the
354    /// in-flight state is reconstructed from the event log on a fresh process).
355    pub fn replace_in_flight(&mut self, snapshot: BTreeMap<String, u32>) {
356        self.in_flight = snapshot.into_iter().filter(|(_, n)| *n > 0).collect();
357    }
358
359    pub fn rounds_completed(&self) -> u64 {
360        self.rounds_completed
361    }
362
363    pub fn starvation_promotions_total(&self) -> u64 {
364        self.starvation_promotions_total
365    }
366
367    pub fn deficit_for(&self, key: &str) -> i64 {
368        self.credits.get(key).copied().unwrap_or(0) as i64
369    }
370
371    pub fn in_flight_for(&self, key: &str) -> u32 {
372        self.in_flight.get(key).copied().unwrap_or(0)
373    }
374
375    pub fn selected_total_for(&self, key: &str) -> u64 {
376        self.selected_total.get(key).copied().unwrap_or(0)
377    }
378
379    pub fn deferred_total_for(&self, key: &str) -> u64 {
380        self.deferred_total.get(key).copied().unwrap_or(0)
381    }
382
383    pub fn snapshot(
384        &self,
385        policy: &SchedulerPolicy,
386        ready_by_key: &BTreeMap<String, ReadyKeyStats>,
387    ) -> SchedulerSnapshot {
388        let mut all_keys: BTreeSet<String> = self.credits.keys().cloned().collect();
389        all_keys.extend(self.selected_total.keys().cloned());
390        all_keys.extend(self.deferred_total.keys().cloned());
391        all_keys.extend(self.in_flight.keys().cloned());
392        all_keys.extend(ready_by_key.keys().cloned());
393
394        let keys = all_keys
395            .into_iter()
396            .map(|key| {
397                let ready = ready_by_key
398                    .get(&key)
399                    .copied()
400                    .unwrap_or(ReadyKeyStats::default());
401                SchedulerKeyStat {
402                    fairness_key: key.clone(),
403                    weight: policy.weight_for(&key),
404                    deficit: self.deficit_for(&key),
405                    in_flight: self.in_flight_for(&key),
406                    selected_total: self.selected_total_for(&key),
407                    deferred_total: self.deferred_total_for(&key),
408                    ready_jobs: ready.ready_jobs,
409                    oldest_ready_age_ms: ready.oldest_ready_age_ms,
410                }
411            })
412            .collect();
413
414        SchedulerSnapshot {
415            strategy: policy.strategy_name().to_string(),
416            fairness_key: policy.fairness_key.as_str().to_string(),
417            rounds_completed: self.rounds_completed,
418            starvation_promotions_total: self.starvation_promotions_total,
419            keys,
420        }
421    }
422
423    fn drr_select(
424        &mut self,
425        candidates: &[SchedulableJob<'_>],
426        policy: &SchedulerPolicy,
427        quantum: u32,
428        starvation_age_ms: Option<u64>,
429        now_ms: i64,
430    ) -> Option<SchedulerSelection> {
431        // Group by fairness key, sorted within each group by
432        // (priority, enqueue, id) so the selected head is deterministic.
433        let mut groups: BTreeMap<String, Vec<&SchedulableJob<'_>>> = BTreeMap::new();
434        for job in candidates {
435            let key = policy.fairness_key_of(job);
436            groups.entry(key).or_default().push(job);
437        }
438        for jobs in groups.values_mut() {
439            jobs.sort_by_key(|j| {
440                (
441                    j.priority.effective_rank(j.enqueued_at_ms, now_ms),
442                    j.enqueued_at_ms,
443                    j.job_event_id,
444                )
445            });
446        }
447
448        // Starvation override: any eligible head whose age exceeds the
449        // threshold wins, with the oldest head breaking ties.
450        if let Some(threshold) = starvation_age_ms {
451            let mut oldest: Option<(i64, String, u64)> = None;
452            for (key, jobs) in &groups {
453                if policy.max_concurrent_per_key > 0
454                    && self.in_flight_for(key) >= policy.max_concurrent_per_key
455                {
456                    continue;
457                }
458                let head = match jobs.first() {
459                    Some(job) => *job,
460                    None => continue,
461                };
462                let age_ms = now_ms.saturating_sub(head.enqueued_at_ms).max(0);
463                if (age_ms as u64) >= threshold
464                    && oldest
465                        .as_ref()
466                        .map(|(prev, _, _)| head.enqueued_at_ms < *prev)
467                        .unwrap_or(true)
468                {
469                    oldest = Some((head.enqueued_at_ms, key.clone(), head.job_event_id));
470                }
471            }
472            if let Some((_, key, job_event_id)) = oldest {
473                self.starvation_promotions_total += 1;
474                self.commit_selection(&key);
475                return Some(SchedulerSelection {
476                    job_event_id,
477                    fairness_key: key,
478                });
479            }
480        }
481
482        // Eligible keys (under per-key concurrency cap and with ready work).
483        let mut eligible_keys: Vec<String> = groups
484            .iter()
485            .filter(|(key, jobs)| {
486                !jobs.is_empty()
487                    && (policy.max_concurrent_per_key == 0
488                        || self.in_flight_for(key) < policy.max_concurrent_per_key)
489            })
490            .map(|(key, _)| key.clone())
491            .collect();
492        eligible_keys.sort();
493        if eligible_keys.is_empty() {
494            for key in groups.keys() {
495                *self.deferred_total.entry(key.clone()).or_default() += 1;
496            }
497            return None;
498        }
499
500        let n = eligible_keys.len();
501        let start = self.start_index(&eligible_keys);
502
503        // Pass 1: find a key that already has credits.
504        for offset in 0..n {
505            let idx = (start + offset) % n;
506            let key = eligible_keys[idx].clone();
507            if self.credits.get(&key).copied().unwrap_or(0) >= 1 {
508                let job_event_id = groups
509                    .get(&key)
510                    .and_then(|jobs| jobs.first())
511                    .map(|job| job.job_event_id)?;
512                self.spend_credit(&key);
513                self.commit_selection(&key);
514                return Some(SchedulerSelection {
515                    job_event_id,
516                    fairness_key: key,
517                });
518            }
519        }
520
521        // Pass 2: refill all eligible keys (one full round) and try again.
522        for key in &eligible_keys {
523            let credits = policy.weight_for(key) as u64 * quantum as u64;
524            let credits = credits.min(u32::MAX as u64) as u32;
525            *self.credits.entry(key.clone()).or_insert(0) += credits;
526        }
527        self.rounds_completed += 1;
528
529        for offset in 0..n {
530            let idx = (start + offset) % n;
531            let key = eligible_keys[idx].clone();
532            if self.credits.get(&key).copied().unwrap_or(0) >= 1 {
533                let job_event_id = groups
534                    .get(&key)
535                    .and_then(|jobs| jobs.first())
536                    .map(|job| job.job_event_id)?;
537                self.spend_credit(&key);
538                self.commit_selection(&key);
539                return Some(SchedulerSelection {
540                    job_event_id,
541                    fairness_key: key,
542                });
543            }
544        }
545        None
546    }
547
548    fn start_index(&self, eligible_keys: &[String]) -> usize {
549        match &self.last_selected {
550            Some(last) => eligible_keys
551                .iter()
552                .position(|key| key.as_str() > last.as_str())
553                .unwrap_or(0),
554            None => 0,
555        }
556    }
557
558    fn spend_credit(&mut self, key: &str) {
559        if let Some(credits) = self.credits.get_mut(key) {
560            *credits = credits.saturating_sub(1);
561        }
562    }
563
564    fn commit_selection(&mut self, key: &str) {
565        self.last_selected = Some(key.to_string());
566        *self.selected_total.entry(key.to_string()).or_default() += 1;
567    }
568}
569
570fn fifo_select(
571    candidates: &[SchedulableJob<'_>],
572    policy: &SchedulerPolicy,
573    now_ms: i64,
574) -> Option<SchedulerSelection> {
575    let pick = candidates.iter().min_by_key(|job| {
576        (
577            job.priority.effective_rank(job.enqueued_at_ms, now_ms),
578            job.enqueued_at_ms,
579            job.job_event_id,
580        )
581    })?;
582    Some(SchedulerSelection {
583        job_event_id: pick.job_event_id,
584        fairness_key: policy.fairness_key_of(pick),
585    })
586}
587
588#[derive(Clone, Copy, Debug, Default, Serialize)]
589pub struct ReadyKeyStats {
590    pub ready_jobs: u32,
591    pub oldest_ready_age_ms: u64,
592}
593
594#[derive(Clone, Debug, Serialize)]
595pub struct SchedulerKeyStat {
596    pub fairness_key: String,
597    pub weight: u32,
598    pub deficit: i64,
599    pub in_flight: u32,
600    pub selected_total: u64,
601    pub deferred_total: u64,
602    pub ready_jobs: u32,
603    pub oldest_ready_age_ms: u64,
604}
605
606#[derive(Clone, Debug, Serialize)]
607pub struct SchedulerSnapshot {
608    pub strategy: String,
609    pub fairness_key: String,
610    pub rounds_completed: u64,
611    pub starvation_promotions_total: u64,
612    pub keys: Vec<SchedulerKeyStat>,
613}
614
615/// Aggregate per-fairness-key stats from the candidates currently in a queue
616/// state. Used to render the inspect snapshot.
617pub fn ready_stats_by_key(
618    jobs: &[WorkerQueueJobState],
619    policy: &SchedulerPolicy,
620    now_ms: i64,
621) -> BTreeMap<String, ReadyKeyStats> {
622    let mut out: BTreeMap<String, ReadyKeyStats> = BTreeMap::new();
623    for state in jobs.iter().filter(|j| j.is_ready()) {
624        let view = SchedulableJob::from_state(state);
625        let key = policy.fairness_key_of(&view);
626        let entry = out.entry(key).or_default();
627        entry.ready_jobs += 1;
628        let age = now_ms.saturating_sub(state.enqueued_at_ms).max(0) as u64;
629        if age > entry.oldest_ready_age_ms {
630            entry.oldest_ready_age_ms = age;
631        }
632    }
633    out
634}
635
636fn parse_fairness_key(raw: &str) -> FairnessKey {
637    match raw.to_ascii_lowercase().as_str() {
638        "binding" => FairnessKey::Binding,
639        "trigger-id" | "trigger_id" => FairnessKey::TriggerId,
640        "tenant-and-binding" | "tenant_and_binding" | "composite" => FairnessKey::TenantAndBinding,
641        _ => FairnessKey::Tenant,
642    }
643}
644
645fn parse_weights(raw: &str) -> BTreeMap<String, u32> {
646    let mut out = BTreeMap::new();
647    for entry in raw.split(',') {
648        let trimmed = entry.trim();
649        if trimmed.is_empty() {
650            continue;
651        }
652        if let Some((key, value)) = trimmed.rsplit_once(':') {
653            let key = key.trim().to_string();
654            if key.is_empty() {
655                continue;
656            }
657            if let Ok(weight) = value.trim().parse::<u32>() {
658                if weight >= 1 {
659                    out.insert(key, weight);
660                }
661            }
662        }
663    }
664    out
665}
666
667/// Aggregate active claim counts by fairness key from the queue state.
668pub fn in_flight_by_key(
669    jobs: &[WorkerQueueJobState],
670    policy: &SchedulerPolicy,
671) -> BTreeMap<String, u32> {
672    let mut out: BTreeMap<String, u32> = BTreeMap::new();
673    for state in jobs {
674        if state.acked || state.purged || state.active_claim.is_none() {
675            continue;
676        }
677        let view = SchedulableJob::from_state(state);
678        let key = policy.fairness_key_of(&view);
679        *out.entry(key).or_default() += 1;
680    }
681    out
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687    use crate::triggers::event::{
688        GenericWebhookPayload, KnownProviderPayload, ProviderId, ProviderPayload, SignatureStatus,
689        TraceId, TriggerEvent, TriggerEventId,
690    };
691    use crate::triggers::worker_queue::{WorkerQueueJob, WorkerQueueJobState};
692    use std::collections::BTreeMap as Map;
693
694    fn event(id: &str, tenant: Option<&str>) -> TriggerEvent {
695        TriggerEvent {
696            id: TriggerEventId(id.to_string()),
697            provider: ProviderId::from("test"),
698            kind: "test.event".to_string(),
699            trace_id: TraceId("trace-x".to_string()),
700            dedupe_key: id.to_string(),
701            tenant_id: tenant.map(TenantId::new),
702            headers: Map::new(),
703            batch: None,
704            raw_body: None,
705            provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
706                GenericWebhookPayload {
707                    source: None,
708                    content_type: None,
709                    raw: serde_json::json!({}),
710                },
711            )),
712            signature_status: SignatureStatus::Verified,
713            received_at: time::OffsetDateTime::now_utc(),
714            occurred_at: None,
715            dedupe_claimed: false,
716        }
717    }
718
719    fn state(
720        job_event_id: u64,
721        enqueued_at_ms: i64,
722        tenant: Option<&str>,
723        trigger_id: &str,
724        priority: WorkerQueuePriority,
725    ) -> WorkerQueueJobState {
726        WorkerQueueJobState {
727            job_event_id,
728            enqueued_at_ms,
729            job: WorkerQueueJob {
730                queue: "q".to_string(),
731                trigger_id: trigger_id.to_string(),
732                binding_key: format!("{trigger_id}@v1"),
733                binding_version: 1,
734                event: event(&format!("evt-{job_event_id}"), tenant),
735                replay_of_event_id: None,
736                priority,
737            },
738            active_claim: None,
739            acked: false,
740            purged: false,
741        }
742    }
743
744    fn snapshot_views<'a>(states: &'a [WorkerQueueJobState]) -> Vec<SchedulableJob<'a>> {
745        states.iter().map(SchedulableJob::from_state).collect()
746    }
747
748    #[test]
749    fn fifo_strategy_matches_priority_and_age() {
750        let jobs = vec![
751            state(1, 100, Some("a"), "t-low", WorkerQueuePriority::Low),
752            state(2, 50, Some("a"), "t-high", WorkerQueuePriority::High),
753            state(3, 200, Some("a"), "t-normal", WorkerQueuePriority::Normal),
754        ];
755        let mut sched = SchedulerState::new();
756        let policy = SchedulerPolicy::fifo();
757        let pick = sched
758            .select(&snapshot_views(&jobs), &policy, 1_000)
759            .unwrap();
760        assert_eq!(pick.job_event_id, 2, "high priority always wins under FIFO");
761    }
762
763    #[test]
764    fn drr_alternates_across_tenants_when_one_tenant_is_hot() {
765        // Tenant A has 100 jobs, tenant B has 1. With pure FIFO, A starves B.
766        // Under DRR with equal weights, we expect strict alternation.
767        let mut jobs = Vec::new();
768        for idx in 0..100 {
769            jobs.push(state(
770                100 + idx,
771                100 + idx as i64,
772                Some("tenant-a"),
773                "trigger",
774                WorkerQueuePriority::Normal,
775            ));
776        }
777        jobs.push(state(
778            5,
779            500,
780            Some("tenant-b"),
781            "trigger",
782            WorkerQueuePriority::Normal,
783        ));
784        let mut sched = SchedulerState::new();
785        let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant);
786
787        let first = sched
788            .select(&snapshot_views(&jobs), &policy, 10_000)
789            .unwrap();
790        let second_jobs: Vec<_> = jobs
791            .iter()
792            .filter(|j| j.job_event_id != first.job_event_id)
793            .cloned()
794            .collect();
795        let second = sched
796            .select(&snapshot_views(&second_jobs), &policy, 10_001)
797            .unwrap();
798        let tenants = [first.fairness_key.clone(), second.fairness_key.clone()];
799        assert!(
800            tenants.contains(&"tenant-a".to_string()) && tenants.contains(&"tenant-b".to_string()),
801            "expected both tenants represented in first two picks, got {tenants:?}",
802        );
803    }
804
805    #[test]
806    fn drr_respects_weighted_share_two_to_one() {
807        // weight a=2, b=1 → over many rounds, a should be selected ~2x b.
808        let now_ms = 1_000_000;
809        let mut sched = SchedulerState::new();
810        let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant)
811            .with_weight("tenant-a", 2)
812            .with_weight("tenant-b", 1);
813
814        let mut acount = 0;
815        let mut bcount = 0;
816        for _ in 0..120 {
817            // Always provide one fresh ready job per tenant.
818            let jobs = vec![
819                state(
820                    1,
821                    now_ms,
822                    Some("tenant-a"),
823                    "trigger",
824                    WorkerQueuePriority::Normal,
825                ),
826                state(
827                    2,
828                    now_ms,
829                    Some("tenant-b"),
830                    "trigger",
831                    WorkerQueuePriority::Normal,
832                ),
833            ];
834            let pick = sched
835                .select(&snapshot_views(&jobs), &policy, now_ms)
836                .unwrap();
837            match pick.fairness_key.as_str() {
838                "tenant-a" => acount += 1,
839                "tenant-b" => bcount += 1,
840                _ => unreachable!(),
841            }
842        }
843        // Allow ±10% drift due to round boundary effects.
844        let ratio = acount as f64 / bcount as f64;
845        assert!(
846            (1.8..=2.2).contains(&ratio),
847            "expected ~2:1 selection ratio, got a={acount} b={bcount} ratio={ratio:.3}",
848        );
849    }
850
851    #[test]
852    fn drr_starvation_promotion_picks_old_job_when_threshold_exceeded() {
853        let mut sched = SchedulerState::new();
854        let policy =
855            SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant).with_starvation_age_ms(1_000);
856
857        // First, drain credits for tenant-a.
858        for _ in 0..3 {
859            let jobs = vec![state(
860                1,
861                100,
862                Some("tenant-a"),
863                "trigger",
864                WorkerQueuePriority::Normal,
865            )];
866            sched.select(&snapshot_views(&jobs), &policy, 200).unwrap();
867        }
868
869        // Now cold tenant-b has an ancient job; tenant-a has a fresh job. Even
870        // though scheduler would normally rotate, the starvation rule should
871        // pick the ancient tenant-b job.
872        let jobs = vec![
873            state(
874                2,
875                10,
876                Some("tenant-b"),
877                "trigger",
878                WorkerQueuePriority::Normal,
879            ),
880            state(
881                3,
882                10_000,
883                Some("tenant-a"),
884                "trigger",
885                WorkerQueuePriority::Normal,
886            ),
887        ];
888        let pick = sched
889            .select(&snapshot_views(&jobs), &policy, 20_000)
890            .unwrap();
891        assert_eq!(pick.fairness_key, "tenant-b");
892        assert_eq!(sched.starvation_promotions_total(), 1);
893    }
894
895    #[test]
896    fn drr_max_concurrent_per_key_blocks_hot_tenant() {
897        let mut sched = SchedulerState::new();
898        let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant)
899            .with_max_concurrent_per_key(1);
900
901        // Pretend tenant-a is already at its cap.
902        sched.note_claim_committed("tenant-a");
903
904        // Both tenants have jobs ready; tenant-a is older (would normally win
905        // under priority/FIFO). Scheduler must skip tenant-a and pick b.
906        let jobs = vec![
907            state(
908                1,
909                10,
910                Some("tenant-a"),
911                "trigger",
912                WorkerQueuePriority::Normal,
913            ),
914            state(
915                2,
916                500,
917                Some("tenant-b"),
918                "trigger",
919                WorkerQueuePriority::Normal,
920            ),
921        ];
922        let pick = sched
923            .select(&snapshot_views(&jobs), &policy, 1_000)
924            .unwrap();
925        assert_eq!(pick.fairness_key, "tenant-b");
926    }
927
928    #[test]
929    fn drr_returns_none_when_all_keys_capped() {
930        let mut sched = SchedulerState::new();
931        let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant)
932            .with_max_concurrent_per_key(1);
933        sched.note_claim_committed("tenant-a");
934        sched.note_claim_committed("tenant-b");
935
936        let jobs = vec![
937            state(
938                1,
939                10,
940                Some("tenant-a"),
941                "trigger",
942                WorkerQueuePriority::Normal,
943            ),
944            state(
945                2,
946                500,
947                Some("tenant-b"),
948                "trigger",
949                WorkerQueuePriority::Normal,
950            ),
951        ];
952        assert!(sched
953            .select(&snapshot_views(&jobs), &policy, 1_000)
954            .is_none());
955        assert_eq!(sched.deferred_total_for("tenant-a"), 1);
956        assert_eq!(sched.deferred_total_for("tenant-b"), 1);
957    }
958
959    #[test]
960    fn drr_priority_within_a_tenant_still_holds() {
961        let mut sched = SchedulerState::new();
962        let policy = SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant);
963        let jobs = vec![
964            state(
965                1,
966                100,
967                Some("tenant-a"),
968                "trigger-low",
969                WorkerQueuePriority::Low,
970            ),
971            state(
972                2,
973                100,
974                Some("tenant-a"),
975                "trigger-high",
976                WorkerQueuePriority::High,
977            ),
978        ];
979        let pick = sched
980            .select(&snapshot_views(&jobs), &policy, 1_000)
981            .unwrap();
982        assert_eq!(
983            pick.job_event_id, 2,
984            "high priority should win within a tenant"
985        );
986    }
987
988    #[test]
989    fn snapshot_includes_fairness_state_per_key() {
990        let mut sched = SchedulerState::new();
991        let policy =
992            SchedulerPolicy::deficit_round_robin(FairnessKey::Tenant).with_weight("tenant-a", 3);
993
994        for _ in 0..5 {
995            let jobs = vec![
996                state(
997                    1,
998                    100,
999                    Some("tenant-a"),
1000                    "trigger",
1001                    WorkerQueuePriority::Normal,
1002                ),
1003                state(
1004                    2,
1005                    200,
1006                    Some("tenant-b"),
1007                    "trigger",
1008                    WorkerQueuePriority::Normal,
1009                ),
1010            ];
1011            sched.select(&snapshot_views(&jobs), &policy, 300).unwrap();
1012        }
1013
1014        let states = vec![
1015            state(
1016                3,
1017                100,
1018                Some("tenant-a"),
1019                "trigger",
1020                WorkerQueuePriority::Normal,
1021            ),
1022            state(
1023                4,
1024                100,
1025                Some("tenant-b"),
1026                "trigger",
1027                WorkerQueuePriority::Normal,
1028            ),
1029        ];
1030        let ready = ready_stats_by_key(&states, &policy, 5_000);
1031        let snap = sched.snapshot(&policy, &ready);
1032        assert_eq!(snap.strategy, "drr");
1033        assert_eq!(snap.fairness_key, "tenant");
1034        let a = snap
1035            .keys
1036            .iter()
1037            .find(|k| k.fairness_key == "tenant-a")
1038            .unwrap();
1039        assert_eq!(a.weight, 3);
1040        let b = snap
1041            .keys
1042            .iter()
1043            .find(|k| k.fairness_key == "tenant-b")
1044            .unwrap();
1045        assert!(a.selected_total > b.selected_total);
1046        assert!(a.ready_jobs >= 1 && b.ready_jobs >= 1);
1047    }
1048}