1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use crate::event_log::{
9 sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12use super::{DispatchOutcome, TriggerEvent};
13
14pub const WORKER_QUEUE_CATALOG_TOPIC: &str = "worker.queues";
15const WORKER_QUEUE_CLAIMS_SUFFIX: &str = ".claims";
16const WORKER_QUEUE_RESPONSES_SUFFIX: &str = ".responses";
17const NORMAL_PROMOTION_AGE_MS: i64 = 15 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "lowercase")]
21pub enum WorkerQueuePriority {
22 High,
23 #[default]
24 Normal,
25 Low,
26}
27
28impl WorkerQueuePriority {
29 pub fn as_str(self) -> &'static str {
30 match self {
31 Self::High => "high",
32 Self::Normal => "normal",
33 Self::Low => "low",
34 }
35 }
36
37 fn effective_rank(self, enqueued_at_ms: i64, now_ms: i64) -> u8 {
38 match self {
39 Self::High => 0,
40 Self::Normal if now_ms.saturating_sub(enqueued_at_ms) >= NORMAL_PROMOTION_AGE_MS => 0,
41 Self::Normal => 1,
42 Self::Low => 2,
43 }
44 }
45}
46
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48pub struct WorkerQueueJob {
49 pub queue: String,
50 pub trigger_id: String,
51 pub binding_key: String,
52 pub binding_version: u32,
53 pub event: TriggerEvent,
54 #[serde(default)]
55 pub replay_of_event_id: Option<String>,
56 #[serde(default)]
57 pub priority: WorkerQueuePriority,
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct WorkerQueueEnqueueReceipt {
62 pub queue: String,
63 pub job_event_id: u64,
64 pub response_topic: String,
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
68pub struct WorkerQueueClaimHandle {
69 pub queue: String,
70 pub job_event_id: u64,
71 pub claim_id: String,
72 pub consumer_id: String,
73 pub expires_at_ms: i64,
74}
75
76#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
77pub struct ClaimedWorkerJob {
78 pub handle: WorkerQueueClaimHandle,
79 pub job: WorkerQueueJob,
80}
81
82#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
83pub struct WorkerQueueResponseRecord {
84 pub queue: String,
85 pub job_event_id: u64,
86 pub consumer_id: String,
87 pub handled_at_ms: i64,
88 pub outcome: Option<DispatchOutcome>,
89 pub error: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub struct WorkerQueueSummary {
94 pub queue: String,
95 pub ready: usize,
96 pub in_flight: usize,
97 pub acked: usize,
98 pub purged: usize,
99 pub responses: usize,
100 pub oldest_unclaimed_age_ms: Option<u64>,
101}
102
103#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
104pub struct WorkerQueueJobState {
105 pub job_event_id: u64,
106 pub enqueued_at_ms: i64,
107 pub job: WorkerQueueJob,
108 pub active_claim: Option<WorkerQueueClaimHandle>,
109 pub acked: bool,
110 pub purged: bool,
111}
112
113impl WorkerQueueJobState {
114 pub fn is_ready(&self) -> bool {
115 !self.acked && !self.purged && self.active_claim.is_none()
116 }
117}
118
119#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
120pub struct WorkerQueueState {
121 pub queue: String,
122 pub responses: Vec<WorkerQueueResponseRecord>,
123 pub jobs: Vec<WorkerQueueJobState>,
124}
125
126impl WorkerQueueState {
127 pub fn summary(&self, now_ms: i64) -> WorkerQueueSummary {
128 let ready = self.jobs.iter().filter(|job| job.is_ready()).count();
129 let in_flight = self
130 .jobs
131 .iter()
132 .filter(|job| !job.acked && !job.purged && job.active_claim.is_some())
133 .count();
134 let acked = self.jobs.iter().filter(|job| job.acked).count();
135 let purged = self.jobs.iter().filter(|job| job.purged).count();
136 let oldest_unclaimed_age_ms = self
137 .jobs
138 .iter()
139 .filter(|job| job.is_ready())
140 .map(|job| now_ms.saturating_sub(job.enqueued_at_ms).max(0) as u64)
141 .max();
142 WorkerQueueSummary {
143 queue: self.queue.clone(),
144 ready,
145 in_flight,
146 acked,
147 purged,
148 responses: self.responses.len(),
149 oldest_unclaimed_age_ms,
150 }
151 }
152
153 fn next_ready_job(&self, now_ms: i64) -> Option<&WorkerQueueJobState> {
154 self.jobs
155 .iter()
156 .filter(|job| job.is_ready())
157 .min_by_key(|job| {
158 (
159 job.job.priority.effective_rank(job.enqueued_at_ms, now_ms),
160 job.enqueued_at_ms,
161 job.job_event_id,
162 )
163 })
164 }
165
166 fn active_claim_for(&self, job_event_id: u64) -> Option<&WorkerQueueClaimHandle> {
167 self.jobs
168 .iter()
169 .find(|job| job.job_event_id == job_event_id)
170 .and_then(|job| job.active_claim.as_ref())
171 }
172}
173
174#[derive(Clone)]
175pub struct WorkerQueue {
176 event_log: Arc<AnyEventLog>,
177}
178
179impl WorkerQueue {
180 pub fn new(event_log: Arc<AnyEventLog>) -> Self {
181 Self { event_log }
182 }
183
184 pub async fn enqueue(
185 &self,
186 job: &WorkerQueueJob,
187 ) -> Result<WorkerQueueEnqueueReceipt, LogError> {
188 let queue = job.queue.trim();
189 if queue.is_empty() {
190 return Err(LogError::Config(
191 "worker queue name cannot be empty".to_string(),
192 ));
193 }
194 let queue_name = queue.to_string();
195 let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
196 .expect("static worker queue catalog topic should always be valid");
197 self.event_log
198 .append(
199 &catalog_topic,
200 LogEvent::new(
201 "queue_seen",
202 serde_json::to_value(WorkerQueueCatalogRecord {
203 queue: queue_name.clone(),
204 })
205 .map_err(|error| LogError::Serde(error.to_string()))?,
206 ),
207 )
208 .await?;
209
210 let job_topic = job_topic(&queue_name)?;
211 let mut headers = BTreeMap::new();
212 headers.insert("queue".to_string(), queue_name.clone());
213 headers.insert("trigger_id".to_string(), job.trigger_id.clone());
214 headers.insert("binding_key".to_string(), job.binding_key.clone());
215 headers.insert("event_id".to_string(), job.event.id.0.clone());
216 headers.insert("priority".to_string(), job.priority.as_str().to_string());
217 let job_event_id = self
218 .event_log
219 .append(
220 &job_topic,
221 LogEvent::new(
222 "trigger_dispatch",
223 serde_json::to_value(job)
224 .map_err(|error| LogError::Serde(error.to_string()))?,
225 )
226 .with_headers(headers),
227 )
228 .await?;
229 if let Some(metrics) = crate::active_metrics_registry() {
230 if let Ok(state) = self.queue_state(&queue_name).await {
231 let summary = state.summary(now_ms());
232 metrics.set_worker_queue_depth(
233 &queue_name,
234 (summary.ready + summary.in_flight) as u64,
235 );
236 }
237 }
238 Ok(WorkerQueueEnqueueReceipt {
239 queue: queue_name.clone(),
240 job_event_id,
241 response_topic: response_topic_name(&queue_name),
242 })
243 }
244
245 pub async fn known_queues(&self) -> Result<Vec<String>, LogError> {
246 let topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
247 .expect("static worker queue catalog topic should always be valid");
248 let events = self.event_log.read_range(&topic, None, usize::MAX).await?;
249 let mut queues = BTreeSet::new();
250 for (_, event) in events {
251 if event.kind != "queue_seen" {
252 continue;
253 }
254 let record: WorkerQueueCatalogRecord = serde_json::from_value(event.payload)
255 .map_err(|error| LogError::Serde(error.to_string()))?;
256 if !record.queue.trim().is_empty() {
257 queues.insert(record.queue);
258 }
259 }
260 Ok(queues.into_iter().collect())
261 }
262
263 pub async fn queue_state(&self, queue: &str) -> Result<WorkerQueueState, LogError> {
264 let queue_name = queue.trim();
265 if queue_name.is_empty() {
266 return Err(LogError::Config(
267 "worker queue name cannot be empty".to_string(),
268 ));
269 }
270 let now_ms = now_ms();
271 let job_events = self
272 .event_log
273 .read_range(&job_topic(queue_name)?, None, usize::MAX)
274 .await?;
275 let claim_events = self
276 .event_log
277 .read_range(&claims_topic(queue_name)?, None, usize::MAX)
278 .await?;
279 let response_events = self
280 .event_log
281 .read_range(&responses_topic(queue_name)?, None, usize::MAX)
282 .await?;
283
284 let mut jobs = BTreeMap::<u64, WorkerQueueJobStateInternal>::new();
285 for (job_event_id, event) in job_events {
286 if event.kind != "trigger_dispatch" {
287 continue;
288 }
289 let job: WorkerQueueJob = serde_json::from_value(event.payload)
290 .map_err(|error| LogError::Serde(error.to_string()))?;
291 jobs.insert(
292 job_event_id,
293 WorkerQueueJobStateInternal {
294 job_event_id,
295 enqueued_at_ms: event.occurred_at_ms,
296 job,
297 active_claim: None,
298 acked: false,
299 purged: false,
300 seen_claim_ids: BTreeSet::new(),
301 },
302 );
303 }
304
305 for (_, event) in claim_events {
306 match event.kind.as_str() {
307 "job_claimed" => {
308 let claim: WorkerQueueClaimRecord = serde_json::from_value(event.payload)
309 .map_err(|error| LogError::Serde(error.to_string()))?;
310 let Some(job) = jobs.get_mut(&claim.job_event_id) else {
311 continue;
312 };
313 if job.acked || job.purged {
314 continue;
315 }
316 job.seen_claim_ids.insert(claim.claim_id.clone());
317 let can_take = job
318 .active_claim
319 .as_ref()
320 .is_none_or(|active| active.expires_at_ms <= claim.claimed_at_ms);
321 if can_take {
322 job.active_claim = Some(WorkerQueueClaimHandle {
323 queue: queue_name.to_string(),
324 job_event_id: claim.job_event_id,
325 claim_id: claim.claim_id,
326 consumer_id: claim.consumer_id,
327 expires_at_ms: claim.expires_at_ms,
328 });
329 }
330 }
331 "claim_renewed" => {
332 let renewal: WorkerQueueClaimRenewalRecord =
333 serde_json::from_value(event.payload)
334 .map_err(|error| LogError::Serde(error.to_string()))?;
335 let Some(job) = jobs.get_mut(&renewal.job_event_id) else {
336 continue;
337 };
338 if let Some(active) = job.active_claim.as_mut() {
339 if active.claim_id == renewal.claim_id {
340 active.expires_at_ms = renewal.expires_at_ms;
341 }
342 }
343 }
344 "job_released" => {
345 let release: WorkerQueueReleaseRecord =
346 serde_json::from_value(event.payload)
347 .map_err(|error| LogError::Serde(error.to_string()))?;
348 let Some(job) = jobs.get_mut(&release.job_event_id) else {
349 continue;
350 };
351 if job
352 .active_claim
353 .as_ref()
354 .is_some_and(|active| active.claim_id == release.claim_id)
355 {
356 job.active_claim = None;
357 }
358 }
359 "job_acked" => {
360 let ack: WorkerQueueAckRecord = serde_json::from_value(event.payload)
361 .map_err(|error| LogError::Serde(error.to_string()))?;
362 let Some(job) = jobs.get_mut(&ack.job_event_id) else {
363 continue;
364 };
365 if ack.claim_id.is_empty() || job.seen_claim_ids.contains(&ack.claim_id) {
366 job.acked = true;
367 job.active_claim = None;
368 }
369 }
370 "job_purged" => {
371 let purge: WorkerQueuePurgeRecord = serde_json::from_value(event.payload)
372 .map_err(|error| LogError::Serde(error.to_string()))?;
373 let Some(job) = jobs.get_mut(&purge.job_event_id) else {
374 continue;
375 };
376 if !job.acked {
377 job.purged = true;
378 job.active_claim = None;
379 }
380 }
381 _ => {}
382 }
383 }
384
385 let responses = response_events
386 .into_iter()
387 .filter(|(_, event)| event.kind == "job_response")
388 .map(|(_, event)| {
389 serde_json::from_value::<WorkerQueueResponseRecord>(event.payload)
390 .map_err(|error| LogError::Serde(error.to_string()))
391 })
392 .collect::<Result<Vec<_>, _>>()?;
393
394 let mut queue_state = WorkerQueueState {
395 queue: queue_name.to_string(),
396 responses,
397 jobs: jobs
398 .into_values()
399 .map(|mut job| {
400 if job
401 .active_claim
402 .as_ref()
403 .is_some_and(|active| active.expires_at_ms <= now_ms)
404 {
405 job.active_claim = None;
406 }
407 WorkerQueueJobState {
408 job_event_id: job.job_event_id,
409 enqueued_at_ms: job.enqueued_at_ms,
410 job: job.job,
411 active_claim: job.active_claim,
412 acked: job.acked,
413 purged: job.purged,
414 }
415 })
416 .collect(),
417 };
418 queue_state
419 .jobs
420 .sort_by_key(|job| (job.enqueued_at_ms, job.job_event_id));
421 Ok(queue_state)
422 }
423
424 pub async fn queue_summaries(&self) -> Result<Vec<WorkerQueueSummary>, LogError> {
425 let now_ms = now_ms();
426 let mut summaries = Vec::new();
427 for queue in self.known_queues().await? {
428 let state = self.queue_state(&queue).await?;
429 summaries.push(state.summary(now_ms));
430 }
431 summaries.sort_by(|left, right| left.queue.cmp(&right.queue));
432 Ok(summaries)
433 }
434
435 pub async fn claim_next(
436 &self,
437 queue: &str,
438 consumer_id: &str,
439 ttl: StdDuration,
440 ) -> Result<Option<ClaimedWorkerJob>, LogError> {
441 let queue_name = queue.trim();
442 if queue_name.is_empty() {
443 return Err(LogError::Config(
444 "worker queue name cannot be empty".to_string(),
445 ));
446 }
447 if consumer_id.trim().is_empty() {
448 return Err(LogError::InvalidConsumer(
449 "worker queue consumer id cannot be empty".to_string(),
450 ));
451 }
452 for _ in 0..8 {
453 let now_ms = now_ms();
454 let state = self.queue_state(queue_name).await?;
455 let Some(job) = state.next_ready_job(now_ms).cloned() else {
456 return Ok(None);
457 };
458 let claim = WorkerQueueClaimRecord {
459 job_event_id: job.job_event_id,
460 claim_id: Uuid::new_v4().to_string(),
461 consumer_id: consumer_id.to_string(),
462 claimed_at_ms: now_ms,
463 expires_at_ms: expiry_ms(now_ms, ttl),
464 };
465 self.event_log
466 .append(
467 &claims_topic(queue_name)?,
468 LogEvent::new(
469 "job_claimed",
470 serde_json::to_value(&claim)
471 .map_err(|error| LogError::Serde(error.to_string()))?,
472 ),
473 )
474 .await?;
475 let refreshed = self.queue_state(queue_name).await?;
476 if refreshed
477 .active_claim_for(job.job_event_id)
478 .is_some_and(|active| active.claim_id == claim.claim_id)
479 {
480 if let Some(metrics) = crate::active_metrics_registry() {
481 let summary = refreshed.summary(now_ms);
482 metrics.record_worker_queue_claim_age(
483 queue_name,
484 now_ms.saturating_sub(job.enqueued_at_ms) as f64 / 1000.0,
485 );
486 metrics.set_worker_queue_depth(
487 queue_name,
488 (summary.ready + summary.in_flight) as u64,
489 );
490 }
491 return Ok(Some(ClaimedWorkerJob {
492 handle: WorkerQueueClaimHandle {
493 queue: queue_name.to_string(),
494 job_event_id: claim.job_event_id,
495 claim_id: claim.claim_id,
496 consumer_id: claim.consumer_id,
497 expires_at_ms: claim.expires_at_ms,
498 },
499 job: job.job,
500 }));
501 }
502 }
503 Ok(None)
504 }
505
506 pub async fn renew_claim(
507 &self,
508 handle: &WorkerQueueClaimHandle,
509 ttl: StdDuration,
510 ) -> Result<bool, LogError> {
511 let now_ms = now_ms();
512 let renewal = WorkerQueueClaimRenewalRecord {
513 job_event_id: handle.job_event_id,
514 claim_id: handle.claim_id.clone(),
515 consumer_id: handle.consumer_id.clone(),
516 renewed_at_ms: now_ms,
517 expires_at_ms: expiry_ms(now_ms, ttl),
518 };
519 self.event_log
520 .append(
521 &claims_topic(&handle.queue)?,
522 LogEvent::new(
523 "claim_renewed",
524 serde_json::to_value(&renewal)
525 .map_err(|error| LogError::Serde(error.to_string()))?,
526 ),
527 )
528 .await?;
529 let refreshed = self.queue_state(&handle.queue).await?;
530 Ok(refreshed
531 .active_claim_for(handle.job_event_id)
532 .is_some_and(|active| active.claim_id == handle.claim_id))
533 }
534
535 pub async fn release_claim(
536 &self,
537 handle: &WorkerQueueClaimHandle,
538 reason: &str,
539 ) -> Result<(), LogError> {
540 let release = WorkerQueueReleaseRecord {
541 job_event_id: handle.job_event_id,
542 claim_id: handle.claim_id.clone(),
543 consumer_id: handle.consumer_id.clone(),
544 released_at_ms: now_ms(),
545 reason: if reason.trim().is_empty() {
546 None
547 } else {
548 Some(reason.to_string())
549 },
550 };
551 self.event_log
552 .append(
553 &claims_topic(&handle.queue)?,
554 LogEvent::new(
555 "job_released",
556 serde_json::to_value(&release)
557 .map_err(|error| LogError::Serde(error.to_string()))?,
558 ),
559 )
560 .await?;
561 Ok(())
562 }
563
564 pub async fn append_response(
565 &self,
566 queue: &str,
567 response: &WorkerQueueResponseRecord,
568 ) -> Result<u64, LogError> {
569 self.event_log
570 .append(
571 &responses_topic(queue)?,
572 LogEvent::new(
573 "job_response",
574 serde_json::to_value(response)
575 .map_err(|error| LogError::Serde(error.to_string()))?,
576 ),
577 )
578 .await
579 }
580
581 pub async fn ack_claim(&self, handle: &WorkerQueueClaimHandle) -> Result<u64, LogError> {
582 self.event_log
583 .append(
584 &claims_topic(&handle.queue)?,
585 LogEvent::new(
586 "job_acked",
587 serde_json::to_value(WorkerQueueAckRecord {
588 job_event_id: handle.job_event_id,
589 claim_id: handle.claim_id.clone(),
590 consumer_id: handle.consumer_id.clone(),
591 acked_at_ms: now_ms(),
592 })
593 .map_err(|error| LogError::Serde(error.to_string()))?,
594 ),
595 )
596 .await
597 }
598
599 pub async fn purge_unclaimed(
600 &self,
601 queue: &str,
602 purged_by: &str,
603 reason: Option<&str>,
604 ) -> Result<usize, LogError> {
605 let state = self.queue_state(queue).await?;
606 let ready_jobs: Vec<_> = state
607 .jobs
608 .into_iter()
609 .filter(|job| job.is_ready())
610 .map(|job| job.job_event_id)
611 .collect();
612 for job_event_id in &ready_jobs {
613 self.event_log
614 .append(
615 &claims_topic(queue)?,
616 LogEvent::new(
617 "job_purged",
618 serde_json::to_value(WorkerQueuePurgeRecord {
619 job_event_id: *job_event_id,
620 purged_by: purged_by.to_string(),
621 purged_at_ms: now_ms(),
622 reason: reason
623 .filter(|value| !value.trim().is_empty())
624 .map(|value| value.to_string()),
625 })
626 .map_err(|error| LogError::Serde(error.to_string()))?,
627 ),
628 )
629 .await?;
630 }
631 Ok(ready_jobs.len())
632 }
633}
634
635#[derive(Clone, Debug)]
636struct WorkerQueueJobStateInternal {
637 job_event_id: u64,
638 enqueued_at_ms: i64,
639 job: WorkerQueueJob,
640 active_claim: Option<WorkerQueueClaimHandle>,
641 acked: bool,
642 purged: bool,
643 seen_claim_ids: BTreeSet<String>,
644}
645
646#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
647struct WorkerQueueCatalogRecord {
648 queue: String,
649}
650
651#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
652struct WorkerQueueClaimRecord {
653 job_event_id: u64,
654 claim_id: String,
655 consumer_id: String,
656 claimed_at_ms: i64,
657 expires_at_ms: i64,
658}
659
660#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
661struct WorkerQueueClaimRenewalRecord {
662 job_event_id: u64,
663 claim_id: String,
664 consumer_id: String,
665 renewed_at_ms: i64,
666 expires_at_ms: i64,
667}
668
669#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
670struct WorkerQueueReleaseRecord {
671 job_event_id: u64,
672 claim_id: String,
673 consumer_id: String,
674 released_at_ms: i64,
675 #[serde(default)]
676 reason: Option<String>,
677}
678
679#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
680struct WorkerQueueAckRecord {
681 job_event_id: u64,
682 claim_id: String,
683 consumer_id: String,
684 acked_at_ms: i64,
685}
686
687#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
688struct WorkerQueuePurgeRecord {
689 job_event_id: u64,
690 purged_by: String,
691 purged_at_ms: i64,
692 #[serde(default)]
693 reason: Option<String>,
694}
695
696pub fn job_topic_name(queue: &str) -> String {
697 format!("worker.{}", sanitize_topic_component(queue))
698}
699
700pub fn claims_topic_name(queue: &str) -> String {
701 format!("{}{}", job_topic_name(queue), WORKER_QUEUE_CLAIMS_SUFFIX)
702}
703
704pub fn response_topic_name(queue: &str) -> String {
705 format!("{}{}", job_topic_name(queue), WORKER_QUEUE_RESPONSES_SUFFIX)
706}
707
708fn job_topic(queue: &str) -> Result<Topic, LogError> {
709 Topic::new(job_topic_name(queue))
710}
711
712fn claims_topic(queue: &str) -> Result<Topic, LogError> {
713 Topic::new(claims_topic_name(queue))
714}
715
716fn responses_topic(queue: &str) -> Result<Topic, LogError> {
717 Topic::new(response_topic_name(queue))
718}
719
720fn now_ms() -> i64 {
721 std::time::SystemTime::now()
722 .duration_since(std::time::UNIX_EPOCH)
723 .map(|duration| duration.as_millis() as i64)
724 .unwrap_or(0)
725}
726
727fn expiry_ms(now_ms: i64, ttl: StdDuration) -> i64 {
728 now_ms.saturating_add(ttl.as_millis().min(i64::MAX as u128) as i64)
729}
730
731#[cfg(test)]
732mod tests {
733 use super::*;
734
735 use crate::event_log::{AnyEventLog, MemoryEventLog};
736 use crate::triggers::{
737 event::{GenericWebhookPayload, KnownProviderPayload},
738 ProviderId, ProviderPayload, SignatureStatus, TraceId, TriggerEvent,
739 };
740
741 fn test_event(id: &str) -> TriggerEvent {
742 TriggerEvent {
743 id: crate::triggers::TriggerEventId(id.to_string()),
744 provider: ProviderId::from("github"),
745 kind: "issues.opened".to_string(),
746 trace_id: TraceId("trace-test".to_string()),
747 dedupe_key: id.to_string(),
748 tenant_id: None,
749 headers: BTreeMap::new(),
750 batch: None,
751 raw_body: None,
752 provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
753 GenericWebhookPayload {
754 source: Some("worker-queue-test".to_string()),
755 content_type: Some("application/json".to_string()),
756 raw: serde_json::json!({"id": id}),
757 },
758 )),
759 signature_status: SignatureStatus::Verified,
760 received_at: time::OffsetDateTime::now_utc(),
761 occurred_at: None,
762 dedupe_claimed: false,
763 }
764 }
765
766 fn test_job(
767 queue: &str,
768 trigger_id: &str,
769 event_id: &str,
770 priority: WorkerQueuePriority,
771 ) -> WorkerQueueJob {
772 WorkerQueueJob {
773 queue: queue.to_string(),
774 trigger_id: trigger_id.to_string(),
775 binding_key: format!("{trigger_id}@v1"),
776 binding_version: 1,
777 event: test_event(event_id),
778 replay_of_event_id: None,
779 priority,
780 }
781 }
782
783 #[tokio::test(flavor = "current_thread")]
784 async fn enqueue_and_summarize_queue() {
785 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
786 let queue = WorkerQueue::new(log);
787 queue
788 .enqueue(&test_job(
789 "triage",
790 "incoming-review-task",
791 "evt-1",
792 WorkerQueuePriority::Normal,
793 ))
794 .await
795 .unwrap();
796 let summaries = queue.queue_summaries().await.unwrap();
797 assert_eq!(summaries.len(), 1);
798 assert_eq!(summaries[0].queue, "triage");
799 assert_eq!(summaries[0].ready, 1);
800 assert_eq!(summaries[0].in_flight, 0);
801 }
802
803 #[tokio::test(flavor = "current_thread")]
804 async fn claim_and_ack_remove_job_from_ready_pool() {
805 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
806 let queue = WorkerQueue::new(log);
807 queue
808 .enqueue(&test_job(
809 "triage",
810 "incoming-review-task",
811 "evt-1",
812 WorkerQueuePriority::Normal,
813 ))
814 .await
815 .unwrap();
816 let claimed = queue
817 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
818 .await
819 .unwrap()
820 .unwrap();
821 let before_ack = queue.queue_state("triage").await.unwrap();
822 assert_eq!(before_ack.summary(now_ms()).ready, 0);
823 assert_eq!(before_ack.summary(now_ms()).in_flight, 1);
824 queue
825 .append_response(
826 "triage",
827 &WorkerQueueResponseRecord {
828 queue: "triage".to_string(),
829 job_event_id: claimed.handle.job_event_id,
830 consumer_id: "consumer-a".to_string(),
831 handled_at_ms: now_ms(),
832 outcome: Some(DispatchOutcome {
833 trigger_id: "incoming-review-task".to_string(),
834 binding_key: "incoming-review-task@v1".to_string(),
835 event_id: "evt-1".to_string(),
836 attempt_count: 1,
837 status: super::super::DispatchStatus::Succeeded,
838 handler_kind: "local".to_string(),
839 target_uri: "handlers::on_review".to_string(),
840 replay_of_event_id: None,
841 result: Some(serde_json::json!({"ok": true})),
842 error: None,
843 }),
844 error: None,
845 },
846 )
847 .await
848 .unwrap();
849 queue.ack_claim(&claimed.handle).await.unwrap();
850 let after_ack = queue.queue_state("triage").await.unwrap();
851 let summary = after_ack.summary(now_ms());
852 assert_eq!(summary.ready, 0);
853 assert_eq!(summary.in_flight, 0);
854 assert_eq!(summary.acked, 1);
855 assert_eq!(summary.responses, 1);
856 }
857
858 #[tokio::test(flavor = "current_thread")]
859 async fn expired_claim_allows_reclaim() {
860 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
861 let queue = WorkerQueue::new(log.clone());
862 let receipt = queue
863 .enqueue(&test_job(
864 "triage",
865 "incoming-review-task",
866 "evt-1",
867 WorkerQueuePriority::Normal,
868 ))
869 .await
870 .unwrap();
871 let expired_claim = WorkerQueueClaimRecord {
872 job_event_id: receipt.job_event_id,
873 claim_id: "expired-claim".to_string(),
874 consumer_id: "consumer-a".to_string(),
875 claimed_at_ms: now_ms().saturating_sub(2),
876 expires_at_ms: now_ms().saturating_sub(1),
877 };
878 log.append(
879 &claims_topic("triage").unwrap(),
880 LogEvent::new("job_claimed", serde_json::to_value(&expired_claim).unwrap()),
881 )
882 .await
883 .unwrap();
884 let second = queue
885 .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
886 .await
887 .unwrap()
888 .unwrap();
889 assert_eq!(second.job.event.id.0, "evt-1");
890 assert_ne!(second.handle.claim_id, expired_claim.claim_id);
891 assert_eq!(second.handle.consumer_id, "consumer-b");
892 }
893
894 #[tokio::test(flavor = "current_thread")]
895 async fn high_priority_and_aged_normal_are_selected_first() {
896 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
897 let queue = WorkerQueue::new(log.clone());
898
899 let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC).unwrap();
900 log.append(
901 &catalog_topic,
902 LogEvent::new("queue_seen", serde_json::json!({"queue":"triage"})),
903 )
904 .await
905 .unwrap();
906
907 let topic = job_topic("triage").unwrap();
908 let mut old_normal = LogEvent::new(
909 "trigger_dispatch",
910 serde_json::to_value(test_job(
911 "triage",
912 "incoming-review-task",
913 "evt-old-normal",
914 WorkerQueuePriority::Normal,
915 ))
916 .unwrap(),
917 );
918 old_normal.occurred_at_ms = now_ms() - NORMAL_PROMOTION_AGE_MS - 1_000;
919 log.append(&topic, old_normal).await.unwrap();
920
921 let high = LogEvent::new(
922 "trigger_dispatch",
923 serde_json::to_value(test_job(
924 "triage",
925 "incoming-review-task",
926 "evt-high",
927 WorkerQueuePriority::High,
928 ))
929 .unwrap(),
930 );
931 log.append(&topic, high).await.unwrap();
932
933 let claimed = queue
934 .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
935 .await
936 .unwrap()
937 .unwrap();
938 assert_eq!(claimed.job.event.id.0, "evt-old-normal");
939 }
940}