1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use crate::event_log::{
9 sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12use super::scheduler::{self, SchedulableJob, SchedulerPolicy, SchedulerSnapshot, SchedulerState};
13use super::{DispatchOutcome, TriggerEvent};
14
15pub const WORKER_QUEUE_CATALOG_TOPIC: &str = "worker.queues";
16const WORKER_QUEUE_CLAIMS_SUFFIX: &str = ".claims";
17const WORKER_QUEUE_RESPONSES_SUFFIX: &str = ".responses";
18const NORMAL_PROMOTION_AGE_MS: i64 = 15 * 60 * 1000;
19
20#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum WorkerQueuePriority {
23 High,
24 #[default]
25 Normal,
26 Low,
27}
28
29impl WorkerQueuePriority {
30 pub fn as_str(self) -> &'static str {
31 match self {
32 Self::High => "high",
33 Self::Normal => "normal",
34 Self::Low => "low",
35 }
36 }
37
38 pub fn effective_rank(self, enqueued_at_ms: i64, now_ms: i64) -> u8 {
39 match self {
40 Self::High => 0,
41 Self::Normal if now_ms.saturating_sub(enqueued_at_ms) >= NORMAL_PROMOTION_AGE_MS => 0,
42 Self::Normal => 1,
43 Self::Low => 2,
44 }
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49pub struct WorkerQueueJob {
50 pub queue: String,
51 pub trigger_id: String,
52 pub binding_key: String,
53 pub binding_version: u32,
54 pub event: TriggerEvent,
55 #[serde(default)]
56 pub replay_of_event_id: Option<String>,
57 #[serde(default)]
58 pub priority: WorkerQueuePriority,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
62pub struct WorkerQueueEnqueueReceipt {
63 pub queue: String,
64 pub job_event_id: u64,
65 pub response_topic: String,
66}
67
68#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69pub struct WorkerQueueClaimHandle {
70 pub queue: String,
71 pub job_event_id: u64,
72 pub claim_id: String,
73 pub consumer_id: String,
74 pub expires_at_ms: i64,
75}
76
77#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
78pub struct ClaimedWorkerJob {
79 pub handle: WorkerQueueClaimHandle,
80 pub job: WorkerQueueJob,
81}
82
83#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
84pub struct WorkerQueueResponseRecord {
85 pub queue: String,
86 pub job_event_id: u64,
87 pub consumer_id: String,
88 pub handled_at_ms: i64,
89 pub outcome: Option<DispatchOutcome>,
90 pub error: Option<String>,
91}
92
93#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct WorkerQueueSummary {
95 pub queue: String,
96 pub ready: usize,
97 pub in_flight: usize,
98 pub acked: usize,
99 pub purged: usize,
100 pub responses: usize,
101 pub oldest_unclaimed_age_ms: Option<u64>,
102}
103
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105pub struct WorkerQueueJobState {
106 pub job_event_id: u64,
107 pub enqueued_at_ms: i64,
108 pub job: WorkerQueueJob,
109 pub active_claim: Option<WorkerQueueClaimHandle>,
110 pub acked: bool,
111 pub purged: bool,
112}
113
114impl WorkerQueueJobState {
115 pub fn is_ready(&self) -> bool {
116 !self.acked && !self.purged && self.active_claim.is_none()
117 }
118}
119
120#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
121pub struct WorkerQueueState {
122 pub queue: String,
123 pub responses: Vec<WorkerQueueResponseRecord>,
124 pub jobs: Vec<WorkerQueueJobState>,
125}
126
127impl WorkerQueueState {
128 pub fn summary(&self, now_ms: i64) -> WorkerQueueSummary {
129 let ready = self.jobs.iter().filter(|job| job.is_ready()).count();
130 let in_flight = self
131 .jobs
132 .iter()
133 .filter(|job| !job.acked && !job.purged && job.active_claim.is_some())
134 .count();
135 let acked = self.jobs.iter().filter(|job| job.acked).count();
136 let purged = self.jobs.iter().filter(|job| job.purged).count();
137 let oldest_unclaimed_age_ms = self
138 .jobs
139 .iter()
140 .filter(|job| job.is_ready())
141 .map(|job| now_ms.saturating_sub(job.enqueued_at_ms).max(0) as u64)
142 .max();
143 WorkerQueueSummary {
144 queue: self.queue.clone(),
145 ready,
146 in_flight,
147 acked,
148 purged,
149 responses: self.responses.len(),
150 oldest_unclaimed_age_ms,
151 }
152 }
153
154 fn next_ready_job_with_scheduler(
162 &self,
163 scheduler_state: &mut SchedulerState,
164 policy: &SchedulerPolicy,
165 now_ms: i64,
166 ) -> Option<&WorkerQueueJobState> {
167 let candidates: Vec<&WorkerQueueJobState> =
168 self.jobs.iter().filter(|job| job.is_ready()).collect();
169 if candidates.is_empty() {
170 return None;
171 }
172 let views: Vec<SchedulableJob<'_>> = candidates
173 .iter()
174 .map(|state| SchedulableJob::from_state(state))
175 .collect();
176
177 let in_flight = scheduler::in_flight_by_key(&self.jobs, policy);
179 scheduler_state.replace_in_flight(in_flight);
180
181 let pick = scheduler_state.select(&views, policy, now_ms)?;
182 candidates
183 .into_iter()
184 .find(|job| job.job_event_id == pick.job_event_id)
185 }
186
187 fn active_claim_for(&self, job_event_id: u64) -> Option<&WorkerQueueClaimHandle> {
188 self.jobs
189 .iter()
190 .find(|job| job.job_event_id == job_event_id)
191 .and_then(|job| job.active_claim.as_ref())
192 }
193}
194
195#[derive(Clone)]
196pub struct WorkerQueue {
197 event_log: Arc<AnyEventLog>,
198 policy: Arc<RwLock<SchedulerPolicy>>,
201 scheduler_states: Arc<Mutex<BTreeMap<String, SchedulerState>>>,
205}
206
207#[derive(Clone, Debug, Serialize)]
208pub struct WorkerQueueInspectSnapshot {
209 pub summary: WorkerQueueSummary,
210 pub scheduler: SchedulerSnapshot,
211}
212
213impl WorkerQueue {
214 pub fn new(event_log: Arc<AnyEventLog>) -> Self {
219 Self::with_policy(event_log, SchedulerPolicy::from_env())
220 }
221
222 pub fn with_policy(event_log: Arc<AnyEventLog>, policy: SchedulerPolicy) -> Self {
223 Self {
224 event_log,
225 policy: Arc::new(RwLock::new(policy)),
226 scheduler_states: Arc::new(Mutex::new(BTreeMap::new())),
227 }
228 }
229
230 pub fn set_policy(&self, policy: SchedulerPolicy) {
233 *self.policy.write().expect("scheduler policy poisoned") = policy;
234 }
235
236 pub fn policy(&self) -> SchedulerPolicy {
237 self.policy
238 .read()
239 .expect("scheduler policy poisoned")
240 .clone()
241 }
242
243 pub async fn enqueue(
244 &self,
245 job: &WorkerQueueJob,
246 ) -> Result<WorkerQueueEnqueueReceipt, LogError> {
247 let queue = job.queue.trim();
248 if queue.is_empty() {
249 return Err(LogError::Config(
250 "worker queue name cannot be empty".to_string(),
251 ));
252 }
253 let queue_name = queue.to_string();
254 let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
255 .expect("static worker queue catalog topic should always be valid");
256 self.event_log
257 .append(
258 &catalog_topic,
259 LogEvent::new(
260 "queue_seen",
261 serde_json::to_value(WorkerQueueCatalogRecord {
262 queue: queue_name.clone(),
263 })
264 .map_err(|error| LogError::Serde(error.to_string()))?,
265 ),
266 )
267 .await?;
268
269 let job_topic = job_topic(&queue_name)?;
270 let mut headers = BTreeMap::new();
271 headers.insert("queue".to_string(), queue_name.clone());
272 headers.insert("trigger_id".to_string(), job.trigger_id.clone());
273 headers.insert("binding_key".to_string(), job.binding_key.clone());
274 headers.insert("event_id".to_string(), job.event.id.0.clone());
275 headers.insert("priority".to_string(), job.priority.as_str().to_string());
276 let job_event_id = self
277 .event_log
278 .append(
279 &job_topic,
280 LogEvent::new(
281 "trigger_dispatch",
282 serde_json::to_value(job)
283 .map_err(|error| LogError::Serde(error.to_string()))?,
284 )
285 .with_headers(headers),
286 )
287 .await?;
288 if let Some(metrics) = crate::active_metrics_registry() {
289 if let Ok(state) = self.queue_state(&queue_name).await {
290 let summary = state.summary(now_ms());
291 metrics.set_worker_queue_depth(
292 &queue_name,
293 (summary.ready + summary.in_flight) as u64,
294 );
295 }
296 }
297 Ok(WorkerQueueEnqueueReceipt {
298 queue: queue_name.clone(),
299 job_event_id,
300 response_topic: response_topic_name(&queue_name),
301 })
302 }
303
304 pub async fn known_queues(&self) -> Result<Vec<String>, LogError> {
305 let topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
306 .expect("static worker queue catalog topic should always be valid");
307 let events = self.event_log.read_range(&topic, None, usize::MAX).await?;
308 let mut queues = BTreeSet::new();
309 for (_, event) in events {
310 if event.kind != "queue_seen" {
311 continue;
312 }
313 let record: WorkerQueueCatalogRecord = serde_json::from_value(event.payload)
314 .map_err(|error| LogError::Serde(error.to_string()))?;
315 if !record.queue.trim().is_empty() {
316 queues.insert(record.queue);
317 }
318 }
319 Ok(queues.into_iter().collect())
320 }
321
322 pub async fn queue_state(&self, queue: &str) -> Result<WorkerQueueState, LogError> {
323 let queue_name = queue.trim();
324 if queue_name.is_empty() {
325 return Err(LogError::Config(
326 "worker queue name cannot be empty".to_string(),
327 ));
328 }
329 let now_ms = now_ms();
330 let job_events = self
331 .event_log
332 .read_range(&job_topic(queue_name)?, None, usize::MAX)
333 .await?;
334 let claim_events = self
335 .event_log
336 .read_range(&claims_topic(queue_name)?, None, usize::MAX)
337 .await?;
338 let response_events = self
339 .event_log
340 .read_range(&responses_topic(queue_name)?, None, usize::MAX)
341 .await?;
342
343 let mut jobs = BTreeMap::<u64, WorkerQueueJobStateInternal>::new();
344 for (job_event_id, event) in job_events {
345 if event.kind != "trigger_dispatch" {
346 continue;
347 }
348 let job: WorkerQueueJob = serde_json::from_value(event.payload)
349 .map_err(|error| LogError::Serde(error.to_string()))?;
350 jobs.insert(
351 job_event_id,
352 WorkerQueueJobStateInternal {
353 job_event_id,
354 enqueued_at_ms: event.occurred_at_ms,
355 job,
356 active_claim: None,
357 acked: false,
358 purged: false,
359 seen_claim_ids: BTreeSet::new(),
360 },
361 );
362 }
363
364 for (_, event) in claim_events {
365 match event.kind.as_str() {
366 "job_claimed" => {
367 let claim: WorkerQueueClaimRecord = serde_json::from_value(event.payload)
368 .map_err(|error| LogError::Serde(error.to_string()))?;
369 let Some(job) = jobs.get_mut(&claim.job_event_id) else {
370 continue;
371 };
372 if job.acked || job.purged {
373 continue;
374 }
375 job.seen_claim_ids.insert(claim.claim_id.clone());
376 let can_take = job
377 .active_claim
378 .as_ref()
379 .is_none_or(|active| active.expires_at_ms <= claim.claimed_at_ms);
380 if can_take {
381 job.active_claim = Some(WorkerQueueClaimHandle {
382 queue: queue_name.to_string(),
383 job_event_id: claim.job_event_id,
384 claim_id: claim.claim_id,
385 consumer_id: claim.consumer_id,
386 expires_at_ms: claim.expires_at_ms,
387 });
388 }
389 }
390 "claim_renewed" => {
391 let renewal: WorkerQueueClaimRenewalRecord =
392 serde_json::from_value(event.payload)
393 .map_err(|error| LogError::Serde(error.to_string()))?;
394 let Some(job) = jobs.get_mut(&renewal.job_event_id) else {
395 continue;
396 };
397 if let Some(active) = job.active_claim.as_mut() {
398 if active.claim_id == renewal.claim_id {
399 active.expires_at_ms = renewal.expires_at_ms;
400 }
401 }
402 }
403 "job_released" => {
404 let release: WorkerQueueReleaseRecord =
405 serde_json::from_value(event.payload)
406 .map_err(|error| LogError::Serde(error.to_string()))?;
407 let Some(job) = jobs.get_mut(&release.job_event_id) else {
408 continue;
409 };
410 if job
411 .active_claim
412 .as_ref()
413 .is_some_and(|active| active.claim_id == release.claim_id)
414 {
415 job.active_claim = None;
416 }
417 }
418 "job_acked" => {
419 let ack: WorkerQueueAckRecord = serde_json::from_value(event.payload)
420 .map_err(|error| LogError::Serde(error.to_string()))?;
421 let Some(job) = jobs.get_mut(&ack.job_event_id) else {
422 continue;
423 };
424 if ack.claim_id.is_empty() || job.seen_claim_ids.contains(&ack.claim_id) {
425 job.acked = true;
426 job.active_claim = None;
427 }
428 }
429 "job_purged" => {
430 let purge: WorkerQueuePurgeRecord = serde_json::from_value(event.payload)
431 .map_err(|error| LogError::Serde(error.to_string()))?;
432 let Some(job) = jobs.get_mut(&purge.job_event_id) else {
433 continue;
434 };
435 if !job.acked {
436 job.purged = true;
437 job.active_claim = None;
438 }
439 }
440 _ => {}
441 }
442 }
443
444 let responses = response_events
445 .into_iter()
446 .filter(|(_, event)| event.kind == "job_response")
447 .map(|(_, event)| {
448 serde_json::from_value::<WorkerQueueResponseRecord>(event.payload)
449 .map_err(|error| LogError::Serde(error.to_string()))
450 })
451 .collect::<Result<Vec<_>, _>>()?;
452
453 let mut queue_state = WorkerQueueState {
454 queue: queue_name.to_string(),
455 responses,
456 jobs: jobs
457 .into_values()
458 .map(|mut job| {
459 if job
460 .active_claim
461 .as_ref()
462 .is_some_and(|active| active.expires_at_ms <= now_ms)
463 {
464 job.active_claim = None;
465 }
466 WorkerQueueJobState {
467 job_event_id: job.job_event_id,
468 enqueued_at_ms: job.enqueued_at_ms,
469 job: job.job,
470 active_claim: job.active_claim,
471 acked: job.acked,
472 purged: job.purged,
473 }
474 })
475 .collect(),
476 };
477 queue_state
478 .jobs
479 .sort_by_key(|job| (job.enqueued_at_ms, job.job_event_id));
480 Ok(queue_state)
481 }
482
483 pub async fn queue_summaries(&self) -> Result<Vec<WorkerQueueSummary>, LogError> {
484 let now_ms = now_ms();
485 let mut summaries = Vec::new();
486 for queue in self.known_queues().await? {
487 let state = self.queue_state(&queue).await?;
488 summaries.push(state.summary(now_ms));
489 }
490 summaries.sort_by(|left, right| left.queue.cmp(&right.queue));
491 Ok(summaries)
492 }
493
494 pub async fn claim_next(
495 &self,
496 queue: &str,
497 consumer_id: &str,
498 ttl: StdDuration,
499 ) -> Result<Option<ClaimedWorkerJob>, LogError> {
500 let queue_name = queue.trim();
501 if queue_name.is_empty() {
502 return Err(LogError::Config(
503 "worker queue name cannot be empty".to_string(),
504 ));
505 }
506 if consumer_id.trim().is_empty() {
507 return Err(LogError::InvalidConsumer(
508 "worker queue consumer id cannot be empty".to_string(),
509 ));
510 }
511 let policy = self.policy();
512 for _ in 0..8 {
513 let now_ms = now_ms();
514 let state = self.queue_state(queue_name).await?;
515 let (job, fairness_key) = {
516 let mut states = self
517 .scheduler_states
518 .lock()
519 .expect("scheduler state poisoned");
520 let scheduler_state = states.entry(queue_name.to_string()).or_default();
521 let Some(job) =
522 state.next_ready_job_with_scheduler(scheduler_state, &policy, now_ms)
523 else {
524 return Ok(None);
525 };
526 let job = job.clone();
527 let fairness_key = policy.fairness_key_of(&SchedulableJob::from_state(&job));
528 (job, fairness_key)
529 };
530 let claim = WorkerQueueClaimRecord {
531 job_event_id: job.job_event_id,
532 claim_id: Uuid::new_v4().to_string(),
533 consumer_id: consumer_id.to_string(),
534 claimed_at_ms: now_ms,
535 expires_at_ms: expiry_ms(now_ms, ttl),
536 };
537 self.event_log
538 .append(
539 &claims_topic(queue_name)?,
540 LogEvent::new(
541 "job_claimed",
542 serde_json::to_value(&claim)
543 .map_err(|error| LogError::Serde(error.to_string()))?,
544 ),
545 )
546 .await?;
547 let refreshed = self.queue_state(queue_name).await?;
548 if refreshed
549 .active_claim_for(job.job_event_id)
550 .is_some_and(|active| active.claim_id == claim.claim_id)
551 {
552 {
553 let mut states = self
554 .scheduler_states
555 .lock()
556 .expect("scheduler state poisoned");
557 let scheduler_state = states.entry(queue_name.to_string()).or_default();
558 scheduler_state.note_claim_committed(&fairness_key);
559 }
560 if let Some(metrics) = crate::active_metrics_registry() {
561 let summary = refreshed.summary(now_ms);
562 metrics.record_worker_queue_claim_age(
563 queue_name,
564 now_ms.saturating_sub(job.enqueued_at_ms) as f64 / 1000.0,
565 );
566 metrics.set_worker_queue_depth(
567 queue_name,
568 (summary.ready + summary.in_flight) as u64,
569 );
570 metrics.record_scheduler_selection(
571 queue_name,
572 policy.fairness_key.as_str(),
573 &fairness_key,
574 );
575 if let Ok(snap) = self.inspect_queue(queue_name).await {
576 for stat in &snap.scheduler.keys {
577 metrics.set_scheduler_deficit(
578 queue_name,
579 policy.fairness_key.as_str(),
580 &stat.fairness_key,
581 stat.deficit,
582 );
583 metrics.set_scheduler_oldest_eligible_age(
584 queue_name,
585 policy.fairness_key.as_str(),
586 &stat.fairness_key,
587 stat.oldest_ready_age_ms,
588 );
589 }
590 }
591 }
592 return Ok(Some(ClaimedWorkerJob {
593 handle: WorkerQueueClaimHandle {
594 queue: queue_name.to_string(),
595 job_event_id: claim.job_event_id,
596 claim_id: claim.claim_id,
597 consumer_id: claim.consumer_id,
598 expires_at_ms: claim.expires_at_ms,
599 },
600 job: job.job,
601 }));
602 }
603 }
604 Ok(None)
605 }
606
607 pub async fn inspect_queue(&self, queue: &str) -> Result<WorkerQueueInspectSnapshot, LogError> {
610 let queue_name = queue.trim();
611 if queue_name.is_empty() {
612 return Err(LogError::Config(
613 "worker queue name cannot be empty".to_string(),
614 ));
615 }
616 let now_ms = now_ms();
617 let state = self.queue_state(queue_name).await?;
618 let summary = state.summary(now_ms);
619 let policy = self.policy();
620 let ready = scheduler::ready_stats_by_key(&state.jobs, &policy, now_ms);
621 let in_flight = scheduler::in_flight_by_key(&state.jobs, &policy);
623 let scheduler_snapshot = {
624 let mut states = self
625 .scheduler_states
626 .lock()
627 .expect("scheduler state poisoned");
628 let scheduler_state = states.entry(queue_name.to_string()).or_default();
629 scheduler_state.replace_in_flight(in_flight);
630 scheduler_state.snapshot(&policy, &ready)
631 };
632 Ok(WorkerQueueInspectSnapshot {
633 summary,
634 scheduler: scheduler_snapshot,
635 })
636 }
637
638 pub async fn inspect_all_queues(&self) -> Result<Vec<WorkerQueueInspectSnapshot>, LogError> {
640 let mut snapshots = Vec::new();
641 for queue in self.known_queues().await? {
642 snapshots.push(self.inspect_queue(&queue).await?);
643 }
644 snapshots.sort_by(|left, right| left.summary.queue.cmp(&right.summary.queue));
645 Ok(snapshots)
646 }
647
648 pub async fn renew_claim(
649 &self,
650 handle: &WorkerQueueClaimHandle,
651 ttl: StdDuration,
652 ) -> Result<bool, LogError> {
653 let now_ms = now_ms();
654 let renewal = WorkerQueueClaimRenewalRecord {
655 job_event_id: handle.job_event_id,
656 claim_id: handle.claim_id.clone(),
657 consumer_id: handle.consumer_id.clone(),
658 renewed_at_ms: now_ms,
659 expires_at_ms: expiry_ms(now_ms, ttl),
660 };
661 self.event_log
662 .append(
663 &claims_topic(&handle.queue)?,
664 LogEvent::new(
665 "claim_renewed",
666 serde_json::to_value(&renewal)
667 .map_err(|error| LogError::Serde(error.to_string()))?,
668 ),
669 )
670 .await?;
671 let refreshed = self.queue_state(&handle.queue).await?;
672 Ok(refreshed
673 .active_claim_for(handle.job_event_id)
674 .is_some_and(|active| active.claim_id == handle.claim_id))
675 }
676
677 pub async fn release_claim(
678 &self,
679 handle: &WorkerQueueClaimHandle,
680 reason: &str,
681 ) -> Result<(), LogError> {
682 let release = WorkerQueueReleaseRecord {
683 job_event_id: handle.job_event_id,
684 claim_id: handle.claim_id.clone(),
685 consumer_id: handle.consumer_id.clone(),
686 released_at_ms: now_ms(),
687 reason: if reason.trim().is_empty() {
688 None
689 } else {
690 Some(reason.to_string())
691 },
692 };
693 self.event_log
694 .append(
695 &claims_topic(&handle.queue)?,
696 LogEvent::new(
697 "job_released",
698 serde_json::to_value(&release)
699 .map_err(|error| LogError::Serde(error.to_string()))?,
700 ),
701 )
702 .await?;
703 Ok(())
704 }
705
706 pub async fn append_response(
707 &self,
708 queue: &str,
709 response: &WorkerQueueResponseRecord,
710 ) -> Result<u64, LogError> {
711 self.event_log
712 .append(
713 &responses_topic(queue)?,
714 LogEvent::new(
715 "job_response",
716 serde_json::to_value(response)
717 .map_err(|error| LogError::Serde(error.to_string()))?,
718 ),
719 )
720 .await
721 }
722
723 pub async fn ack_claim(&self, handle: &WorkerQueueClaimHandle) -> Result<u64, LogError> {
724 self.event_log
725 .append(
726 &claims_topic(&handle.queue)?,
727 LogEvent::new(
728 "job_acked",
729 serde_json::to_value(WorkerQueueAckRecord {
730 job_event_id: handle.job_event_id,
731 claim_id: handle.claim_id.clone(),
732 consumer_id: handle.consumer_id.clone(),
733 acked_at_ms: now_ms(),
734 })
735 .map_err(|error| LogError::Serde(error.to_string()))?,
736 ),
737 )
738 .await
739 }
740
741 pub async fn ack_job(
742 &self,
743 queue: &str,
744 job_event_id: u64,
745 consumer_id: &str,
746 ) -> Result<bool, LogError> {
747 let queue_name = queue.trim();
748 if queue_name.is_empty() {
749 return Err(LogError::Config(
750 "worker queue name cannot be empty".to_string(),
751 ));
752 }
753 let state = self.queue_state(queue_name).await?;
754 let Some(job) = state
755 .jobs
756 .iter()
757 .find(|job| job.job_event_id == job_event_id)
758 else {
759 return Ok(false);
760 };
761 if job.acked || job.purged {
762 return Ok(false);
763 }
764 self.event_log
765 .append(
766 &claims_topic(queue_name)?,
767 LogEvent::new(
768 "job_acked",
769 serde_json::to_value(WorkerQueueAckRecord {
770 job_event_id,
771 claim_id: String::new(),
772 consumer_id: consumer_id.to_string(),
773 acked_at_ms: now_ms(),
774 })
775 .map_err(|error| LogError::Serde(error.to_string()))?,
776 ),
777 )
778 .await?;
779 Ok(true)
780 }
781
782 pub async fn purge_unclaimed(
783 &self,
784 queue: &str,
785 purged_by: &str,
786 reason: Option<&str>,
787 ) -> Result<usize, LogError> {
788 let state = self.queue_state(queue).await?;
789 let ready_jobs: Vec<_> = state
790 .jobs
791 .into_iter()
792 .filter(|job| job.is_ready())
793 .map(|job| job.job_event_id)
794 .collect();
795 for job_event_id in &ready_jobs {
796 self.event_log
797 .append(
798 &claims_topic(queue)?,
799 LogEvent::new(
800 "job_purged",
801 serde_json::to_value(WorkerQueuePurgeRecord {
802 job_event_id: *job_event_id,
803 purged_by: purged_by.to_string(),
804 purged_at_ms: now_ms(),
805 reason: reason
806 .filter(|value| !value.trim().is_empty())
807 .map(|value| value.to_string()),
808 })
809 .map_err(|error| LogError::Serde(error.to_string()))?,
810 ),
811 )
812 .await?;
813 }
814 Ok(ready_jobs.len())
815 }
816}
817
818#[derive(Clone, Debug)]
819struct WorkerQueueJobStateInternal {
820 job_event_id: u64,
821 enqueued_at_ms: i64,
822 job: WorkerQueueJob,
823 active_claim: Option<WorkerQueueClaimHandle>,
824 acked: bool,
825 purged: bool,
826 seen_claim_ids: BTreeSet<String>,
827}
828
829#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
830struct WorkerQueueCatalogRecord {
831 queue: String,
832}
833
834#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
835struct WorkerQueueClaimRecord {
836 job_event_id: u64,
837 claim_id: String,
838 consumer_id: String,
839 claimed_at_ms: i64,
840 expires_at_ms: i64,
841}
842
843#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
844struct WorkerQueueClaimRenewalRecord {
845 job_event_id: u64,
846 claim_id: String,
847 consumer_id: String,
848 renewed_at_ms: i64,
849 expires_at_ms: i64,
850}
851
852#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
853struct WorkerQueueReleaseRecord {
854 job_event_id: u64,
855 claim_id: String,
856 consumer_id: String,
857 released_at_ms: i64,
858 #[serde(default)]
859 reason: Option<String>,
860}
861
862#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
863struct WorkerQueueAckRecord {
864 job_event_id: u64,
865 claim_id: String,
866 consumer_id: String,
867 acked_at_ms: i64,
868}
869
870#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
871struct WorkerQueuePurgeRecord {
872 job_event_id: u64,
873 purged_by: String,
874 purged_at_ms: i64,
875 #[serde(default)]
876 reason: Option<String>,
877}
878
879pub fn job_topic_name(queue: &str) -> String {
880 format!("worker.{}", sanitize_topic_component(queue))
881}
882
883pub fn claims_topic_name(queue: &str) -> String {
884 format!("{}{}", job_topic_name(queue), WORKER_QUEUE_CLAIMS_SUFFIX)
885}
886
887pub fn response_topic_name(queue: &str) -> String {
888 format!("{}{}", job_topic_name(queue), WORKER_QUEUE_RESPONSES_SUFFIX)
889}
890
891fn job_topic(queue: &str) -> Result<Topic, LogError> {
892 Topic::new(job_topic_name(queue))
893}
894
895fn claims_topic(queue: &str) -> Result<Topic, LogError> {
896 Topic::new(claims_topic_name(queue))
897}
898
899fn responses_topic(queue: &str) -> Result<Topic, LogError> {
900 Topic::new(response_topic_name(queue))
901}
902
903fn now_ms() -> i64 {
904 std::time::SystemTime::now()
905 .duration_since(std::time::UNIX_EPOCH)
906 .map(|duration| duration.as_millis() as i64)
907 .unwrap_or(0)
908}
909
910fn expiry_ms(now_ms: i64, ttl: StdDuration) -> i64 {
911 now_ms.saturating_add(ttl.as_millis().min(i64::MAX as u128) as i64)
912}
913
914#[cfg(test)]
915mod tests {
916 use super::*;
917
918 use crate::event_log::{AnyEventLog, MemoryEventLog};
919 use crate::triggers::{
920 event::{GenericWebhookPayload, KnownProviderPayload},
921 scheduler::{self, SchedulerStrategy},
922 ProviderId, ProviderPayload, SignatureStatus, TraceId, TriggerEvent,
923 };
924
925 fn test_event(id: &str) -> TriggerEvent {
926 TriggerEvent {
927 id: crate::triggers::TriggerEventId(id.to_string()),
928 provider: ProviderId::from("github"),
929 kind: "issues.opened".to_string(),
930 trace_id: TraceId("trace-test".to_string()),
931 dedupe_key: id.to_string(),
932 tenant_id: None,
933 headers: BTreeMap::new(),
934 batch: None,
935 raw_body: None,
936 provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
937 GenericWebhookPayload {
938 source: Some("worker-queue-test".to_string()),
939 content_type: Some("application/json".to_string()),
940 raw: serde_json::json!({"id": id}),
941 },
942 )),
943 signature_status: SignatureStatus::Verified,
944 received_at: time::OffsetDateTime::now_utc(),
945 occurred_at: None,
946 dedupe_claimed: false,
947 }
948 }
949
950 fn test_job(
951 queue: &str,
952 trigger_id: &str,
953 event_id: &str,
954 priority: WorkerQueuePriority,
955 ) -> WorkerQueueJob {
956 WorkerQueueJob {
957 queue: queue.to_string(),
958 trigger_id: trigger_id.to_string(),
959 binding_key: format!("{trigger_id}@v1"),
960 binding_version: 1,
961 event: test_event(event_id),
962 replay_of_event_id: None,
963 priority,
964 }
965 }
966
967 #[tokio::test(flavor = "current_thread")]
968 async fn enqueue_and_summarize_queue() {
969 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
970 let queue = WorkerQueue::new(log);
971 queue
972 .enqueue(&test_job(
973 "triage",
974 "incoming-review-task",
975 "evt-1",
976 WorkerQueuePriority::Normal,
977 ))
978 .await
979 .unwrap();
980 let summaries = queue.queue_summaries().await.unwrap();
981 assert_eq!(summaries.len(), 1);
982 assert_eq!(summaries[0].queue, "triage");
983 assert_eq!(summaries[0].ready, 1);
984 assert_eq!(summaries[0].in_flight, 0);
985 }
986
987 #[tokio::test(flavor = "current_thread")]
988 async fn claim_and_ack_remove_job_from_ready_pool() {
989 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
990 let queue = WorkerQueue::new(log);
991 queue
992 .enqueue(&test_job(
993 "triage",
994 "incoming-review-task",
995 "evt-1",
996 WorkerQueuePriority::Normal,
997 ))
998 .await
999 .unwrap();
1000 let claimed = queue
1001 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1002 .await
1003 .unwrap()
1004 .unwrap();
1005 let before_ack = queue.queue_state("triage").await.unwrap();
1006 assert_eq!(before_ack.summary(now_ms()).ready, 0);
1007 assert_eq!(before_ack.summary(now_ms()).in_flight, 1);
1008 queue
1009 .append_response(
1010 "triage",
1011 &WorkerQueueResponseRecord {
1012 queue: "triage".to_string(),
1013 job_event_id: claimed.handle.job_event_id,
1014 consumer_id: "consumer-a".to_string(),
1015 handled_at_ms: now_ms(),
1016 outcome: Some(DispatchOutcome {
1017 trigger_id: "incoming-review-task".to_string(),
1018 binding_key: "incoming-review-task@v1".to_string(),
1019 event_id: "evt-1".to_string(),
1020 attempt_count: 1,
1021 status: super::super::DispatchStatus::Succeeded,
1022 handler_kind: "local".to_string(),
1023 target_uri: "handlers::on_review".to_string(),
1024 replay_of_event_id: None,
1025 result: Some(serde_json::json!({"ok": true})),
1026 error: None,
1027 }),
1028 error: None,
1029 },
1030 )
1031 .await
1032 .unwrap();
1033 queue.ack_claim(&claimed.handle).await.unwrap();
1034 let after_ack = queue.queue_state("triage").await.unwrap();
1035 let summary = after_ack.summary(now_ms());
1036 assert_eq!(summary.ready, 0);
1037 assert_eq!(summary.in_flight, 0);
1038 assert_eq!(summary.acked, 1);
1039 assert_eq!(summary.responses, 1);
1040 }
1041
1042 #[tokio::test(flavor = "current_thread")]
1043 async fn ack_job_acknowledges_without_active_claim() {
1044 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1045 let queue = WorkerQueue::new(log);
1046 let receipt = queue
1047 .enqueue(&test_job(
1048 "triage",
1049 "incoming-review-task",
1050 "evt-1",
1051 WorkerQueuePriority::Normal,
1052 ))
1053 .await
1054 .unwrap();
1055
1056 assert!(queue
1057 .ack_job("triage", receipt.job_event_id, "pipeline_lifecycle")
1058 .await
1059 .unwrap());
1060 let state = queue.queue_state("triage").await.unwrap();
1061 let summary = state.summary(now_ms());
1062 assert_eq!(summary.ready, 0);
1063 assert_eq!(summary.acked, 1);
1064 assert!(
1065 !queue
1066 .ack_job("triage", receipt.job_event_id, "pipeline_lifecycle")
1067 .await
1068 .unwrap(),
1069 "already acknowledged jobs should not produce a second settlement"
1070 );
1071 }
1072
1073 #[tokio::test(flavor = "current_thread")]
1074 async fn expired_claim_allows_reclaim() {
1075 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1076 let queue = WorkerQueue::new(log.clone());
1077 let receipt = queue
1078 .enqueue(&test_job(
1079 "triage",
1080 "incoming-review-task",
1081 "evt-1",
1082 WorkerQueuePriority::Normal,
1083 ))
1084 .await
1085 .unwrap();
1086 let expired_claim = WorkerQueueClaimRecord {
1087 job_event_id: receipt.job_event_id,
1088 claim_id: "expired-claim".to_string(),
1089 consumer_id: "consumer-a".to_string(),
1090 claimed_at_ms: now_ms().saturating_sub(2),
1091 expires_at_ms: now_ms().saturating_sub(1),
1092 };
1093 log.append(
1094 &claims_topic("triage").unwrap(),
1095 LogEvent::new("job_claimed", serde_json::to_value(&expired_claim).unwrap()),
1096 )
1097 .await
1098 .unwrap();
1099 let second = queue
1100 .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
1101 .await
1102 .unwrap()
1103 .unwrap();
1104 assert_eq!(second.job.event.id.0, "evt-1");
1105 assert_ne!(second.handle.claim_id, expired_claim.claim_id);
1106 assert_eq!(second.handle.consumer_id, "consumer-b");
1107 }
1108
1109 #[tokio::test(flavor = "current_thread")]
1110 async fn high_priority_and_aged_normal_are_selected_first() {
1111 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1112 let queue = WorkerQueue::new(log.clone());
1113
1114 let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC).unwrap();
1115 log.append(
1116 &catalog_topic,
1117 LogEvent::new("queue_seen", serde_json::json!({"queue":"triage"})),
1118 )
1119 .await
1120 .unwrap();
1121
1122 let topic = job_topic("triage").unwrap();
1123 let mut old_normal = LogEvent::new(
1124 "trigger_dispatch",
1125 serde_json::to_value(test_job(
1126 "triage",
1127 "incoming-review-task",
1128 "evt-old-normal",
1129 WorkerQueuePriority::Normal,
1130 ))
1131 .unwrap(),
1132 );
1133 old_normal.occurred_at_ms = now_ms() - NORMAL_PROMOTION_AGE_MS - 1_000;
1134 log.append(&topic, old_normal).await.unwrap();
1135
1136 let high = LogEvent::new(
1137 "trigger_dispatch",
1138 serde_json::to_value(test_job(
1139 "triage",
1140 "incoming-review-task",
1141 "evt-high",
1142 WorkerQueuePriority::High,
1143 ))
1144 .unwrap(),
1145 );
1146 log.append(&topic, high).await.unwrap();
1147
1148 let claimed = queue
1149 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1150 .await
1151 .unwrap()
1152 .unwrap();
1153 assert_eq!(claimed.job.event.id.0, "evt-old-normal");
1154 }
1155
1156 fn tenant_event(id: &str, tenant: &str) -> TriggerEvent {
1157 let mut event = test_event(id);
1158 event.tenant_id = Some(crate::triggers::TenantId::new(tenant));
1159 event
1160 }
1161
1162 fn tenant_job(
1163 queue: &str,
1164 trigger_id: &str,
1165 event_id: &str,
1166 tenant: &str,
1167 priority: WorkerQueuePriority,
1168 ) -> WorkerQueueJob {
1169 WorkerQueueJob {
1170 queue: queue.to_string(),
1171 trigger_id: trigger_id.to_string(),
1172 binding_key: format!("{trigger_id}@v1"),
1173 binding_version: 1,
1174 event: tenant_event(event_id, tenant),
1175 replay_of_event_id: None,
1176 priority,
1177 }
1178 }
1179
1180 async fn ack_and_respond(queue: &WorkerQueue, queue_name: &str, claim: &ClaimedWorkerJob) {
1181 queue
1182 .append_response(
1183 queue_name,
1184 &WorkerQueueResponseRecord {
1185 queue: queue_name.to_string(),
1186 job_event_id: claim.handle.job_event_id,
1187 consumer_id: claim.handle.consumer_id.clone(),
1188 handled_at_ms: now_ms(),
1189 outcome: Some(DispatchOutcome {
1190 trigger_id: claim.job.trigger_id.clone(),
1191 binding_key: claim.job.binding_key.clone(),
1192 event_id: claim.job.event.id.0.clone(),
1193 attempt_count: 1,
1194 status: super::super::DispatchStatus::Succeeded,
1195 handler_kind: "local".to_string(),
1196 target_uri: "test::handler".to_string(),
1197 replay_of_event_id: None,
1198 result: None,
1199 error: None,
1200 }),
1201 error: None,
1202 },
1203 )
1204 .await
1205 .unwrap();
1206 queue.ack_claim(&claim.handle).await.unwrap();
1207 }
1208
1209 #[tokio::test(flavor = "current_thread")]
1210 async fn drr_policy_rotates_across_tenants_through_claim_next() {
1211 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(256)));
1212 let queue = WorkerQueue::with_policy(
1213 log,
1214 SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant),
1215 );
1216
1217 for idx in 0..8 {
1219 queue
1220 .enqueue(&tenant_job(
1221 "triage",
1222 "trigger",
1223 &format!("a-{idx}"),
1224 "tenant-a",
1225 WorkerQueuePriority::Normal,
1226 ))
1227 .await
1228 .unwrap();
1229 }
1230 queue
1231 .enqueue(&tenant_job(
1232 "triage",
1233 "trigger",
1234 "b-1",
1235 "tenant-b",
1236 WorkerQueuePriority::Normal,
1237 ))
1238 .await
1239 .unwrap();
1240
1241 let mut tenants_seen = Vec::new();
1245 for n in 0..4 {
1246 let consumer = format!("c-{n}");
1247 let claim = queue
1248 .claim_next("triage", &consumer, StdDuration::from_secs(60))
1249 .await
1250 .unwrap()
1251 .expect("queue should still have ready jobs");
1252 tenants_seen.push(
1253 claim
1254 .job
1255 .event
1256 .tenant_id
1257 .as_ref()
1258 .map(|t| t.0.clone())
1259 .unwrap_or_default(),
1260 );
1261 ack_and_respond(&queue, "triage", &claim).await;
1262 }
1263
1264 let saw_b = tenants_seen.iter().any(|t| t == "tenant-b");
1265 assert!(
1266 saw_b,
1267 "tenant-b should have been served within the first 4 claims, got {tenants_seen:?}",
1268 );
1269 }
1270
1271 #[tokio::test(flavor = "current_thread")]
1272 async fn fifo_policy_preserves_legacy_behavior() {
1273 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
1274 let queue = WorkerQueue::with_policy(log, SchedulerPolicy::fifo());
1275
1276 for idx in 0..4 {
1278 queue
1279 .enqueue(&tenant_job(
1280 "triage",
1281 "trigger",
1282 &format!("a-{idx}"),
1283 "tenant-a",
1284 WorkerQueuePriority::Normal,
1285 ))
1286 .await
1287 .unwrap();
1288 }
1289 queue
1290 .enqueue(&tenant_job(
1291 "triage",
1292 "trigger",
1293 "b-1",
1294 "tenant-b",
1295 WorkerQueuePriority::Normal,
1296 ))
1297 .await
1298 .unwrap();
1299
1300 let first = queue
1302 .claim_next("triage", "c-0", StdDuration::from_secs(60))
1303 .await
1304 .unwrap()
1305 .unwrap();
1306 assert_eq!(first.job.event.tenant_id.unwrap().0, "tenant-a");
1307 }
1308
1309 #[tokio::test(flavor = "current_thread")]
1310 async fn inspect_queue_reports_per_tenant_fairness_state() {
1311 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
1312 let queue = WorkerQueue::with_policy(
1313 log,
1314 SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant)
1315 .with_weight("tenant-a", 2)
1316 .with_weight("tenant-b", 1),
1317 );
1318
1319 for idx in 0..3 {
1320 queue
1321 .enqueue(&tenant_job(
1322 "triage",
1323 "trigger",
1324 &format!("a-{idx}"),
1325 "tenant-a",
1326 WorkerQueuePriority::Normal,
1327 ))
1328 .await
1329 .unwrap();
1330 }
1331 queue
1332 .enqueue(&tenant_job(
1333 "triage",
1334 "trigger",
1335 "b-1",
1336 "tenant-b",
1337 WorkerQueuePriority::Normal,
1338 ))
1339 .await
1340 .unwrap();
1341
1342 for n in 0..2 {
1343 let consumer = format!("c-{n}");
1344 let claim = queue
1345 .claim_next("triage", &consumer, StdDuration::from_secs(60))
1346 .await
1347 .unwrap()
1348 .unwrap();
1349 ack_and_respond(&queue, "triage", &claim).await;
1350 }
1351
1352 let snap = queue.inspect_queue("triage").await.unwrap();
1353 assert_eq!(snap.scheduler.strategy, "drr");
1354 assert_eq!(snap.scheduler.fairness_key, "tenant");
1355 assert!(snap
1356 .scheduler
1357 .keys
1358 .iter()
1359 .any(|k| k.fairness_key == "tenant-a"));
1360 let weights: BTreeMap<String, u32> = snap
1361 .scheduler
1362 .keys
1363 .iter()
1364 .map(|k| (k.fairness_key.clone(), k.weight))
1365 .collect();
1366 assert_eq!(weights.get("tenant-a").copied(), Some(2));
1367 assert_eq!(weights.get("tenant-b").copied(), Some(1));
1368 }
1369
1370 #[tokio::test(flavor = "current_thread")]
1371 async fn drr_with_max_concurrent_per_key_throttles_hot_tenant() {
1372 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(128)));
1373 let queue = WorkerQueue::with_policy(
1374 log,
1375 SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant)
1376 .with_max_concurrent_per_key(1),
1377 );
1378
1379 for idx in 0..4 {
1380 queue
1381 .enqueue(&tenant_job(
1382 "triage",
1383 "trigger",
1384 &format!("a-{idx}"),
1385 "tenant-a",
1386 WorkerQueuePriority::Normal,
1387 ))
1388 .await
1389 .unwrap();
1390 }
1391 queue
1392 .enqueue(&tenant_job(
1393 "triage",
1394 "trigger",
1395 "b-1",
1396 "tenant-b",
1397 WorkerQueuePriority::Normal,
1398 ))
1399 .await
1400 .unwrap();
1401
1402 let first = queue
1403 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1404 .await
1405 .unwrap()
1406 .unwrap();
1407 let second = queue
1410 .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
1411 .await
1412 .unwrap()
1413 .unwrap();
1414 let pair = [
1415 first.job.event.tenant_id.clone().unwrap().0,
1416 second.job.event.tenant_id.clone().unwrap().0,
1417 ];
1418 assert!(
1419 pair.contains(&"tenant-a".to_string()) && pair.contains(&"tenant-b".to_string()),
1420 "max_concurrent_per_key=1 must release tenant-b within two claims, got {pair:?}",
1421 );
1422 }
1423
1424 #[test]
1425 fn from_env_parses_drr_policy_from_lookup() {
1426 let lookup = |name: &str| -> Option<String> {
1427 match name {
1428 "HARN_SCHEDULER_STRATEGY" => Some("drr".to_string()),
1429 "HARN_SCHEDULER_FAIRNESS_KEY" => Some("tenant-and-binding".to_string()),
1430 "HARN_SCHEDULER_QUANTUM" => Some("3".to_string()),
1431 "HARN_SCHEDULER_STARVATION_AGE_MS" => Some("750".to_string()),
1432 "HARN_SCHEDULER_MAX_CONCURRENT_PER_KEY" => Some("4".to_string()),
1433 "HARN_SCHEDULER_DEFAULT_WEIGHT" => Some("2".to_string()),
1434 "HARN_SCHEDULER_WEIGHTS" => Some("tenant-a:5,tenant-b:1, : ,bad:abc".to_string()),
1435 _ => None,
1436 }
1437 };
1438 let policy = SchedulerPolicy::from_env_lookup(lookup);
1439 match policy.strategy {
1440 SchedulerStrategy::DeficitRoundRobin {
1441 quantum,
1442 starvation_age_ms,
1443 } => {
1444 assert_eq!(quantum, 3);
1445 assert_eq!(starvation_age_ms, Some(750));
1446 }
1447 other => panic!("expected DRR strategy, got {other:?}"),
1448 }
1449 assert_eq!(
1450 policy.fairness_key,
1451 scheduler::FairnessKey::TenantAndBinding
1452 );
1453 assert_eq!(policy.max_concurrent_per_key, 4);
1454 assert_eq!(policy.default_weight, 2);
1455 assert_eq!(policy.weight_for("tenant-a"), 5);
1456 assert_eq!(policy.weight_for("tenant-b"), 1);
1457 assert_eq!(policy.weight_for("tenant-c"), 2);
1459 }
1460
1461 #[test]
1462 fn from_env_defaults_to_fifo_when_missing() {
1463 let policy = SchedulerPolicy::from_env_lookup(|_| None);
1464 assert!(matches!(policy.strategy, SchedulerStrategy::Fifo));
1465 }
1466}