Skip to main content

harn_vm/triggers/
worker_queue.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use crate::event_log::{
9    sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12use super::{DispatchOutcome, TriggerEvent};
13
14pub const WORKER_QUEUE_CATALOG_TOPIC: &str = "worker.queues";
15const WORKER_QUEUE_CLAIMS_SUFFIX: &str = ".claims";
16const WORKER_QUEUE_RESPONSES_SUFFIX: &str = ".responses";
17const NORMAL_PROMOTION_AGE_MS: i64 = 15 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "lowercase")]
21pub enum WorkerQueuePriority {
22    High,
23    #[default]
24    Normal,
25    Low,
26}
27
28impl WorkerQueuePriority {
29    pub fn as_str(self) -> &'static str {
30        match self {
31            Self::High => "high",
32            Self::Normal => "normal",
33            Self::Low => "low",
34        }
35    }
36
37    fn effective_rank(self, enqueued_at_ms: i64, now_ms: i64) -> u8 {
38        match self {
39            Self::High => 0,
40            Self::Normal if now_ms.saturating_sub(enqueued_at_ms) >= NORMAL_PROMOTION_AGE_MS => 0,
41            Self::Normal => 1,
42            Self::Low => 2,
43        }
44    }
45}
46
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48pub struct WorkerQueueJob {
49    pub queue: String,
50    pub trigger_id: String,
51    pub binding_key: String,
52    pub binding_version: u32,
53    pub event: TriggerEvent,
54    #[serde(default)]
55    pub replay_of_event_id: Option<String>,
56    #[serde(default)]
57    pub priority: WorkerQueuePriority,
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct WorkerQueueEnqueueReceipt {
62    pub queue: String,
63    pub job_event_id: u64,
64    pub response_topic: String,
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
68pub struct WorkerQueueClaimHandle {
69    pub queue: String,
70    pub job_event_id: u64,
71    pub claim_id: String,
72    pub consumer_id: String,
73    pub expires_at_ms: i64,
74}
75
76#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
77pub struct ClaimedWorkerJob {
78    pub handle: WorkerQueueClaimHandle,
79    pub job: WorkerQueueJob,
80}
81
82#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
83pub struct WorkerQueueResponseRecord {
84    pub queue: String,
85    pub job_event_id: u64,
86    pub consumer_id: String,
87    pub handled_at_ms: i64,
88    pub outcome: Option<DispatchOutcome>,
89    pub error: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub struct WorkerQueueSummary {
94    pub queue: String,
95    pub ready: usize,
96    pub in_flight: usize,
97    pub acked: usize,
98    pub purged: usize,
99    pub responses: usize,
100    pub oldest_unclaimed_age_ms: Option<u64>,
101}
102
103#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
104pub struct WorkerQueueJobState {
105    pub job_event_id: u64,
106    pub enqueued_at_ms: i64,
107    pub job: WorkerQueueJob,
108    pub active_claim: Option<WorkerQueueClaimHandle>,
109    pub acked: bool,
110    pub purged: bool,
111}
112
113impl WorkerQueueJobState {
114    pub fn is_ready(&self) -> bool {
115        !self.acked && !self.purged && self.active_claim.is_none()
116    }
117}
118
119#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
120pub struct WorkerQueueState {
121    pub queue: String,
122    pub responses: Vec<WorkerQueueResponseRecord>,
123    pub jobs: Vec<WorkerQueueJobState>,
124}
125
126impl WorkerQueueState {
127    pub fn summary(&self, now_ms: i64) -> WorkerQueueSummary {
128        let ready = self.jobs.iter().filter(|job| job.is_ready()).count();
129        let in_flight = self
130            .jobs
131            .iter()
132            .filter(|job| !job.acked && !job.purged && job.active_claim.is_some())
133            .count();
134        let acked = self.jobs.iter().filter(|job| job.acked).count();
135        let purged = self.jobs.iter().filter(|job| job.purged).count();
136        let oldest_unclaimed_age_ms = self
137            .jobs
138            .iter()
139            .filter(|job| job.is_ready())
140            .map(|job| now_ms.saturating_sub(job.enqueued_at_ms).max(0) as u64)
141            .max();
142        WorkerQueueSummary {
143            queue: self.queue.clone(),
144            ready,
145            in_flight,
146            acked,
147            purged,
148            responses: self.responses.len(),
149            oldest_unclaimed_age_ms,
150        }
151    }
152
153    fn next_ready_job(&self, now_ms: i64) -> Option<&WorkerQueueJobState> {
154        self.jobs
155            .iter()
156            .filter(|job| job.is_ready())
157            .min_by_key(|job| {
158                (
159                    job.job.priority.effective_rank(job.enqueued_at_ms, now_ms),
160                    job.enqueued_at_ms,
161                    job.job_event_id,
162                )
163            })
164    }
165
166    fn active_claim_for(&self, job_event_id: u64) -> Option<&WorkerQueueClaimHandle> {
167        self.jobs
168            .iter()
169            .find(|job| job.job_event_id == job_event_id)
170            .and_then(|job| job.active_claim.as_ref())
171    }
172}
173
174#[derive(Clone)]
175pub struct WorkerQueue {
176    event_log: Arc<AnyEventLog>,
177}
178
179impl WorkerQueue {
180    pub fn new(event_log: Arc<AnyEventLog>) -> Self {
181        Self { event_log }
182    }
183
184    pub async fn enqueue(
185        &self,
186        job: &WorkerQueueJob,
187    ) -> Result<WorkerQueueEnqueueReceipt, LogError> {
188        let queue = job.queue.trim();
189        if queue.is_empty() {
190            return Err(LogError::Config(
191                "worker queue name cannot be empty".to_string(),
192            ));
193        }
194        let queue_name = queue.to_string();
195        let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
196            .expect("static worker queue catalog topic should always be valid");
197        self.event_log
198            .append(
199                &catalog_topic,
200                LogEvent::new(
201                    "queue_seen",
202                    serde_json::to_value(WorkerQueueCatalogRecord {
203                        queue: queue_name.clone(),
204                    })
205                    .map_err(|error| LogError::Serde(error.to_string()))?,
206                ),
207            )
208            .await?;
209
210        let job_topic = job_topic(&queue_name)?;
211        let mut headers = BTreeMap::new();
212        headers.insert("queue".to_string(), queue_name.clone());
213        headers.insert("trigger_id".to_string(), job.trigger_id.clone());
214        headers.insert("binding_key".to_string(), job.binding_key.clone());
215        headers.insert("event_id".to_string(), job.event.id.0.clone());
216        headers.insert("priority".to_string(), job.priority.as_str().to_string());
217        let job_event_id = self
218            .event_log
219            .append(
220                &job_topic,
221                LogEvent::new(
222                    "trigger_dispatch",
223                    serde_json::to_value(job)
224                        .map_err(|error| LogError::Serde(error.to_string()))?,
225                )
226                .with_headers(headers),
227            )
228            .await?;
229        Ok(WorkerQueueEnqueueReceipt {
230            queue: queue_name.clone(),
231            job_event_id,
232            response_topic: response_topic_name(&queue_name),
233        })
234    }
235
236    pub async fn known_queues(&self) -> Result<Vec<String>, LogError> {
237        let topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
238            .expect("static worker queue catalog topic should always be valid");
239        let events = self.event_log.read_range(&topic, None, usize::MAX).await?;
240        let mut queues = BTreeSet::new();
241        for (_, event) in events {
242            if event.kind != "queue_seen" {
243                continue;
244            }
245            let record: WorkerQueueCatalogRecord = serde_json::from_value(event.payload)
246                .map_err(|error| LogError::Serde(error.to_string()))?;
247            if !record.queue.trim().is_empty() {
248                queues.insert(record.queue);
249            }
250        }
251        Ok(queues.into_iter().collect())
252    }
253
254    pub async fn queue_state(&self, queue: &str) -> Result<WorkerQueueState, LogError> {
255        let queue_name = queue.trim();
256        if queue_name.is_empty() {
257            return Err(LogError::Config(
258                "worker queue name cannot be empty".to_string(),
259            ));
260        }
261        let now_ms = now_ms();
262        let job_events = self
263            .event_log
264            .read_range(&job_topic(queue_name)?, None, usize::MAX)
265            .await?;
266        let claim_events = self
267            .event_log
268            .read_range(&claims_topic(queue_name)?, None, usize::MAX)
269            .await?;
270        let response_events = self
271            .event_log
272            .read_range(&responses_topic(queue_name)?, None, usize::MAX)
273            .await?;
274
275        let mut jobs = BTreeMap::<u64, WorkerQueueJobStateInternal>::new();
276        for (job_event_id, event) in job_events {
277            if event.kind != "trigger_dispatch" {
278                continue;
279            }
280            let job: WorkerQueueJob = serde_json::from_value(event.payload)
281                .map_err(|error| LogError::Serde(error.to_string()))?;
282            jobs.insert(
283                job_event_id,
284                WorkerQueueJobStateInternal {
285                    job_event_id,
286                    enqueued_at_ms: event.occurred_at_ms,
287                    job,
288                    active_claim: None,
289                    acked: false,
290                    purged: false,
291                    seen_claim_ids: BTreeSet::new(),
292                },
293            );
294        }
295
296        for (_, event) in claim_events {
297            match event.kind.as_str() {
298                "job_claimed" => {
299                    let claim: WorkerQueueClaimRecord = serde_json::from_value(event.payload)
300                        .map_err(|error| LogError::Serde(error.to_string()))?;
301                    let Some(job) = jobs.get_mut(&claim.job_event_id) else {
302                        continue;
303                    };
304                    if job.acked || job.purged {
305                        continue;
306                    }
307                    job.seen_claim_ids.insert(claim.claim_id.clone());
308                    let can_take = job
309                        .active_claim
310                        .as_ref()
311                        .is_none_or(|active| active.expires_at_ms <= claim.claimed_at_ms);
312                    if can_take {
313                        job.active_claim = Some(WorkerQueueClaimHandle {
314                            queue: queue_name.to_string(),
315                            job_event_id: claim.job_event_id,
316                            claim_id: claim.claim_id,
317                            consumer_id: claim.consumer_id,
318                            expires_at_ms: claim.expires_at_ms,
319                        });
320                    }
321                }
322                "claim_renewed" => {
323                    let renewal: WorkerQueueClaimRenewalRecord =
324                        serde_json::from_value(event.payload)
325                            .map_err(|error| LogError::Serde(error.to_string()))?;
326                    let Some(job) = jobs.get_mut(&renewal.job_event_id) else {
327                        continue;
328                    };
329                    if let Some(active) = job.active_claim.as_mut() {
330                        if active.claim_id == renewal.claim_id {
331                            active.expires_at_ms = renewal.expires_at_ms;
332                        }
333                    }
334                }
335                "job_released" => {
336                    let release: WorkerQueueReleaseRecord =
337                        serde_json::from_value(event.payload)
338                            .map_err(|error| LogError::Serde(error.to_string()))?;
339                    let Some(job) = jobs.get_mut(&release.job_event_id) else {
340                        continue;
341                    };
342                    if job
343                        .active_claim
344                        .as_ref()
345                        .is_some_and(|active| active.claim_id == release.claim_id)
346                    {
347                        job.active_claim = None;
348                    }
349                }
350                "job_acked" => {
351                    let ack: WorkerQueueAckRecord = serde_json::from_value(event.payload)
352                        .map_err(|error| LogError::Serde(error.to_string()))?;
353                    let Some(job) = jobs.get_mut(&ack.job_event_id) else {
354                        continue;
355                    };
356                    if ack.claim_id.is_empty() || job.seen_claim_ids.contains(&ack.claim_id) {
357                        job.acked = true;
358                        job.active_claim = None;
359                    }
360                }
361                "job_purged" => {
362                    let purge: WorkerQueuePurgeRecord = serde_json::from_value(event.payload)
363                        .map_err(|error| LogError::Serde(error.to_string()))?;
364                    let Some(job) = jobs.get_mut(&purge.job_event_id) else {
365                        continue;
366                    };
367                    if !job.acked {
368                        job.purged = true;
369                        job.active_claim = None;
370                    }
371                }
372                _ => {}
373            }
374        }
375
376        let responses = response_events
377            .into_iter()
378            .filter(|(_, event)| event.kind == "job_response")
379            .map(|(_, event)| {
380                serde_json::from_value::<WorkerQueueResponseRecord>(event.payload)
381                    .map_err(|error| LogError::Serde(error.to_string()))
382            })
383            .collect::<Result<Vec<_>, _>>()?;
384
385        let mut queue_state = WorkerQueueState {
386            queue: queue_name.to_string(),
387            responses,
388            jobs: jobs
389                .into_values()
390                .map(|mut job| {
391                    if job
392                        .active_claim
393                        .as_ref()
394                        .is_some_and(|active| active.expires_at_ms <= now_ms)
395                    {
396                        job.active_claim = None;
397                    }
398                    WorkerQueueJobState {
399                        job_event_id: job.job_event_id,
400                        enqueued_at_ms: job.enqueued_at_ms,
401                        job: job.job,
402                        active_claim: job.active_claim,
403                        acked: job.acked,
404                        purged: job.purged,
405                    }
406                })
407                .collect(),
408        };
409        queue_state
410            .jobs
411            .sort_by_key(|job| (job.enqueued_at_ms, job.job_event_id));
412        Ok(queue_state)
413    }
414
415    pub async fn queue_summaries(&self) -> Result<Vec<WorkerQueueSummary>, LogError> {
416        let now_ms = now_ms();
417        let mut summaries = Vec::new();
418        for queue in self.known_queues().await? {
419            let state = self.queue_state(&queue).await?;
420            summaries.push(state.summary(now_ms));
421        }
422        summaries.sort_by(|left, right| left.queue.cmp(&right.queue));
423        Ok(summaries)
424    }
425
426    pub async fn claim_next(
427        &self,
428        queue: &str,
429        consumer_id: &str,
430        ttl: StdDuration,
431    ) -> Result<Option<ClaimedWorkerJob>, LogError> {
432        let queue_name = queue.trim();
433        if queue_name.is_empty() {
434            return Err(LogError::Config(
435                "worker queue name cannot be empty".to_string(),
436            ));
437        }
438        if consumer_id.trim().is_empty() {
439            return Err(LogError::InvalidConsumer(
440                "worker queue consumer id cannot be empty".to_string(),
441            ));
442        }
443        for _ in 0..8 {
444            let now_ms = now_ms();
445            let state = self.queue_state(queue_name).await?;
446            let Some(job) = state.next_ready_job(now_ms).cloned() else {
447                return Ok(None);
448            };
449            let claim = WorkerQueueClaimRecord {
450                job_event_id: job.job_event_id,
451                claim_id: Uuid::new_v4().to_string(),
452                consumer_id: consumer_id.to_string(),
453                claimed_at_ms: now_ms,
454                expires_at_ms: expiry_ms(now_ms, ttl),
455            };
456            self.event_log
457                .append(
458                    &claims_topic(queue_name)?,
459                    LogEvent::new(
460                        "job_claimed",
461                        serde_json::to_value(&claim)
462                            .map_err(|error| LogError::Serde(error.to_string()))?,
463                    ),
464                )
465                .await?;
466            let refreshed = self.queue_state(queue_name).await?;
467            if refreshed
468                .active_claim_for(job.job_event_id)
469                .is_some_and(|active| active.claim_id == claim.claim_id)
470            {
471                return Ok(Some(ClaimedWorkerJob {
472                    handle: WorkerQueueClaimHandle {
473                        queue: queue_name.to_string(),
474                        job_event_id: claim.job_event_id,
475                        claim_id: claim.claim_id,
476                        consumer_id: claim.consumer_id,
477                        expires_at_ms: claim.expires_at_ms,
478                    },
479                    job: job.job,
480                }));
481            }
482        }
483        Ok(None)
484    }
485
486    pub async fn renew_claim(
487        &self,
488        handle: &WorkerQueueClaimHandle,
489        ttl: StdDuration,
490    ) -> Result<bool, LogError> {
491        let now_ms = now_ms();
492        let renewal = WorkerQueueClaimRenewalRecord {
493            job_event_id: handle.job_event_id,
494            claim_id: handle.claim_id.clone(),
495            consumer_id: handle.consumer_id.clone(),
496            renewed_at_ms: now_ms,
497            expires_at_ms: expiry_ms(now_ms, ttl),
498        };
499        self.event_log
500            .append(
501                &claims_topic(&handle.queue)?,
502                LogEvent::new(
503                    "claim_renewed",
504                    serde_json::to_value(&renewal)
505                        .map_err(|error| LogError::Serde(error.to_string()))?,
506                ),
507            )
508            .await?;
509        let refreshed = self.queue_state(&handle.queue).await?;
510        Ok(refreshed
511            .active_claim_for(handle.job_event_id)
512            .is_some_and(|active| active.claim_id == handle.claim_id))
513    }
514
515    pub async fn release_claim(
516        &self,
517        handle: &WorkerQueueClaimHandle,
518        reason: &str,
519    ) -> Result<(), LogError> {
520        let release = WorkerQueueReleaseRecord {
521            job_event_id: handle.job_event_id,
522            claim_id: handle.claim_id.clone(),
523            consumer_id: handle.consumer_id.clone(),
524            released_at_ms: now_ms(),
525            reason: if reason.trim().is_empty() {
526                None
527            } else {
528                Some(reason.to_string())
529            },
530        };
531        self.event_log
532            .append(
533                &claims_topic(&handle.queue)?,
534                LogEvent::new(
535                    "job_released",
536                    serde_json::to_value(&release)
537                        .map_err(|error| LogError::Serde(error.to_string()))?,
538                ),
539            )
540            .await?;
541        Ok(())
542    }
543
544    pub async fn append_response(
545        &self,
546        queue: &str,
547        response: &WorkerQueueResponseRecord,
548    ) -> Result<u64, LogError> {
549        self.event_log
550            .append(
551                &responses_topic(queue)?,
552                LogEvent::new(
553                    "job_response",
554                    serde_json::to_value(response)
555                        .map_err(|error| LogError::Serde(error.to_string()))?,
556                ),
557            )
558            .await
559    }
560
561    pub async fn ack_claim(&self, handle: &WorkerQueueClaimHandle) -> Result<u64, LogError> {
562        self.event_log
563            .append(
564                &claims_topic(&handle.queue)?,
565                LogEvent::new(
566                    "job_acked",
567                    serde_json::to_value(WorkerQueueAckRecord {
568                        job_event_id: handle.job_event_id,
569                        claim_id: handle.claim_id.clone(),
570                        consumer_id: handle.consumer_id.clone(),
571                        acked_at_ms: now_ms(),
572                    })
573                    .map_err(|error| LogError::Serde(error.to_string()))?,
574                ),
575            )
576            .await
577    }
578
579    pub async fn purge_unclaimed(
580        &self,
581        queue: &str,
582        purged_by: &str,
583        reason: Option<&str>,
584    ) -> Result<usize, LogError> {
585        let state = self.queue_state(queue).await?;
586        let ready_jobs: Vec<_> = state
587            .jobs
588            .into_iter()
589            .filter(|job| job.is_ready())
590            .map(|job| job.job_event_id)
591            .collect();
592        for job_event_id in &ready_jobs {
593            self.event_log
594                .append(
595                    &claims_topic(queue)?,
596                    LogEvent::new(
597                        "job_purged",
598                        serde_json::to_value(WorkerQueuePurgeRecord {
599                            job_event_id: *job_event_id,
600                            purged_by: purged_by.to_string(),
601                            purged_at_ms: now_ms(),
602                            reason: reason
603                                .filter(|value| !value.trim().is_empty())
604                                .map(|value| value.to_string()),
605                        })
606                        .map_err(|error| LogError::Serde(error.to_string()))?,
607                    ),
608                )
609                .await?;
610        }
611        Ok(ready_jobs.len())
612    }
613}
614
615#[derive(Clone, Debug)]
616struct WorkerQueueJobStateInternal {
617    job_event_id: u64,
618    enqueued_at_ms: i64,
619    job: WorkerQueueJob,
620    active_claim: Option<WorkerQueueClaimHandle>,
621    acked: bool,
622    purged: bool,
623    seen_claim_ids: BTreeSet<String>,
624}
625
626#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
627struct WorkerQueueCatalogRecord {
628    queue: String,
629}
630
631#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
632struct WorkerQueueClaimRecord {
633    job_event_id: u64,
634    claim_id: String,
635    consumer_id: String,
636    claimed_at_ms: i64,
637    expires_at_ms: i64,
638}
639
640#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
641struct WorkerQueueClaimRenewalRecord {
642    job_event_id: u64,
643    claim_id: String,
644    consumer_id: String,
645    renewed_at_ms: i64,
646    expires_at_ms: i64,
647}
648
649#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
650struct WorkerQueueReleaseRecord {
651    job_event_id: u64,
652    claim_id: String,
653    consumer_id: String,
654    released_at_ms: i64,
655    #[serde(default)]
656    reason: Option<String>,
657}
658
659#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
660struct WorkerQueueAckRecord {
661    job_event_id: u64,
662    claim_id: String,
663    consumer_id: String,
664    acked_at_ms: i64,
665}
666
667#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
668struct WorkerQueuePurgeRecord {
669    job_event_id: u64,
670    purged_by: String,
671    purged_at_ms: i64,
672    #[serde(default)]
673    reason: Option<String>,
674}
675
676pub fn job_topic_name(queue: &str) -> String {
677    format!("worker.{}", sanitize_topic_component(queue))
678}
679
680pub fn claims_topic_name(queue: &str) -> String {
681    format!("{}{}", job_topic_name(queue), WORKER_QUEUE_CLAIMS_SUFFIX)
682}
683
684pub fn response_topic_name(queue: &str) -> String {
685    format!("{}{}", job_topic_name(queue), WORKER_QUEUE_RESPONSES_SUFFIX)
686}
687
688fn job_topic(queue: &str) -> Result<Topic, LogError> {
689    Topic::new(job_topic_name(queue))
690}
691
692fn claims_topic(queue: &str) -> Result<Topic, LogError> {
693    Topic::new(claims_topic_name(queue))
694}
695
696fn responses_topic(queue: &str) -> Result<Topic, LogError> {
697    Topic::new(response_topic_name(queue))
698}
699
700fn now_ms() -> i64 {
701    std::time::SystemTime::now()
702        .duration_since(std::time::UNIX_EPOCH)
703        .map(|duration| duration.as_millis() as i64)
704        .unwrap_or(0)
705}
706
707fn expiry_ms(now_ms: i64, ttl: StdDuration) -> i64 {
708    now_ms.saturating_add(ttl.as_millis().min(i64::MAX as u128) as i64)
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714
715    use crate::event_log::{AnyEventLog, MemoryEventLog};
716    use crate::triggers::{
717        event::{GenericWebhookPayload, KnownProviderPayload},
718        ProviderId, ProviderPayload, SignatureStatus, TraceId, TriggerEvent,
719    };
720
721    fn test_event(id: &str) -> TriggerEvent {
722        TriggerEvent {
723            id: crate::triggers::TriggerEventId(id.to_string()),
724            provider: ProviderId::from("github"),
725            kind: "issues.opened".to_string(),
726            trace_id: TraceId("trace-test".to_string()),
727            dedupe_key: id.to_string(),
728            tenant_id: None,
729            headers: BTreeMap::new(),
730            batch: None,
731            raw_body: None,
732            provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
733                GenericWebhookPayload {
734                    source: Some("worker-queue-test".to_string()),
735                    content_type: Some("application/json".to_string()),
736                    raw: serde_json::json!({"id": id}),
737                },
738            )),
739            signature_status: SignatureStatus::Verified,
740            received_at: time::OffsetDateTime::now_utc(),
741            occurred_at: None,
742            dedupe_claimed: false,
743        }
744    }
745
746    fn test_job(
747        queue: &str,
748        trigger_id: &str,
749        event_id: &str,
750        priority: WorkerQueuePriority,
751    ) -> WorkerQueueJob {
752        WorkerQueueJob {
753            queue: queue.to_string(),
754            trigger_id: trigger_id.to_string(),
755            binding_key: format!("{trigger_id}@v1"),
756            binding_version: 1,
757            event: test_event(event_id),
758            replay_of_event_id: None,
759            priority,
760        }
761    }
762
763    #[tokio::test(flavor = "current_thread")]
764    async fn enqueue_and_summarize_queue() {
765        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
766        let queue = WorkerQueue::new(log);
767        queue
768            .enqueue(&test_job(
769                "triage",
770                "incoming-review-task",
771                "evt-1",
772                WorkerQueuePriority::Normal,
773            ))
774            .await
775            .unwrap();
776        let summaries = queue.queue_summaries().await.unwrap();
777        assert_eq!(summaries.len(), 1);
778        assert_eq!(summaries[0].queue, "triage");
779        assert_eq!(summaries[0].ready, 1);
780        assert_eq!(summaries[0].in_flight, 0);
781    }
782
783    #[tokio::test(flavor = "current_thread")]
784    async fn claim_and_ack_remove_job_from_ready_pool() {
785        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
786        let queue = WorkerQueue::new(log);
787        queue
788            .enqueue(&test_job(
789                "triage",
790                "incoming-review-task",
791                "evt-1",
792                WorkerQueuePriority::Normal,
793            ))
794            .await
795            .unwrap();
796        let claimed = queue
797            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
798            .await
799            .unwrap()
800            .unwrap();
801        let before_ack = queue.queue_state("triage").await.unwrap();
802        assert_eq!(before_ack.summary(now_ms()).ready, 0);
803        assert_eq!(before_ack.summary(now_ms()).in_flight, 1);
804        queue
805            .append_response(
806                "triage",
807                &WorkerQueueResponseRecord {
808                    queue: "triage".to_string(),
809                    job_event_id: claimed.handle.job_event_id,
810                    consumer_id: "consumer-a".to_string(),
811                    handled_at_ms: now_ms(),
812                    outcome: Some(DispatchOutcome {
813                        trigger_id: "incoming-review-task".to_string(),
814                        binding_key: "incoming-review-task@v1".to_string(),
815                        event_id: "evt-1".to_string(),
816                        attempt_count: 1,
817                        status: super::super::DispatchStatus::Succeeded,
818                        handler_kind: "local".to_string(),
819                        target_uri: "handlers::on_review".to_string(),
820                        replay_of_event_id: None,
821                        result: Some(serde_json::json!({"ok": true})),
822                        error: None,
823                    }),
824                    error: None,
825                },
826            )
827            .await
828            .unwrap();
829        queue.ack_claim(&claimed.handle).await.unwrap();
830        let after_ack = queue.queue_state("triage").await.unwrap();
831        let summary = after_ack.summary(now_ms());
832        assert_eq!(summary.ready, 0);
833        assert_eq!(summary.in_flight, 0);
834        assert_eq!(summary.acked, 1);
835        assert_eq!(summary.responses, 1);
836    }
837
838    #[tokio::test(flavor = "current_thread")]
839    async fn expired_claim_allows_reclaim() {
840        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
841        let queue = WorkerQueue::new(log);
842        queue
843            .enqueue(&test_job(
844                "triage",
845                "incoming-review-task",
846                "evt-1",
847                WorkerQueuePriority::Normal,
848            ))
849            .await
850            .unwrap();
851        let first = queue
852            .claim_next("triage", "consumer-a", StdDuration::from_millis(15))
853            .await
854            .unwrap()
855            .unwrap();
856        tokio::time::sleep(StdDuration::from_millis(30)).await;
857        let second = queue
858            .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
859            .await
860            .unwrap()
861            .unwrap();
862        assert_eq!(first.job.event.id.0, second.job.event.id.0);
863        assert_ne!(first.handle.claim_id, second.handle.claim_id);
864        assert_eq!(second.handle.consumer_id, "consumer-b");
865    }
866
867    #[tokio::test(flavor = "current_thread")]
868    async fn high_priority_and_aged_normal_are_selected_first() {
869        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
870        let queue = WorkerQueue::new(log.clone());
871
872        let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC).unwrap();
873        log.append(
874            &catalog_topic,
875            LogEvent::new("queue_seen", serde_json::json!({"queue":"triage"})),
876        )
877        .await
878        .unwrap();
879
880        let topic = job_topic("triage").unwrap();
881        let mut old_normal = LogEvent::new(
882            "trigger_dispatch",
883            serde_json::to_value(test_job(
884                "triage",
885                "incoming-review-task",
886                "evt-old-normal",
887                WorkerQueuePriority::Normal,
888            ))
889            .unwrap(),
890        );
891        old_normal.occurred_at_ms = now_ms() - NORMAL_PROMOTION_AGE_MS - 1_000;
892        log.append(&topic, old_normal).await.unwrap();
893
894        let high = LogEvent::new(
895            "trigger_dispatch",
896            serde_json::to_value(test_job(
897                "triage",
898                "incoming-review-task",
899                "evt-high",
900                WorkerQueuePriority::High,
901            ))
902            .unwrap(),
903        );
904        log.append(&topic, high).await.unwrap();
905
906        let claimed = queue
907            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
908            .await
909            .unwrap()
910            .unwrap();
911        assert_eq!(claimed.job.event.id.0, "evt-old-normal");
912    }
913}