1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use crate::event_log::{
9 sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12use super::{DispatchOutcome, TriggerEvent};
13
14pub const WORKER_QUEUE_CATALOG_TOPIC: &str = "worker.queues";
15const WORKER_QUEUE_CLAIMS_SUFFIX: &str = ".claims";
16const WORKER_QUEUE_RESPONSES_SUFFIX: &str = ".responses";
17const NORMAL_PROMOTION_AGE_MS: i64 = 15 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "lowercase")]
21pub enum WorkerQueuePriority {
22 High,
23 #[default]
24 Normal,
25 Low,
26}
27
28impl WorkerQueuePriority {
29 pub fn as_str(self) -> &'static str {
30 match self {
31 Self::High => "high",
32 Self::Normal => "normal",
33 Self::Low => "low",
34 }
35 }
36
37 fn effective_rank(self, enqueued_at_ms: i64, now_ms: i64) -> u8 {
38 match self {
39 Self::High => 0,
40 Self::Normal if now_ms.saturating_sub(enqueued_at_ms) >= NORMAL_PROMOTION_AGE_MS => 0,
41 Self::Normal => 1,
42 Self::Low => 2,
43 }
44 }
45}
46
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48pub struct WorkerQueueJob {
49 pub queue: String,
50 pub trigger_id: String,
51 pub binding_key: String,
52 pub binding_version: u32,
53 pub event: TriggerEvent,
54 #[serde(default)]
55 pub replay_of_event_id: Option<String>,
56 #[serde(default)]
57 pub priority: WorkerQueuePriority,
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct WorkerQueueEnqueueReceipt {
62 pub queue: String,
63 pub job_event_id: u64,
64 pub response_topic: String,
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
68pub struct WorkerQueueClaimHandle {
69 pub queue: String,
70 pub job_event_id: u64,
71 pub claim_id: String,
72 pub consumer_id: String,
73 pub expires_at_ms: i64,
74}
75
76#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
77pub struct ClaimedWorkerJob {
78 pub handle: WorkerQueueClaimHandle,
79 pub job: WorkerQueueJob,
80}
81
82#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
83pub struct WorkerQueueResponseRecord {
84 pub queue: String,
85 pub job_event_id: u64,
86 pub consumer_id: String,
87 pub handled_at_ms: i64,
88 pub outcome: Option<DispatchOutcome>,
89 pub error: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub struct WorkerQueueSummary {
94 pub queue: String,
95 pub ready: usize,
96 pub in_flight: usize,
97 pub acked: usize,
98 pub purged: usize,
99 pub responses: usize,
100 pub oldest_unclaimed_age_ms: Option<u64>,
101}
102
103#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
104pub struct WorkerQueueJobState {
105 pub job_event_id: u64,
106 pub enqueued_at_ms: i64,
107 pub job: WorkerQueueJob,
108 pub active_claim: Option<WorkerQueueClaimHandle>,
109 pub acked: bool,
110 pub purged: bool,
111}
112
113impl WorkerQueueJobState {
114 pub fn is_ready(&self) -> bool {
115 !self.acked && !self.purged && self.active_claim.is_none()
116 }
117}
118
119#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
120pub struct WorkerQueueState {
121 pub queue: String,
122 pub responses: Vec<WorkerQueueResponseRecord>,
123 pub jobs: Vec<WorkerQueueJobState>,
124}
125
126impl WorkerQueueState {
127 pub fn summary(&self, now_ms: i64) -> WorkerQueueSummary {
128 let ready = self.jobs.iter().filter(|job| job.is_ready()).count();
129 let in_flight = self
130 .jobs
131 .iter()
132 .filter(|job| !job.acked && !job.purged && job.active_claim.is_some())
133 .count();
134 let acked = self.jobs.iter().filter(|job| job.acked).count();
135 let purged = self.jobs.iter().filter(|job| job.purged).count();
136 let oldest_unclaimed_age_ms = self
137 .jobs
138 .iter()
139 .filter(|job| job.is_ready())
140 .map(|job| now_ms.saturating_sub(job.enqueued_at_ms).max(0) as u64)
141 .max();
142 WorkerQueueSummary {
143 queue: self.queue.clone(),
144 ready,
145 in_flight,
146 acked,
147 purged,
148 responses: self.responses.len(),
149 oldest_unclaimed_age_ms,
150 }
151 }
152
153 fn next_ready_job(&self, now_ms: i64) -> Option<&WorkerQueueJobState> {
154 self.jobs
155 .iter()
156 .filter(|job| job.is_ready())
157 .min_by_key(|job| {
158 (
159 job.job.priority.effective_rank(job.enqueued_at_ms, now_ms),
160 job.enqueued_at_ms,
161 job.job_event_id,
162 )
163 })
164 }
165
166 fn active_claim_for(&self, job_event_id: u64) -> Option<&WorkerQueueClaimHandle> {
167 self.jobs
168 .iter()
169 .find(|job| job.job_event_id == job_event_id)
170 .and_then(|job| job.active_claim.as_ref())
171 }
172}
173
174#[derive(Clone)]
175pub struct WorkerQueue {
176 event_log: Arc<AnyEventLog>,
177}
178
179impl WorkerQueue {
180 pub fn new(event_log: Arc<AnyEventLog>) -> Self {
181 Self { event_log }
182 }
183
184 pub async fn enqueue(
185 &self,
186 job: &WorkerQueueJob,
187 ) -> Result<WorkerQueueEnqueueReceipt, LogError> {
188 let queue = job.queue.trim();
189 if queue.is_empty() {
190 return Err(LogError::Config(
191 "worker queue name cannot be empty".to_string(),
192 ));
193 }
194 let queue_name = queue.to_string();
195 let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
196 .expect("static worker queue catalog topic should always be valid");
197 self.event_log
198 .append(
199 &catalog_topic,
200 LogEvent::new(
201 "queue_seen",
202 serde_json::to_value(WorkerQueueCatalogRecord {
203 queue: queue_name.clone(),
204 })
205 .map_err(|error| LogError::Serde(error.to_string()))?,
206 ),
207 )
208 .await?;
209
210 let job_topic = job_topic(&queue_name)?;
211 let mut headers = BTreeMap::new();
212 headers.insert("queue".to_string(), queue_name.clone());
213 headers.insert("trigger_id".to_string(), job.trigger_id.clone());
214 headers.insert("binding_key".to_string(), job.binding_key.clone());
215 headers.insert("event_id".to_string(), job.event.id.0.clone());
216 headers.insert("priority".to_string(), job.priority.as_str().to_string());
217 let job_event_id = self
218 .event_log
219 .append(
220 &job_topic,
221 LogEvent::new(
222 "trigger_dispatch",
223 serde_json::to_value(job)
224 .map_err(|error| LogError::Serde(error.to_string()))?,
225 )
226 .with_headers(headers),
227 )
228 .await?;
229 Ok(WorkerQueueEnqueueReceipt {
230 queue: queue_name.clone(),
231 job_event_id,
232 response_topic: response_topic_name(&queue_name),
233 })
234 }
235
236 pub async fn known_queues(&self) -> Result<Vec<String>, LogError> {
237 let topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
238 .expect("static worker queue catalog topic should always be valid");
239 let events = self.event_log.read_range(&topic, None, usize::MAX).await?;
240 let mut queues = BTreeSet::new();
241 for (_, event) in events {
242 if event.kind != "queue_seen" {
243 continue;
244 }
245 let record: WorkerQueueCatalogRecord = serde_json::from_value(event.payload)
246 .map_err(|error| LogError::Serde(error.to_string()))?;
247 if !record.queue.trim().is_empty() {
248 queues.insert(record.queue);
249 }
250 }
251 Ok(queues.into_iter().collect())
252 }
253
254 pub async fn queue_state(&self, queue: &str) -> Result<WorkerQueueState, LogError> {
255 let queue_name = queue.trim();
256 if queue_name.is_empty() {
257 return Err(LogError::Config(
258 "worker queue name cannot be empty".to_string(),
259 ));
260 }
261 let now_ms = now_ms();
262 let job_events = self
263 .event_log
264 .read_range(&job_topic(queue_name)?, None, usize::MAX)
265 .await?;
266 let claim_events = self
267 .event_log
268 .read_range(&claims_topic(queue_name)?, None, usize::MAX)
269 .await?;
270 let response_events = self
271 .event_log
272 .read_range(&responses_topic(queue_name)?, None, usize::MAX)
273 .await?;
274
275 let mut jobs = BTreeMap::<u64, WorkerQueueJobStateInternal>::new();
276 for (job_event_id, event) in job_events {
277 if event.kind != "trigger_dispatch" {
278 continue;
279 }
280 let job: WorkerQueueJob = serde_json::from_value(event.payload)
281 .map_err(|error| LogError::Serde(error.to_string()))?;
282 jobs.insert(
283 job_event_id,
284 WorkerQueueJobStateInternal {
285 job_event_id,
286 enqueued_at_ms: event.occurred_at_ms,
287 job,
288 active_claim: None,
289 acked: false,
290 purged: false,
291 seen_claim_ids: BTreeSet::new(),
292 },
293 );
294 }
295
296 for (_, event) in claim_events {
297 match event.kind.as_str() {
298 "job_claimed" => {
299 let claim: WorkerQueueClaimRecord = serde_json::from_value(event.payload)
300 .map_err(|error| LogError::Serde(error.to_string()))?;
301 let Some(job) = jobs.get_mut(&claim.job_event_id) else {
302 continue;
303 };
304 if job.acked || job.purged {
305 continue;
306 }
307 job.seen_claim_ids.insert(claim.claim_id.clone());
308 let can_take = job
309 .active_claim
310 .as_ref()
311 .is_none_or(|active| active.expires_at_ms <= claim.claimed_at_ms);
312 if can_take {
313 job.active_claim = Some(WorkerQueueClaimHandle {
314 queue: queue_name.to_string(),
315 job_event_id: claim.job_event_id,
316 claim_id: claim.claim_id,
317 consumer_id: claim.consumer_id,
318 expires_at_ms: claim.expires_at_ms,
319 });
320 }
321 }
322 "claim_renewed" => {
323 let renewal: WorkerQueueClaimRenewalRecord =
324 serde_json::from_value(event.payload)
325 .map_err(|error| LogError::Serde(error.to_string()))?;
326 let Some(job) = jobs.get_mut(&renewal.job_event_id) else {
327 continue;
328 };
329 if let Some(active) = job.active_claim.as_mut() {
330 if active.claim_id == renewal.claim_id {
331 active.expires_at_ms = renewal.expires_at_ms;
332 }
333 }
334 }
335 "job_released" => {
336 let release: WorkerQueueReleaseRecord =
337 serde_json::from_value(event.payload)
338 .map_err(|error| LogError::Serde(error.to_string()))?;
339 let Some(job) = jobs.get_mut(&release.job_event_id) else {
340 continue;
341 };
342 if job
343 .active_claim
344 .as_ref()
345 .is_some_and(|active| active.claim_id == release.claim_id)
346 {
347 job.active_claim = None;
348 }
349 }
350 "job_acked" => {
351 let ack: WorkerQueueAckRecord = serde_json::from_value(event.payload)
352 .map_err(|error| LogError::Serde(error.to_string()))?;
353 let Some(job) = jobs.get_mut(&ack.job_event_id) else {
354 continue;
355 };
356 if ack.claim_id.is_empty() || job.seen_claim_ids.contains(&ack.claim_id) {
357 job.acked = true;
358 job.active_claim = None;
359 }
360 }
361 "job_purged" => {
362 let purge: WorkerQueuePurgeRecord = serde_json::from_value(event.payload)
363 .map_err(|error| LogError::Serde(error.to_string()))?;
364 let Some(job) = jobs.get_mut(&purge.job_event_id) else {
365 continue;
366 };
367 if !job.acked {
368 job.purged = true;
369 job.active_claim = None;
370 }
371 }
372 _ => {}
373 }
374 }
375
376 let responses = response_events
377 .into_iter()
378 .filter(|(_, event)| event.kind == "job_response")
379 .map(|(_, event)| {
380 serde_json::from_value::<WorkerQueueResponseRecord>(event.payload)
381 .map_err(|error| LogError::Serde(error.to_string()))
382 })
383 .collect::<Result<Vec<_>, _>>()?;
384
385 let mut queue_state = WorkerQueueState {
386 queue: queue_name.to_string(),
387 responses,
388 jobs: jobs
389 .into_values()
390 .map(|mut job| {
391 if job
392 .active_claim
393 .as_ref()
394 .is_some_and(|active| active.expires_at_ms <= now_ms)
395 {
396 job.active_claim = None;
397 }
398 WorkerQueueJobState {
399 job_event_id: job.job_event_id,
400 enqueued_at_ms: job.enqueued_at_ms,
401 job: job.job,
402 active_claim: job.active_claim,
403 acked: job.acked,
404 purged: job.purged,
405 }
406 })
407 .collect(),
408 };
409 queue_state
410 .jobs
411 .sort_by_key(|job| (job.enqueued_at_ms, job.job_event_id));
412 Ok(queue_state)
413 }
414
415 pub async fn queue_summaries(&self) -> Result<Vec<WorkerQueueSummary>, LogError> {
416 let now_ms = now_ms();
417 let mut summaries = Vec::new();
418 for queue in self.known_queues().await? {
419 let state = self.queue_state(&queue).await?;
420 summaries.push(state.summary(now_ms));
421 }
422 summaries.sort_by(|left, right| left.queue.cmp(&right.queue));
423 Ok(summaries)
424 }
425
426 pub async fn claim_next(
427 &self,
428 queue: &str,
429 consumer_id: &str,
430 ttl: StdDuration,
431 ) -> Result<Option<ClaimedWorkerJob>, LogError> {
432 let queue_name = queue.trim();
433 if queue_name.is_empty() {
434 return Err(LogError::Config(
435 "worker queue name cannot be empty".to_string(),
436 ));
437 }
438 if consumer_id.trim().is_empty() {
439 return Err(LogError::InvalidConsumer(
440 "worker queue consumer id cannot be empty".to_string(),
441 ));
442 }
443 for _ in 0..8 {
444 let now_ms = now_ms();
445 let state = self.queue_state(queue_name).await?;
446 let Some(job) = state.next_ready_job(now_ms).cloned() else {
447 return Ok(None);
448 };
449 let claim = WorkerQueueClaimRecord {
450 job_event_id: job.job_event_id,
451 claim_id: Uuid::new_v4().to_string(),
452 consumer_id: consumer_id.to_string(),
453 claimed_at_ms: now_ms,
454 expires_at_ms: expiry_ms(now_ms, ttl),
455 };
456 self.event_log
457 .append(
458 &claims_topic(queue_name)?,
459 LogEvent::new(
460 "job_claimed",
461 serde_json::to_value(&claim)
462 .map_err(|error| LogError::Serde(error.to_string()))?,
463 ),
464 )
465 .await?;
466 let refreshed = self.queue_state(queue_name).await?;
467 if refreshed
468 .active_claim_for(job.job_event_id)
469 .is_some_and(|active| active.claim_id == claim.claim_id)
470 {
471 return Ok(Some(ClaimedWorkerJob {
472 handle: WorkerQueueClaimHandle {
473 queue: queue_name.to_string(),
474 job_event_id: claim.job_event_id,
475 claim_id: claim.claim_id,
476 consumer_id: claim.consumer_id,
477 expires_at_ms: claim.expires_at_ms,
478 },
479 job: job.job,
480 }));
481 }
482 }
483 Ok(None)
484 }
485
486 pub async fn renew_claim(
487 &self,
488 handle: &WorkerQueueClaimHandle,
489 ttl: StdDuration,
490 ) -> Result<bool, LogError> {
491 let now_ms = now_ms();
492 let renewal = WorkerQueueClaimRenewalRecord {
493 job_event_id: handle.job_event_id,
494 claim_id: handle.claim_id.clone(),
495 consumer_id: handle.consumer_id.clone(),
496 renewed_at_ms: now_ms,
497 expires_at_ms: expiry_ms(now_ms, ttl),
498 };
499 self.event_log
500 .append(
501 &claims_topic(&handle.queue)?,
502 LogEvent::new(
503 "claim_renewed",
504 serde_json::to_value(&renewal)
505 .map_err(|error| LogError::Serde(error.to_string()))?,
506 ),
507 )
508 .await?;
509 let refreshed = self.queue_state(&handle.queue).await?;
510 Ok(refreshed
511 .active_claim_for(handle.job_event_id)
512 .is_some_and(|active| active.claim_id == handle.claim_id))
513 }
514
515 pub async fn release_claim(
516 &self,
517 handle: &WorkerQueueClaimHandle,
518 reason: &str,
519 ) -> Result<(), LogError> {
520 let release = WorkerQueueReleaseRecord {
521 job_event_id: handle.job_event_id,
522 claim_id: handle.claim_id.clone(),
523 consumer_id: handle.consumer_id.clone(),
524 released_at_ms: now_ms(),
525 reason: if reason.trim().is_empty() {
526 None
527 } else {
528 Some(reason.to_string())
529 },
530 };
531 self.event_log
532 .append(
533 &claims_topic(&handle.queue)?,
534 LogEvent::new(
535 "job_released",
536 serde_json::to_value(&release)
537 .map_err(|error| LogError::Serde(error.to_string()))?,
538 ),
539 )
540 .await?;
541 Ok(())
542 }
543
544 pub async fn append_response(
545 &self,
546 queue: &str,
547 response: &WorkerQueueResponseRecord,
548 ) -> Result<u64, LogError> {
549 self.event_log
550 .append(
551 &responses_topic(queue)?,
552 LogEvent::new(
553 "job_response",
554 serde_json::to_value(response)
555 .map_err(|error| LogError::Serde(error.to_string()))?,
556 ),
557 )
558 .await
559 }
560
561 pub async fn ack_claim(&self, handle: &WorkerQueueClaimHandle) -> Result<u64, LogError> {
562 self.event_log
563 .append(
564 &claims_topic(&handle.queue)?,
565 LogEvent::new(
566 "job_acked",
567 serde_json::to_value(WorkerQueueAckRecord {
568 job_event_id: handle.job_event_id,
569 claim_id: handle.claim_id.clone(),
570 consumer_id: handle.consumer_id.clone(),
571 acked_at_ms: now_ms(),
572 })
573 .map_err(|error| LogError::Serde(error.to_string()))?,
574 ),
575 )
576 .await
577 }
578
579 pub async fn purge_unclaimed(
580 &self,
581 queue: &str,
582 purged_by: &str,
583 reason: Option<&str>,
584 ) -> Result<usize, LogError> {
585 let state = self.queue_state(queue).await?;
586 let ready_jobs: Vec<_> = state
587 .jobs
588 .into_iter()
589 .filter(|job| job.is_ready())
590 .map(|job| job.job_event_id)
591 .collect();
592 for job_event_id in &ready_jobs {
593 self.event_log
594 .append(
595 &claims_topic(queue)?,
596 LogEvent::new(
597 "job_purged",
598 serde_json::to_value(WorkerQueuePurgeRecord {
599 job_event_id: *job_event_id,
600 purged_by: purged_by.to_string(),
601 purged_at_ms: now_ms(),
602 reason: reason
603 .filter(|value| !value.trim().is_empty())
604 .map(|value| value.to_string()),
605 })
606 .map_err(|error| LogError::Serde(error.to_string()))?,
607 ),
608 )
609 .await?;
610 }
611 Ok(ready_jobs.len())
612 }
613}
614
615#[derive(Clone, Debug)]
616struct WorkerQueueJobStateInternal {
617 job_event_id: u64,
618 enqueued_at_ms: i64,
619 job: WorkerQueueJob,
620 active_claim: Option<WorkerQueueClaimHandle>,
621 acked: bool,
622 purged: bool,
623 seen_claim_ids: BTreeSet<String>,
624}
625
626#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
627struct WorkerQueueCatalogRecord {
628 queue: String,
629}
630
631#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
632struct WorkerQueueClaimRecord {
633 job_event_id: u64,
634 claim_id: String,
635 consumer_id: String,
636 claimed_at_ms: i64,
637 expires_at_ms: i64,
638}
639
640#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
641struct WorkerQueueClaimRenewalRecord {
642 job_event_id: u64,
643 claim_id: String,
644 consumer_id: String,
645 renewed_at_ms: i64,
646 expires_at_ms: i64,
647}
648
649#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
650struct WorkerQueueReleaseRecord {
651 job_event_id: u64,
652 claim_id: String,
653 consumer_id: String,
654 released_at_ms: i64,
655 #[serde(default)]
656 reason: Option<String>,
657}
658
659#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
660struct WorkerQueueAckRecord {
661 job_event_id: u64,
662 claim_id: String,
663 consumer_id: String,
664 acked_at_ms: i64,
665}
666
667#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
668struct WorkerQueuePurgeRecord {
669 job_event_id: u64,
670 purged_by: String,
671 purged_at_ms: i64,
672 #[serde(default)]
673 reason: Option<String>,
674}
675
676pub fn job_topic_name(queue: &str) -> String {
677 format!("worker.{}", sanitize_topic_component(queue))
678}
679
680pub fn claims_topic_name(queue: &str) -> String {
681 format!("{}{}", job_topic_name(queue), WORKER_QUEUE_CLAIMS_SUFFIX)
682}
683
684pub fn response_topic_name(queue: &str) -> String {
685 format!("{}{}", job_topic_name(queue), WORKER_QUEUE_RESPONSES_SUFFIX)
686}
687
688fn job_topic(queue: &str) -> Result<Topic, LogError> {
689 Topic::new(job_topic_name(queue))
690}
691
692fn claims_topic(queue: &str) -> Result<Topic, LogError> {
693 Topic::new(claims_topic_name(queue))
694}
695
696fn responses_topic(queue: &str) -> Result<Topic, LogError> {
697 Topic::new(response_topic_name(queue))
698}
699
700fn now_ms() -> i64 {
701 std::time::SystemTime::now()
702 .duration_since(std::time::UNIX_EPOCH)
703 .map(|duration| duration.as_millis() as i64)
704 .unwrap_or(0)
705}
706
707fn expiry_ms(now_ms: i64, ttl: StdDuration) -> i64 {
708 now_ms.saturating_add(ttl.as_millis().min(i64::MAX as u128) as i64)
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714
715 use crate::event_log::{AnyEventLog, MemoryEventLog};
716 use crate::triggers::{
717 event::{GenericWebhookPayload, KnownProviderPayload},
718 ProviderId, ProviderPayload, SignatureStatus, TraceId, TriggerEvent,
719 };
720
721 fn test_event(id: &str) -> TriggerEvent {
722 TriggerEvent {
723 id: crate::triggers::TriggerEventId(id.to_string()),
724 provider: ProviderId::from("github"),
725 kind: "issues.opened".to_string(),
726 trace_id: TraceId("trace-test".to_string()),
727 dedupe_key: id.to_string(),
728 tenant_id: None,
729 headers: BTreeMap::new(),
730 batch: None,
731 provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
732 GenericWebhookPayload {
733 source: Some("worker-queue-test".to_string()),
734 content_type: Some("application/json".to_string()),
735 raw: serde_json::json!({"id": id}),
736 },
737 )),
738 signature_status: SignatureStatus::Verified,
739 received_at: time::OffsetDateTime::now_utc(),
740 occurred_at: None,
741 dedupe_claimed: false,
742 }
743 }
744
745 fn test_job(
746 queue: &str,
747 trigger_id: &str,
748 event_id: &str,
749 priority: WorkerQueuePriority,
750 ) -> WorkerQueueJob {
751 WorkerQueueJob {
752 queue: queue.to_string(),
753 trigger_id: trigger_id.to_string(),
754 binding_key: format!("{trigger_id}@v1"),
755 binding_version: 1,
756 event: test_event(event_id),
757 replay_of_event_id: None,
758 priority,
759 }
760 }
761
762 #[tokio::test(flavor = "current_thread")]
763 async fn enqueue_and_summarize_queue() {
764 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
765 let queue = WorkerQueue::new(log);
766 queue
767 .enqueue(&test_job(
768 "triage",
769 "incoming-review-task",
770 "evt-1",
771 WorkerQueuePriority::Normal,
772 ))
773 .await
774 .unwrap();
775 let summaries = queue.queue_summaries().await.unwrap();
776 assert_eq!(summaries.len(), 1);
777 assert_eq!(summaries[0].queue, "triage");
778 assert_eq!(summaries[0].ready, 1);
779 assert_eq!(summaries[0].in_flight, 0);
780 }
781
782 #[tokio::test(flavor = "current_thread")]
783 async fn claim_and_ack_remove_job_from_ready_pool() {
784 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
785 let queue = WorkerQueue::new(log);
786 queue
787 .enqueue(&test_job(
788 "triage",
789 "incoming-review-task",
790 "evt-1",
791 WorkerQueuePriority::Normal,
792 ))
793 .await
794 .unwrap();
795 let claimed = queue
796 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
797 .await
798 .unwrap()
799 .unwrap();
800 let before_ack = queue.queue_state("triage").await.unwrap();
801 assert_eq!(before_ack.summary(now_ms()).ready, 0);
802 assert_eq!(before_ack.summary(now_ms()).in_flight, 1);
803 queue
804 .append_response(
805 "triage",
806 &WorkerQueueResponseRecord {
807 queue: "triage".to_string(),
808 job_event_id: claimed.handle.job_event_id,
809 consumer_id: "consumer-a".to_string(),
810 handled_at_ms: now_ms(),
811 outcome: Some(DispatchOutcome {
812 trigger_id: "incoming-review-task".to_string(),
813 binding_key: "incoming-review-task@v1".to_string(),
814 event_id: "evt-1".to_string(),
815 attempt_count: 1,
816 status: super::super::DispatchStatus::Succeeded,
817 handler_kind: "local".to_string(),
818 target_uri: "handlers::on_review".to_string(),
819 replay_of_event_id: None,
820 result: Some(serde_json::json!({"ok": true})),
821 error: None,
822 }),
823 error: None,
824 },
825 )
826 .await
827 .unwrap();
828 queue.ack_claim(&claimed.handle).await.unwrap();
829 let after_ack = queue.queue_state("triage").await.unwrap();
830 let summary = after_ack.summary(now_ms());
831 assert_eq!(summary.ready, 0);
832 assert_eq!(summary.in_flight, 0);
833 assert_eq!(summary.acked, 1);
834 assert_eq!(summary.responses, 1);
835 }
836
837 #[tokio::test(flavor = "current_thread")]
838 async fn expired_claim_allows_reclaim() {
839 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
840 let queue = WorkerQueue::new(log);
841 queue
842 .enqueue(&test_job(
843 "triage",
844 "incoming-review-task",
845 "evt-1",
846 WorkerQueuePriority::Normal,
847 ))
848 .await
849 .unwrap();
850 let first = queue
851 .claim_next("triage", "consumer-a", StdDuration::from_millis(15))
852 .await
853 .unwrap()
854 .unwrap();
855 tokio::time::sleep(StdDuration::from_millis(30)).await;
856 let second = queue
857 .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
858 .await
859 .unwrap()
860 .unwrap();
861 assert_eq!(first.job.event.id.0, second.job.event.id.0);
862 assert_ne!(first.handle.claim_id, second.handle.claim_id);
863 assert_eq!(second.handle.consumer_id, "consumer-b");
864 }
865
866 #[tokio::test(flavor = "current_thread")]
867 async fn high_priority_and_aged_normal_are_selected_first() {
868 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
869 let queue = WorkerQueue::new(log.clone());
870
871 let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC).unwrap();
872 log.append(
873 &catalog_topic,
874 LogEvent::new("queue_seen", serde_json::json!({"queue":"triage"})),
875 )
876 .await
877 .unwrap();
878
879 let topic = job_topic("triage").unwrap();
880 let mut old_normal = LogEvent::new(
881 "trigger_dispatch",
882 serde_json::to_value(test_job(
883 "triage",
884 "incoming-review-task",
885 "evt-old-normal",
886 WorkerQueuePriority::Normal,
887 ))
888 .unwrap(),
889 );
890 old_normal.occurred_at_ms = now_ms() - NORMAL_PROMOTION_AGE_MS - 1_000;
891 log.append(&topic, old_normal).await.unwrap();
892
893 let high = LogEvent::new(
894 "trigger_dispatch",
895 serde_json::to_value(test_job(
896 "triage",
897 "incoming-review-task",
898 "evt-high",
899 WorkerQueuePriority::High,
900 ))
901 .unwrap(),
902 );
903 log.append(&topic, high).await.unwrap();
904
905 let claimed = queue
906 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
907 .await
908 .unwrap()
909 .unwrap();
910 assert_eq!(claimed.job.event.id.0, "evt-old-normal");
911 }
912}