1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use crate::event_log::{
9 sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12use super::scheduler::{self, SchedulableJob, SchedulerPolicy, SchedulerSnapshot, SchedulerState};
13use super::{DispatchOutcome, TriggerEvent};
14
15pub const WORKER_QUEUE_CATALOG_TOPIC: &str = "worker.queues";
16const WORKER_QUEUE_CLAIMS_SUFFIX: &str = ".claims";
17const WORKER_QUEUE_RESPONSES_SUFFIX: &str = ".responses";
18const NORMAL_PROMOTION_AGE_MS: i64 = 15 * 60 * 1000;
19
20#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum WorkerQueuePriority {
23 High,
24 #[default]
25 Normal,
26 Low,
27}
28
29impl WorkerQueuePriority {
30 pub fn as_str(self) -> &'static str {
31 match self {
32 Self::High => "high",
33 Self::Normal => "normal",
34 Self::Low => "low",
35 }
36 }
37
38 pub fn effective_rank(self, enqueued_at_ms: i64, now_ms: i64) -> u8 {
39 match self {
40 Self::High => 0,
41 Self::Normal if now_ms.saturating_sub(enqueued_at_ms) >= NORMAL_PROMOTION_AGE_MS => 0,
42 Self::Normal => 1,
43 Self::Low => 2,
44 }
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49pub struct WorkerQueueJob {
50 pub queue: String,
51 pub trigger_id: String,
52 pub binding_key: String,
53 pub binding_version: u32,
54 pub event: TriggerEvent,
55 #[serde(default)]
56 pub replay_of_event_id: Option<String>,
57 #[serde(default)]
58 pub priority: WorkerQueuePriority,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
62pub struct WorkerQueueEnqueueReceipt {
63 pub queue: String,
64 pub job_event_id: u64,
65 pub response_topic: String,
66}
67
68#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69pub struct WorkerQueueClaimHandle {
70 pub queue: String,
71 pub job_event_id: u64,
72 pub claim_id: String,
73 pub consumer_id: String,
74 pub expires_at_ms: i64,
75}
76
77#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
78pub struct ClaimedWorkerJob {
79 pub handle: WorkerQueueClaimHandle,
80 pub job: WorkerQueueJob,
81}
82
83#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
84pub struct WorkerQueueResponseRecord {
85 pub queue: String,
86 pub job_event_id: u64,
87 pub consumer_id: String,
88 pub handled_at_ms: i64,
89 pub outcome: Option<DispatchOutcome>,
90 pub error: Option<String>,
91}
92
93#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct WorkerQueueSummary {
95 pub queue: String,
96 pub ready: usize,
97 pub in_flight: usize,
98 pub acked: usize,
99 pub purged: usize,
100 pub responses: usize,
101 pub oldest_unclaimed_age_ms: Option<u64>,
102}
103
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105pub struct WorkerQueueJobState {
106 pub job_event_id: u64,
107 pub enqueued_at_ms: i64,
108 pub job: WorkerQueueJob,
109 pub active_claim: Option<WorkerQueueClaimHandle>,
110 pub acked: bool,
111 pub purged: bool,
112}
113
114impl WorkerQueueJobState {
115 pub fn is_ready(&self) -> bool {
116 !self.acked && !self.purged && self.active_claim.is_none()
117 }
118}
119
120#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
121pub struct WorkerQueueState {
122 pub queue: String,
123 pub responses: Vec<WorkerQueueResponseRecord>,
124 pub jobs: Vec<WorkerQueueJobState>,
125}
126
127impl WorkerQueueState {
128 pub fn summary(&self, now_ms: i64) -> WorkerQueueSummary {
129 let ready = self.jobs.iter().filter(|job| job.is_ready()).count();
130 let in_flight = self
131 .jobs
132 .iter()
133 .filter(|job| !job.acked && !job.purged && job.active_claim.is_some())
134 .count();
135 let acked = self.jobs.iter().filter(|job| job.acked).count();
136 let purged = self.jobs.iter().filter(|job| job.purged).count();
137 let oldest_unclaimed_age_ms = self
138 .jobs
139 .iter()
140 .filter(|job| job.is_ready())
141 .map(|job| now_ms.saturating_sub(job.enqueued_at_ms).max(0) as u64)
142 .max();
143 WorkerQueueSummary {
144 queue: self.queue.clone(),
145 ready,
146 in_flight,
147 acked,
148 purged,
149 responses: self.responses.len(),
150 oldest_unclaimed_age_ms,
151 }
152 }
153
154 fn next_ready_job_with_scheduler(
162 &self,
163 scheduler_state: &mut SchedulerState,
164 policy: &SchedulerPolicy,
165 now_ms: i64,
166 ) -> Option<&WorkerQueueJobState> {
167 let candidates: Vec<&WorkerQueueJobState> =
168 self.jobs.iter().filter(|job| job.is_ready()).collect();
169 if candidates.is_empty() {
170 return None;
171 }
172 let views: Vec<SchedulableJob<'_>> = candidates
173 .iter()
174 .map(|state| SchedulableJob::from_state(state))
175 .collect();
176
177 let in_flight = scheduler::in_flight_by_key(&self.jobs, policy);
179 scheduler_state.replace_in_flight(in_flight);
180
181 let pick = scheduler_state.select(&views, policy, now_ms)?;
182 candidates
183 .into_iter()
184 .find(|job| job.job_event_id == pick.job_event_id)
185 }
186
187 fn active_claim_for(&self, job_event_id: u64) -> Option<&WorkerQueueClaimHandle> {
188 self.jobs
189 .iter()
190 .find(|job| job.job_event_id == job_event_id)
191 .and_then(|job| job.active_claim.as_ref())
192 }
193}
194
195#[derive(Clone)]
196pub struct WorkerQueue {
197 event_log: Arc<AnyEventLog>,
198 policy: Arc<RwLock<SchedulerPolicy>>,
201 scheduler_states: Arc<Mutex<BTreeMap<String, SchedulerState>>>,
205}
206
207#[derive(Clone, Debug, Serialize)]
208pub struct WorkerQueueInspectSnapshot {
209 pub summary: WorkerQueueSummary,
210 pub scheduler: SchedulerSnapshot,
211}
212
213impl WorkerQueue {
214 pub fn new(event_log: Arc<AnyEventLog>) -> Self {
219 Self::with_policy(event_log, SchedulerPolicy::from_env())
220 }
221
222 pub fn with_policy(event_log: Arc<AnyEventLog>, policy: SchedulerPolicy) -> Self {
223 Self {
224 event_log,
225 policy: Arc::new(RwLock::new(policy)),
226 scheduler_states: Arc::new(Mutex::new(BTreeMap::new())),
227 }
228 }
229
230 pub fn set_policy(&self, policy: SchedulerPolicy) {
233 *self.policy.write().expect("scheduler policy poisoned") = policy;
234 }
235
236 pub fn policy(&self) -> SchedulerPolicy {
237 self.policy
238 .read()
239 .expect("scheduler policy poisoned")
240 .clone()
241 }
242
243 pub async fn enqueue(
244 &self,
245 job: &WorkerQueueJob,
246 ) -> Result<WorkerQueueEnqueueReceipt, LogError> {
247 let queue = job.queue.trim();
248 if queue.is_empty() {
249 return Err(LogError::Config(
250 "worker queue name cannot be empty".to_string(),
251 ));
252 }
253 let queue_name = queue.to_string();
254 let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
255 .expect("static worker queue catalog topic should always be valid");
256 self.event_log
257 .append(
258 &catalog_topic,
259 LogEvent::new(
260 "queue_seen",
261 serde_json::to_value(WorkerQueueCatalogRecord {
262 queue: queue_name.clone(),
263 })
264 .map_err(|error| LogError::Serde(error.to_string()))?,
265 ),
266 )
267 .await?;
268
269 let job_topic = job_topic(&queue_name)?;
270 let mut headers = BTreeMap::new();
271 headers.insert("queue".to_string(), queue_name.clone());
272 headers.insert("trigger_id".to_string(), job.trigger_id.clone());
273 headers.insert("binding_key".to_string(), job.binding_key.clone());
274 headers.insert("event_id".to_string(), job.event.id.0.clone());
275 headers.insert("priority".to_string(), job.priority.as_str().to_string());
276 let job_event_id = self
277 .event_log
278 .append(
279 &job_topic,
280 LogEvent::new(
281 "trigger_dispatch",
282 serde_json::to_value(job)
283 .map_err(|error| LogError::Serde(error.to_string()))?,
284 )
285 .with_headers(headers),
286 )
287 .await?;
288 if let Some(metrics) = crate::active_metrics_registry() {
289 if let Ok(state) = self.queue_state(&queue_name).await {
290 let summary = state.summary(now_ms());
291 metrics.set_worker_queue_depth(
292 &queue_name,
293 (summary.ready + summary.in_flight) as u64,
294 );
295 }
296 }
297 Ok(WorkerQueueEnqueueReceipt {
298 queue: queue_name.clone(),
299 job_event_id,
300 response_topic: response_topic_name(&queue_name),
301 })
302 }
303
304 pub async fn known_queues(&self) -> Result<Vec<String>, LogError> {
305 let topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
306 .expect("static worker queue catalog topic should always be valid");
307 let events = self.event_log.read_range(&topic, None, usize::MAX).await?;
308 let mut queues = BTreeSet::new();
309 for (_, event) in events {
310 if event.kind != "queue_seen" {
311 continue;
312 }
313 let record: WorkerQueueCatalogRecord = serde_json::from_value(event.payload)
314 .map_err(|error| LogError::Serde(error.to_string()))?;
315 if !record.queue.trim().is_empty() {
316 queues.insert(record.queue);
317 }
318 }
319 Ok(queues.into_iter().collect())
320 }
321
322 pub async fn queue_state(&self, queue: &str) -> Result<WorkerQueueState, LogError> {
323 let queue_name = queue.trim();
324 if queue_name.is_empty() {
325 return Err(LogError::Config(
326 "worker queue name cannot be empty".to_string(),
327 ));
328 }
329 let now_ms = now_ms();
330 let job_events = self
331 .event_log
332 .read_range(&job_topic(queue_name)?, None, usize::MAX)
333 .await?;
334 let claim_events = self
335 .event_log
336 .read_range(&claims_topic(queue_name)?, None, usize::MAX)
337 .await?;
338 let response_events = self
339 .event_log
340 .read_range(&responses_topic(queue_name)?, None, usize::MAX)
341 .await?;
342
343 let mut jobs = BTreeMap::<u64, WorkerQueueJobStateInternal>::new();
344 for (job_event_id, event) in job_events {
345 if event.kind != "trigger_dispatch" {
346 continue;
347 }
348 let job: WorkerQueueJob = serde_json::from_value(event.payload)
349 .map_err(|error| LogError::Serde(error.to_string()))?;
350 jobs.insert(
351 job_event_id,
352 WorkerQueueJobStateInternal {
353 job_event_id,
354 enqueued_at_ms: event.occurred_at_ms,
355 job,
356 active_claim: None,
357 acked: false,
358 purged: false,
359 seen_claim_ids: BTreeSet::new(),
360 },
361 );
362 }
363
364 for (_, event) in claim_events {
365 match event.kind.as_str() {
366 "job_claimed" => {
367 let claim: WorkerQueueClaimRecord = serde_json::from_value(event.payload)
368 .map_err(|error| LogError::Serde(error.to_string()))?;
369 let Some(job) = jobs.get_mut(&claim.job_event_id) else {
370 continue;
371 };
372 if job.acked || job.purged {
373 continue;
374 }
375 job.seen_claim_ids.insert(claim.claim_id.clone());
376 let can_take = job
377 .active_claim
378 .as_ref()
379 .is_none_or(|active| active.expires_at_ms <= claim.claimed_at_ms);
380 if can_take {
381 job.active_claim = Some(WorkerQueueClaimHandle {
382 queue: queue_name.to_string(),
383 job_event_id: claim.job_event_id,
384 claim_id: claim.claim_id,
385 consumer_id: claim.consumer_id,
386 expires_at_ms: claim.expires_at_ms,
387 });
388 }
389 }
390 "claim_renewed" => {
391 let renewal: WorkerQueueClaimRenewalRecord =
392 serde_json::from_value(event.payload)
393 .map_err(|error| LogError::Serde(error.to_string()))?;
394 let Some(job) = jobs.get_mut(&renewal.job_event_id) else {
395 continue;
396 };
397 if let Some(active) = job.active_claim.as_mut() {
398 if active.claim_id == renewal.claim_id {
399 active.expires_at_ms = renewal.expires_at_ms;
400 }
401 }
402 }
403 "job_released" => {
404 let release: WorkerQueueReleaseRecord =
405 serde_json::from_value(event.payload)
406 .map_err(|error| LogError::Serde(error.to_string()))?;
407 let Some(job) = jobs.get_mut(&release.job_event_id) else {
408 continue;
409 };
410 if job
411 .active_claim
412 .as_ref()
413 .is_some_and(|active| active.claim_id == release.claim_id)
414 {
415 job.active_claim = None;
416 }
417 }
418 "job_acked" => {
419 let ack: WorkerQueueAckRecord = serde_json::from_value(event.payload)
420 .map_err(|error| LogError::Serde(error.to_string()))?;
421 let Some(job) = jobs.get_mut(&ack.job_event_id) else {
422 continue;
423 };
424 if ack.claim_id.is_empty() || job.seen_claim_ids.contains(&ack.claim_id) {
425 job.acked = true;
426 job.active_claim = None;
427 }
428 }
429 "job_purged" => {
430 let purge: WorkerQueuePurgeRecord = serde_json::from_value(event.payload)
431 .map_err(|error| LogError::Serde(error.to_string()))?;
432 let Some(job) = jobs.get_mut(&purge.job_event_id) else {
433 continue;
434 };
435 if !job.acked {
436 job.purged = true;
437 job.active_claim = None;
438 }
439 }
440 _ => {}
441 }
442 }
443
444 let responses = response_events
445 .into_iter()
446 .filter(|(_, event)| event.kind == "job_response")
447 .map(|(_, event)| {
448 serde_json::from_value::<WorkerQueueResponseRecord>(event.payload)
449 .map_err(|error| LogError::Serde(error.to_string()))
450 })
451 .collect::<Result<Vec<_>, _>>()?;
452
453 let mut queue_state = WorkerQueueState {
454 queue: queue_name.to_string(),
455 responses,
456 jobs: jobs
457 .into_values()
458 .map(|mut job| {
459 if job
460 .active_claim
461 .as_ref()
462 .is_some_and(|active| active.expires_at_ms <= now_ms)
463 {
464 job.active_claim = None;
465 }
466 WorkerQueueJobState {
467 job_event_id: job.job_event_id,
468 enqueued_at_ms: job.enqueued_at_ms,
469 job: job.job,
470 active_claim: job.active_claim,
471 acked: job.acked,
472 purged: job.purged,
473 }
474 })
475 .collect(),
476 };
477 queue_state
478 .jobs
479 .sort_by_key(|job| (job.enqueued_at_ms, job.job_event_id));
480 Ok(queue_state)
481 }
482
483 pub async fn queue_summaries(&self) -> Result<Vec<WorkerQueueSummary>, LogError> {
484 let now_ms = now_ms();
485 let mut summaries = Vec::new();
486 for queue in self.known_queues().await? {
487 let state = self.queue_state(&queue).await?;
488 summaries.push(state.summary(now_ms));
489 }
490 summaries.sort_by(|left, right| left.queue.cmp(&right.queue));
491 Ok(summaries)
492 }
493
494 pub async fn claim_next(
495 &self,
496 queue: &str,
497 consumer_id: &str,
498 ttl: StdDuration,
499 ) -> Result<Option<ClaimedWorkerJob>, LogError> {
500 let queue_name = queue.trim();
501 if queue_name.is_empty() {
502 return Err(LogError::Config(
503 "worker queue name cannot be empty".to_string(),
504 ));
505 }
506 if consumer_id.trim().is_empty() {
507 return Err(LogError::InvalidConsumer(
508 "worker queue consumer id cannot be empty".to_string(),
509 ));
510 }
511 let policy = self.policy();
512 for _ in 0..8 {
513 let now_ms = now_ms();
514 let state = self.queue_state(queue_name).await?;
515 let (job, fairness_key) = {
516 let mut states = self
517 .scheduler_states
518 .lock()
519 .expect("scheduler state poisoned");
520 let scheduler_state = states.entry(queue_name.to_string()).or_default();
521 let Some(job) =
522 state.next_ready_job_with_scheduler(scheduler_state, &policy, now_ms)
523 else {
524 return Ok(None);
525 };
526 let job = job.clone();
527 let fairness_key = policy.fairness_key_of(&SchedulableJob::from_state(&job));
528 (job, fairness_key)
529 };
530 let claim = WorkerQueueClaimRecord {
531 job_event_id: job.job_event_id,
532 claim_id: Uuid::new_v4().to_string(),
533 consumer_id: consumer_id.to_string(),
534 claimed_at_ms: now_ms,
535 expires_at_ms: expiry_ms(now_ms, ttl),
536 };
537 self.event_log
538 .append(
539 &claims_topic(queue_name)?,
540 LogEvent::new(
541 "job_claimed",
542 serde_json::to_value(&claim)
543 .map_err(|error| LogError::Serde(error.to_string()))?,
544 ),
545 )
546 .await?;
547 let refreshed = self.queue_state(queue_name).await?;
548 if refreshed
549 .active_claim_for(job.job_event_id)
550 .is_some_and(|active| active.claim_id == claim.claim_id)
551 {
552 {
553 let mut states = self
554 .scheduler_states
555 .lock()
556 .expect("scheduler state poisoned");
557 let scheduler_state = states.entry(queue_name.to_string()).or_default();
558 scheduler_state.note_claim_committed(&fairness_key);
559 }
560 if let Some(metrics) = crate::active_metrics_registry() {
561 let summary = refreshed.summary(now_ms);
562 metrics.record_worker_queue_claim_age(
563 queue_name,
564 now_ms.saturating_sub(job.enqueued_at_ms) as f64 / 1000.0,
565 );
566 metrics.set_worker_queue_depth(
567 queue_name,
568 (summary.ready + summary.in_flight) as u64,
569 );
570 metrics.record_scheduler_selection(
571 queue_name,
572 policy.fairness_key.as_str(),
573 &fairness_key,
574 );
575 if let Ok(snap) = self.inspect_queue(queue_name).await {
576 for stat in &snap.scheduler.keys {
577 metrics.set_scheduler_deficit(
578 queue_name,
579 policy.fairness_key.as_str(),
580 &stat.fairness_key,
581 stat.deficit,
582 );
583 metrics.set_scheduler_oldest_eligible_age(
584 queue_name,
585 policy.fairness_key.as_str(),
586 &stat.fairness_key,
587 stat.oldest_ready_age_ms,
588 );
589 }
590 }
591 }
592 return Ok(Some(ClaimedWorkerJob {
593 handle: WorkerQueueClaimHandle {
594 queue: queue_name.to_string(),
595 job_event_id: claim.job_event_id,
596 claim_id: claim.claim_id,
597 consumer_id: claim.consumer_id,
598 expires_at_ms: claim.expires_at_ms,
599 },
600 job: job.job,
601 }));
602 }
603 }
604 Ok(None)
605 }
606
607 pub async fn inspect_queue(&self, queue: &str) -> Result<WorkerQueueInspectSnapshot, LogError> {
610 let queue_name = queue.trim();
611 if queue_name.is_empty() {
612 return Err(LogError::Config(
613 "worker queue name cannot be empty".to_string(),
614 ));
615 }
616 let now_ms = now_ms();
617 let state = self.queue_state(queue_name).await?;
618 let summary = state.summary(now_ms);
619 let policy = self.policy();
620 let ready = scheduler::ready_stats_by_key(&state.jobs, &policy, now_ms);
621 let in_flight = scheduler::in_flight_by_key(&state.jobs, &policy);
623 let scheduler_snapshot = {
624 let mut states = self
625 .scheduler_states
626 .lock()
627 .expect("scheduler state poisoned");
628 let scheduler_state = states.entry(queue_name.to_string()).or_default();
629 scheduler_state.replace_in_flight(in_flight);
630 scheduler_state.snapshot(&policy, &ready)
631 };
632 Ok(WorkerQueueInspectSnapshot {
633 summary,
634 scheduler: scheduler_snapshot,
635 })
636 }
637
638 pub async fn inspect_all_queues(&self) -> Result<Vec<WorkerQueueInspectSnapshot>, LogError> {
640 let mut snapshots = Vec::new();
641 for queue in self.known_queues().await? {
642 snapshots.push(self.inspect_queue(&queue).await?);
643 }
644 snapshots.sort_by(|left, right| left.summary.queue.cmp(&right.summary.queue));
645 Ok(snapshots)
646 }
647
648 pub async fn renew_claim(
649 &self,
650 handle: &WorkerQueueClaimHandle,
651 ttl: StdDuration,
652 ) -> Result<bool, LogError> {
653 let now_ms = now_ms();
654 let renewal = WorkerQueueClaimRenewalRecord {
655 job_event_id: handle.job_event_id,
656 claim_id: handle.claim_id.clone(),
657 consumer_id: handle.consumer_id.clone(),
658 renewed_at_ms: now_ms,
659 expires_at_ms: expiry_ms(now_ms, ttl),
660 };
661 self.event_log
662 .append(
663 &claims_topic(&handle.queue)?,
664 LogEvent::new(
665 "claim_renewed",
666 serde_json::to_value(&renewal)
667 .map_err(|error| LogError::Serde(error.to_string()))?,
668 ),
669 )
670 .await?;
671 let refreshed = self.queue_state(&handle.queue).await?;
672 Ok(refreshed
673 .active_claim_for(handle.job_event_id)
674 .is_some_and(|active| active.claim_id == handle.claim_id))
675 }
676
677 pub async fn release_claim(
678 &self,
679 handle: &WorkerQueueClaimHandle,
680 reason: &str,
681 ) -> Result<(), LogError> {
682 let release = WorkerQueueReleaseRecord {
683 job_event_id: handle.job_event_id,
684 claim_id: handle.claim_id.clone(),
685 consumer_id: handle.consumer_id.clone(),
686 released_at_ms: now_ms(),
687 reason: if reason.trim().is_empty() {
688 None
689 } else {
690 Some(reason.to_string())
691 },
692 };
693 self.event_log
694 .append(
695 &claims_topic(&handle.queue)?,
696 LogEvent::new(
697 "job_released",
698 serde_json::to_value(&release)
699 .map_err(|error| LogError::Serde(error.to_string()))?,
700 ),
701 )
702 .await?;
703 Ok(())
704 }
705
706 pub async fn append_response(
707 &self,
708 queue: &str,
709 response: &WorkerQueueResponseRecord,
710 ) -> Result<u64, LogError> {
711 self.event_log
712 .append(
713 &responses_topic(queue)?,
714 LogEvent::new(
715 "job_response",
716 serde_json::to_value(response)
717 .map_err(|error| LogError::Serde(error.to_string()))?,
718 ),
719 )
720 .await
721 }
722
723 pub async fn ack_claim(&self, handle: &WorkerQueueClaimHandle) -> Result<u64, LogError> {
724 self.event_log
725 .append(
726 &claims_topic(&handle.queue)?,
727 LogEvent::new(
728 "job_acked",
729 serde_json::to_value(WorkerQueueAckRecord {
730 job_event_id: handle.job_event_id,
731 claim_id: handle.claim_id.clone(),
732 consumer_id: handle.consumer_id.clone(),
733 acked_at_ms: now_ms(),
734 })
735 .map_err(|error| LogError::Serde(error.to_string()))?,
736 ),
737 )
738 .await
739 }
740
741 pub async fn purge_unclaimed(
742 &self,
743 queue: &str,
744 purged_by: &str,
745 reason: Option<&str>,
746 ) -> Result<usize, LogError> {
747 let state = self.queue_state(queue).await?;
748 let ready_jobs: Vec<_> = state
749 .jobs
750 .into_iter()
751 .filter(|job| job.is_ready())
752 .map(|job| job.job_event_id)
753 .collect();
754 for job_event_id in &ready_jobs {
755 self.event_log
756 .append(
757 &claims_topic(queue)?,
758 LogEvent::new(
759 "job_purged",
760 serde_json::to_value(WorkerQueuePurgeRecord {
761 job_event_id: *job_event_id,
762 purged_by: purged_by.to_string(),
763 purged_at_ms: now_ms(),
764 reason: reason
765 .filter(|value| !value.trim().is_empty())
766 .map(|value| value.to_string()),
767 })
768 .map_err(|error| LogError::Serde(error.to_string()))?,
769 ),
770 )
771 .await?;
772 }
773 Ok(ready_jobs.len())
774 }
775}
776
777#[derive(Clone, Debug)]
778struct WorkerQueueJobStateInternal {
779 job_event_id: u64,
780 enqueued_at_ms: i64,
781 job: WorkerQueueJob,
782 active_claim: Option<WorkerQueueClaimHandle>,
783 acked: bool,
784 purged: bool,
785 seen_claim_ids: BTreeSet<String>,
786}
787
788#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
789struct WorkerQueueCatalogRecord {
790 queue: String,
791}
792
793#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
794struct WorkerQueueClaimRecord {
795 job_event_id: u64,
796 claim_id: String,
797 consumer_id: String,
798 claimed_at_ms: i64,
799 expires_at_ms: i64,
800}
801
802#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
803struct WorkerQueueClaimRenewalRecord {
804 job_event_id: u64,
805 claim_id: String,
806 consumer_id: String,
807 renewed_at_ms: i64,
808 expires_at_ms: i64,
809}
810
811#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
812struct WorkerQueueReleaseRecord {
813 job_event_id: u64,
814 claim_id: String,
815 consumer_id: String,
816 released_at_ms: i64,
817 #[serde(default)]
818 reason: Option<String>,
819}
820
821#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
822struct WorkerQueueAckRecord {
823 job_event_id: u64,
824 claim_id: String,
825 consumer_id: String,
826 acked_at_ms: i64,
827}
828
829#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
830struct WorkerQueuePurgeRecord {
831 job_event_id: u64,
832 purged_by: String,
833 purged_at_ms: i64,
834 #[serde(default)]
835 reason: Option<String>,
836}
837
838pub fn job_topic_name(queue: &str) -> String {
839 format!("worker.{}", sanitize_topic_component(queue))
840}
841
842pub fn claims_topic_name(queue: &str) -> String {
843 format!("{}{}", job_topic_name(queue), WORKER_QUEUE_CLAIMS_SUFFIX)
844}
845
846pub fn response_topic_name(queue: &str) -> String {
847 format!("{}{}", job_topic_name(queue), WORKER_QUEUE_RESPONSES_SUFFIX)
848}
849
850fn job_topic(queue: &str) -> Result<Topic, LogError> {
851 Topic::new(job_topic_name(queue))
852}
853
854fn claims_topic(queue: &str) -> Result<Topic, LogError> {
855 Topic::new(claims_topic_name(queue))
856}
857
858fn responses_topic(queue: &str) -> Result<Topic, LogError> {
859 Topic::new(response_topic_name(queue))
860}
861
862fn now_ms() -> i64 {
863 std::time::SystemTime::now()
864 .duration_since(std::time::UNIX_EPOCH)
865 .map(|duration| duration.as_millis() as i64)
866 .unwrap_or(0)
867}
868
869fn expiry_ms(now_ms: i64, ttl: StdDuration) -> i64 {
870 now_ms.saturating_add(ttl.as_millis().min(i64::MAX as u128) as i64)
871}
872
873#[cfg(test)]
874mod tests {
875 use super::*;
876
877 use crate::event_log::{AnyEventLog, MemoryEventLog};
878 use crate::triggers::{
879 event::{GenericWebhookPayload, KnownProviderPayload},
880 scheduler::{self, SchedulerStrategy},
881 ProviderId, ProviderPayload, SignatureStatus, TraceId, TriggerEvent,
882 };
883
884 fn test_event(id: &str) -> TriggerEvent {
885 TriggerEvent {
886 id: crate::triggers::TriggerEventId(id.to_string()),
887 provider: ProviderId::from("github"),
888 kind: "issues.opened".to_string(),
889 trace_id: TraceId("trace-test".to_string()),
890 dedupe_key: id.to_string(),
891 tenant_id: None,
892 headers: BTreeMap::new(),
893 batch: None,
894 raw_body: None,
895 provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
896 GenericWebhookPayload {
897 source: Some("worker-queue-test".to_string()),
898 content_type: Some("application/json".to_string()),
899 raw: serde_json::json!({"id": id}),
900 },
901 )),
902 signature_status: SignatureStatus::Verified,
903 received_at: time::OffsetDateTime::now_utc(),
904 occurred_at: None,
905 dedupe_claimed: false,
906 }
907 }
908
909 fn test_job(
910 queue: &str,
911 trigger_id: &str,
912 event_id: &str,
913 priority: WorkerQueuePriority,
914 ) -> WorkerQueueJob {
915 WorkerQueueJob {
916 queue: queue.to_string(),
917 trigger_id: trigger_id.to_string(),
918 binding_key: format!("{trigger_id}@v1"),
919 binding_version: 1,
920 event: test_event(event_id),
921 replay_of_event_id: None,
922 priority,
923 }
924 }
925
926 #[tokio::test(flavor = "current_thread")]
927 async fn enqueue_and_summarize_queue() {
928 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
929 let queue = WorkerQueue::new(log);
930 queue
931 .enqueue(&test_job(
932 "triage",
933 "incoming-review-task",
934 "evt-1",
935 WorkerQueuePriority::Normal,
936 ))
937 .await
938 .unwrap();
939 let summaries = queue.queue_summaries().await.unwrap();
940 assert_eq!(summaries.len(), 1);
941 assert_eq!(summaries[0].queue, "triage");
942 assert_eq!(summaries[0].ready, 1);
943 assert_eq!(summaries[0].in_flight, 0);
944 }
945
946 #[tokio::test(flavor = "current_thread")]
947 async fn claim_and_ack_remove_job_from_ready_pool() {
948 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
949 let queue = WorkerQueue::new(log);
950 queue
951 .enqueue(&test_job(
952 "triage",
953 "incoming-review-task",
954 "evt-1",
955 WorkerQueuePriority::Normal,
956 ))
957 .await
958 .unwrap();
959 let claimed = queue
960 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
961 .await
962 .unwrap()
963 .unwrap();
964 let before_ack = queue.queue_state("triage").await.unwrap();
965 assert_eq!(before_ack.summary(now_ms()).ready, 0);
966 assert_eq!(before_ack.summary(now_ms()).in_flight, 1);
967 queue
968 .append_response(
969 "triage",
970 &WorkerQueueResponseRecord {
971 queue: "triage".to_string(),
972 job_event_id: claimed.handle.job_event_id,
973 consumer_id: "consumer-a".to_string(),
974 handled_at_ms: now_ms(),
975 outcome: Some(DispatchOutcome {
976 trigger_id: "incoming-review-task".to_string(),
977 binding_key: "incoming-review-task@v1".to_string(),
978 event_id: "evt-1".to_string(),
979 attempt_count: 1,
980 status: super::super::DispatchStatus::Succeeded,
981 handler_kind: "local".to_string(),
982 target_uri: "handlers::on_review".to_string(),
983 replay_of_event_id: None,
984 result: Some(serde_json::json!({"ok": true})),
985 error: None,
986 }),
987 error: None,
988 },
989 )
990 .await
991 .unwrap();
992 queue.ack_claim(&claimed.handle).await.unwrap();
993 let after_ack = queue.queue_state("triage").await.unwrap();
994 let summary = after_ack.summary(now_ms());
995 assert_eq!(summary.ready, 0);
996 assert_eq!(summary.in_flight, 0);
997 assert_eq!(summary.acked, 1);
998 assert_eq!(summary.responses, 1);
999 }
1000
1001 #[tokio::test(flavor = "current_thread")]
1002 async fn expired_claim_allows_reclaim() {
1003 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1004 let queue = WorkerQueue::new(log.clone());
1005 let receipt = queue
1006 .enqueue(&test_job(
1007 "triage",
1008 "incoming-review-task",
1009 "evt-1",
1010 WorkerQueuePriority::Normal,
1011 ))
1012 .await
1013 .unwrap();
1014 let expired_claim = WorkerQueueClaimRecord {
1015 job_event_id: receipt.job_event_id,
1016 claim_id: "expired-claim".to_string(),
1017 consumer_id: "consumer-a".to_string(),
1018 claimed_at_ms: now_ms().saturating_sub(2),
1019 expires_at_ms: now_ms().saturating_sub(1),
1020 };
1021 log.append(
1022 &claims_topic("triage").unwrap(),
1023 LogEvent::new("job_claimed", serde_json::to_value(&expired_claim).unwrap()),
1024 )
1025 .await
1026 .unwrap();
1027 let second = queue
1028 .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
1029 .await
1030 .unwrap()
1031 .unwrap();
1032 assert_eq!(second.job.event.id.0, "evt-1");
1033 assert_ne!(second.handle.claim_id, expired_claim.claim_id);
1034 assert_eq!(second.handle.consumer_id, "consumer-b");
1035 }
1036
1037 #[tokio::test(flavor = "current_thread")]
1038 async fn high_priority_and_aged_normal_are_selected_first() {
1039 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1040 let queue = WorkerQueue::new(log.clone());
1041
1042 let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC).unwrap();
1043 log.append(
1044 &catalog_topic,
1045 LogEvent::new("queue_seen", serde_json::json!({"queue":"triage"})),
1046 )
1047 .await
1048 .unwrap();
1049
1050 let topic = job_topic("triage").unwrap();
1051 let mut old_normal = LogEvent::new(
1052 "trigger_dispatch",
1053 serde_json::to_value(test_job(
1054 "triage",
1055 "incoming-review-task",
1056 "evt-old-normal",
1057 WorkerQueuePriority::Normal,
1058 ))
1059 .unwrap(),
1060 );
1061 old_normal.occurred_at_ms = now_ms() - NORMAL_PROMOTION_AGE_MS - 1_000;
1062 log.append(&topic, old_normal).await.unwrap();
1063
1064 let high = LogEvent::new(
1065 "trigger_dispatch",
1066 serde_json::to_value(test_job(
1067 "triage",
1068 "incoming-review-task",
1069 "evt-high",
1070 WorkerQueuePriority::High,
1071 ))
1072 .unwrap(),
1073 );
1074 log.append(&topic, high).await.unwrap();
1075
1076 let claimed = queue
1077 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1078 .await
1079 .unwrap()
1080 .unwrap();
1081 assert_eq!(claimed.job.event.id.0, "evt-old-normal");
1082 }
1083
1084 fn tenant_event(id: &str, tenant: &str) -> TriggerEvent {
1085 let mut event = test_event(id);
1086 event.tenant_id = Some(crate::triggers::TenantId::new(tenant));
1087 event
1088 }
1089
1090 fn tenant_job(
1091 queue: &str,
1092 trigger_id: &str,
1093 event_id: &str,
1094 tenant: &str,
1095 priority: WorkerQueuePriority,
1096 ) -> WorkerQueueJob {
1097 WorkerQueueJob {
1098 queue: queue.to_string(),
1099 trigger_id: trigger_id.to_string(),
1100 binding_key: format!("{trigger_id}@v1"),
1101 binding_version: 1,
1102 event: tenant_event(event_id, tenant),
1103 replay_of_event_id: None,
1104 priority,
1105 }
1106 }
1107
1108 async fn ack_and_respond(queue: &WorkerQueue, queue_name: &str, claim: &ClaimedWorkerJob) {
1109 queue
1110 .append_response(
1111 queue_name,
1112 &WorkerQueueResponseRecord {
1113 queue: queue_name.to_string(),
1114 job_event_id: claim.handle.job_event_id,
1115 consumer_id: claim.handle.consumer_id.clone(),
1116 handled_at_ms: now_ms(),
1117 outcome: Some(DispatchOutcome {
1118 trigger_id: claim.job.trigger_id.clone(),
1119 binding_key: claim.job.binding_key.clone(),
1120 event_id: claim.job.event.id.0.clone(),
1121 attempt_count: 1,
1122 status: super::super::DispatchStatus::Succeeded,
1123 handler_kind: "local".to_string(),
1124 target_uri: "test::handler".to_string(),
1125 replay_of_event_id: None,
1126 result: None,
1127 error: None,
1128 }),
1129 error: None,
1130 },
1131 )
1132 .await
1133 .unwrap();
1134 queue.ack_claim(&claim.handle).await.unwrap();
1135 }
1136
1137 #[tokio::test(flavor = "current_thread")]
1138 async fn drr_policy_rotates_across_tenants_through_claim_next() {
1139 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(256)));
1140 let queue = WorkerQueue::with_policy(
1141 log,
1142 SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant),
1143 );
1144
1145 for idx in 0..8 {
1147 queue
1148 .enqueue(&tenant_job(
1149 "triage",
1150 "trigger",
1151 &format!("a-{idx}"),
1152 "tenant-a",
1153 WorkerQueuePriority::Normal,
1154 ))
1155 .await
1156 .unwrap();
1157 }
1158 queue
1159 .enqueue(&tenant_job(
1160 "triage",
1161 "trigger",
1162 "b-1",
1163 "tenant-b",
1164 WorkerQueuePriority::Normal,
1165 ))
1166 .await
1167 .unwrap();
1168
1169 let mut tenants_seen = Vec::new();
1173 for n in 0..4 {
1174 let consumer = format!("c-{n}");
1175 let claim = queue
1176 .claim_next("triage", &consumer, StdDuration::from_secs(60))
1177 .await
1178 .unwrap()
1179 .expect("queue should still have ready jobs");
1180 tenants_seen.push(
1181 claim
1182 .job
1183 .event
1184 .tenant_id
1185 .as_ref()
1186 .map(|t| t.0.clone())
1187 .unwrap_or_default(),
1188 );
1189 ack_and_respond(&queue, "triage", &claim).await;
1190 }
1191
1192 let saw_b = tenants_seen.iter().any(|t| t == "tenant-b");
1193 assert!(
1194 saw_b,
1195 "tenant-b should have been served within the first 4 claims, got {tenants_seen:?}",
1196 );
1197 }
1198
1199 #[tokio::test(flavor = "current_thread")]
1200 async fn fifo_policy_preserves_legacy_behavior() {
1201 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
1202 let queue = WorkerQueue::with_policy(log, SchedulerPolicy::fifo());
1203
1204 for idx in 0..4 {
1206 queue
1207 .enqueue(&tenant_job(
1208 "triage",
1209 "trigger",
1210 &format!("a-{idx}"),
1211 "tenant-a",
1212 WorkerQueuePriority::Normal,
1213 ))
1214 .await
1215 .unwrap();
1216 }
1217 queue
1218 .enqueue(&tenant_job(
1219 "triage",
1220 "trigger",
1221 "b-1",
1222 "tenant-b",
1223 WorkerQueuePriority::Normal,
1224 ))
1225 .await
1226 .unwrap();
1227
1228 let first = queue
1230 .claim_next("triage", "c-0", StdDuration::from_secs(60))
1231 .await
1232 .unwrap()
1233 .unwrap();
1234 assert_eq!(first.job.event.tenant_id.unwrap().0, "tenant-a");
1235 }
1236
1237 #[tokio::test(flavor = "current_thread")]
1238 async fn inspect_queue_reports_per_tenant_fairness_state() {
1239 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
1240 let queue = WorkerQueue::with_policy(
1241 log,
1242 SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant)
1243 .with_weight("tenant-a", 2)
1244 .with_weight("tenant-b", 1),
1245 );
1246
1247 for idx in 0..3 {
1248 queue
1249 .enqueue(&tenant_job(
1250 "triage",
1251 "trigger",
1252 &format!("a-{idx}"),
1253 "tenant-a",
1254 WorkerQueuePriority::Normal,
1255 ))
1256 .await
1257 .unwrap();
1258 }
1259 queue
1260 .enqueue(&tenant_job(
1261 "triage",
1262 "trigger",
1263 "b-1",
1264 "tenant-b",
1265 WorkerQueuePriority::Normal,
1266 ))
1267 .await
1268 .unwrap();
1269
1270 for n in 0..2 {
1271 let consumer = format!("c-{n}");
1272 let claim = queue
1273 .claim_next("triage", &consumer, StdDuration::from_secs(60))
1274 .await
1275 .unwrap()
1276 .unwrap();
1277 ack_and_respond(&queue, "triage", &claim).await;
1278 }
1279
1280 let snap = queue.inspect_queue("triage").await.unwrap();
1281 assert_eq!(snap.scheduler.strategy, "drr");
1282 assert_eq!(snap.scheduler.fairness_key, "tenant");
1283 assert!(snap
1284 .scheduler
1285 .keys
1286 .iter()
1287 .any(|k| k.fairness_key == "tenant-a"));
1288 let weights: BTreeMap<String, u32> = snap
1289 .scheduler
1290 .keys
1291 .iter()
1292 .map(|k| (k.fairness_key.clone(), k.weight))
1293 .collect();
1294 assert_eq!(weights.get("tenant-a").copied(), Some(2));
1295 assert_eq!(weights.get("tenant-b").copied(), Some(1));
1296 }
1297
1298 #[tokio::test(flavor = "current_thread")]
1299 async fn drr_with_max_concurrent_per_key_throttles_hot_tenant() {
1300 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(128)));
1301 let queue = WorkerQueue::with_policy(
1302 log,
1303 SchedulerPolicy::deficit_round_robin(scheduler::FairnessKey::Tenant)
1304 .with_max_concurrent_per_key(1),
1305 );
1306
1307 for idx in 0..4 {
1308 queue
1309 .enqueue(&tenant_job(
1310 "triage",
1311 "trigger",
1312 &format!("a-{idx}"),
1313 "tenant-a",
1314 WorkerQueuePriority::Normal,
1315 ))
1316 .await
1317 .unwrap();
1318 }
1319 queue
1320 .enqueue(&tenant_job(
1321 "triage",
1322 "trigger",
1323 "b-1",
1324 "tenant-b",
1325 WorkerQueuePriority::Normal,
1326 ))
1327 .await
1328 .unwrap();
1329
1330 let first = queue
1331 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
1332 .await
1333 .unwrap()
1334 .unwrap();
1335 let second = queue
1338 .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
1339 .await
1340 .unwrap()
1341 .unwrap();
1342 let pair = [
1343 first.job.event.tenant_id.clone().unwrap().0,
1344 second.job.event.tenant_id.clone().unwrap().0,
1345 ];
1346 assert!(
1347 pair.contains(&"tenant-a".to_string()) && pair.contains(&"tenant-b".to_string()),
1348 "max_concurrent_per_key=1 must release tenant-b within two claims, got {pair:?}",
1349 );
1350 }
1351
1352 #[test]
1353 fn from_env_parses_drr_policy_from_lookup() {
1354 let lookup = |name: &str| -> Option<String> {
1355 match name {
1356 "HARN_SCHEDULER_STRATEGY" => Some("drr".to_string()),
1357 "HARN_SCHEDULER_FAIRNESS_KEY" => Some("tenant-and-binding".to_string()),
1358 "HARN_SCHEDULER_QUANTUM" => Some("3".to_string()),
1359 "HARN_SCHEDULER_STARVATION_AGE_MS" => Some("750".to_string()),
1360 "HARN_SCHEDULER_MAX_CONCURRENT_PER_KEY" => Some("4".to_string()),
1361 "HARN_SCHEDULER_DEFAULT_WEIGHT" => Some("2".to_string()),
1362 "HARN_SCHEDULER_WEIGHTS" => Some("tenant-a:5,tenant-b:1, : ,bad:abc".to_string()),
1363 _ => None,
1364 }
1365 };
1366 let policy = SchedulerPolicy::from_env_lookup(lookup);
1367 match policy.strategy {
1368 SchedulerStrategy::DeficitRoundRobin {
1369 quantum,
1370 starvation_age_ms,
1371 } => {
1372 assert_eq!(quantum, 3);
1373 assert_eq!(starvation_age_ms, Some(750));
1374 }
1375 other => panic!("expected DRR strategy, got {other:?}"),
1376 }
1377 assert_eq!(
1378 policy.fairness_key,
1379 scheduler::FairnessKey::TenantAndBinding
1380 );
1381 assert_eq!(policy.max_concurrent_per_key, 4);
1382 assert_eq!(policy.default_weight, 2);
1383 assert_eq!(policy.weight_for("tenant-a"), 5);
1384 assert_eq!(policy.weight_for("tenant-b"), 1);
1385 assert_eq!(policy.weight_for("tenant-c"), 2);
1387 }
1388
1389 #[test]
1390 fn from_env_defaults_to_fifo_when_missing() {
1391 let policy = SchedulerPolicy::from_env_lookup(|_| None);
1392 assert!(matches!(policy.strategy, SchedulerStrategy::Fifo));
1393 }
1394}