Skip to main content

harn_vm/triggers/
worker_queue.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use crate::event_log::{
9    sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12use super::scheduler::{self, SchedulableJob, SchedulerPolicy, SchedulerSnapshot, SchedulerState};
13use super::{DispatchOutcome, TriggerEvent};
14
15pub const WORKER_QUEUE_CATALOG_TOPIC: &str = "worker.queues";
16const WORKER_QUEUE_CLAIMS_SUFFIX: &str = ".claims";
17const WORKER_QUEUE_RESPONSES_SUFFIX: &str = ".responses";
18const NORMAL_PROMOTION_AGE_MS: i64 = 15 * 60 * 1000;
19
20#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum WorkerQueuePriority {
23    High,
24    #[default]
25    Normal,
26    Low,
27}
28
29impl WorkerQueuePriority {
30    pub fn as_str(self) -> &'static str {
31        match self {
32            Self::High => "high",
33            Self::Normal => "normal",
34            Self::Low => "low",
35        }
36    }
37
38    pub fn effective_rank(self, enqueued_at_ms: i64, now_ms: i64) -> u8 {
39        match self {
40            Self::High => 0,
41            Self::Normal if now_ms.saturating_sub(enqueued_at_ms) >= NORMAL_PROMOTION_AGE_MS => 0,
42            Self::Normal => 1,
43            Self::Low => 2,
44        }
45    }
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49pub struct WorkerQueueJob {
50    pub queue: String,
51    pub trigger_id: String,
52    pub binding_key: String,
53    pub binding_version: u32,
54    pub event: TriggerEvent,
55    #[serde(default)]
56    pub replay_of_event_id: Option<String>,
57    #[serde(default)]
58    pub priority: WorkerQueuePriority,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
62pub struct WorkerQueueEnqueueReceipt {
63    pub queue: String,
64    pub job_event_id: u64,
65    pub response_topic: String,
66}
67
68#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69pub struct WorkerQueueClaimHandle {
70    pub queue: String,
71    pub job_event_id: u64,
72    pub claim_id: String,
73    pub consumer_id: String,
74    pub expires_at_ms: i64,
75}
76
77#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
78pub struct ClaimedWorkerJob {
79    pub handle: WorkerQueueClaimHandle,
80    pub job: WorkerQueueJob,
81}
82
83#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
84pub struct WorkerQueueResponseRecord {
85    pub queue: String,
86    pub job_event_id: u64,
87    pub consumer_id: String,
88    pub handled_at_ms: i64,
89    pub outcome: Option<DispatchOutcome>,
90    pub error: Option<String>,
91}
92
93#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct WorkerQueueSummary {
95    pub queue: String,
96    pub ready: usize,
97    pub in_flight: usize,
98    pub acked: usize,
99    pub purged: usize,
100    pub responses: usize,
101    pub oldest_unclaimed_age_ms: Option<u64>,
102}
103
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105pub struct WorkerQueueJobState {
106    pub job_event_id: u64,
107    pub enqueued_at_ms: i64,
108    pub job: WorkerQueueJob,
109    pub active_claim: Option<WorkerQueueClaimHandle>,
110    pub acked: bool,
111    pub purged: bool,
112}
113
114impl WorkerQueueJobState {
115    pub fn is_ready(&self) -> bool {
116        !self.acked && !self.purged && self.active_claim.is_none()
117    }
118}
119
120#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
121pub struct WorkerQueueState {
122    pub queue: String,
123    pub responses: Vec<WorkerQueueResponseRecord>,
124    pub jobs: Vec<WorkerQueueJobState>,
125}
126
127impl WorkerQueueState {
128    pub fn summary(&self, now_ms: i64) -> WorkerQueueSummary {
129        let ready = self.jobs.iter().filter(|job| job.is_ready()).count();
130        let in_flight = self
131            .jobs
132            .iter()
133            .filter(|job| !job.acked && !job.purged && job.active_claim.is_some())
134            .count();
135        let acked = self.jobs.iter().filter(|job| job.acked).count();
136        let purged = self.jobs.iter().filter(|job| job.purged).count();
137        let oldest_unclaimed_age_ms = self
138            .jobs
139            .iter()
140            .filter(|job| job.is_ready())
141            .map(|job| now_ms.saturating_sub(job.enqueued_at_ms).max(0) as u64)
142            .max();
143        WorkerQueueSummary {
144            queue: self.queue.clone(),
145            ready,
146            in_flight,
147            acked,
148            purged,
149            responses: self.responses.len(),
150            oldest_unclaimed_age_ms,
151        }
152    }
153
154    /// Select the next ready job by consulting `scheduler` under `policy`.
155    ///
156    /// Under `Fifo` this is equivalent to picking the job with the lowest
157    /// `(priority_rank, enqueued_at_ms, job_event_id)` — the historical
158    /// behaviour. Under `DeficitRoundRobin`, candidates are grouped by the
159    /// configured fairness key and the scheduler rotates so a hot
160    /// tenant/binding cannot monopolise the queue.
161    fn next_ready_job_with_scheduler(
162        &self,
163        scheduler_state: &mut SchedulerState,
164        policy: &SchedulerPolicy,
165        now_ms: i64,
166    ) -> Option<&WorkerQueueJobState> {
167        let candidates: Vec<&WorkerQueueJobState> =
168            self.jobs.iter().filter(|job| job.is_ready()).collect();
169        if candidates.is_empty() {
170            return None;
171        }
172        let views: Vec<SchedulableJob<'_>> = candidates
173            .iter()
174            .map(|state| SchedulableJob::from_state(state))
175            .collect();
176
177        // Refresh authoritative in-flight count from the rebuilt queue state.
178        let in_flight = scheduler::in_flight_by_key(&self.jobs, policy);
179        scheduler_state.replace_in_flight(in_flight);
180
181        let pick = scheduler_state.select(&views, policy, now_ms)?;
182        candidates
183            .into_iter()
184            .find(|job| job.job_event_id == pick.job_event_id)
185    }
186
187    fn active_claim_for(&self, job_event_id: u64) -> Option<&WorkerQueueClaimHandle> {
188        self.jobs
189            .iter()
190            .find(|job| job.job_event_id == job_event_id)
191            .and_then(|job| job.active_claim.as_ref())
192    }
193}
194
195#[derive(Clone)]
196pub struct WorkerQueue {
197    event_log: Arc<AnyEventLog>,
198    /// Active scheduler policy. Reads on every claim so it can be hot-swapped
199    /// at runtime without rebuilding the queue.
200    policy: Arc<RwLock<SchedulerPolicy>>,
201    /// Per-queue ephemeral scheduler state. Keyed by queue name; entries are
202    /// created lazily on first claim. Self-correcting — safe to lose on
203    /// process restart.
204    scheduler_states: Arc<Mutex<BTreeMap<String, SchedulerState>>>,
205}
206
207#[derive(Clone, Debug, Serialize)]
208pub struct WorkerQueueInspectSnapshot {
209    pub summary: WorkerQueueSummary,
210    pub scheduler: SchedulerSnapshot,
211}
212
213impl WorkerQueue {
214    /// Construct a `WorkerQueue` using the policy derived from the
215    /// `HARN_SCHEDULER_*` environment variables (see
216    /// [`SchedulerPolicy::from_env`]). Defaults to FIFO so single-tenant
217    /// deployments behave exactly as before unless they opt in.
218    pub fn new(event_log: Arc<AnyEventLog>) -> Self {
219        Self::with_policy(event_log, SchedulerPolicy::from_env())
220    }
221
222    pub fn with_policy(event_log: Arc<AnyEventLog>, policy: SchedulerPolicy) -> Self {
223        Self {
224            event_log,
225            policy: Arc::new(RwLock::new(policy)),
226            scheduler_states: Arc::new(Mutex::new(BTreeMap::new())),
227        }
228    }
229
230    /// Replace the active scheduler policy. Existing per-queue state is
231    /// preserved (deficits self-correct against the new weights).
232    pub fn set_policy(&self, policy: SchedulerPolicy) {
233        *self.policy.write().expect("scheduler policy poisoned") = policy;
234    }
235
236    pub fn policy(&self) -> SchedulerPolicy {
237        self.policy
238            .read()
239            .expect("scheduler policy poisoned")
240            .clone()
241    }
242
243    pub async fn enqueue(
244        &self,
245        job: &WorkerQueueJob,
246    ) -> Result<WorkerQueueEnqueueReceipt, LogError> {
247        let queue = job.queue.trim();
248        if queue.is_empty() {
249            return Err(LogError::Config(
250                "worker queue name cannot be empty".to_string(),
251            ));
252        }
253        let queue_name = queue.to_string();
254        let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
255            .expect("static worker queue catalog topic should always be valid");
256        self.event_log
257            .append(
258                &catalog_topic,
259                LogEvent::new(
260                    "queue_seen",
261                    serde_json::to_value(WorkerQueueCatalogRecord {
262                        queue: queue_name.clone(),
263                    })
264                    .map_err(|error| LogError::Serde(error.to_string()))?,
265                ),
266            )
267            .await?;
268
269        let job_topic = job_topic(&queue_name)?;
270        let mut headers = BTreeMap::new();
271        headers.insert("queue".to_string(), queue_name.clone());
272        headers.insert("trigger_id".to_string(), job.trigger_id.clone());
273        headers.insert("binding_key".to_string(), job.binding_key.clone());
274        headers.insert("event_id".to_string(), job.event.id.0.clone());
275        headers.insert("priority".to_string(), job.priority.as_str().to_string());
276        let job_event_id = self
277            .event_log
278            .append(
279                &job_topic,
280                LogEvent::new(
281                    "trigger_dispatch",
282                    serde_json::to_value(job)
283                        .map_err(|error| LogError::Serde(error.to_string()))?,
284                )
285                .with_headers(headers),
286            )
287            .await?;
288        if let Some(metrics) = crate::active_metrics_registry() {
289            if let Ok(state) = self.queue_state(&queue_name).await {
290                let summary = state.summary(now_ms());
291                metrics.set_worker_queue_depth(
292                    &queue_name,
293                    (summary.ready + summary.in_flight) as u64,
294                );
295            }
296        }
297        Ok(WorkerQueueEnqueueReceipt {
298            queue: queue_name.clone(),
299            job_event_id,
300            response_topic: response_topic_name(&queue_name),
301        })
302    }
303
304    pub async fn known_queues(&self) -> Result<Vec<String>, LogError> {
305        let topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
306            .expect("static worker queue catalog topic should always be valid");
307        let events = self.event_log.read_range(&topic, None, usize::MAX).await?;
308        let mut queues = BTreeSet::new();
309        for (_, event) in events {
310            if event.kind != "queue_seen" {
311                continue;
312            }
313            let record: WorkerQueueCatalogRecord = serde_json::from_value(event.payload)
314                .map_err(|error| LogError::Serde(error.to_string()))?;
315            if !record.queue.trim().is_empty() {
316                queues.insert(record.queue);
317            }
318        }
319        Ok(queues.into_iter().collect())
320    }
321
322    pub async fn queue_state(&self, queue: &str) -> Result<WorkerQueueState, LogError> {
323        let queue_name = queue.trim();
324        if queue_name.is_empty() {
325            return Err(LogError::Config(
326                "worker queue name cannot be empty".to_string(),
327            ));
328        }
329        let now_ms = now_ms();
330        let job_events = self
331            .event_log
332            .read_range(&job_topic(queue_name)?, None, usize::MAX)
333            .await?;
334        let claim_events = self
335            .event_log
336            .read_range(&claims_topic(queue_name)?, None, usize::MAX)
337            .await?;
338        let response_events = self
339            .event_log
340            .read_range(&responses_topic(queue_name)?, None, usize::MAX)
341            .await?;
342
343        let mut jobs = BTreeMap::<u64, WorkerQueueJobStateInternal>::new();
344        for (job_event_id, event) in job_events {
345            if event.kind != "trigger_dispatch" {
346                continue;
347            }
348            let job: WorkerQueueJob = serde_json::from_value(event.payload)
349                .map_err(|error| LogError::Serde(error.to_string()))?;
350            jobs.insert(
351                job_event_id,
352                WorkerQueueJobStateInternal {
353                    job_event_id,
354                    enqueued_at_ms: event.occurred_at_ms,
355                    job,
356                    active_claim: None,
357                    acked: false,
358                    purged: false,
359                    seen_claim_ids: BTreeSet::new(),
360                },
361            );
362        }
363
364        for (_, event) in claim_events {
365            match event.kind.as_str() {
366                "job_claimed" => {
367                    let claim: WorkerQueueClaimRecord = serde_json::from_value(event.payload)
368                        .map_err(|error| LogError::Serde(error.to_string()))?;
369                    let Some(job) = jobs.get_mut(&claim.job_event_id) else {
370                        continue;
371                    };
372                    if job.acked || job.purged {
373                        continue;
374                    }
375                    job.seen_claim_ids.insert(claim.claim_id.clone());
376                    let can_take = job
377                        .active_claim
378                        .as_ref()
379                        .is_none_or(|active| active.expires_at_ms <= claim.claimed_at_ms);
380                    if can_take {
381                        job.active_claim = Some(WorkerQueueClaimHandle {
382                            queue: queue_name.to_string(),
383                            job_event_id: claim.job_event_id,
384                            claim_id: claim.claim_id,
385                            consumer_id: claim.consumer_id,
386                            expires_at_ms: claim.expires_at_ms,
387                        });
388                    }
389                }
390                "claim_renewed" => {
391                    let renewal: WorkerQueueClaimRenewalRecord =
392                        serde_json::from_value(event.payload)
393                            .map_err(|error| LogError::Serde(error.to_string()))?;
394                    let Some(job) = jobs.get_mut(&renewal.job_event_id) else {
395                        continue;
396                    };
397                    if let Some(active) = job.active_claim.as_mut() {
398                        if active.claim_id == renewal.claim_id {
399                            active.expires_at_ms = renewal.expires_at_ms;
400                        }
401                    }
402                }
403                "job_released" => {
404                    let release: WorkerQueueReleaseRecord =
405                        serde_json::from_value(event.payload)
406                            .map_err(|error| LogError::Serde(error.to_string()))?;
407                    let Some(job) = jobs.get_mut(&release.job_event_id) else {
408                        continue;
409                    };
410                    if job
411                        .active_claim
412                        .as_ref()
413                        .is_some_and(|active| active.claim_id == release.claim_id)
414                    {
415                        job.active_claim = None;
416                    }
417                }
418                "job_acked" => {
419                    let ack: WorkerQueueAckRecord = serde_json::from_value(event.payload)
420                        .map_err(|error| LogError::Serde(error.to_string()))?;
421                    let Some(job) = jobs.get_mut(&ack.job_event_id) else {
422                        continue;
423                    };
424                    if ack.claim_id.is_empty() || job.seen_claim_ids.contains(&ack.claim_id) {
425                        job.acked = true;
426                        job.active_claim = None;
427                    }
428                }
429                "job_purged" => {
430                    let purge: WorkerQueuePurgeRecord = serde_json::from_value(event.payload)
431                        .map_err(|error| LogError::Serde(error.to_string()))?;
432                    let Some(job) = jobs.get_mut(&purge.job_event_id) else {
433                        continue;
434                    };
435                    if !job.acked {
436                        job.purged = true;
437                        job.active_claim = None;
438                    }
439                }
440                _ => {}
441            }
442        }
443
444        let responses = response_events
445            .into_iter()
446            .filter(|(_, event)| event.kind == "job_response")
447            .map(|(_, event)| {
448                serde_json::from_value::<WorkerQueueResponseRecord>(event.payload)
449                    .map_err(|error| LogError::Serde(error.to_string()))
450            })
451            .collect::<Result<Vec<_>, _>>()?;
452
453        let mut queue_state = WorkerQueueState {
454            queue: queue_name.to_string(),
455            responses,
456            jobs: jobs
457                .into_values()
458                .map(|mut job| {
459                    if job
460                        .active_claim
461                        .as_ref()
462                        .is_some_and(|active| active.expires_at_ms <= now_ms)
463                    {
464                        job.active_claim = None;
465                    }
466                    WorkerQueueJobState {
467                        job_event_id: job.job_event_id,
468                        enqueued_at_ms: job.enqueued_at_ms,
469                        job: job.job,
470                        active_claim: job.active_claim,
471                        acked: job.acked,
472                        purged: job.purged,
473                    }
474                })
475                .collect(),
476        };
477        queue_state
478            .jobs
479            .sort_by_key(|job| (job.enqueued_at_ms, job.job_event_id));
480        Ok(queue_state)
481    }
482
483    pub async fn queue_summaries(&self) -> Result<Vec<WorkerQueueSummary>, LogError> {
484        let now_ms = now_ms();
485        let mut summaries = Vec::new();
486        for queue in self.known_queues().await? {
487            let state = self.queue_state(&queue).await?;
488            summaries.push(state.summary(now_ms));
489        }
490        summaries.sort_by(|left, right| left.queue.cmp(&right.queue));
491        Ok(summaries)
492    }
493
494    pub async fn claim_next(
495        &self,
496        queue: &str,
497        consumer_id: &str,
498        ttl: StdDuration,
499    ) -> Result<Option<ClaimedWorkerJob>, LogError> {
500        let queue_name = queue.trim();
501        if queue_name.is_empty() {
502            return Err(LogError::Config(
503                "worker queue name cannot be empty".to_string(),
504            ));
505        }
506        if consumer_id.trim().is_empty() {
507            return Err(LogError::InvalidConsumer(
508                "worker queue consumer id cannot be empty".to_string(),
509            ));
510        }
511        let policy = self.policy();
512        for _ in 0..8 {
513            let now_ms = now_ms();
514            let state = self.queue_state(queue_name).await?;
515            let (job, fairness_key) = {
516                let mut states = self
517                    .scheduler_states
518                    .lock()
519                    .expect("scheduler state poisoned");
520                let scheduler_state = states.entry(queue_name.to_string()).or_default();
521                let Some(job) =
522                    state.next_ready_job_with_scheduler(scheduler_state, &policy, now_ms)
523                else {
524                    return Ok(None);
525                };
526                let job = job.clone();
527                let fairness_key = policy.fairness_key_of(&SchedulableJob::from_state(&job));
528                (job, fairness_key)
529            };
530            let claim = WorkerQueueClaimRecord {
531                job_event_id: job.job_event_id,
532                claim_id: Uuid::new_v4().to_string(),
533                consumer_id: consumer_id.to_string(),
534                claimed_at_ms: now_ms,
535                expires_at_ms: expiry_ms(now_ms, ttl),
536            };
537            self.event_log
538                .append(
539                    &claims_topic(queue_name)?,
540                    LogEvent::new(
541                        "job_claimed",
542                        serde_json::to_value(&claim)
543                            .map_err(|error| LogError::Serde(error.to_string()))?,
544                    ),
545                )
546                .await?;
547            let refreshed = self.queue_state(queue_name).await?;
548            if refreshed
549                .active_claim_for(job.job_event_id)
550                .is_some_and(|active| active.claim_id == claim.claim_id)
551            {
552                {
553                    let mut states = self
554                        .scheduler_states
555                        .lock()
556                        .expect("scheduler state poisoned");
557                    let scheduler_state = states.entry(queue_name.to_string()).or_default();
558                    scheduler_state.note_claim_committed(&fairness_key);
559                }
560                if let Some(metrics) = crate::active_metrics_registry() {
561                    let summary = refreshed.summary(now_ms);
562                    metrics.record_worker_queue_claim_age(
563                        queue_name,
564                        now_ms.saturating_sub(job.enqueued_at_ms) as f64 / 1000.0,
565                    );
566                    metrics.set_worker_queue_depth(
567                        queue_name,
568                        (summary.ready + summary.in_flight) as u64,
569                    );
570                    metrics.record_scheduler_selection(
571                        queue_name,
572                        policy.fairness_key.as_str(),
573                        &fairness_key,
574                    );
575                    if let Ok(snap) = self.inspect_queue(queue_name).await {
576                        for stat in &snap.scheduler.keys {
577                            metrics.set_scheduler_deficit(
578                                queue_name,
579                                policy.fairness_key.as_str(),
580                                &stat.fairness_key,
581                                stat.deficit,
582                            );
583                            metrics.set_scheduler_oldest_eligible_age(
584                                queue_name,
585                                policy.fairness_key.as_str(),
586                                &stat.fairness_key,
587                                stat.oldest_ready_age_ms,
588                            );
589                        }
590                    }
591                }
592                return Ok(Some(ClaimedWorkerJob {
593                    handle: WorkerQueueClaimHandle {
594                        queue: queue_name.to_string(),
595                        job_event_id: claim.job_event_id,
596                        claim_id: claim.claim_id,
597                        consumer_id: claim.consumer_id,
598                        expires_at_ms: claim.expires_at_ms,
599                    },
600                    job: job.job,
601                }));
602            }
603        }
604        Ok(None)
605    }
606
607    /// Build a fairness-aware inspect snapshot for `queue` that includes
608    /// scheduler state alongside the standard summary.
609    pub async fn inspect_queue(&self, queue: &str) -> Result<WorkerQueueInspectSnapshot, LogError> {
610        let queue_name = queue.trim();
611        if queue_name.is_empty() {
612            return Err(LogError::Config(
613                "worker queue name cannot be empty".to_string(),
614            ));
615        }
616        let now_ms = now_ms();
617        let state = self.queue_state(queue_name).await?;
618        let summary = state.summary(now_ms);
619        let policy = self.policy();
620        let ready = scheduler::ready_stats_by_key(&state.jobs, &policy, now_ms);
621        // Make sure in-flight stays authoritative against the rebuilt log.
622        let in_flight = scheduler::in_flight_by_key(&state.jobs, &policy);
623        let scheduler_snapshot = {
624            let mut states = self
625                .scheduler_states
626                .lock()
627                .expect("scheduler state poisoned");
628            let scheduler_state = states.entry(queue_name.to_string()).or_default();
629            scheduler_state.replace_in_flight(in_flight);
630            scheduler_state.snapshot(&policy, &ready)
631        };
632        Ok(WorkerQueueInspectSnapshot {
633            summary,
634            scheduler: scheduler_snapshot,
635        })
636    }
637
638    /// Inspect snapshots for every known queue.
639    pub async fn inspect_all_queues(&self) -> Result<Vec<WorkerQueueInspectSnapshot>, LogError> {
640        let mut snapshots = Vec::new();
641        for queue in self.known_queues().await? {
642            snapshots.push(self.inspect_queue(&queue).await?);
643        }
644        snapshots.sort_by(|left, right| left.summary.queue.cmp(&right.summary.queue));
645        Ok(snapshots)
646    }
647
648    pub async fn renew_claim(
649        &self,
650        handle: &WorkerQueueClaimHandle,
651        ttl: StdDuration,
652    ) -> Result<bool, LogError> {
653        let now_ms = now_ms();
654        let renewal = WorkerQueueClaimRenewalRecord {
655            job_event_id: handle.job_event_id,
656            claim_id: handle.claim_id.clone(),
657            consumer_id: handle.consumer_id.clone(),
658            renewed_at_ms: now_ms,
659            expires_at_ms: expiry_ms(now_ms, ttl),
660        };
661        self.event_log
662            .append(
663                &claims_topic(&handle.queue)?,
664                LogEvent::new(
665                    "claim_renewed",
666                    serde_json::to_value(&renewal)
667                        .map_err(|error| LogError::Serde(error.to_string()))?,
668                ),
669            )
670            .await?;
671        let refreshed = self.queue_state(&handle.queue).await?;
672        Ok(refreshed
673            .active_claim_for(handle.job_event_id)
674            .is_some_and(|active| active.claim_id == handle.claim_id))
675    }
676
677    pub async fn release_claim(
678        &self,
679        handle: &WorkerQueueClaimHandle,
680        reason: &str,
681    ) -> Result<(), LogError> {
682        let release = WorkerQueueReleaseRecord {
683            job_event_id: handle.job_event_id,
684            claim_id: handle.claim_id.clone(),
685            consumer_id: handle.consumer_id.clone(),
686            released_at_ms: now_ms(),
687            reason: if reason.trim().is_empty() {
688                None
689            } else {
690                Some(reason.to_string())
691            },
692        };
693        self.event_log
694            .append(
695                &claims_topic(&handle.queue)?,
696                LogEvent::new(
697                    "job_released",
698                    serde_json::to_value(&release)
699                        .map_err(|error| LogError::Serde(error.to_string()))?,
700                ),
701            )
702            .await?;
703        Ok(())
704    }
705
706    pub async fn append_response(
707        &self,
708        queue: &str,
709        response: &WorkerQueueResponseRecord,
710    ) -> Result<u64, LogError> {
711        self.event_log
712            .append(
713                &responses_topic(queue)?,
714                LogEvent::new(
715                    "job_response",
716                    serde_json::to_value(response)
717                        .map_err(|error| LogError::Serde(error.to_string()))?,
718                ),
719            )
720            .await
721    }
722
723    pub async fn ack_claim(&self, handle: &WorkerQueueClaimHandle) -> Result<u64, LogError> {
724        self.event_log
725            .append(
726                &claims_topic(&handle.queue)?,
727                LogEvent::new(
728                    "job_acked",
729                    serde_json::to_value(WorkerQueueAckRecord {
730                        job_event_id: handle.job_event_id,
731                        claim_id: handle.claim_id.clone(),
732                        consumer_id: handle.consumer_id.clone(),
733                        acked_at_ms: now_ms(),
734                    })
735                    .map_err(|error| LogError::Serde(error.to_string()))?,
736                ),
737            )
738            .await
739    }
740
741    pub async fn purge_unclaimed(
742        &self,
743        queue: &str,
744        purged_by: &str,
745        reason: Option<&str>,
746    ) -> Result<usize, LogError> {
747        let state = self.queue_state(queue).await?;
748        let ready_jobs: Vec<_> = state
749            .jobs
750            .into_iter()
751            .filter(|job| job.is_ready())
752            .map(|job| job.job_event_id)
753            .collect();
754        for job_event_id in &ready_jobs {
755            self.event_log
756                .append(
757                    &claims_topic(queue)?,
758                    LogEvent::new(
759                        "job_purged",
760                        serde_json::to_value(WorkerQueuePurgeRecord {
761                            job_event_id: *job_event_id,
762                            purged_by: purged_by.to_string(),
763                            purged_at_ms: now_ms(),
764                            reason: reason
765                                .filter(|value| !value.trim().is_empty())
766                                .map(|value| value.to_string()),
767                        })
768                        .map_err(|error| LogError::Serde(error.to_string()))?,
769                    ),
770                )
771                .await?;
772        }
773        Ok(ready_jobs.len())
774    }
775}
776
777#[derive(Clone, Debug)]
778struct WorkerQueueJobStateInternal {
779    job_event_id: u64,
780    enqueued_at_ms: i64,
781    job: WorkerQueueJob,
782    active_claim: Option<WorkerQueueClaimHandle>,
783    acked: bool,
784    purged: bool,
785    seen_claim_ids: BTreeSet<String>,
786}
787
788#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
789struct WorkerQueueCatalogRecord {
790    queue: String,
791}
792
793#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
794struct WorkerQueueClaimRecord {
795    job_event_id: u64,
796    claim_id: String,
797    consumer_id: String,
798    claimed_at_ms: i64,
799    expires_at_ms: i64,
800}
801
802#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
803struct WorkerQueueClaimRenewalRecord {
804    job_event_id: u64,
805    claim_id: String,
806    consumer_id: String,
807    renewed_at_ms: i64,
808    expires_at_ms: i64,
809}
810
811#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
812struct WorkerQueueReleaseRecord {
813    job_event_id: u64,
814    claim_id: String,
815    consumer_id: String,
816    released_at_ms: i64,
817    #[serde(default)]
818    reason: Option<String>,
819}
820
821#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
822struct WorkerQueueAckRecord {
823    job_event_id: u64,
824    claim_id: String,
825    consumer_id: String,
826    acked_at_ms: i64,
827}
828
829#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
830struct WorkerQueuePurgeRecord {
831    job_event_id: u64,
832    purged_by: String,
833    purged_at_ms: i64,
834    #[serde(default)]
835    reason: Option<String>,
836}
837
838pub fn job_topic_name(queue: &str) -> String {
839    format!("worker.{}", sanitize_topic_component(queue))
840}
841
842pub fn claims_topic_name(queue: &str) -> String {
843    format!("{}{}", job_topic_name(queue), WORKER_QUEUE_CLAIMS_SUFFIX)
844}
845
846pub fn response_topic_name(queue: &str) -> String {
847    format!("{}{}", job_topic_name(queue), WORKER_QUEUE_RESPONSES_SUFFIX)
848}
849
850fn job_topic(queue: &str) -> Result<Topic, LogError> {
851    Topic::new(job_topic_name(queue))
852}
853
854fn claims_topic(queue: &str) -> Result<Topic, LogError> {
855    Topic::new(claims_topic_name(queue))
856}
857
858fn responses_topic(queue: &str) -> Result<Topic, LogError> {
859    Topic::new(response_topic_name(queue))
860}
861
862fn now_ms() -> i64 {
863    std::time::SystemTime::now()
864        .duration_since(std::time::UNIX_EPOCH)
865        .map(|duration| duration.as_millis() as i64)
866        .unwrap_or(0)
867}
868
869fn expiry_ms(now_ms: i64, ttl: StdDuration) -> i64 {
870    now_ms.saturating_add(ttl.as_millis().min(i64::MAX as u128) as i64)
871}
872
873#[cfg(test)]
874mod tests {
875    use super::*;
876
877    use crate::event_log::{AnyEventLog, MemoryEventLog};
878    use crate::triggers::{
879        event::{GenericWebhookPayload, KnownProviderPayload},
880        scheduler::{self, SchedulerStrategy},
881        ProviderId, ProviderPayload, SignatureStatus, TraceId, TriggerEvent,
882    };
883
884    fn test_event(id: &str) -> TriggerEvent {
885        TriggerEvent {
886            id: crate::triggers::TriggerEventId(id.to_string()),
887            provider: ProviderId::from("github"),
888            kind: "issues.opened".to_string(),
889            trace_id: TraceId("trace-test".to_string()),
890            dedupe_key: id.to_string(),
891            tenant_id: None,
892            headers: BTreeMap::new(),
893            batch: None,
894            raw_body: None,
895            provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
896                GenericWebhookPayload {
897                    source: Some("worker-queue-test".to_string()),
898                    content_type: Some("application/json".to_string()),
899                    raw: serde_json::json!({"id": id}),
900                },
901            )),
902            signature_status: SignatureStatus::Verified,
903            received_at: time::OffsetDateTime::now_utc(),
904            occurred_at: None,
905            dedupe_claimed: false,
906        }
907    }
908
909    fn test_job(
910        queue: &str,
911        trigger_id: &str,
912        event_id: &str,
913        priority: WorkerQueuePriority,
914    ) -> WorkerQueueJob {
915        WorkerQueueJob {
916            queue: queue.to_string(),
917            trigger_id: trigger_id.to_string(),
918            binding_key: format!("{trigger_id}@v1"),
919            binding_version: 1,
920            event: test_event(event_id),
921            replay_of_event_id: None,
922            priority,
923        }
924    }
925
926    #[tokio::test(flavor = "current_thread")]
927    async fn enqueue_and_summarize_queue() {
928        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
929        let queue = WorkerQueue::new(log);
930        queue
931            .enqueue(&test_job(
932                "triage",
933                "incoming-review-task",
934                "evt-1",
935                WorkerQueuePriority::Normal,
936            ))
937            .await
938            .unwrap();
939        let summaries = queue.queue_summaries().await.unwrap();
940        assert_eq!(summaries.len(), 1);
941        assert_eq!(summaries[0].queue, "triage");
942        assert_eq!(summaries[0].ready, 1);
943        assert_eq!(summaries[0].in_flight, 0);
944    }
945
946    #[tokio::test(flavor = "current_thread")]
947    async fn claim_and_ack_remove_job_from_ready_pool() {
948        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
949        let queue = WorkerQueue::new(log);
950        queue
951            .enqueue(&test_job(
952                "triage",
953                "incoming-review-task",
954                "evt-1",
955                WorkerQueuePriority::Normal,
956            ))
957            .await
958            .unwrap();
959        let claimed = queue
960            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
961            .await
962            .unwrap()
963            .unwrap();
964        let before_ack = queue.queue_state("triage").await.unwrap();
965        assert_eq!(before_ack.summary(now_ms()).ready, 0);
966        assert_eq!(before_ack.summary(now_ms()).in_flight, 1);
967        queue
968            .append_response(
969                "triage",
970                &WorkerQueueResponseRecord {
971                    queue: "triage".to_string(),
972                    job_event_id: claimed.handle.job_event_id,
973                    consumer_id: "consumer-a".to_string(),
974                    handled_at_ms: now_ms(),
975                    outcome: Some(DispatchOutcome {
976                        trigger_id: "incoming-review-task".to_string(),
977                        binding_key: "incoming-review-task@v1".to_string(),
978                        event_id: "evt-1".to_string(),
979                        attempt_count: 1,
980                        status: super::super::DispatchStatus::Succeeded,
981                        handler_kind: "local".to_string(),
982                        target_uri: "handlers::on_review".to_string(),
983                        replay_of_event_id: None,
984                        result: Some(serde_json::json!({"ok": true})),
985                        error: None,
986                    }),
987                    error: None,
988                },
989            )
990            .await
991            .unwrap();
992        queue.ack_claim(&claimed.handle).await.unwrap();
993        let after_ack = queue.queue_state("triage").await.unwrap();
994        let summary = after_ack.summary(now_ms());
995        assert_eq!(summary.ready, 0);
996        assert_eq!(summary.in_flight, 0);
997        assert_eq!(summary.acked, 1);
998        assert_eq!(summary.responses, 1);
999    }
1000
1001    #[tokio::test(flavor = "current_thread")]
1002    async fn expired_claim_allows_reclaim() {
1003        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1004        let queue = WorkerQueue::new(log.clone());
1005        let receipt = queue
1006            .enqueue(&test_job(
1007                "triage",
1008                "incoming-review-task",
1009                "evt-1",
1010                WorkerQueuePriority::Normal,
1011            ))
1012            .await
1013            .unwrap();
1014        let expired_claim = WorkerQueueClaimRecord {
1015            job_event_id: receipt.job_event_id,
1016            claim_id: "expired-claim".to_string(),
1017            consumer_id: "consumer-a".to_string(),
1018            claimed_at_ms: now_ms().saturating_sub(2),
1019            expires_at_ms: now_ms().saturating_sub(1),
1020        };
1021        log.append(
1022            &claims_topic("triage").unwrap(),
1023            LogEvent::new("job_claimed", serde_json::to_value(&expired_claim).unwrap()),
1024        )
1025        .await
1026        .unwrap();
1027        let second = queue
1028            .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
1029            .await
1030            .unwrap()
1031            .unwrap();
1032        assert_eq!(second.job.event.id.0, "evt-1");
1033        assert_ne!(second.handle.claim_id, expired_claim.claim_id);
1034        assert_eq!(second.handle.consumer_id, "consumer-b");
1035    }
1036
1037    #[tokio::test(flavor = "current_thread")]
1038    async fn high_priority_and_aged_normal_are_selected_first() {
1039        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1040        let queue = WorkerQueue::new(log.clone());
1041
1042        let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC).unwrap();
1043        log.append(
1044            &catalog_topic,
1045            LogEvent::new("queue_seen", serde_json::json!({"queue":"triage"})),
1046        )
1047        .await
1048        .unwrap();
1049
1050        let topic = job_topic("triage").unwrap();
1051        let mut old_normal = LogEvent::new(
1052            "trigger_dispatch",
1053            serde_json::to_value(test_job(
1054                "triage",
1055                "incoming-review-task",
1056                "evt-old-normal",
1057                WorkerQueuePriority::Normal,
1058            ))
1059            .unwrap(),
1060        );
1061        old_normal.occurred_at_ms = now_ms() - NORMAL_PROMOTION_AGE_MS - 1_000;
1062        log.append(&topic, old_normal).await.unwrap();
1063
1064        let high = LogEvent::new(
1065            "trigger_dispatch",
1066            serde_json::to_value(test_job(
1067                "triage",
1068                "incoming-review-task",
1069                "evt-high",
1070                WorkerQueuePriority::High,
1071            ))
1072            .unwrap(),
1073        );
1074        log.append(&topic, high).await.unwrap();
1075
1076        let claimed = queue
1077            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1078            .await
1079            .unwrap()
1080            .unwrap();
1081        assert_eq!(claimed.job.event.id.0, "evt-old-normal");
1082    }
1083
1084    fn tenant_event(id: &str, tenant: &str) -> TriggerEvent {
1085        let mut event = test_event(id);
1086        event.tenant_id = Some(crate::triggers::TenantId::new(tenant));
1087        event
1088    }
1089
1090    fn tenant_job(
1091        queue: &str,
1092        trigger_id: &str,
1093        event_id: &str,
1094        tenant: &str,
1095        priority: WorkerQueuePriority,
1096    ) -> WorkerQueueJob {
1097        WorkerQueueJob {
1098            queue: queue.to_string(),
1099            trigger_id: trigger_id.to_string(),
1100            binding_key: format!("{trigger_id}@v1"),
1101            binding_version: 1,
1102            event: tenant_event(event_id, tenant),
1103            replay_of_event_id: None,
1104            priority,
1105        }
1106    }
1107
1108    async fn ack_and_respond(queue: &WorkerQueue, queue_name: &str, claim: &ClaimedWorkerJob) {
1109        queue
1110            .append_response(
1111                queue_name,
1112                &WorkerQueueResponseRecord {
1113                    queue: queue_name.to_string(),
1114                    job_event_id: claim.handle.job_event_id,
1115                    consumer_id: claim.handle.consumer_id.clone(),
1116                    handled_at_ms: now_ms(),
1117                    outcome: Some(DispatchOutcome {
1118                        trigger_id: claim.job.trigger_id.clone(),
1119                        binding_key: claim.job.binding_key.clone(),
1120                        event_id: claim.job.event.id.0.clone(),
1121                        attempt_count: 1,
1122                        status: super::super::DispatchStatus::Succeeded,
1123                        handler_kind: "local".to_string(),
1124                        target_uri: "test::handler".to_string(),
1125                        replay_of_event_id: None,
1126                        result: None,
1127                        error: None,
1128                    }),
1129                    error: None,
1130                },
1131            )
1132            .await
1133            .unwrap();
1134        queue.ack_claim(&claim.handle).await.unwrap();
1135    }
1136
1137    #[tokio::test(flavor = "current_thread")]
1138    async fn drr_policy_rotates_across_tenants_through_claim_next() {
1139        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(256)));
1140        let queue = WorkerQueue::with_policy(
1141            log,
1142            SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant),
1143        );
1144
1145        // Tenant A enqueues 8 jobs before tenant B enqueues a single job.
1146        for idx in 0..8 {
1147            queue
1148                .enqueue(&tenant_job(
1149                    "triage",
1150                    "trigger",
1151                    &format!("a-{idx}"),
1152                    "tenant-a",
1153                    WorkerQueuePriority::Normal,
1154                ))
1155                .await
1156                .unwrap();
1157        }
1158        queue
1159            .enqueue(&tenant_job(
1160                "triage",
1161                "trigger",
1162                "b-1",
1163                "tenant-b",
1164                WorkerQueuePriority::Normal,
1165            ))
1166            .await
1167            .unwrap();
1168
1169        // Claim+ack 4 jobs back-to-back. Under FIFO, tenant B would never be
1170        // touched. Under fair-share, B must be served within the first two
1171        // claims.
1172        let mut tenants_seen = Vec::new();
1173        for n in 0..4 {
1174            let consumer = format!("c-{n}");
1175            let claim = queue
1176                .claim_next("triage", &consumer, StdDuration::from_secs(60))
1177                .await
1178                .unwrap()
1179                .expect("queue should still have ready jobs");
1180            tenants_seen.push(
1181                claim
1182                    .job
1183                    .event
1184                    .tenant_id
1185                    .as_ref()
1186                    .map(|t| t.0.clone())
1187                    .unwrap_or_default(),
1188            );
1189            ack_and_respond(&queue, "triage", &claim).await;
1190        }
1191
1192        let saw_b = tenants_seen.iter().any(|t| t == "tenant-b");
1193        assert!(
1194            saw_b,
1195            "tenant-b should have been served within the first 4 claims, got {tenants_seen:?}",
1196        );
1197    }
1198
1199    #[tokio::test(flavor = "current_thread")]
1200    async fn fifo_policy_preserves_legacy_behavior() {
1201        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
1202        let queue = WorkerQueue::with_policy(log, SchedulerPolicy::fifo());
1203
1204        // Fill queue with tenant-a jobs first, then a single tenant-b job.
1205        for idx in 0..4 {
1206            queue
1207                .enqueue(&tenant_job(
1208                    "triage",
1209                    "trigger",
1210                    &format!("a-{idx}"),
1211                    "tenant-a",
1212                    WorkerQueuePriority::Normal,
1213                ))
1214                .await
1215                .unwrap();
1216        }
1217        queue
1218            .enqueue(&tenant_job(
1219                "triage",
1220                "trigger",
1221                "b-1",
1222                "tenant-b",
1223                WorkerQueuePriority::Normal,
1224            ))
1225            .await
1226            .unwrap();
1227
1228        // FIFO must drain all of tenant-a before touching tenant-b.
1229        let first = queue
1230            .claim_next("triage", "c-0", StdDuration::from_secs(60))
1231            .await
1232            .unwrap()
1233            .unwrap();
1234        assert_eq!(first.job.event.tenant_id.unwrap().0, "tenant-a");
1235    }
1236
1237    #[tokio::test(flavor = "current_thread")]
1238    async fn inspect_queue_reports_per_tenant_fairness_state() {
1239        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
1240        let queue = WorkerQueue::with_policy(
1241            log,
1242            SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant)
1243                .with_weight("tenant-a", 2)
1244                .with_weight("tenant-b", 1),
1245        );
1246
1247        for idx in 0..3 {
1248            queue
1249                .enqueue(&tenant_job(
1250                    "triage",
1251                    "trigger",
1252                    &format!("a-{idx}"),
1253                    "tenant-a",
1254                    WorkerQueuePriority::Normal,
1255                ))
1256                .await
1257                .unwrap();
1258        }
1259        queue
1260            .enqueue(&tenant_job(
1261                "triage",
1262                "trigger",
1263                "b-1",
1264                "tenant-b",
1265                WorkerQueuePriority::Normal,
1266            ))
1267            .await
1268            .unwrap();
1269
1270        for n in 0..2 {
1271            let consumer = format!("c-{n}");
1272            let claim = queue
1273                .claim_next("triage", &consumer, StdDuration::from_secs(60))
1274                .await
1275                .unwrap()
1276                .unwrap();
1277            ack_and_respond(&queue, "triage", &claim).await;
1278        }
1279
1280        let snap = queue.inspect_queue("triage").await.unwrap();
1281        assert_eq!(snap.scheduler.strategy, "drr");
1282        assert_eq!(snap.scheduler.fairness_key, "tenant");
1283        assert!(snap
1284            .scheduler
1285            .keys
1286            .iter()
1287            .any(|k| k.fairness_key == "tenant-a"));
1288        let weights: BTreeMap<String, u32> = snap
1289            .scheduler
1290            .keys
1291            .iter()
1292            .map(|k| (k.fairness_key.clone(), k.weight))
1293            .collect();
1294        assert_eq!(weights.get("tenant-a").copied(), Some(2));
1295        assert_eq!(weights.get("tenant-b").copied(), Some(1));
1296    }
1297
1298    #[tokio::test(flavor = "current_thread")]
1299    async fn drr_with_max_concurrent_per_key_throttles_hot_tenant() {
1300        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(128)));
1301        let queue = WorkerQueue::with_policy(
1302            log,
1303            SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant)
1304                .with_max_concurrent_per_key(1),
1305        );
1306
1307        for idx in 0..4 {
1308            queue
1309                .enqueue(&tenant_job(
1310                    "triage",
1311                    "trigger",
1312                    &format!("a-{idx}"),
1313                    "tenant-a",
1314                    WorkerQueuePriority::Normal,
1315                ))
1316                .await
1317                .unwrap();
1318        }
1319        queue
1320            .enqueue(&tenant_job(
1321                "triage",
1322                "trigger",
1323                "b-1",
1324                "tenant-b",
1325                WorkerQueuePriority::Normal,
1326            ))
1327            .await
1328            .unwrap();
1329
1330        let first = queue
1331            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1332            .await
1333            .unwrap()
1334            .unwrap();
1335        // Without releasing the first claim, the second pick must skip the
1336        // capped tenant-a and serve tenant-b instead.
1337        let second = queue
1338            .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
1339            .await
1340            .unwrap()
1341            .unwrap();
1342        let pair = [
1343            first.job.event.tenant_id.clone().unwrap().0,
1344            second.job.event.tenant_id.clone().unwrap().0,
1345        ];
1346        assert!(
1347            pair.contains(&"tenant-a".to_string()) && pair.contains(&"tenant-b".to_string()),
1348            "max_concurrent_per_key=1 must release tenant-b within two claims, got {pair:?}",
1349        );
1350    }
1351
1352    #[test]
1353    fn from_env_parses_drr_policy_from_lookup() {
1354        let lookup = |name: &str| -> Option<String> {
1355            match name {
1356                "HARN_SCHEDULER_STRATEGY" => Some("drr".to_string()),
1357                "HARN_SCHEDULER_FAIRNESS_KEY" => Some("tenant-and-binding".to_string()),
1358                "HARN_SCHEDULER_QUANTUM" => Some("3".to_string()),
1359                "HARN_SCHEDULER_STARVATION_AGE_MS" => Some("750".to_string()),
1360                "HARN_SCHEDULER_MAX_CONCURRENT_PER_KEY" => Some("4".to_string()),
1361                "HARN_SCHEDULER_DEFAULT_WEIGHT" => Some("2".to_string()),
1362                "HARN_SCHEDULER_WEIGHTS" => Some("tenant-a:5,tenant-b:1, : ,bad:abc".to_string()),
1363                _ => None,
1364            }
1365        };
1366        let policy = SchedulerPolicy::from_env_lookup(lookup);
1367        match policy.strategy {
1368            SchedulerStrategy::DeficitRoundRobin {
1369                quantum,
1370                starvation_age_ms,
1371            } => {
1372                assert_eq!(quantum, 3);
1373                assert_eq!(starvation_age_ms, Some(750));
1374            }
1375            other => panic!("expected DRR strategy, got {other:?}"),
1376        }
1377        assert_eq!(
1378            policy.fairness_key,
1379            scheduler::FairnessKey::TenantAndBinding
1380        );
1381        assert_eq!(policy.max_concurrent_per_key, 4);
1382        assert_eq!(policy.default_weight, 2);
1383        assert_eq!(policy.weight_for("tenant-a"), 5);
1384        assert_eq!(policy.weight_for("tenant-b"), 1);
1385        // Unknown key falls back to default_weight.
1386        assert_eq!(policy.weight_for("tenant-c"), 2);
1387    }
1388
1389    #[test]
1390    fn from_env_defaults_to_fifo_when_missing() {
1391        let policy = SchedulerPolicy::from_env_lookup(|_| None);
1392        assert!(matches!(policy.strategy, SchedulerStrategy::Fifo));
1393    }
1394}