Skip to main content

harn_vm/triggers/
worker_queue.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use crate::event_log::{
9    sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12use super::scheduler::{self, SchedulableJob, SchedulerPolicy, SchedulerSnapshot, SchedulerState};
13use super::{DispatchOutcome, TriggerEvent};
14
15pub const WORKER_QUEUE_CATALOG_TOPIC: &str = "worker.queues";
16const WORKER_QUEUE_CLAIMS_SUFFIX: &str = ".claims";
17const WORKER_QUEUE_RESPONSES_SUFFIX: &str = ".responses";
18const NORMAL_PROMOTION_AGE_MS: i64 = 15 * 60 * 1000;
19
20#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum WorkerQueuePriority {
23    High,
24    #[default]
25    Normal,
26    Low,
27}
28
29impl WorkerQueuePriority {
30    pub fn as_str(self) -> &'static str {
31        match self {
32            Self::High => "high",
33            Self::Normal => "normal",
34            Self::Low => "low",
35        }
36    }
37
38    pub fn effective_rank(self, enqueued_at_ms: i64, now_ms: i64) -> u8 {
39        match self {
40            Self::High => 0,
41            Self::Normal if now_ms.saturating_sub(enqueued_at_ms) >= NORMAL_PROMOTION_AGE_MS => 0,
42            Self::Normal => 1,
43            Self::Low => 2,
44        }
45    }
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49pub struct WorkerQueueJob {
50    pub queue: String,
51    pub trigger_id: String,
52    pub binding_key: String,
53    pub binding_version: u32,
54    pub event: TriggerEvent,
55    #[serde(default)]
56    pub replay_of_event_id: Option<String>,
57    #[serde(default)]
58    pub priority: WorkerQueuePriority,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
62pub struct WorkerQueueEnqueueReceipt {
63    pub queue: String,
64    pub job_event_id: u64,
65    pub response_topic: String,
66}
67
68#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69pub struct WorkerQueueClaimHandle {
70    pub queue: String,
71    pub job_event_id: u64,
72    pub claim_id: String,
73    pub consumer_id: String,
74    pub expires_at_ms: i64,
75}
76
77#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
78pub struct ClaimedWorkerJob {
79    pub handle: WorkerQueueClaimHandle,
80    pub job: WorkerQueueJob,
81}
82
83#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
84pub struct WorkerQueueResponseRecord {
85    pub queue: String,
86    pub job_event_id: u64,
87    pub consumer_id: String,
88    pub handled_at_ms: i64,
89    pub outcome: Option<DispatchOutcome>,
90    pub error: Option<String>,
91}
92
93#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct WorkerQueueSummary {
95    pub queue: String,
96    pub ready: usize,
97    pub in_flight: usize,
98    pub acked: usize,
99    pub purged: usize,
100    pub responses: usize,
101    pub oldest_unclaimed_age_ms: Option<u64>,
102}
103
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105pub struct WorkerQueueJobState {
106    pub job_event_id: u64,
107    pub enqueued_at_ms: i64,
108    pub job: WorkerQueueJob,
109    pub active_claim: Option<WorkerQueueClaimHandle>,
110    pub acked: bool,
111    pub purged: bool,
112}
113
114impl WorkerQueueJobState {
115    pub fn is_ready(&self) -> bool {
116        !self.acked && !self.purged && self.active_claim.is_none()
117    }
118}
119
120#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
121pub struct WorkerQueueState {
122    pub queue: String,
123    pub responses: Vec<WorkerQueueResponseRecord>,
124    pub jobs: Vec<WorkerQueueJobState>,
125}
126
127impl WorkerQueueState {
128    pub fn summary(&self, now_ms: i64) -> WorkerQueueSummary {
129        let ready = self.jobs.iter().filter(|job| job.is_ready()).count();
130        let in_flight = self
131            .jobs
132            .iter()
133            .filter(|job| !job.acked && !job.purged && job.active_claim.is_some())
134            .count();
135        let acked = self.jobs.iter().filter(|job| job.acked).count();
136        let purged = self.jobs.iter().filter(|job| job.purged).count();
137        let oldest_unclaimed_age_ms = self
138            .jobs
139            .iter()
140            .filter(|job| job.is_ready())
141            .map(|job| now_ms.saturating_sub(job.enqueued_at_ms).max(0) as u64)
142            .max();
143        WorkerQueueSummary {
144            queue: self.queue.clone(),
145            ready,
146            in_flight,
147            acked,
148            purged,
149            responses: self.responses.len(),
150            oldest_unclaimed_age_ms,
151        }
152    }
153
154    /// Select the next ready job by consulting `scheduler` under `policy`.
155    ///
156    /// Under `Fifo` this is equivalent to picking the job with the lowest
157    /// `(priority_rank, enqueued_at_ms, job_event_id)` — the historical
158    /// behaviour. Under `DeficitRoundRobin`, candidates are grouped by the
159    /// configured fairness key and the scheduler rotates so a hot
160    /// tenant/binding cannot monopolise the queue.
161    fn next_ready_job_with_scheduler(
162        &self,
163        scheduler_state: &mut SchedulerState,
164        policy: &SchedulerPolicy,
165        now_ms: i64,
166    ) -> Option<&WorkerQueueJobState> {
167        let candidates: Vec<&WorkerQueueJobState> =
168            self.jobs.iter().filter(|job| job.is_ready()).collect();
169        if candidates.is_empty() {
170            return None;
171        }
172        let views: Vec<SchedulableJob<'_>> = candidates
173            .iter()
174            .map(|state| SchedulableJob::from_state(state))
175            .collect();
176
177        // Refresh authoritative in-flight count from the rebuilt queue state.
178        let in_flight = scheduler::in_flight_by_key(&self.jobs, policy);
179        scheduler_state.replace_in_flight(in_flight);
180
181        let pick = scheduler_state.select(&views, policy, now_ms)?;
182        candidates
183            .into_iter()
184            .find(|job| job.job_event_id == pick.job_event_id)
185    }
186
187    fn active_claim_for(&self, job_event_id: u64) -> Option<&WorkerQueueClaimHandle> {
188        self.jobs
189            .iter()
190            .find(|job| job.job_event_id == job_event_id)
191            .and_then(|job| job.active_claim.as_ref())
192    }
193}
194
195#[derive(Clone)]
196pub struct WorkerQueue {
197    event_log: Arc<AnyEventLog>,
198    /// Active scheduler policy. Reads on every claim so it can be hot-swapped
199    /// at runtime without rebuilding the queue.
200    policy: Arc<RwLock<SchedulerPolicy>>,
201    /// Per-queue ephemeral scheduler state. Keyed by queue name; entries are
202    /// created lazily on first claim. Self-correcting — safe to lose on
203    /// process restart.
204    scheduler_states: Arc<Mutex<BTreeMap<String, SchedulerState>>>,
205}
206
207#[derive(Clone, Debug, Serialize)]
208pub struct WorkerQueueInspectSnapshot {
209    pub summary: WorkerQueueSummary,
210    pub scheduler: SchedulerSnapshot,
211}
212
213impl WorkerQueue {
214    /// Construct a `WorkerQueue` using the policy derived from the
215    /// `HARN_SCHEDULER_*` environment variables (see
216    /// [`SchedulerPolicy::from_env`]). Defaults to FIFO so single-tenant
217    /// deployments behave exactly as before unless they opt in.
218    pub fn new(event_log: Arc<AnyEventLog>) -> Self {
219        Self::with_policy(event_log, SchedulerPolicy::from_env())
220    }
221
222    pub fn with_policy(event_log: Arc<AnyEventLog>, policy: SchedulerPolicy) -> Self {
223        Self {
224            event_log,
225            policy: Arc::new(RwLock::new(policy)),
226            scheduler_states: Arc::new(Mutex::new(BTreeMap::new())),
227        }
228    }
229
230    /// Replace the active scheduler policy. Existing per-queue state is
231    /// preserved (deficits self-correct against the new weights).
232    pub fn set_policy(&self, policy: SchedulerPolicy) {
233        *self.policy.write().expect("scheduler policy poisoned") = policy;
234    }
235
236    pub fn policy(&self) -> SchedulerPolicy {
237        self.policy
238            .read()
239            .expect("scheduler policy poisoned")
240            .clone()
241    }
242
243    pub async fn enqueue(
244        &self,
245        job: &WorkerQueueJob,
246    ) -> Result<WorkerQueueEnqueueReceipt, LogError> {
247        let queue = job.queue.trim();
248        if queue.is_empty() {
249            return Err(LogError::Config(
250                "worker queue name cannot be empty".to_string(),
251            ));
252        }
253        let queue_name = queue.to_string();
254        let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
255            .expect("static worker queue catalog topic should always be valid");
256        self.event_log
257            .append(
258                &catalog_topic,
259                LogEvent::new(
260                    "queue_seen",
261                    serde_json::to_value(WorkerQueueCatalogRecord {
262                        queue: queue_name.clone(),
263                    })
264                    .map_err(|error| LogError::Serde(error.to_string()))?,
265                ),
266            )
267            .await?;
268
269        let job_topic = job_topic(&queue_name)?;
270        let mut headers = BTreeMap::new();
271        headers.insert("queue".to_string(), queue_name.clone());
272        headers.insert("trigger_id".to_string(), job.trigger_id.clone());
273        headers.insert("binding_key".to_string(), job.binding_key.clone());
274        headers.insert("event_id".to_string(), job.event.id.0.clone());
275        headers.insert("priority".to_string(), job.priority.as_str().to_string());
276        let job_event_id = self
277            .event_log
278            .append(
279                &job_topic,
280                LogEvent::new(
281                    "trigger_dispatch",
282                    serde_json::to_value(job)
283                        .map_err(|error| LogError::Serde(error.to_string()))?,
284                )
285                .with_headers(headers),
286            )
287            .await?;
288        if let Some(metrics) = crate::active_metrics_registry() {
289            if let Ok(state) = self.queue_state(&queue_name).await {
290                let summary = state.summary(now_ms());
291                metrics.set_worker_queue_depth(
292                    &queue_name,
293                    (summary.ready + summary.in_flight) as u64,
294                );
295            }
296        }
297        Ok(WorkerQueueEnqueueReceipt {
298            queue: queue_name.clone(),
299            job_event_id,
300            response_topic: response_topic_name(&queue_name),
301        })
302    }
303
304    pub async fn known_queues(&self) -> Result<Vec<String>, LogError> {
305        let topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
306            .expect("static worker queue catalog topic should always be valid");
307        let events = self.event_log.read_range(&topic, None, usize::MAX).await?;
308        let mut queues = BTreeSet::new();
309        for (_, event) in events {
310            if event.kind != "queue_seen" {
311                continue;
312            }
313            let record: WorkerQueueCatalogRecord = serde_json::from_value(event.payload)
314                .map_err(|error| LogError::Serde(error.to_string()))?;
315            if !record.queue.trim().is_empty() {
316                queues.insert(record.queue);
317            }
318        }
319        Ok(queues.into_iter().collect())
320    }
321
322    pub async fn queue_state(&self, queue: &str) -> Result<WorkerQueueState, LogError> {
323        let queue_name = queue.trim();
324        if queue_name.is_empty() {
325            return Err(LogError::Config(
326                "worker queue name cannot be empty".to_string(),
327            ));
328        }
329        let now_ms = now_ms();
330        let job_events = self
331            .event_log
332            .read_range(&job_topic(queue_name)?, None, usize::MAX)
333            .await?;
334        let claim_events = self
335            .event_log
336            .read_range(&claims_topic(queue_name)?, None, usize::MAX)
337            .await?;
338        let response_events = self
339            .event_log
340            .read_range(&responses_topic(queue_name)?, None, usize::MAX)
341            .await?;
342
343        let mut jobs = BTreeMap::<u64, WorkerQueueJobStateInternal>::new();
344        for (job_event_id, event) in job_events {
345            if event.kind != "trigger_dispatch" {
346                continue;
347            }
348            let job: WorkerQueueJob = serde_json::from_value(event.payload)
349                .map_err(|error| LogError::Serde(error.to_string()))?;
350            jobs.insert(
351                job_event_id,
352                WorkerQueueJobStateInternal {
353                    job_event_id,
354                    enqueued_at_ms: event.occurred_at_ms,
355                    job,
356                    active_claim: None,
357                    acked: false,
358                    purged: false,
359                    seen_claim_ids: BTreeSet::new(),
360                },
361            );
362        }
363
364        for (_, event) in claim_events {
365            match event.kind.as_str() {
366                "job_claimed" => {
367                    let claim: WorkerQueueClaimRecord = serde_json::from_value(event.payload)
368                        .map_err(|error| LogError::Serde(error.to_string()))?;
369                    let Some(job) = jobs.get_mut(&claim.job_event_id) else {
370                        continue;
371                    };
372                    if job.acked || job.purged {
373                        continue;
374                    }
375                    job.seen_claim_ids.insert(claim.claim_id.clone());
376                    let can_take = job
377                        .active_claim
378                        .as_ref()
379                        .is_none_or(|active| active.expires_at_ms <= claim.claimed_at_ms);
380                    if can_take {
381                        job.active_claim = Some(WorkerQueueClaimHandle {
382                            queue: queue_name.to_string(),
383                            job_event_id: claim.job_event_id,
384                            claim_id: claim.claim_id,
385                            consumer_id: claim.consumer_id,
386                            expires_at_ms: claim.expires_at_ms,
387                        });
388                    }
389                }
390                "claim_renewed" => {
391                    let renewal: WorkerQueueClaimRenewalRecord =
392                        serde_json::from_value(event.payload)
393                            .map_err(|error| LogError::Serde(error.to_string()))?;
394                    let Some(job) = jobs.get_mut(&renewal.job_event_id) else {
395                        continue;
396                    };
397                    if let Some(active) = job.active_claim.as_mut() {
398                        if active.claim_id == renewal.claim_id {
399                            active.expires_at_ms = renewal.expires_at_ms;
400                        }
401                    }
402                }
403                "job_released" => {
404                    let release: WorkerQueueReleaseRecord =
405                        serde_json::from_value(event.payload)
406                            .map_err(|error| LogError::Serde(error.to_string()))?;
407                    let Some(job) = jobs.get_mut(&release.job_event_id) else {
408                        continue;
409                    };
410                    if job
411                        .active_claim
412                        .as_ref()
413                        .is_some_and(|active| active.claim_id == release.claim_id)
414                    {
415                        job.active_claim = None;
416                    }
417                }
418                "job_acked" => {
419                    let ack: WorkerQueueAckRecord = serde_json::from_value(event.payload)
420                        .map_err(|error| LogError::Serde(error.to_string()))?;
421                    let Some(job) = jobs.get_mut(&ack.job_event_id) else {
422                        continue;
423                    };
424                    if ack.claim_id.is_empty() || job.seen_claim_ids.contains(&ack.claim_id) {
425                        job.acked = true;
426                        job.active_claim = None;
427                    }
428                }
429                "job_purged" => {
430                    let purge: WorkerQueuePurgeRecord = serde_json::from_value(event.payload)
431                        .map_err(|error| LogError::Serde(error.to_string()))?;
432                    let Some(job) = jobs.get_mut(&purge.job_event_id) else {
433                        continue;
434                    };
435                    if !job.acked {
436                        job.purged = true;
437                        job.active_claim = None;
438                    }
439                }
440                _ => {}
441            }
442        }
443
444        let responses = response_events
445            .into_iter()
446            .filter(|(_, event)| event.kind == "job_response")
447            .map(|(_, event)| {
448                serde_json::from_value::<WorkerQueueResponseRecord>(event.payload)
449                    .map_err(|error| LogError::Serde(error.to_string()))
450            })
451            .collect::<Result<Vec<_>, _>>()?;
452
453        let mut queue_state = WorkerQueueState {
454            queue: queue_name.to_string(),
455            responses,
456            jobs: jobs
457                .into_values()
458                .map(|mut job| {
459                    if job
460                        .active_claim
461                        .as_ref()
462                        .is_some_and(|active| active.expires_at_ms <= now_ms)
463                    {
464                        job.active_claim = None;
465                    }
466                    WorkerQueueJobState {
467                        job_event_id: job.job_event_id,
468                        enqueued_at_ms: job.enqueued_at_ms,
469                        job: job.job,
470                        active_claim: job.active_claim,
471                        acked: job.acked,
472                        purged: job.purged,
473                    }
474                })
475                .collect(),
476        };
477        queue_state
478            .jobs
479            .sort_by_key(|job| (job.enqueued_at_ms, job.job_event_id));
480        Ok(queue_state)
481    }
482
483    pub async fn queue_summaries(&self) -> Result<Vec<WorkerQueueSummary>, LogError> {
484        let now_ms = now_ms();
485        let mut summaries = Vec::new();
486        for queue in self.known_queues().await? {
487            let state = self.queue_state(&queue).await?;
488            summaries.push(state.summary(now_ms));
489        }
490        summaries.sort_by(|left, right| left.queue.cmp(&right.queue));
491        Ok(summaries)
492    }
493
494    pub async fn claim_next(
495        &self,
496        queue: &str,
497        consumer_id: &str,
498        ttl: StdDuration,
499    ) -> Result<Option<ClaimedWorkerJob>, LogError> {
500        let queue_name = queue.trim();
501        if queue_name.is_empty() {
502            return Err(LogError::Config(
503                "worker queue name cannot be empty".to_string(),
504            ));
505        }
506        if consumer_id.trim().is_empty() {
507            return Err(LogError::InvalidConsumer(
508                "worker queue consumer id cannot be empty".to_string(),
509            ));
510        }
511        let policy = self.policy();
512        for _ in 0..8 {
513            let now_ms = now_ms();
514            let state = self.queue_state(queue_name).await?;
515            let (job, fairness_key) = {
516                let mut states = self
517                    .scheduler_states
518                    .lock()
519                    .expect("scheduler state poisoned");
520                let scheduler_state = states.entry(queue_name.to_string()).or_default();
521                let Some(job) =
522                    state.next_ready_job_with_scheduler(scheduler_state, &policy, now_ms)
523                else {
524                    return Ok(None);
525                };
526                let job = job.clone();
527                let fairness_key = policy.fairness_key_of(&SchedulableJob::from_state(&job));
528                (job, fairness_key)
529            };
530            let claim = WorkerQueueClaimRecord {
531                job_event_id: job.job_event_id,
532                claim_id: Uuid::new_v4().to_string(),
533                consumer_id: consumer_id.to_string(),
534                claimed_at_ms: now_ms,
535                expires_at_ms: expiry_ms(now_ms, ttl),
536            };
537            self.event_log
538                .append(
539                    &claims_topic(queue_name)?,
540                    LogEvent::new(
541                        "job_claimed",
542                        serde_json::to_value(&claim)
543                            .map_err(|error| LogError::Serde(error.to_string()))?,
544                    ),
545                )
546                .await?;
547            let refreshed = self.queue_state(queue_name).await?;
548            if refreshed
549                .active_claim_for(job.job_event_id)
550                .is_some_and(|active| active.claim_id == claim.claim_id)
551            {
552                {
553                    let mut states = self
554                        .scheduler_states
555                        .lock()
556                        .expect("scheduler state poisoned");
557                    let scheduler_state = states.entry(queue_name.to_string()).or_default();
558                    scheduler_state.note_claim_committed(&fairness_key);
559                }
560                if let Some(metrics) = crate::active_metrics_registry() {
561                    let summary = refreshed.summary(now_ms);
562                    metrics.record_worker_queue_claim_age(
563                        queue_name,
564                        now_ms.saturating_sub(job.enqueued_at_ms) as f64 / 1000.0,
565                    );
566                    metrics.set_worker_queue_depth(
567                        queue_name,
568                        (summary.ready + summary.in_flight) as u64,
569                    );
570                    metrics.record_scheduler_selection(
571                        queue_name,
572                        policy.fairness_key.as_str(),
573                        &fairness_key,
574                    );
575                    if let Ok(snap) = self.inspect_queue(queue_name).await {
576                        for stat in &snap.scheduler.keys {
577                            metrics.set_scheduler_deficit(
578                                queue_name,
579                                policy.fairness_key.as_str(),
580                                &stat.fairness_key,
581                                stat.deficit,
582                            );
583                            metrics.set_scheduler_oldest_eligible_age(
584                                queue_name,
585                                policy.fairness_key.as_str(),
586                                &stat.fairness_key,
587                                stat.oldest_ready_age_ms,
588                            );
589                        }
590                    }
591                }
592                return Ok(Some(ClaimedWorkerJob {
593                    handle: WorkerQueueClaimHandle {
594                        queue: queue_name.to_string(),
595                        job_event_id: claim.job_event_id,
596                        claim_id: claim.claim_id,
597                        consumer_id: claim.consumer_id,
598                        expires_at_ms: claim.expires_at_ms,
599                    },
600                    job: job.job,
601                }));
602            }
603        }
604        Ok(None)
605    }
606
607    /// Build a fairness-aware inspect snapshot for `queue` that includes
608    /// scheduler state alongside the standard summary.
609    pub async fn inspect_queue(&self, queue: &str) -> Result<WorkerQueueInspectSnapshot, LogError> {
610        let queue_name = queue.trim();
611        if queue_name.is_empty() {
612            return Err(LogError::Config(
613                "worker queue name cannot be empty".to_string(),
614            ));
615        }
616        let now_ms = now_ms();
617        let state = self.queue_state(queue_name).await?;
618        let summary = state.summary(now_ms);
619        let policy = self.policy();
620        let ready = scheduler::ready_stats_by_key(&state.jobs, &policy, now_ms);
621        // Make sure in-flight stays authoritative against the rebuilt log.
622        let in_flight = scheduler::in_flight_by_key(&state.jobs, &policy);
623        let scheduler_snapshot = {
624            let mut states = self
625                .scheduler_states
626                .lock()
627                .expect("scheduler state poisoned");
628            let scheduler_state = states.entry(queue_name.to_string()).or_default();
629            scheduler_state.replace_in_flight(in_flight);
630            scheduler_state.snapshot(&policy, &ready)
631        };
632        Ok(WorkerQueueInspectSnapshot {
633            summary,
634            scheduler: scheduler_snapshot,
635        })
636    }
637
638    /// Inspect snapshots for every known queue.
639    pub async fn inspect_all_queues(&self) -> Result<Vec<WorkerQueueInspectSnapshot>, LogError> {
640        let mut snapshots = Vec::new();
641        for queue in self.known_queues().await? {
642            snapshots.push(self.inspect_queue(&queue).await?);
643        }
644        snapshots.sort_by(|left, right| left.summary.queue.cmp(&right.summary.queue));
645        Ok(snapshots)
646    }
647
648    pub async fn renew_claim(
649        &self,
650        handle: &WorkerQueueClaimHandle,
651        ttl: StdDuration,
652    ) -> Result<bool, LogError> {
653        let now_ms = now_ms();
654        let renewal = WorkerQueueClaimRenewalRecord {
655            job_event_id: handle.job_event_id,
656            claim_id: handle.claim_id.clone(),
657            consumer_id: handle.consumer_id.clone(),
658            renewed_at_ms: now_ms,
659            expires_at_ms: expiry_ms(now_ms, ttl),
660        };
661        self.event_log
662            .append(
663                &claims_topic(&handle.queue)?,
664                LogEvent::new(
665                    "claim_renewed",
666                    serde_json::to_value(&renewal)
667                        .map_err(|error| LogError::Serde(error.to_string()))?,
668                ),
669            )
670            .await?;
671        let refreshed = self.queue_state(&handle.queue).await?;
672        Ok(refreshed
673            .active_claim_for(handle.job_event_id)
674            .is_some_and(|active| active.claim_id == handle.claim_id))
675    }
676
677    pub async fn release_claim(
678        &self,
679        handle: &WorkerQueueClaimHandle,
680        reason: &str,
681    ) -> Result<(), LogError> {
682        let release = WorkerQueueReleaseRecord {
683            job_event_id: handle.job_event_id,
684            claim_id: handle.claim_id.clone(),
685            consumer_id: handle.consumer_id.clone(),
686            released_at_ms: now_ms(),
687            reason: if reason.trim().is_empty() {
688                None
689            } else {
690                Some(reason.to_string())
691            },
692        };
693        self.event_log
694            .append(
695                &claims_topic(&handle.queue)?,
696                LogEvent::new(
697                    "job_released",
698                    serde_json::to_value(&release)
699                        .map_err(|error| LogError::Serde(error.to_string()))?,
700                ),
701            )
702            .await?;
703        Ok(())
704    }
705
706    pub async fn append_response(
707        &self,
708        queue: &str,
709        response: &WorkerQueueResponseRecord,
710    ) -> Result<u64, LogError> {
711        self.event_log
712            .append(
713                &responses_topic(queue)?,
714                LogEvent::new(
715                    "job_response",
716                    serde_json::to_value(response)
717                        .map_err(|error| LogError::Serde(error.to_string()))?,
718                ),
719            )
720            .await
721    }
722
723    pub async fn ack_claim(&self, handle: &WorkerQueueClaimHandle) -> Result<u64, LogError> {
724        self.event_log
725            .append(
726                &claims_topic(&handle.queue)?,
727                LogEvent::new(
728                    "job_acked",
729                    serde_json::to_value(WorkerQueueAckRecord {
730                        job_event_id: handle.job_event_id,
731                        claim_id: handle.claim_id.clone(),
732                        consumer_id: handle.consumer_id.clone(),
733                        acked_at_ms: now_ms(),
734                    })
735                    .map_err(|error| LogError::Serde(error.to_string()))?,
736                ),
737            )
738            .await
739    }
740
741    pub async fn ack_job(
742        &self,
743        queue: &str,
744        job_event_id: u64,
745        consumer_id: &str,
746    ) -> Result<bool, LogError> {
747        let queue_name = queue.trim();
748        if queue_name.is_empty() {
749            return Err(LogError::Config(
750                "worker queue name cannot be empty".to_string(),
751            ));
752        }
753        let state = self.queue_state(queue_name).await?;
754        let Some(job) = state
755            .jobs
756            .iter()
757            .find(|job| job.job_event_id == job_event_id)
758        else {
759            return Ok(false);
760        };
761        if job.acked || job.purged {
762            return Ok(false);
763        }
764        self.event_log
765            .append(
766                &claims_topic(queue_name)?,
767                LogEvent::new(
768                    "job_acked",
769                    serde_json::to_value(WorkerQueueAckRecord {
770                        job_event_id,
771                        claim_id: String::new(),
772                        consumer_id: consumer_id.to_string(),
773                        acked_at_ms: now_ms(),
774                    })
775                    .map_err(|error| LogError::Serde(error.to_string()))?,
776                ),
777            )
778            .await?;
779        Ok(true)
780    }
781
782    pub async fn purge_unclaimed(
783        &self,
784        queue: &str,
785        purged_by: &str,
786        reason: Option<&str>,
787    ) -> Result<usize, LogError> {
788        let state = self.queue_state(queue).await?;
789        let ready_jobs: Vec<_> = state
790            .jobs
791            .into_iter()
792            .filter(|job| job.is_ready())
793            .map(|job| job.job_event_id)
794            .collect();
795        for job_event_id in &ready_jobs {
796            self.event_log
797                .append(
798                    &claims_topic(queue)?,
799                    LogEvent::new(
800                        "job_purged",
801                        serde_json::to_value(WorkerQueuePurgeRecord {
802                            job_event_id: *job_event_id,
803                            purged_by: purged_by.to_string(),
804                            purged_at_ms: now_ms(),
805                            reason: reason
806                                .filter(|value| !value.trim().is_empty())
807                                .map(|value| value.to_string()),
808                        })
809                        .map_err(|error| LogError::Serde(error.to_string()))?,
810                    ),
811                )
812                .await?;
813        }
814        Ok(ready_jobs.len())
815    }
816}
817
818#[derive(Clone, Debug)]
819struct WorkerQueueJobStateInternal {
820    job_event_id: u64,
821    enqueued_at_ms: i64,
822    job: WorkerQueueJob,
823    active_claim: Option<WorkerQueueClaimHandle>,
824    acked: bool,
825    purged: bool,
826    seen_claim_ids: BTreeSet<String>,
827}
828
829#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
830struct WorkerQueueCatalogRecord {
831    queue: String,
832}
833
834#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
835struct WorkerQueueClaimRecord {
836    job_event_id: u64,
837    claim_id: String,
838    consumer_id: String,
839    claimed_at_ms: i64,
840    expires_at_ms: i64,
841}
842
843#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
844struct WorkerQueueClaimRenewalRecord {
845    job_event_id: u64,
846    claim_id: String,
847    consumer_id: String,
848    renewed_at_ms: i64,
849    expires_at_ms: i64,
850}
851
852#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
853struct WorkerQueueReleaseRecord {
854    job_event_id: u64,
855    claim_id: String,
856    consumer_id: String,
857    released_at_ms: i64,
858    #[serde(default)]
859    reason: Option<String>,
860}
861
862#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
863struct WorkerQueueAckRecord {
864    job_event_id: u64,
865    claim_id: String,
866    consumer_id: String,
867    acked_at_ms: i64,
868}
869
870#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
871struct WorkerQueuePurgeRecord {
872    job_event_id: u64,
873    purged_by: String,
874    purged_at_ms: i64,
875    #[serde(default)]
876    reason: Option<String>,
877}
878
879pub fn job_topic_name(queue: &str) -> String {
880    format!("worker.{}", sanitize_topic_component(queue))
881}
882
883pub fn claims_topic_name(queue: &str) -> String {
884    format!("{}{}", job_topic_name(queue), WORKER_QUEUE_CLAIMS_SUFFIX)
885}
886
887pub fn response_topic_name(queue: &str) -> String {
888    format!("{}{}", job_topic_name(queue), WORKER_QUEUE_RESPONSES_SUFFIX)
889}
890
891fn job_topic(queue: &str) -> Result<Topic, LogError> {
892    Topic::new(job_topic_name(queue))
893}
894
895fn claims_topic(queue: &str) -> Result<Topic, LogError> {
896    Topic::new(claims_topic_name(queue))
897}
898
899fn responses_topic(queue: &str) -> Result<Topic, LogError> {
900    Topic::new(response_topic_name(queue))
901}
902
903fn now_ms() -> i64 {
904    std::time::SystemTime::now()
905        .duration_since(std::time::UNIX_EPOCH)
906        .map(|duration| duration.as_millis() as i64)
907        .unwrap_or(0)
908}
909
910fn expiry_ms(now_ms: i64, ttl: StdDuration) -> i64 {
911    now_ms.saturating_add(ttl.as_millis().min(i64::MAX as u128) as i64)
912}
913
914#[cfg(test)]
915mod tests {
916    use super::*;
917
918    use crate::event_log::{AnyEventLog, MemoryEventLog};
919    use crate::triggers::{
920        event::{GenericWebhookPayload, KnownProviderPayload},
921        scheduler::{self, SchedulerStrategy},
922        ProviderId, ProviderPayload, SignatureStatus, TraceId, TriggerEvent,
923    };
924
925    fn test_event(id: &str) -> TriggerEvent {
926        TriggerEvent {
927            id: crate::triggers::TriggerEventId(id.to_string()),
928            provider: ProviderId::from("github"),
929            kind: "issues.opened".to_string(),
930            trace_id: TraceId("trace-test".to_string()),
931            dedupe_key: id.to_string(),
932            tenant_id: None,
933            headers: BTreeMap::new(),
934            batch: None,
935            raw_body: None,
936            provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
937                GenericWebhookPayload {
938                    source: Some("worker-queue-test".to_string()),
939                    content_type: Some("application/json".to_string()),
940                    raw: serde_json::json!({"id": id}),
941                },
942            )),
943            signature_status: SignatureStatus::Verified,
944            received_at: time::OffsetDateTime::now_utc(),
945            occurred_at: None,
946            dedupe_claimed: false,
947        }
948    }
949
950    fn test_job(
951        queue: &str,
952        trigger_id: &str,
953        event_id: &str,
954        priority: WorkerQueuePriority,
955    ) -> WorkerQueueJob {
956        WorkerQueueJob {
957            queue: queue.to_string(),
958            trigger_id: trigger_id.to_string(),
959            binding_key: format!("{trigger_id}@v1"),
960            binding_version: 1,
961            event: test_event(event_id),
962            replay_of_event_id: None,
963            priority,
964        }
965    }
966
967    #[tokio::test(flavor = "current_thread")]
968    async fn enqueue_and_summarize_queue() {
969        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
970        let queue = WorkerQueue::new(log);
971        queue
972            .enqueue(&test_job(
973                "triage",
974                "incoming-review-task",
975                "evt-1",
976                WorkerQueuePriority::Normal,
977            ))
978            .await
979            .unwrap();
980        let summaries = queue.queue_summaries().await.unwrap();
981        assert_eq!(summaries.len(), 1);
982        assert_eq!(summaries[0].queue, "triage");
983        assert_eq!(summaries[0].ready, 1);
984        assert_eq!(summaries[0].in_flight, 0);
985    }
986
987    #[tokio::test(flavor = "current_thread")]
988    async fn claim_and_ack_remove_job_from_ready_pool() {
989        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
990        let queue = WorkerQueue::new(log);
991        queue
992            .enqueue(&test_job(
993                "triage",
994                "incoming-review-task",
995                "evt-1",
996                WorkerQueuePriority::Normal,
997            ))
998            .await
999            .unwrap();
1000        let claimed = queue
1001            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1002            .await
1003            .unwrap()
1004            .unwrap();
1005        let before_ack = queue.queue_state("triage").await.unwrap();
1006        assert_eq!(before_ack.summary(now_ms()).ready, 0);
1007        assert_eq!(before_ack.summary(now_ms()).in_flight, 1);
1008        queue
1009            .append_response(
1010                "triage",
1011                &WorkerQueueResponseRecord {
1012                    queue: "triage".to_string(),
1013                    job_event_id: claimed.handle.job_event_id,
1014                    consumer_id: "consumer-a".to_string(),
1015                    handled_at_ms: now_ms(),
1016                    outcome: Some(DispatchOutcome {
1017                        trigger_id: "incoming-review-task".to_string(),
1018                        binding_key: "incoming-review-task@v1".to_string(),
1019                        event_id: "evt-1".to_string(),
1020                        attempt_count: 1,
1021                        status: super::super::DispatchStatus::Succeeded,
1022                        handler_kind: "local".to_string(),
1023                        target_uri: "handlers::on_review".to_string(),
1024                        replay_of_event_id: None,
1025                        result: Some(serde_json::json!({"ok": true})),
1026                        error: None,
1027                    }),
1028                    error: None,
1029                },
1030            )
1031            .await
1032            .unwrap();
1033        queue.ack_claim(&claimed.handle).await.unwrap();
1034        let after_ack = queue.queue_state("triage").await.unwrap();
1035        let summary = after_ack.summary(now_ms());
1036        assert_eq!(summary.ready, 0);
1037        assert_eq!(summary.in_flight, 0);
1038        assert_eq!(summary.acked, 1);
1039        assert_eq!(summary.responses, 1);
1040    }
1041
1042    #[tokio::test(flavor = "current_thread")]
1043    async fn ack_job_acknowledges_without_active_claim() {
1044        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1045        let queue = WorkerQueue::new(log);
1046        let receipt = queue
1047            .enqueue(&test_job(
1048                "triage",
1049                "incoming-review-task",
1050                "evt-1",
1051                WorkerQueuePriority::Normal,
1052            ))
1053            .await
1054            .unwrap();
1055
1056        assert!(queue
1057            .ack_job("triage", receipt.job_event_id, "pipeline_lifecycle")
1058            .await
1059            .unwrap());
1060        let state = queue.queue_state("triage").await.unwrap();
1061        let summary = state.summary(now_ms());
1062        assert_eq!(summary.ready, 0);
1063        assert_eq!(summary.acked, 1);
1064        assert!(
1065            !queue
1066                .ack_job("triage", receipt.job_event_id, "pipeline_lifecycle")
1067                .await
1068                .unwrap(),
1069            "already acknowledged jobs should not produce a second settlement"
1070        );
1071    }
1072
1073    #[tokio::test(flavor = "current_thread")]
1074    async fn expired_claim_allows_reclaim() {
1075        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1076        let queue = WorkerQueue::new(log.clone());
1077        let receipt = queue
1078            .enqueue(&test_job(
1079                "triage",
1080                "incoming-review-task",
1081                "evt-1",
1082                WorkerQueuePriority::Normal,
1083            ))
1084            .await
1085            .unwrap();
1086        let expired_claim = WorkerQueueClaimRecord {
1087            job_event_id: receipt.job_event_id,
1088            claim_id: "expired-claim".to_string(),
1089            consumer_id: "consumer-a".to_string(),
1090            claimed_at_ms: now_ms().saturating_sub(2),
1091            expires_at_ms: now_ms().saturating_sub(1),
1092        };
1093        log.append(
1094            &claims_topic("triage").unwrap(),
1095            LogEvent::new("job_claimed", serde_json::to_value(&expired_claim).unwrap()),
1096        )
1097        .await
1098        .unwrap();
1099        let second = queue
1100            .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
1101            .await
1102            .unwrap()
1103            .unwrap();
1104        assert_eq!(second.job.event.id.0, "evt-1");
1105        assert_ne!(second.handle.claim_id, expired_claim.claim_id);
1106        assert_eq!(second.handle.consumer_id, "consumer-b");
1107    }
1108
1109    #[tokio::test(flavor = "current_thread")]
1110    async fn high_priority_and_aged_normal_are_selected_first() {
1111        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1112        let queue = WorkerQueue::new(log.clone());
1113
1114        let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC).unwrap();
1115        log.append(
1116            &catalog_topic,
1117            LogEvent::new("queue_seen", serde_json::json!({"queue":"triage"})),
1118        )
1119        .await
1120        .unwrap();
1121
1122        let topic = job_topic("triage").unwrap();
1123        let mut old_normal = LogEvent::new(
1124            "trigger_dispatch",
1125            serde_json::to_value(test_job(
1126                "triage",
1127                "incoming-review-task",
1128                "evt-old-normal",
1129                WorkerQueuePriority::Normal,
1130            ))
1131            .unwrap(),
1132        );
1133        old_normal.occurred_at_ms = now_ms() - NORMAL_PROMOTION_AGE_MS - 1_000;
1134        log.append(&topic, old_normal).await.unwrap();
1135
1136        let high = LogEvent::new(
1137            "trigger_dispatch",
1138            serde_json::to_value(test_job(
1139                "triage",
1140                "incoming-review-task",
1141                "evt-high",
1142                WorkerQueuePriority::High,
1143            ))
1144            .unwrap(),
1145        );
1146        log.append(&topic, high).await.unwrap();
1147
1148        let claimed = queue
1149            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1150            .await
1151            .unwrap()
1152            .unwrap();
1153        assert_eq!(claimed.job.event.id.0, "evt-old-normal");
1154    }
1155
1156    fn tenant_event(id: &str, tenant: &str) -> TriggerEvent {
1157        let mut event = test_event(id);
1158        event.tenant_id = Some(crate::triggers::TenantId::new(tenant));
1159        event
1160    }
1161
1162    fn tenant_job(
1163        queue: &str,
1164        trigger_id: &str,
1165        event_id: &str,
1166        tenant: &str,
1167        priority: WorkerQueuePriority,
1168    ) -> WorkerQueueJob {
1169        WorkerQueueJob {
1170            queue: queue.to_string(),
1171            trigger_id: trigger_id.to_string(),
1172            binding_key: format!("{trigger_id}@v1"),
1173            binding_version: 1,
1174            event: tenant_event(event_id, tenant),
1175            replay_of_event_id: None,
1176            priority,
1177        }
1178    }
1179
1180    async fn ack_and_respond(queue: &WorkerQueue, queue_name: &str, claim: &ClaimedWorkerJob) {
1181        queue
1182            .append_response(
1183                queue_name,
1184                &WorkerQueueResponseRecord {
1185                    queue: queue_name.to_string(),
1186                    job_event_id: claim.handle.job_event_id,
1187                    consumer_id: claim.handle.consumer_id.clone(),
1188                    handled_at_ms: now_ms(),
1189                    outcome: Some(DispatchOutcome {
1190                        trigger_id: claim.job.trigger_id.clone(),
1191                        binding_key: claim.job.binding_key.clone(),
1192                        event_id: claim.job.event.id.0.clone(),
1193                        attempt_count: 1,
1194                        status: super::super::DispatchStatus::Succeeded,
1195                        handler_kind: "local".to_string(),
1196                        target_uri: "test::handler".to_string(),
1197                        replay_of_event_id: None,
1198                        result: None,
1199                        error: None,
1200                    }),
1201                    error: None,
1202                },
1203            )
1204            .await
1205            .unwrap();
1206        queue.ack_claim(&claim.handle).await.unwrap();
1207    }
1208
1209    #[tokio::test(flavor = "current_thread")]
1210    async fn drr_policy_rotates_across_tenants_through_claim_next() {
1211        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(256)));
1212        let queue = WorkerQueue::with_policy(
1213            log,
1214            SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant),
1215        );
1216
1217        // Tenant A enqueues 8 jobs before tenant B enqueues a single job.
1218        for idx in 0..8 {
1219            queue
1220                .enqueue(&tenant_job(
1221                    "triage",
1222                    "trigger",
1223                    &format!("a-{idx}"),
1224                    "tenant-a",
1225                    WorkerQueuePriority::Normal,
1226                ))
1227                .await
1228                .unwrap();
1229        }
1230        queue
1231            .enqueue(&tenant_job(
1232                "triage",
1233                "trigger",
1234                "b-1",
1235                "tenant-b",
1236                WorkerQueuePriority::Normal,
1237            ))
1238            .await
1239            .unwrap();
1240
1241        // Claim+ack 4 jobs back-to-back. Under FIFO, tenant B would never be
1242        // touched. Under fair-share, B must be served within the first two
1243        // claims.
1244        let mut tenants_seen = Vec::new();
1245        for n in 0..4 {
1246            let consumer = format!("c-{n}");
1247            let claim = queue
1248                .claim_next("triage", &consumer, StdDuration::from_secs(60))
1249                .await
1250                .unwrap()
1251                .expect("queue should still have ready jobs");
1252            tenants_seen.push(
1253                claim
1254                    .job
1255                    .event
1256                    .tenant_id
1257                    .as_ref()
1258                    .map(|t| t.0.clone())
1259                    .unwrap_or_default(),
1260            );
1261            ack_and_respond(&queue, "triage", &claim).await;
1262        }
1263
1264        let saw_b = tenants_seen.iter().any(|t| t == "tenant-b");
1265        assert!(
1266            saw_b,
1267            "tenant-b should have been served within the first 4 claims, got {tenants_seen:?}",
1268        );
1269    }
1270
1271    #[tokio::test(flavor = "current_thread")]
1272    async fn fifo_policy_preserves_legacy_behavior() {
1273        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
1274        let queue = WorkerQueue::with_policy(log, SchedulerPolicy::fifo());
1275
1276        // Fill queue with tenant-a jobs first, then a single tenant-b job.
1277        for idx in 0..4 {
1278            queue
1279                .enqueue(&tenant_job(
1280                    "triage",
1281                    "trigger",
1282                    &format!("a-{idx}"),
1283                    "tenant-a",
1284                    WorkerQueuePriority::Normal,
1285                ))
1286                .await
1287                .unwrap();
1288        }
1289        queue
1290            .enqueue(&tenant_job(
1291                "triage",
1292                "trigger",
1293                "b-1",
1294                "tenant-b",
1295                WorkerQueuePriority::Normal,
1296            ))
1297            .await
1298            .unwrap();
1299
1300        // FIFO must drain all of tenant-a before touching tenant-b.
1301        let first = queue
1302            .claim_next("triage", "c-0", StdDuration::from_secs(60))
1303            .await
1304            .unwrap()
1305            .unwrap();
1306        assert_eq!(first.job.event.tenant_id.unwrap().0, "tenant-a");
1307    }
1308
1309    #[tokio::test(flavor = "current_thread")]
1310    async fn inspect_queue_reports_per_tenant_fairness_state() {
1311        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
1312        let queue = WorkerQueue::with_policy(
1313            log,
1314            SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant)
1315                .with_weight("tenant-a", 2)
1316                .with_weight("tenant-b", 1),
1317        );
1318
1319        for idx in 0..3 {
1320            queue
1321                .enqueue(&tenant_job(
1322                    "triage",
1323                    "trigger",
1324                    &format!("a-{idx}"),
1325                    "tenant-a",
1326                    WorkerQueuePriority::Normal,
1327                ))
1328                .await
1329                .unwrap();
1330        }
1331        queue
1332            .enqueue(&tenant_job(
1333                "triage",
1334                "trigger",
1335                "b-1",
1336                "tenant-b",
1337                WorkerQueuePriority::Normal,
1338            ))
1339            .await
1340            .unwrap();
1341
1342        for n in 0..2 {
1343            let consumer = format!("c-{n}");
1344            let claim = queue
1345                .claim_next("triage", &consumer, StdDuration::from_secs(60))
1346                .await
1347                .unwrap()
1348                .unwrap();
1349            ack_and_respond(&queue, "triage", &claim).await;
1350        }
1351
1352        let snap = queue.inspect_queue("triage").await.unwrap();
1353        assert_eq!(snap.scheduler.strategy, "drr");
1354        assert_eq!(snap.scheduler.fairness_key, "tenant");
1355        assert!(snap
1356            .scheduler
1357            .keys
1358            .iter()
1359            .any(|k| k.fairness_key == "tenant-a"));
1360        let weights: BTreeMap<String, u32> = snap
1361            .scheduler
1362            .keys
1363            .iter()
1364            .map(|k| (k.fairness_key.clone(), k.weight))
1365            .collect();
1366        assert_eq!(weights.get("tenant-a").copied(), Some(2));
1367        assert_eq!(weights.get("tenant-b").copied(), Some(1));
1368    }
1369
1370    #[tokio::test(flavor = "current_thread")]
1371    async fn drr_with_max_concurrent_per_key_throttles_hot_tenant() {
1372        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(128)));
1373        let queue = WorkerQueue::with_policy(
1374            log,
1375            SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant)
1376                .with_max_concurrent_per_key(1),
1377        );
1378
1379        for idx in 0..4 {
1380            queue
1381                .enqueue(&tenant_job(
1382                    "triage",
1383                    "trigger",
1384                    &format!("a-{idx}"),
1385                    "tenant-a",
1386                    WorkerQueuePriority::Normal,
1387                ))
1388                .await
1389                .unwrap();
1390        }
1391        queue
1392            .enqueue(&tenant_job(
1393                "triage",
1394                "trigger",
1395                "b-1",
1396                "tenant-b",
1397                WorkerQueuePriority::Normal,
1398            ))
1399            .await
1400            .unwrap();
1401
1402        let first = queue
1403            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1404            .await
1405            .unwrap()
1406            .unwrap();
1407        // Without releasing the first claim, the second pick must skip the
1408        // capped tenant-a and serve tenant-b instead.
1409        let second = queue
1410            .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
1411            .await
1412            .unwrap()
1413            .unwrap();
1414        let pair = [
1415            first.job.event.tenant_id.clone().unwrap().0,
1416            second.job.event.tenant_id.clone().unwrap().0,
1417        ];
1418        assert!(
1419            pair.contains(&"tenant-a".to_string()) && pair.contains(&"tenant-b".to_string()),
1420            "max_concurrent_per_key=1 must release tenant-b within two claims, got {pair:?}",
1421        );
1422    }
1423
1424    #[test]
1425    fn from_env_parses_drr_policy_from_lookup() {
1426        let lookup = |name: &str| -> Option<String> {
1427            match name {
1428                "HARN_SCHEDULER_STRATEGY" => Some("drr".to_string()),
1429                "HARN_SCHEDULER_FAIRNESS_KEY" => Some("tenant-and-binding".to_string()),
1430                "HARN_SCHEDULER_QUANTUM" => Some("3".to_string()),
1431                "HARN_SCHEDULER_STARVATION_AGE_MS" => Some("750".to_string()),
1432                "HARN_SCHEDULER_MAX_CONCURRENT_PER_KEY" => Some("4".to_string()),
1433                "HARN_SCHEDULER_DEFAULT_WEIGHT" => Some("2".to_string()),
1434                "HARN_SCHEDULER_WEIGHTS" => Some("tenant-a:5,tenant-b:1, : ,bad:abc".to_string()),
1435                _ => None,
1436            }
1437        };
1438        let policy = SchedulerPolicy::from_env_lookup(lookup);
1439        match policy.strategy {
1440            SchedulerStrategy::DeficitRoundRobin {
1441                quantum,
1442                starvation_age_ms,
1443            } => {
1444                assert_eq!(quantum, 3);
1445                assert_eq!(starvation_age_ms, Some(750));
1446            }
1447            other => panic!("expected DRR strategy, got {other:?}"),
1448        }
1449        assert_eq!(
1450            policy.fairness_key,
1451            scheduler::FairnessKey::TenantAndBinding
1452        );
1453        assert_eq!(policy.max_concurrent_per_key, 4);
1454        assert_eq!(policy.default_weight, 2);
1455        assert_eq!(policy.weight_for("tenant-a"), 5);
1456        assert_eq!(policy.weight_for("tenant-b"), 1);
1457        // Unknown key falls back to default_weight.
1458        assert_eq!(policy.weight_for("tenant-c"), 2);
1459    }
1460
1461    #[test]
1462    fn from_env_defaults_to_fifo_when_missing() {
1463        let policy = SchedulerPolicy::from_env_lookup(|_| None);
1464        assert!(matches!(policy.strategy, SchedulerStrategy::Fifo));
1465    }
1466}